4 import os, sys, shutil, uuid
7 from AthenaCommon.Logging
import log
as msg
9 from AthenaMP.AthenaMPConf
import AthMpEvtLoopMgr
11 def __init__(self, name='AthMpEvtLoopMgr', isPileup=False, **kw):
13 from AthenaCommon.AppMgr
import theApp
18 super(MpEvtLoopMgr, self).
__init__(**kw)
20 os.putenv(
'XRD_ENABLEFORKHANDLERS',
'1')
21 os.putenv(
'XRD_RUNFORKHANDLER',
'1')
23 from .AthenaMPFlags
import jobproperties
as jp
36 from AthenaCommon.AppMgr
import theApp
as app
37 app.EventLoop = self.getFullJobOptName()
40 from GaudiSvc.GaudiSvcConf
import FileMgr
41 from AthenaCommon.AppMgr
import ServiceMgr
as svcMgr
42 svcMgr+=FileMgr(LogFile=
"FileManagerLog")
45 if os.path.isfile(
'PoolFileCatalog.xml'):
46 shutil.copyfile(
'PoolFileCatalog.xml',
'PoolFileCatalog.xml.AthenaMP-saved')
51 from .AthenaMPFlags
import jobproperties
as jp
52 import AthenaCommon.ConcurrencyFlags
53 event_range_channel = jp.AthenaMPFlags.EventRangeChannel()
57 msg.warning(
'Nonpositive ChunkSize (%i) caught, setting it to 1', chunk_size)
60 debug_worker = jp.ConcurrencyFlags.DebugWorkers()
61 use_shared_reader = jp.AthenaMPFlags.UseSharedReader()
62 use_shared_writer = jp.AthenaMPFlags.UseSharedWriter()
63 use_parallel_compression = jp.AthenaMPFlags.UseParallelCompression()
64 unique_id = f
"{str(os.getpid())}-{uuid.uuid4().hex}"
67 if strategy ==
'SharedQueue':
68 from AthenaCommon.AthenaCommonFlags
import jobproperties
as ajp
69 if (
not ajp.AthenaCommonFlags.FilesInput.statusOn)
or ajp.AthenaCommonFlags.FilesInput == []:
70 msg.info(
'MP strategy "SharedQueue" will not work without input files when maxEvents=-1. Switching to "RoundRobin" just in case')
71 strategy =
'RoundRobin'
73 if strategy==
'SharedQueue' or strategy==
'RoundRobin':
75 from AthenaCommon.AppMgr
import ServiceMgr
as svcMgr
76 svcMgr.PoolSvc.MaxFilesOpen = 2
77 from AthenaIPCTools.AthenaIPCToolsConf
import AthenaSharedMemoryTool
78 svcMgr.EventSelector.SharedMemoryTool =
AthenaSharedMemoryTool(
"EventStreamingTool", SharedMemoryName=f
"EventStream{unique_id}")
79 if 'AthenaPoolCnvSvc.ReadAthenaPool' in sys.modules:
80 svcMgr.AthenaPoolCnvSvc.InputStreamingTool =
AthenaSharedMemoryTool(
"InputStreamingTool", SharedMemoryName=f
"InputStream{unique_id}")
82 from AthenaCommon.AppMgr
import ServiceMgr
as svcMgr
83 if 'AthenaPoolCnvSvc.WriteAthenaPool' in sys.modules:
84 from AthenaIPCTools.AthenaIPCToolsConf
import AthenaSharedMemoryTool
85 svcMgr.AthenaPoolCnvSvc.OutputStreamingTool =
AthenaSharedMemoryTool(
"OutputStreamingTool", SharedMemoryName=f
"OutputStream{unique_id}")
86 svcMgr.AthenaPoolCnvSvc.ParallelCompression=use_parallel_compression
88 if strategy==
'SharedQueue':
89 from AthenaMPTools.AthenaMPToolsConf
import SharedEvtQueueProvider
92 EventsBeforeFork=events_before_fork,
93 ChunkSize=chunk_size) ]
98 raise Exception(
'Running pileup digitization in mixed MP+MT currently not supported')
99 from AthenaMPTools.AthenaMPToolsConf
import SharedHiveEvtQueueConsumer
101 EventsBeforeFork=events_before_fork,
102 Debug=debug_worker) ]
104 from AthenaMPTools.AthenaMPToolsConf
import SharedEvtQueueConsumer
106 UseSharedWriter=use_shared_writer,
108 IsRoundRobin=(strategy==
'RoundRobin'),
109 EventsBeforeFork=events_before_fork,
110 ReadEventOrders=jp.AthenaMPFlags.ReadEventOrders(),
111 EventOrdersFile=jp.AthenaMPFlags.EventOrdersFile(),
112 Debug=debug_worker) ]
113 if use_shared_writer:
114 from AthenaMPTools.AthenaMPToolsConf
import SharedWriterTool
117 Debug=debug_worker) ]
118 elif strategy==
'EventService':
119 channelScatterer2Processor =
"AthenaMP_Scatterer2Processor"
120 channelProcessor2EvtSel =
"AthenaMP_Processor2EvtSel"
122 from AthenaMPTools.AthenaMPToolsConf
import EvtRangeScatterer
124 EventRangeChannel = event_range_channel,
125 DoCaching=jp.AthenaMPFlags.EvtRangeScattererCaching()) ]
127 from AthenaMPTools.AthenaMPToolsConf
import EvtRangeProcessor
129 Channel2Scatterer = channelScatterer2Processor,
130 Channel2EvtSel = channelProcessor2EvtSel,
131 Debug=debug_worker) ]
133 msg.warning(
"Unknown strategy. No MP tools will be configured")
136 from .AthenaMPFlags
import jobproperties
as jp
137 from PyUtils.MetaReaderPeeker
import metadata
140 if (jp.AthenaMPFlags.ChunkSize() > 0):
141 chunk_size = jp.AthenaMPFlags.ChunkSize()
142 msg.info(
'Chunk size set to %i', chunk_size)
143 elif 'file_size' in metadata
and metadata[
'file_size']
is not None:
145 if (jp.AthenaMPFlags.UseSharedReader()):
146 msg.info(
'Shared Reader in use, chunk_size set to default (%i)', chunk_size)
148 elif (jp.AthenaMPFlags.ChunkSize() == -1):
149 if (metadata[
'file_comp_alg'] == 2):
150 chunk_size = metadata[
'auto_flush']
151 msg.info(
'Chunk size set to auto flush (%i)', chunk_size)
153 msg.info(
'LZMA algorithm not in use, chunk_size set to default (%i)', chunk_size)
155 elif (jp.AthenaMPFlags.ChunkSize() == -2):
156 if (metadata[
'file_comp_alg'] == 1
or metadata[
'file_comp_alg'] == 2):
157 chunk_size = metadata[
'auto_flush']
158 msg.info(
'Chunk size set to auto flush (%i)', chunk_size)
160 msg.info(
'LZMA nor ZLIB in use, chunk_size set to default (%i)', chunk_size)
162 elif (jp.AthenaMPFlags.ChunkSize() == -3):
163 if (metadata[
'file_comp_alg'] == 1
or metadata[
'file_comp_alg'] == 2
or metadata[
'file_comp_alg'] == 4):
164 chunk_size = metadata[
'auto_flush']
165 msg.info(
'Chunk size set to auto flush (%i)', chunk_size)
167 msg.info(
'LZMA, ZLIB nor LZ4 in use, chunk_size set to (%i)', chunk_size)
169 elif (jp.AthenaMPFlags.ChunkSize() <= -4):
170 chunk_size = metadata[
'auto_flush']
171 msg.info(
'Chunk size set to auto flush (%i)', chunk_size)
173 msg.warning(
'Invalid ChunkSize, Chunk Size set to default (%i)', chunk_size)