ATLAS Offline Software
Functions
TestWriteFileMetaDataAOD Namespace Reference

Functions

def writeFileMetaData (flags)
 
def testMetaData (file_name)
 
def main ()
 

Function Documentation

◆ main()

def TestWriteFileMetaDataAOD.main ( )
Run a job writing a file with FileMetaData

Definition at line 52 of file TestWriteFileMetaDataAOD.py.

52 def main():
53  """Run a job writing a file with FileMetaData"""
54  msg = Logging.logging.getLogger("TestFileMetaData")
55 
56  flags = AllConfigFlags.initConfigFlags()
57  flags.Exec.OutputLevel = Constants.DEBUG
58  flags.Input.Files = TestDefaults.defaultTestFiles.AOD_RUN2_DATA
59  flags.Output.AODFileName = "test.pool.root"
60  flags.Output.doWriteAOD = True
61  flags.Concurrency.NumThreads = 4
62  flags.Concurrency.NumConcurrentEvents = 4
63  flags.lock()
64 
65  write = MainServicesConfig.MainServicesCfg(flags)
66  write.merge(PoolReadCfg(flags))
67  write.merge(writeFileMetaData(flags))
68  write.run(100)
69 
70  try:
71  if testMetaData(flags.Output.AODFileName):
72  msg.info("File contains xAOD::FileMetaData")
73  return 0
74  msg.error("File does not contain xAOD::FileMetaData")
75  except ReferenceError:
76  msg.error("Failed to produce output file")
77  except KeyError:
78  msg.error("Failed to get metadata item list from file")
79  return 1
80 
81 

◆ testMetaData()

def TestWriteFileMetaDataAOD.testMetaData (   file_name)
Check that file metadata is in output

Definition at line 42 of file TestWriteFileMetaDataAOD.py.

42 def testMetaData(file_name):
43  """Check that file metadata is in output"""
44  meta = MetaReader.read_metadata([file_name])[file_name]["metadata_items"]
45  file_info_items = [
46  v for _, v in meta.items() if "FileMetaData" in v
47  ]
48  print("found file metadata objects:", file_info_items)
49  return bool(file_info_items)
50 
51 

◆ writeFileMetaData()

def TestWriteFileMetaDataAOD.writeFileMetaData (   flags)
set up an output stream and xAOD::EventInfo

Definition at line 22 of file TestWriteFileMetaDataAOD.py.

22 def writeFileMetaData(flags):
23  """set up an output stream and xAOD::EventInfo"""
24  accumulator = OutputStreamConfig.OutputStreamCfg(flags, streamName="AOD")
25  accumulator.merge(SetupMetaDataForStreamCfg(flags, streamName="AOD"))
26  # AthAnalysis doesn't have the dictionary for ByteStreamMetadata(Container)
27  # Therefore, do not attempt to write it - although this is more of a hack
28  if flags.Common.Project == Project.AthAnalysis:
29  originalList = accumulator.getEventAlgo('StreamAOD').MetadataItemList
30  modifiedList = [s for s in originalList if s != "ByteStreamMetadataContainer#*"]
31  accumulator.getEventAlgo('StreamAOD').MetadataItemList = modifiedList
32 
33  accumulator.merge(
34  xAODEventInfoCnvConfig.EventInfoCnvAlgCfg(
35  flags=flags, inputKey="", disableBeamSpot=flags.Common.Project is not Enums.Project.Athena
36  )
37  )
38 
39  return accumulator
40 
41 
TestWriteFileMetaDataAOD.main
def main()
Definition: TestWriteFileMetaDataAOD.py:52
TestWriteFileMetaDataAOD.testMetaData
def testMetaData(file_name)
Definition: TestWriteFileMetaDataAOD.py:42
print
void print(char *figname, TCanvas *c1)
Definition: TRTCalib_StrawStatusPlots.cxx:25
TestWriteFileMetaDataAOD.writeFileMetaData
def writeFileMetaData(flags)
Definition: TestWriteFileMetaDataAOD.py:22
InfileMetaDataConfig.SetupMetaDataForStreamCfg
def SetupMetaDataForStreamCfg(flags, streamName="", AcceptAlgs=None, createMetadata=None, propagateMetadataFromInput=True, *args, **kwargs)
Definition: InfileMetaDataConfig.py:222
python.PoolReadConfig.PoolReadCfg
def PoolReadCfg(flags)
Definition: PoolReadConfig.py:71
xAOD::bool
setBGCode setTAP setLVL2ErrorBits bool
Definition: TrigDecision_v1.cxx:60