ATLAS Offline Software
Functions
TestWriteFileMetaDataHITS Namespace Reference

Functions

def writeFileMetaData (flags)
 
def testMetaData (file_name)
 
def main ()
 

Function Documentation

◆ main()

def TestWriteFileMetaDataHITS.main ( )
Run a job writing a file with FileMetaData

Definition at line 45 of file TestWriteFileMetaDataHITS.py.

45 def main():
46  """Run a job writing a file with FileMetaData"""
47  msg = Logging.logging.getLogger("TestFileMetaData")
48 
49  flags = AllConfigFlags.initConfigFlags()
50  flags.Exec.OutputLevel = Constants.DEBUG
51  flags.Input.Files = TestDefaults.defaultTestFiles.HITS_RUN2
52  flags.Output.HITSFileName = "test.pool.root"
53  flags.Concurrency.NumThreads = 4
54  flags.Concurrency.NumConcurrentEvents = 4
55  flags.lock()
56 
57  write = MainServicesConfig.MainServicesCfg(flags)
58  write.merge(PoolReadCfg(flags))
59  write.merge(writeFileMetaData(flags))
60  write.run(100)
61 
62  try:
63  if testMetaData(flags.Output.HITSFileName):
64  msg.info("File contains xAOD::FileMetaData")
65  return 0
66  msg.error("File does not contain xAOD::FileMetaData")
67  except ReferenceError:
68  msg.error("Failed to produce output file")
69  except KeyError:
70  msg.error("Failed to get metadata item list from file")
71  return 1
72 
73 

◆ testMetaData()

def TestWriteFileMetaDataHITS.testMetaData (   file_name)
Check that file metadata is in output

Definition at line 35 of file TestWriteFileMetaDataHITS.py.

35 def testMetaData(file_name):
36  """Check that file metadata is in output"""
37  meta = MetaReader.read_metadata([file_name])[file_name]["metadata_items"]
38  file_info_items = [
39  v for _, v in meta.items() if "FileMetaData" in v
40  ]
41  print("found file metadata objects:", file_info_items)
42  return bool(file_info_items)
43 
44 

◆ writeFileMetaData()

def TestWriteFileMetaDataHITS.writeFileMetaData (   flags)
set up an output stream and xAOD::EventInfo

Definition at line 21 of file TestWriteFileMetaDataHITS.py.

21 def writeFileMetaData(flags):
22  """set up an output stream and xAOD::EventInfo"""
23  accumulator = OutputStreamConfig.OutputStreamCfg(flags, streamName="HITS")
24  accumulator.merge(SetupMetaDataForStreamCfg(flags, streamName="HITS"))
25 
26  accumulator.merge(
27  xAODEventInfoCnvConfig.EventInfoCnvAlgCfg(
28  flags=flags, inputKey="", disableBeamSpot=flags.Common.Project is not Enums.Project.Athena
29  )
30  )
31 
32  return accumulator
33 
34 
TestWriteFileMetaDataHITS.testMetaData
def testMetaData(file_name)
Definition: TestWriteFileMetaDataHITS.py:35
TestWriteFileMetaDataHITS.main
def main()
Definition: TestWriteFileMetaDataHITS.py:45
TestWriteFileMetaDataHITS.writeFileMetaData
def writeFileMetaData(flags)
Definition: TestWriteFileMetaDataHITS.py:21
print
void print(char *figname, TCanvas *c1)
Definition: TRTCalib_StrawStatusPlots.cxx:25
InfileMetaDataConfig.SetupMetaDataForStreamCfg
def SetupMetaDataForStreamCfg(flags, streamName="", AcceptAlgs=None, createMetadata=None, propagateMetadataFromInput=True, *args, **kwargs)
Definition: InfileMetaDataConfig.py:222
python.PoolReadConfig.PoolReadCfg
def PoolReadCfg(flags)
Definition: PoolReadConfig.py:69
xAOD::bool
setBGCode setTAP setLVL2ErrorBits bool
Definition: TrigDecision_v1.cxx:60