8 __author__ =
"Sebastien Binet <binet@cern.ch>"
26 from dbm
import whichdb
28 from .Helpers
import ShutUp
38 try:
from ROOT
import RNTuple
40 return isinstance( obj, RNTuple )
44 """ reverse-engineering of the POOL FileCatalog.
45 allows to retrieve the physical filename from a logical one, provided
46 that the file-id is known to the (real) PoolFileCatalog
48 DefaultCatalog =
"xmlcatalog_file:PoolFileCatalog.xml"
59 super (PoolFileCatalog, self).
__init__()
66 if isinstance(catalog, str):
69 if not isinstance (catalog, (str, list)):
71 "catalog contact string should be a string or a list thereof! (got %r)"%
76 return osp.expanduser(osp.expandvars(x))
78 def _handle_apcfile_old(x):
79 """ return $ATLAS_POOLCOND_PATH/poolcond/x
81 if 'ATLAS_POOLCOND_PATH' not in os.environ:
83 pcp = os.environ[
"ATLAS_POOLCOND_PATH"]
84 if x.startswith(
"apcfile:"):
85 x = x[len(
"apcfile:"):]
86 return osp_exp(osp.join(pcp,
'poolcond', x))
88 def _handle_apcfile(x):
89 """ return $ATLAS_POOLCOND_PATH/x
91 if 'ATLAS_POOLCOND_PATH' not in os.environ:
93 pcp = os.environ[
"ATLAS_POOLCOND_PATH"]
94 if x.startswith(
"apcfile:"):
95 x = x[len(
"apcfile:"):]
96 return osp_exp(osp.join(pcp, x))
98 def _handle_xmlcatalog_file(x):
99 return osp_exp(x[len(
"xmlcatalog_file:"):])
101 def _handle_prfile(x):
102 x = x[len(
"prfile:"):]
105 import AthenaCommon.Utils.unixtools
as u
107 os.environ[
'DATAPATH'].
split(os.pathsep),
118 "xmlcatalog_file:": _handle_xmlcatalog_file,
119 "apcfile:": _handle_apcfile,
120 "prfile:": _handle_prfile,
121 "file:": _handle_file,
124 "catalog dispatch keys does not match AllowedProtocols:" \
125 "\n%s\n%s" % (
sorted(cat_dispatch.keys()),
128 from .
import xmldict
129 def _build_catalog(catalog):
132 "sorry PoolFile:PoolFileCatalog only supports %s"
133 " as a protocol for the POOL file catalog (got: '%s')"
136 for protocol, handler
in cat_dispatch.iteritems():
137 if catalog.startswith(protocol):
143 if not os.path.exists (catalog):
150 root = xmldict.ElementTree.parse (catalog).getroot()
151 return dict(xmldict.xml2dict(root))
154 cat = {
'POOLFILECATALOG':{
'File':[]}}
157 bc = _build_catalog(c)
158 pc = bc.get(
'POOLFILECATALOG',{})
161 files = pc.get(
'File',[])
162 if isinstance(files, dict):
164 cat[
'POOLFILECATALOG'][
'File'].
extend(files)
165 except Exception
as err:
174 def pfn (self, url_or_fid):
175 """find the physical file name given a url or a file-id"""
176 import os.path
as osp
177 url_or_fid = osp.expanduser(osp.expandvars(url_or_fid))
179 if isinstance (url_or_fid, types.ListType):
180 return [self.
_pfn(f)
for f
in url_or_fid]
182 return self.
_pfn(url_or_fid)
185 """find the physical file name given a url or a file-id"""
186 if not (
'POOLFILECATALOG' in self.
catalog):
188 if not (
'File' in self.
catalog[
'POOLFILECATALOG']):
193 files = self.
catalog[
'POOLFILECATALOG'][
'File']
194 if isinstance(files, dict):
198 if url_or_fid.lower().startswith(
'fid:'):
199 url_or_fid = url_or_fid[len(
'fid:'):]
200 if re.compile (
r'\w{8}-\w{4}-\w{4}-\w{4}-\w{12}$').match (url_or_fid):
201 fid = url_or_fid.lower()
206 if f.ID.lower() == fid:
209 if isinstance(pfn, (list,tuple)):
210 match[fid].
append([i.name
for i
in pfn])
212 match[fid].
append([pfn.name])
213 if len(match[fid])==1:
214 return match[fid][0][PFN_IDX]
215 if len(match[fid])>1:
217 "more than one match for FID='%s'!\n%r"%(fid,match)
219 raise KeyError (
"no entry with FID='%s' in catalog" % fid)
222 if url.lower().startswith(
"lfn:"):
223 url = url[len(
"lfn:"):]
229 and f.logical.lfn.name == url):
232 if isinstance(pfn, (list,tuple)):
233 match[url].
append([i.name
for i
in pfn])
235 match[url].
append([pfn.name])
236 if len(match[url])==1:
237 return match[url][0][PFN_IDX]
238 if len(match[url])>1:
240 "more than one match for LFN='%s'!\n%r"%(url,match)
242 raise KeyError (
"no entry with LFN='%s' in catalog" % url)
244 if url.lower().startswith(
"pfn:"):
245 url = url[len(
"pfn:"):]
249 return self.
pfn (url_or_fid)
259 EventData =
"CollectionTree"
260 EventTag =
"POOLCollectionTree"
261 DataHeader =
"POOLContainer"
262 MetaData =
"MetaData"
264 EventData =
"EventData"
265 EventTag =
"EventTag"
266 DataHeader =
"DataHeader"
267 MetaData =
"MetaData"
270 SUPER_DETAILED_BRANCH_SZ =
False
272 POOL_HEADER = TTreeNames.DataHeader
273 EVENT_DATA = TTreeNames.EventData
274 META_DATA = TTreeNames.MetaData
275 HDR_FORMAT =
" %11s %11s %11s %11s %5s %s"
276 ROW_FORMAT =
"%12.3f kb %12.3f kb %12.3f kb %12.3f %8i %s"
280 return not name.startswith(
"##")
and not cls.
isDataHeader(name)
290 return name.startswith(PoolOpts.EVENT_DATA)
294 return "_DAOD_" in name
298 s = (name+
"__").
split(
'_')[2]
299 if s.endswith(
"Form"):
305 return name.startswith(PoolOpts.POOL_HEADER)
and cls.
isAugmentation(name)
310 if PoolOpts.FAST_MODE:
312 if not PoolOpts.SUPER_DETAILED_BRANCH_SZ:
313 return branch.GetTotalSize()
316 for bnum
in range(0, branch.GetWriteBasket()):
317 basket = branch.GetBasket(bnum)
318 brSize += basket.GetObjlen() - 8
322 """take a file name, return the pair (protocol, 'real' file name)
324 fname = os.path.expanduser(os.path.expandvars(fname))
326 def _normalize_uri(uri):
327 if uri.startswith(
'/'):
331 from urllib.parse
import urlsplit
332 url = urlsplit(_normalize_uri(fname))
333 protocol = url.scheme
334 def _normalize(fname):
335 from posixpath
import normpath
336 fname = normpath(fname)
337 if fname.startswith(
'//'): fname = fname[1:]
340 if protocol
in (
'',
'file',
'pfn'):
342 fname = _normalize(url.path)
345 if fname.startswith(
'/castor/'):
347 fname = protocol +
':' + fname
349 elif protocol
in (
'rfio',
'castor'):
351 fname = _normalize(url.path)
352 fname = protocol+
':'+fname
354 elif protocol
in (
'root',
'dcap',
'dcache',
'http',
'https',
'dav',
'davs'):
357 elif protocol
in (
'gsidcap',):
358 protocol =
'gfal:gsidcap'
361 elif protocol
in (
'lfn',
'fid',):
363 from PyUtils.PoolFile
import PoolFileCatalog
as pfc
364 fname = pfc().pfn(protocol+
':'+url.path)
367 elif protocol
in (
'ami',):
369 for token
in (
'ami:',
'//',
'/'):
370 if fname.startswith(token):
371 fname = fname[len(token):]
372 fname =
'ami://' + fname
376 print(f
'## warning: unknown protocol [{protocol}]. we will just return our input')
379 return (protocol, fname)
382 x509_proxy = os.environ.get(
'X509_USER_PROXY',
'')
385 root.TSSLSocket.SetUpSSL(
387 "/etc/grid-security/certificates",
391 print(
"## warning: protocol https is requested but no X509_USER_PROXY was found! (opening the file might fail.)")
396 import PyUtils.RootUtils
as ru
397 root = ru.import_root()
401 re.compile(
'TClass::TClass:0: RuntimeWarning: no dictionary for class.*') ]):
402 root.gSystem.Load(
'libRootCollection')
403 root_open = root.TFile.Open
408 if protocol ==
'https':
410 root_open = root.TWebFile.Open
412 f = root_open(fname,
'READ')
413 if f
is None or not f:
415 raise IOError(errno.ENOENT,
416 'No such file or directory',fname)
421 fmt =
"%s %3i %8.3f %8.3f %8.3f %s"
424 branch.GetListOfBranches().GetSize(),
425 _get_total_size (branch),
426 branch.GetTotBytes(),
427 branch.GetZipBytes(),
431 branches = branch.GetListOfBranches()
433 poolRecord.memSize += _get_total_size (b) / Units.kb
434 if (b.GetZipBytes() < 0.001):
435 poolRecord.memSizeNoZip += _get_total_size (b) / Units.kb
436 poolRecord.diskSize += b.GetZipBytes() / Units.kb
437 poolRecord = retrieveBranchInfos ( b, poolRecord, ident+
" " )
442 memSize = _get_total_size (branch) / Units.kb
443 zipBytes = branch.GetZipBytes()
444 memSizeNoZip = memSize
if zipBytes < 0.001
else 0.
445 diskSize = branch.GetZipBytes() / Units.kb
446 typeName = branch.GetClassName()
447 if not typeName
and (leaf := branch.GetListOfLeaves().At(0)):
448 typeName = leaf.GetTypeName()
449 return PoolRecord(branch.GetName(), memSize, diskSize, memSizeNoZip,
455 """Helper function to read a POOL file and extract the item-list from the
458 `pool_file` the name of the pool file to inspect
459 `verbose` self-explanatory
460 `items_type` what kind of items one is interested in
461 allowed values: 'eventdata' 'metadata'
462 Note: this function is actually executed in a forked sub-process
465 _allowed_values = (
'eventdata',
467 if items_type
not in _allowed_values:
469 "invalid argument for 'items_type'. ",
470 "got: [%s] " % items_type,
471 "(allowed values: %r)" % _allowed_values
473 raise ValueError(err)
475 key =
'%s_items' % items_type
477 import PyUtils.FilePeekerTool
as fpt
478 fp = fpt.FilePeekerTool(f_root)
479 items = fp.getPeekedData(key)
489 DiskSize =
"diskSize"
491 ContainerName =
"name"
495 return [ PoolRecord.Sorter.DiskSize,
496 PoolRecord.Sorter.MemSize,
497 PoolRecord.Sorter.ContainerName ]
499 def __init__(self, name, memSize, diskSize, memSizeNoZip, nEntries, dirType,
500 detailedInfos = "", typeName = None):
501 """Initialize PoolRecord instance.
503 dirType first letter of object type name that may distinguish the types:
504 "T" for TTree, "B" for TBranch,
505 "N" for RNTuple, "F" for RField
507 object.__init__(self)
521 A simple class to retrieve informations about the content of a POOL file.
522 It should be abstracted from the underlying technology used to create this
523 POOL file (Db, ROOT,...).
524 Right now, we are using the easy and loosy solution: going straight to the
529 object.__init__(self)
544 except Exception
as err:
545 print(
"## warning: problem opening PoolFileCatalog:\n%s"%err)
547 traceback.print_exc(err)
551 dbFileName = whichdb( fileName )
552 if dbFileName
not in (
None,
'' ):
554 print(
"## opening file [%s]..." %
str(fileName))
555 db = shelve.open( fileName,
'r' )
557 print(
"## opening file [OK]")
558 report = db[
'report']
561 self.
data = report[
'data']
564 print(
"## opening file [%s]..." %
str(fileName))
567 print(
"## opening file [OK]")
576 print(
"## importing ROOT...")
577 import PyUtils.RootUtils
as ru
578 ROOT = ru.import_root()
581 print(
"## importing ROOT... [DONE]")
585 ROOT.gErrorIgnoreLevel = ROOT.kFatal
589 poolFile = ROOT.TFile.Open( fileName, PoolOpts.READ_MODE )
590 except Exception
as e:
592 print(
"## Failed to open file [%s] !!" % fileName)
595 print(
"## Bailing out...")
596 raise IOError(
"Could not open file [%s]" % fileName)
601 print(
"## Failed to open file [%s] !!" % fileName)
602 msg =
"Could not open file [%s]" % fileName
607 "Invalid POOL file or a Zombie one"
616 for name
in {PoolOpts.TTreeNames.DataHeader, PoolOpts.RNTupleNames.DataHeader}:
617 dhKey = self.
poolFile.FindKey( name )
620 if isinstance(obj, self.
ROOT.TTree):
621 nEntries = obj.GetEntries()
623 nEntries = self.
ROOT.Experimental.RNTupleReader.Open(obj).GetNEntries()
625 raise NotImplementedError(f
"Keys of type {type(obj)!r} not supported")
632 for k
in self.
poolFile.GetListOfKeys():
633 keyname = k.GetName()
635 if isinstance(obj, self.
ROOT.TTree):
636 containerName = obj.GetName()
637 nEntries = obj.GetEntries()
640 reader = self.
ROOT.Experimental.RNTupleReader.Open(obj)
641 containerName = reader.GetDescriptor().GetName()
642 nEntries = reader.GetNEntries()
645 raise NotImplementedError(f
"Keys of type {type(obj)!r} not supported")
646 if containerName
not in containers:
648 containers.append(containerName)
650 if keyname.startswith(PoolOpts.POOL_HEADER)
and not keyname.endswith(
'Form'):
651 self.
dataHeaderA[PoolOpts.augmentationName(keyname)] = \
656 keys.sort (key =
lambda x: x.GetName())
662 if isinstance(obj, self.
ROOT.TTree):
665 inspector = self.
ROOT.Experimental.RNTupleInspector.Create(obj)
666 name = inspector.GetDescriptor().GetName()
668 if PoolOpts.isDataHeader(name):
669 contName =
"DataHeader"
670 if isinstance(obj, self.
ROOT.TTree):
671 memSize = obj.GetTotBytes() / Units.kb
672 diskSize = obj.GetZipBytes() / Units.kb
675 memSizeNoZip = memSize
676 nEntries = obj.GetEntries()
680 br.GetName()
for br
in obj.GetListOfBranches()
681 if br.GetName().
count(
"DataHeader_p") > 0
683 if len(dhBranchNames) == 1:
684 dhBranch = obj.GetBranch(dhBranchNames[0])
685 typeName = dhBranch.GetClassName()
686 if not typeName
and (leaf := dhBranch.GetListOfLeaves().At(0)):
687 typeName = leaf.GetTypeName()
693 typeName = typeName ),
697 poolRecord =
PoolRecord(contName, memSize, diskSize, memSizeNoZip,
703 diskSize = inspector.GetCompressedSize() / Units.kb
704 memSize = inspector.GetUncompressedSize() / Units.kb
708 memSizeNoZip = memSize
709 nEntries = inspector.GetDescriptor().GetNEntries()
710 poolRecord =
PoolRecord(contName, memSize, diskSize, memSizeNoZip,
714 elif PoolOpts.isData(name):
715 if isinstance(obj, self.
ROOT.TTree):
716 if not hasattr(obj,
'GetListOfBranches'):
718 branches = obj.GetListOfBranches()
720 if name
in (PoolOpts.EVENT_DATA, PoolOpts.META_DATA):
722 for branch
in branches:
728 poolRecord.augName = PoolOpts.augmentationName(name)
730 self.
data += [ poolRecord ]
732 descriptor = inspector.GetDescriptor()
734 if name
in {PoolOpts.RNTupleNames.EventData, PoolOpts.RNTupleNames.MetaData}:
736 fieldZeroId = descriptor.GetFieldZeroId()
737 for fieldDescriptor
in descriptor.GetFieldIterable(fieldZeroId):
738 fieldId = fieldDescriptor.GetId()
739 fieldTreeInspector = inspector.GetFieldTreeInspector(fieldId)
740 diskSize = fieldTreeInspector.GetCompressedSize() / Units.kb
741 memSize = fieldTreeInspector.GetUncompressedSize() / Units.kb
742 typeName = fieldDescriptor.GetTypeName()
743 fieldName = fieldDescriptor.GetFieldName()
744 poolRecord =
PoolRecord(fieldName, memSize, diskSize, memSize,
745 descriptor.GetNEntries(),
748 poolRecord.augName = PoolOpts.augmentationName(name)
750 self.
data += [ poolRecord ]
756 return os.linesep.join( [
758 "Size: %12.3f kb" % (self.
_fileInfos[
'size'] / Units.kb),
763 def checkFile(self, sorting = PoolRecord.Sorter.DiskSize):
773 if sorting
in PoolRecord.Sorter.allowedValues():
775 data.sort(key = operator.attrgetter(sorting) )
777 def _get_val(x, dflt=-999.):
778 if PoolOpts.FAST_MODE:
782 totMemSize = _get_val(self.
dataHeader.memSize, dflt=0.)
785 def _safe_div(num,den):
793 print(PoolOpts.HDR_FORMAT % (
"Mem Size",
"Disk Size",
"Size/Evt",
794 "MissZip/Mem",
"items",
795 "(X) Container Name (X=Tree|Branch)" ))
798 print(PoolOpts.ROW_FORMAT % (
802 _get_val (_safe_div(self.
dataHeader.memSizeNoZip,
812 totMemSize += 0.
if PoolOpts.FAST_MODE
else d.memSize
813 totDiskSize += d.diskSize
814 memSizeNoZip = d.memSizeNoZip/d.memSize
if d.memSize != 0.
else 0.
816 totMemSizeA[aug] = totMemSizeA.get(aug,0.) + d.memSize
817 totDiskSizeA[aug] = totDiskSizeA.get(aug,0.) + d.diskSize
819 print(PoolOpts.ROW_FORMAT % (
820 _get_val (d.memSize),
823 _get_val (memSizeNoZip),
825 "("+d.dirType+
") "+d.name
833 print(PoolOpts.ROW_FORMAT % (
834 totMemSizeA[a], totDiskSizeA[a],
838 "Aug Stream: " + (
'MAIN' if a==
'' else a)
841 print(PoolOpts.ROW_FORMAT % (
842 totMemSize, totDiskSize,
845 "TOTAL (POOL containers)"
848 if PoolOpts.FAST_MODE:
849 print(
"::: warning: FAST_MODE was enabled: some columns' content ",)
850 print(
"is meaningless...")
856 print(
"Can't perform a detailedDump with a shelve file as input !")
859 if bufferName
is None:
860 bufferName =
"/dev/stdout"
861 out =
open( bufferName,
"w" )
863 save_stdout_fileno = os.dup (sys.stdout.fileno())
864 os.dup2( out.fileno(), sys.stdout.fileno() )
866 out.write(
"#" * 80 + os.linesep )
867 out.write(
"## detailed dump" + os.linesep )
870 for key
in self.
keys:
872 name = tree.GetName()
874 if PoolOpts.isDataHeader(name)
or \
875 PoolOpts.isData(name):
877 print (
"=== [%s] ===" % name, file=sys.stderr)
879 except Exception
as err:
880 print (
"Caught:",err, file=sys.stderr)
881 print (sys.exc_info()[0], file=sys.stderr)
882 print (sys.exc_info()[1], file=sys.stderr)
886 out.write(
"#" * 80 + os.linesep )
888 out.write(
"#" * 80 + os.linesep )
894 if bufferName !=
"<stdout>":
897 sys.stdout = open (save_stdout_fileno,
'a')
902 Return a PoolRecord according to its (branch) name
903 Raise KeyError if no match is found
905 for data
in self.data:
906 if data.name == name:
908 raise KeyError(
"No PoolRecord with name [%s]" % name)
912 Save all the gathered informations into a python shelve or a CSV file
913 (depending on the @param `fileName` extension)
916 if os.path.splitext(fileName)[-1] ==
'.csv':
917 return self._save_csv_report (fileName)
918 return self._save_shelve_report (fileName)
922 Save all the gathered informations into a python shelve
923 Data can then be read like so:
925 >>> db = shelve.open( 'myfile.dat', 'r' )
926 >>> report = db['report']
927 >>> print ('fileSize:',report['fileSize'])
928 >>> print ('dataHeader/memSize:',report['dataHeader'].memSize)
929 >>> for d in report['data']:
930 ... print ('data:',d.name,d.nEntries,d.memSize)
933 if os.path.exists (fileName):
935 db = shelve.open (fileName)
937 'fileInfos' : self._fileInfos,
938 'nbrEvts' : self.dataHeader.nEntries,
939 'dataHeader' : self.dataHeader,
947 Save all the gathered informations into a CSV file
950 if os.path.exists (fileName):
952 args = {
'newline' :
''}
953 f = open (fileName,
'w', **args)
955 o.writerow ([
'file name', self._fileInfos[
'name']])
956 o.writerow ([
'file size', self._fileInfos[
'size']])
957 o.writerow ([
'nbr evts', self.dataHeader.nEntries])
958 o.writerow ([
'mem size',
'disk size',
'mem size nozip',
'items',
959 'container name',
'branch type'])
962 o.writerow ([d.memSize, d.diskSize, d.memSizeNoZip,
963 d.nEntries, d.name, d.dirType])
968 if self.poolFile
and hasattr(self.poolFile,
'Close'):
970 self.poolFile.Close()
972 except Exception
as err:
973 print(
"WARNING:",err)
980 A helper class to compare 2 POOL files and check that they match, both in
981 terms of containers' content and containers' sizes
984 def __init__(self, refFileName, chkFileName, verbose = False, ignoreList = None, strict = False):
985 object.__init__(self)
989 refFileName = os.path.expandvars( os.path.expanduser( refFileName ) )
990 chkFileName = os.path.expandvars( os.path.expanduser( chkFileName ) )
992 if ignoreList
is None:
999 except Exception
as err:
1000 print(
"## Caught exception [%s] !!" %
str(err.__class__))
1001 print(
"## What:",err)
1002 print(sys.exc_info()[0])
1003 print(sys.exc_info()[1])
1004 err =
"Error while opening POOL files !"
1005 err +=
" chk : %s%s" % ( chkFileName, os.linesep )
1006 err +=
" ref : %s%s" % ( refFileName, os.linesep )
1007 raise Exception(err)
1019 "::: Comparing POOL files...",
1020 " ref : %s" % self.
refFile._fileInfos[
'name'],
1021 " chk : %s" % self.
chkFile._fileInfos[
'name'],
1025 if self.
chkFile.dataHeader.nEntries != \
1026 self.
refFile.dataHeader.nEntries :
1028 "## WARNING: files don't have the same number of entries !!",
1029 " ref : %r" % self.
refFile.dataHeader.nEntries,
1030 " chk : %r" % self.
chkFile.dataHeader.nEntries,
1036 if chkNames != refNames:
1038 "## ERROR: files don't have the same content !!",
1040 addNames = [ n
for n
in chkNames
if n
not in refNames ]
1041 if len( addNames ) > 0:
1042 self.
summary += [
"## collections in 'chk' and not in 'ref'" ]
1044 self.
summary += [
" + %s" % n ]
1045 subNames = [ n
for n
in refNames
if n
not in chkNames ]
1046 if len( subNames ) > 0:
1047 self.
summary += [
"## collections in 'ref' and not in 'chk'" ]
1049 self.
summary += [
" - %s" % n ]
1054 self.
summary += [
"## Ignoring the following:" ]
1058 commonContent = [ d
for d
in chkNames
if (d
in refNames
and d
not in self.
ignList)]
1062 self.
summary += [
"::: comparing common content (mem-size / disk-size)..." ]
1064 for name
in commonContent:
1070 if chkMemSize != refMemSize
or (self.
strict and chkDiskSize != refDiskSize):
1072 "[ERR] %12.3f / %12.3f kb (ref) ==> %12.3f / %12.3f kb (chk) | %s" % \
1073 ( refMemSize,refDiskSize,chkMemSize,chkDiskSize, name )
1078 " [OK] %12.3f/%12.3f kb | %s" % \
1079 ( chkMemSize, chkDiskSize, name )
1086 else: self.
summary += [
"## Comparison : [ERR]" ]
1096 out.writelines( i + os.linesep )
1102 A counter just contains an item list (pairs class-name/sg-key) and the size
1106 object.__init__(self)