50 def configureStrategy(self,strategy,pileup,events_before_fork) -> None :
51 from .AthenaMPFlags import jobproperties as jp
52 import AthenaCommon.ConcurrencyFlags
53 event_range_channel = jp.AthenaMPFlags.EventRangeChannel()
54
55 chunk_size = getChunkSize()
56 if chunk_size < 1:
57 msg.warning('Nonpositive ChunkSize (%i) caught, setting it to 1', chunk_size)
58 chunk_size = 1
59
60 debug_worker = jp.ConcurrencyFlags.DebugWorkers()
61 use_shared_reader = jp.AthenaMPFlags.UseSharedReader()
62 use_shared_writer = jp.AthenaMPFlags.UseSharedWriter()
63 use_parallel_compression = jp.AthenaMPFlags.UseParallelCompression()
64 unique_id = f"{str(os.getpid())}-{uuid.uuid4().hex}"
65
66
67 if strategy == 'SharedQueue':
68 from AthenaCommon.AthenaCommonFlags import jobproperties as ajp
69 if (not ajp.AthenaCommonFlags.FilesInput.statusOn) or ajp.AthenaCommonFlags.FilesInput == []:
70 msg.info('MP strategy "SharedQueue" will not work without input files when maxEvents=-1. Switching to "RoundRobin" just in case')
71 strategy = 'RoundRobin'
72
73 if strategy=='SharedQueue' or strategy=='RoundRobin':
74 if use_shared_reader:
75 from AthenaCommon.AppMgr import ServiceMgr as svcMgr
76 svcMgr.PoolSvc.MaxFilesOpen = 2
77 from AthenaIPCTools.AthenaIPCToolsConf import AthenaSharedMemoryTool
78 svcMgr.EventSelector.SharedMemoryTool =
AthenaSharedMemoryTool(
"EventStreamingTool", SharedMemoryName=f
"EventStream{unique_id}")
79 if 'AthenaPoolCnvSvc.ReadAthenaPool' in sys.modules:
80 svcMgr.AthenaPoolCnvSvc.InputStreamingTool =
AthenaSharedMemoryTool(
"InputStreamingTool", SharedMemoryName=f
"InputStream{unique_id}")
81 if use_shared_writer:
82 from AthenaCommon.AppMgr import ServiceMgr as svcMgr
83 if 'AthenaPoolCnvSvc.WriteAthenaPool' in sys.modules:
84 from AthenaIPCTools.AthenaIPCToolsConf import AthenaSharedMemoryTool
85 svcMgr.AthenaPoolCnvSvc.OutputStreamingTool =
AthenaSharedMemoryTool(
"OutputStreamingTool", SharedMemoryName=f
"OutputStream{unique_id}")
86 svcMgr.AthenaPoolCnvSvc.ParallelCompression=use_parallel_compression
87
88 if strategy=='SharedQueue':
89 from AthenaMPTools.AthenaMPToolsConf import SharedEvtQueueProvider
91 IsPileup=pileup,
92 EventsBeforeFork=events_before_fork,
93 ChunkSize=chunk_size) ]
94
95
96 if self.nThreads is not None and self.nThreads >= 1:
98 raise Exception('Running pileup digitization in mixed MP+MT currently not supported')
99 from AthenaMPTools.AthenaMPToolsConf import SharedHiveEvtQueueConsumer
101 EventsBeforeFork=events_before_fork,
102 Debug=debug_worker) ]
103 else:
104 from AthenaMPTools.AthenaMPToolsConf import SharedEvtQueueConsumer
106 UseSharedWriter=use_shared_writer,
107 IsPileup=pileup,
108 IsRoundRobin=(strategy=='RoundRobin'),
109 EventsBeforeFork=events_before_fork,
110 ReadEventOrders=jp.AthenaMPFlags.ReadEventOrders(),
111 EventOrdersFile=jp.AthenaMPFlags.EventOrdersFile(),
112 Debug=debug_worker) ]
113 if use_shared_writer:
114 from AthenaMPTools.AthenaMPToolsConf import SharedWriterTool
116 IsPileup=pileup,
117 Debug=debug_worker) ]
118 elif strategy=='EventService':
119 channelScatterer2Processor = "AthenaMP_Scatterer2Processor"
120 channelProcessor2EvtSel = "AthenaMP_Processor2EvtSel"
121
122 from AthenaMPTools.AthenaMPToolsConf import EvtRangeScatterer
124 EventRangeChannel = event_range_channel,
125 DoCaching=jp.AthenaMPFlags.EvtRangeScattererCaching()) ]
126
127 from AthenaMPTools.AthenaMPToolsConf import EvtRangeProcessor
129 Channel2Scatterer = channelScatterer2Processor,
130 Channel2EvtSel = channelProcessor2EvtSel,
131 Debug=debug_worker) ]
132 else:
133 msg.warning("Unknown strategy. No MP tools will be configured")
134