4 import os, sys, shutil, uuid
7 from AthenaCommon.Logging
import log
as msg
9 from AthenaMP.AthenaMPConf
import AthMpEvtLoopMgr
11 def __init__(self, name='AthMpEvtLoopMgr', isPileup=False, **kw):
13 from AthenaCommon.AppMgr
import theApp
18 super(MpEvtLoopMgr, self).
__init__(**kw)
20 os.putenv(
'XRD_ENABLEFORKHANDLERS',
'1')
21 os.putenv(
'XRD_RUNFORKHANDLER',
'1')
23 from .AthenaMPFlags
import jobproperties
as jp
36 from AthenaCommon.AppMgr
import theApp
as app
37 app.EventLoop = self.getFullJobOptName()
40 from GaudiSvc.GaudiSvcConf
import FileMgr
41 from AthenaCommon.AppMgr
import ServiceMgr
as svcMgr
42 svcMgr+=FileMgr(LogFile=
"FileManagerLog")
45 if os.path.isfile(
'PoolFileCatalog.xml'):
46 shutil.copyfile(
'PoolFileCatalog.xml',
'PoolFileCatalog.xml.AthenaMP-saved')
51 from .AthenaMPFlags
import jobproperties
as jp
52 import AthenaCommon.ConcurrencyFlags
53 event_range_channel = jp.AthenaMPFlags.EventRangeChannel()
57 msg.warning(
'Nonpositive ChunkSize (%i) caught, setting it to 1', chunk_size)
60 debug_worker = jp.ConcurrencyFlags.DebugWorkers()
61 use_shared_reader = jp.AthenaMPFlags.UseSharedReader()
62 use_shared_writer = jp.AthenaMPFlags.UseSharedWriter()
63 use_parallel_compression = jp.AthenaMPFlags.UseParallelCompression()
64 unique_id = f
"{str(os.getpid())}-{uuid.uuid4().hex}"
66 if strategy==
'SharedQueue' or strategy==
'RoundRobin':
68 from AthenaCommon.AppMgr
import ServiceMgr
as svcMgr
69 svcMgr.PoolSvc.MaxFilesOpen = 2
70 from AthenaIPCTools.AthenaIPCToolsConf
import AthenaSharedMemoryTool
71 svcMgr.EventSelector.SharedMemoryTool =
AthenaSharedMemoryTool(
"EventStreamingTool", SharedMemoryName=f
"EventStream{unique_id}")
72 if 'AthenaPoolCnvSvc.ReadAthenaPool' in sys.modules:
73 svcMgr.AthenaPoolCnvSvc.InputStreamingTool =
AthenaSharedMemoryTool(
"InputStreamingTool", SharedMemoryName=f
"InputStream{unique_id}")
75 from AthenaCommon.AppMgr
import ServiceMgr
as svcMgr
76 if 'AthenaPoolCnvSvc.WriteAthenaPool' in sys.modules:
77 from AthenaIPCTools.AthenaIPCToolsConf
import AthenaSharedMemoryTool
78 svcMgr.AthenaPoolCnvSvc.OutputStreamingTool =
AthenaSharedMemoryTool(
"OutputStreamingTool", SharedMemoryName=f
"OutputStream{unique_id}")
79 svcMgr.AthenaPoolCnvSvc.ParallelCompression=use_parallel_compression
81 from AthenaMPTools.AthenaMPToolsConf
import SharedEvtQueueProvider
84 EventsBeforeFork=events_before_fork,
85 ChunkSize=chunk_size) ]
90 raise Exception(
'Running pileup digitization in mixed MP+MT currently not supported')
91 from AthenaMPTools.AthenaMPToolsConf
import SharedHiveEvtQueueConsumer
93 EventsBeforeFork=events_before_fork,
96 from AthenaMPTools.AthenaMPToolsConf
import SharedEvtQueueConsumer
98 UseSharedWriter=use_shared_writer,
100 IsRoundRobin=(strategy==
'RoundRobin'),
101 EventsBeforeFork=events_before_fork,
102 ReadEventOrders=jp.AthenaMPFlags.ReadEventOrders(),
103 EventOrdersFile=jp.AthenaMPFlags.EventOrdersFile(),
104 Debug=debug_worker) ]
105 if use_shared_writer:
106 from AthenaMPTools.AthenaMPToolsConf
import SharedWriterTool
109 Debug=debug_worker) ]
112 if not use_shared_reader:
115 elif strategy==
'EventService':
116 channelScatterer2Processor =
"AthenaMP_Scatterer2Processor"
117 channelProcessor2EvtSel =
"AthenaMP_Processor2EvtSel"
119 from AthenaMPTools.AthenaMPToolsConf
import EvtRangeScatterer
121 EventRangeChannel = event_range_channel,
122 DoCaching=jp.AthenaMPFlags.EvtRangeScattererCaching()) ]
124 from AthenaMPTools.AthenaMPToolsConf
import EvtRangeProcessor
126 Channel2Scatterer = channelScatterer2Processor,
127 Channel2EvtSel = channelProcessor2EvtSel,
128 Debug=debug_worker) ]
133 msg.warning(
"Unknown strategy. No MP tools will be configured")
136 """ try to install seek-stuff on the EventSelector side """
139 msg.debug(
"setupEvtSelForSeekOps:")
140 if 'AthenaRootComps.ReadAthenaRoot' in sys.modules:
142 msg.info(
'=> Seeking enabled.')
145 if 'AthenaPoolCnvSvc.ReadAthenaPool' not in sys.modules:
147 msg.info(
"Cannot enable 'seeking' b/c module "
148 "[AthenaPoolCnvSvc.ReadAthenaPool] hasn't been imported..." )
149 msg.info(
"Modify your jobOptions to import that module "
150 "(or just ignore this message)" )
153 from AthenaCommon.AppMgr
import theApp, AthAppMgr
154 if theApp.state() != AthAppMgr.State.OFFLINE:
155 msg.info(
"C++ ApplicationMgr already instantiated, probably seeking "
156 "will be ill-configured..." )
157 msg.info(
"EventSelector writers should implement updateHandlers" )
159 from AthenaCommon.AppMgr
import ServiceMgr
as svcMgr
161 collectionType = svcMgr.EventSelector.properties()[
"CollectionType"]
163 if collectionType
in (
"ImplicitROOT", Configurable.propertyNoValue, ):
164 msg.info (
"=> Seeking enabled." )
167 msg.warning(
"Input seeking is not compatible with collection type of %s",
168 svcMgr.EventSelector.properties()[
"CollectionType"] )
169 msg.warning(
"=> Seeking disabled." )
173 from .AthenaMPFlags
import jobproperties
as jp
174 from PyUtils.MetaReaderPeeker
import metadata
177 if (jp.AthenaMPFlags.ChunkSize() > 0):
178 chunk_size = jp.AthenaMPFlags.ChunkSize()
179 msg.info(
'Chunk size set to %i', chunk_size)
180 elif 'file_size' in metadata
and metadata[
'file_size']
is not None:
182 if (jp.AthenaMPFlags.UseSharedReader()):
183 msg.info(
'Shared Reader in use, chunk_size set to default (%i)', chunk_size)
185 elif (jp.AthenaMPFlags.ChunkSize() == -1):
186 if (metadata[
'file_comp_alg'] == 2):
187 chunk_size = metadata[
'auto_flush']
188 msg.info(
'Chunk size set to auto flush (%i)', chunk_size)
190 msg.info(
'LZMA algorithm not in use, chunk_size set to default (%i)', chunk_size)
192 elif (jp.AthenaMPFlags.ChunkSize() == -2):
193 if (metadata[
'file_comp_alg'] == 1
or metadata[
'file_comp_alg'] == 2):
194 chunk_size = metadata[
'auto_flush']
195 msg.info(
'Chunk size set to auto flush (%i)', chunk_size)
197 msg.info(
'LZMA nor ZLIB in use, chunk_size set to default (%i)', chunk_size)
199 elif (jp.AthenaMPFlags.ChunkSize() == -3):
200 if (metadata[
'file_comp_alg'] == 1
or metadata[
'file_comp_alg'] == 2
or metadata[
'file_comp_alg'] == 4):
201 chunk_size = metadata[
'auto_flush']
202 msg.info(
'Chunk size set to auto flush (%i)', chunk_size)
204 msg.info(
'LZMA, ZLIB nor LZ4 in use, chunk_size set to (%i)', chunk_size)
206 elif (jp.AthenaMPFlags.ChunkSize() <= -4):
207 chunk_size = metadata[
'auto_flush']
208 msg.info(
'Chunk size set to auto flush (%i)', chunk_size)
210 msg.warning(
'Invalid ChunkSize, Chunk Size set to default (%i)', chunk_size)