4 import os, sys, shutil, uuid
7 from AthenaCommon.Logging
import log
as msg
9 from AthenaMP.AthenaMPConf
import AthMpEvtLoopMgr
11 def __init__(self, name='AthMpEvtLoopMgr', isPileup=False, **kw):
13 from AthenaCommon.AppMgr
import theApp
18 super(MpEvtLoopMgr, self).
__init__(**kw)
20 os.putenv(
'XRD_ENABLEFORKHANDLERS',
'1')
21 os.putenv(
'XRD_RUNFORKHANDLER',
'1')
23 from .AthenaMPFlags
import jobproperties
as jp
36 from AthenaCommon.AppMgr
import theApp
as app
37 app.EventLoop = self.getFullJobOptName()
40 from GaudiSvc.GaudiSvcConf
import FileMgr
41 from AthenaCommon.AppMgr
import ServiceMgr
as svcMgr
42 svcMgr+=FileMgr(LogFile=
"FileManagerLog")
45 if os.path.isfile(
'PoolFileCatalog.xml'):
46 shutil.copyfile(
'PoolFileCatalog.xml',
'PoolFileCatalog.xml.AthenaMP-saved')
51 from .AthenaMPFlags
import jobproperties
as jp
52 import AthenaCommon.ConcurrencyFlags
53 event_range_channel = jp.AthenaMPFlags.EventRangeChannel()
57 msg.warning(
'Nonpositive ChunkSize (%i) caught, setting it to 1', chunk_size)
60 debug_worker = jp.ConcurrencyFlags.DebugWorkers()
61 use_shared_reader = jp.AthenaMPFlags.UseSharedReader()
62 use_shared_writer = jp.AthenaMPFlags.UseSharedWriter()
63 use_parallel_compression = jp.AthenaMPFlags.UseParallelCompression()
64 unique_id = f
"{str(os.getpid())}-{uuid.uuid4().hex}"
66 if strategy==
'SharedQueue' or strategy==
'RoundRobin':
68 from AthenaCommon.AppMgr
import ServiceMgr
as svcMgr
69 svcMgr.PoolSvc.MaxFilesOpen = 2
70 from AthenaIPCTools.AthenaIPCToolsConf
import AthenaSharedMemoryTool
71 svcMgr.EventSelector.SharedMemoryTool =
AthenaSharedMemoryTool(
"EventStreamingTool", SharedMemoryName=f
"EventStream{unique_id}")
72 if 'AthenaPoolCnvSvc.ReadAthenaPool' in sys.modules:
73 svcMgr.AthenaPoolCnvSvc.InputStreamingTool =
AthenaSharedMemoryTool(
"InputStreamingTool", SharedMemoryName=f
"InputStream{unique_id}")
75 from AthenaCommon.AppMgr
import ServiceMgr
as svcMgr
76 if 'AthenaPoolCnvSvc.WriteAthenaPool' in sys.modules:
77 from AthenaIPCTools.AthenaIPCToolsConf
import AthenaSharedMemoryTool
78 svcMgr.AthenaPoolCnvSvc.OutputStreamingTool =
AthenaSharedMemoryTool(
"OutputStreamingTool", SharedMemoryName=f
"OutputStream{unique_id}")
79 svcMgr.AthenaPoolCnvSvc.ParallelCompression=use_parallel_compression
81 from AthenaMPTools.AthenaMPToolsConf
import SharedEvtQueueProvider
84 EventsBeforeFork=events_before_fork,
85 ChunkSize=chunk_size) ]
89 raise Exception(
'Running pileup digitization in mixed MP+MT currently not supported')
90 from AthenaMPTools.AthenaMPToolsConf
import SharedHiveEvtQueueConsumer
92 EventsBeforeFork=events_before_fork,
95 from AthenaMPTools.AthenaMPToolsConf
import SharedEvtQueueConsumer
97 UseSharedWriter=use_shared_writer,
99 IsRoundRobin=(strategy==
'RoundRobin'),
100 EventsBeforeFork=events_before_fork,
101 ReadEventOrders=jp.AthenaMPFlags.ReadEventOrders(),
102 EventOrdersFile=jp.AthenaMPFlags.EventOrdersFile(),
103 Debug=debug_worker) ]
104 if use_shared_writer:
105 from AthenaMPTools.AthenaMPToolsConf
import SharedWriterTool
108 Debug=debug_worker) ]
111 if not use_shared_reader:
114 elif strategy==
'EventService':
115 channelScatterer2Processor =
"AthenaMP_Scatterer2Processor"
116 channelProcessor2EvtSel =
"AthenaMP_Processor2EvtSel"
118 from AthenaMPTools.AthenaMPToolsConf
import EvtRangeScatterer
120 EventRangeChannel = event_range_channel,
121 DoCaching=jp.AthenaMPFlags.EvtRangeScattererCaching()) ]
123 from AthenaMPTools.AthenaMPToolsConf
import EvtRangeProcessor
125 Channel2Scatterer = channelScatterer2Processor,
126 Channel2EvtSel = channelProcessor2EvtSel,
127 Debug=debug_worker) ]
132 msg.warning(
"Unknown strategy. No MP tools will be configured")
135 """ try to install seek-stuff on the EventSelector side """
138 msg.debug(
"setupEvtSelForSeekOps:")
139 if 'AthenaRootComps.ReadAthenaRoot' in sys.modules:
141 msg.info(
'=> Seeking enabled.')
144 if 'AthenaPoolCnvSvc.ReadAthenaPool' not in sys.modules:
146 msg.info(
"Cannot enable 'seeking' b/c module "
147 "[AthenaPoolCnvSvc.ReadAthenaPool] hasn't been imported..." )
148 msg.info(
"Modify your jobOptions to import that module "
149 "(or just ignore this message)" )
152 from AthenaCommon.AppMgr
import theApp, AthAppMgr
153 if theApp.state() != AthAppMgr.State.OFFLINE:
154 msg.info(
"C++ ApplicationMgr already instantiated, probably seeking "
155 "will be ill-configured..." )
156 msg.info(
"EventSelector writers should implement updateHandlers" )
158 from AthenaCommon.AppMgr
import ServiceMgr
as svcMgr
160 collectionType = svcMgr.EventSelector.properties()[
"CollectionType"]
162 if collectionType
in (
"ImplicitROOT", Configurable.propertyNoValue, ):
163 msg.info (
"=> Seeking enabled." )
166 msg.warning(
"Input seeking is not compatible with collection type of %s",
167 svcMgr.EventSelector.properties()[
"CollectionType"] )
168 msg.warning(
"=> Seeking disabled." )
172 from .AthenaMPFlags
import jobproperties
as jp
173 from PyUtils.MetaReaderPeeker
import metadata
175 if (jp.AthenaMPFlags.ChunkSize() > 0):
176 chunk_size = jp.AthenaMPFlags.ChunkSize()
177 msg.info(
'Chunk size set to %i', chunk_size)
178 elif metadata[
'file_size']
is not None:
180 if (jp.AthenaMPFlags.UseSharedReader()):
181 msg.info(
'Shared Reader in use, chunk_size set to default (%i)', chunk_size)
183 elif (jp.AthenaMPFlags.ChunkSize() == -1):
184 if (metadata[
'file_comp_alg'] == 2):
185 chunk_size = metadata[
'auto_flush']
186 msg.info(
'Chunk size set to auto flush (%i)', chunk_size)
188 msg.info(
'LZMA algorithm not in use, chunk_size set to default (%i)', chunk_size)
190 elif (jp.AthenaMPFlags.ChunkSize() == -2):
191 if (metadata[
'file_comp_alg'] == 1
or metadata[
'file_comp_alg'] == 2):
192 chunk_size = metadata[
'auto_flush']
193 msg.info(
'Chunk size set to auto flush (%i)', chunk_size)
195 msg.info(
'LZMA nor ZLIB in use, chunk_size set to default (%i)', chunk_size)
197 elif (jp.AthenaMPFlags.ChunkSize() == -3):
198 if (metadata[
'file_comp_alg'] == 1
or metadata[
'file_comp_alg'] == 2
or metadata[
'file_comp_alg'] == 4):
199 chunk_size = metadata[
'auto_flush']
200 msg.info(
'Chunk size set to auto flush (%i)', chunk_size)
202 msg.info(
'LZMA, ZLIB nor LZ4 in use, chunk_size set to (%i)', chunk_size)
204 elif (jp.AthenaMPFlags.ChunkSize() <= -4):
205 chunk_size = metadata[
'auto_flush']
206 msg.info(
'Chunk size set to auto flush (%i)', chunk_size)
208 msg.warning(
'Invalid ChunkSize, Chunk Size set to default (%i)', chunk_size)