ATLAS Offline Software
Loading...
Searching...
No Matches
Control/AthenaMP/python/PyComps.py
Go to the documentation of this file.
1# Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
2
3#-----Python imports---#
4import os, sys, shutil, uuid
5
6#-----Athena imports---#
7from AthenaCommon.Logging import log as msg
8
9from AthenaMP.AthenaMPConf import AthMpEvtLoopMgr
11 def __init__(self, name='AthMpEvtLoopMgr', isPileup=False, **kw):
12
13 from AthenaCommon.AppMgr import theApp
14 self.nThreads = theApp._opts.threads
15
16
17 kw['name'] = name
18 super(MpEvtLoopMgr, self).__init__(**kw)
19
20 os.putenv('XRD_ENABLEFORKHANDLERS','1')
21 os.putenv('XRD_RUNFORKHANDLER','1')
22
23 from .AthenaMPFlags import jobproperties as jp
24 self.WorkerTopDir = jp.AthenaMPFlags.WorkerTopDir()
25 self.OutputReportFile = jp.AthenaMPFlags.OutputReportFile()
26 self.CollectSubprocessLogs = jp.AthenaMPFlags.CollectSubprocessLogs()
27 self.Strategy = jp.AthenaMPFlags.Strategy()
28 self.PollingInterval = jp.AthenaMPFlags.PollingInterval()
29 self.MemSamplingInterval = jp.AthenaMPFlags.MemSamplingInterval()
30 self.EventsBeforeFork = jp.AthenaMPFlags.EventsBeforeFork()
31 self.IsPileup = isPileup
32
33 if self.Strategy=='EventService':
34 self.EventsBeforeFork = 0
35
36 from AthenaCommon.AppMgr import theApp as app
37 app.EventLoop = self.getFullJobOptName()
38
39 # Enable FileMgr logging
40 from GaudiSvc.GaudiSvcConf import FileMgr
41 from AthenaCommon.AppMgr import ServiceMgr as svcMgr
42 svcMgr+=FileMgr(LogFile="FileManagerLog")
43
44 # Save PoolFileCatalog.xml if exists in the run directory
45 if os.path.isfile('PoolFileCatalog.xml'):
46 shutil.copyfile('PoolFileCatalog.xml','PoolFileCatalog.xml.AthenaMP-saved')
47
49
50 def configureStrategy(self,strategy,pileup,events_before_fork) -> None :
51 from .AthenaMPFlags import jobproperties as jp
52 import AthenaCommon.ConcurrencyFlags # noqa: F401
53 event_range_channel = jp.AthenaMPFlags.EventRangeChannel()
54
55 chunk_size = getChunkSize()
56 if chunk_size < 1:
57 msg.warning('Nonpositive ChunkSize (%i) caught, setting it to 1', chunk_size)
58 chunk_size = 1
59
60 debug_worker = jp.ConcurrencyFlags.DebugWorkers()
61 use_shared_reader = jp.AthenaMPFlags.UseSharedReader()
62 use_shared_writer = jp.AthenaMPFlags.UseSharedWriter()
63 use_parallel_compression = jp.AthenaMPFlags.UseParallelCompression()
64 unique_id = f"{str(os.getpid())}-{uuid.uuid4().hex}"
65
66 # For e.g. event generation, if we use SharedQueue the job does not complete correctly
67 if strategy == 'SharedQueue':
68 from AthenaCommon.AthenaCommonFlags import jobproperties as ajp
69 if (not ajp.AthenaCommonFlags.FilesInput.statusOn) or ajp.AthenaCommonFlags.FilesInput == []:
70 msg.info('MP strategy "SharedQueue" will not work without input files when maxEvents=-1. Switching to "RoundRobin" just in case')
71 strategy = 'RoundRobin'
72
73 if strategy=='SharedQueue' or strategy=='RoundRobin':
74 if use_shared_reader:
75 from AthenaCommon.AppMgr import ServiceMgr as svcMgr
76 svcMgr.PoolSvc.MaxFilesOpen = 2
77 from AthenaIPCTools.AthenaIPCToolsConf import AthenaSharedMemoryTool
78 svcMgr.EventSelector.SharedMemoryTool = AthenaSharedMemoryTool("EventStreamingTool", SharedMemoryName=f"EventStream{unique_id}")
79 if 'AthenaPoolCnvSvc.ReadAthenaPool' in sys.modules:
80 svcMgr.AthenaPoolCnvSvc.InputStreamingTool = AthenaSharedMemoryTool("InputStreamingTool", SharedMemoryName=f"InputStream{unique_id}")
81 if use_shared_writer:
82 from AthenaCommon.AppMgr import ServiceMgr as svcMgr
83 if 'AthenaPoolCnvSvc.WriteAthenaPool' in sys.modules:
84 from AthenaIPCTools.AthenaIPCToolsConf import AthenaSharedMemoryTool
85 svcMgr.AthenaPoolCnvSvc.OutputStreamingTool = AthenaSharedMemoryTool("OutputStreamingTool", SharedMemoryName=f"OutputStream{unique_id}")
86 svcMgr.AthenaPoolCnvSvc.ParallelCompression=use_parallel_compression
87
88 if strategy=='SharedQueue':
89 from AthenaMPTools.AthenaMPToolsConf import SharedEvtQueueProvider
90 self.Tools += [ SharedEvtQueueProvider(UseSharedReader=use_shared_reader,
91 IsPileup=pileup,
92 EventsBeforeFork=events_before_fork,
93 ChunkSize=chunk_size) ]
94
95 # In pure MP, self.nThreads may be set to None - we want the pure MP setup in that case
96 if self.nThreads is not None and self.nThreads >= 1:
97 if(pileup):
98 raise Exception('Running pileup digitization in mixed MP+MT currently not supported')
99 from AthenaMPTools.AthenaMPToolsConf import SharedHiveEvtQueueConsumer
100 self.Tools += [ SharedHiveEvtQueueConsumer(UseSharedWriter=use_shared_writer,
101 EventsBeforeFork=events_before_fork,
102 Debug=debug_worker) ]
103 else:
104 from AthenaMPTools.AthenaMPToolsConf import SharedEvtQueueConsumer
105 self.Tools += [ SharedEvtQueueConsumer(UseSharedReader=use_shared_reader,
106 UseSharedWriter=use_shared_writer,
107 IsPileup=pileup,
108 IsRoundRobin=(strategy=='RoundRobin'),
109 EventsBeforeFork=events_before_fork,
110 ReadEventOrders=jp.AthenaMPFlags.ReadEventOrders(),
111 EventOrdersFile=jp.AthenaMPFlags.EventOrdersFile(),
112 Debug=debug_worker) ]
113 if use_shared_writer:
114 from AthenaMPTools.AthenaMPToolsConf import SharedWriterTool
115 self.Tools += [ SharedWriterTool(MotherProcess=(events_before_fork>0),
116 IsPileup=pileup,
117 Debug=debug_worker) ]
118 elif strategy=='EventService':
119 channelScatterer2Processor = "AthenaMP_Scatterer2Processor"
120 channelProcessor2EvtSel = "AthenaMP_Processor2EvtSel"
121
122 from AthenaMPTools.AthenaMPToolsConf import EvtRangeScatterer
123 self.Tools += [ EvtRangeScatterer(ProcessorChannel = channelScatterer2Processor,
124 EventRangeChannel = event_range_channel,
125 DoCaching=jp.AthenaMPFlags.EvtRangeScattererCaching()) ]
126
127 from AthenaMPTools.AthenaMPToolsConf import EvtRangeProcessor
128 self.Tools += [ EvtRangeProcessor(IsPileup=pileup,
129 Channel2Scatterer = channelScatterer2Processor,
130 Channel2EvtSel = channelProcessor2EvtSel,
131 Debug=debug_worker) ]
132 else:
133 msg.warning("Unknown strategy. No MP tools will be configured")
134
135def getChunkSize() -> int :
136 from .AthenaMPFlags import jobproperties as jp
137 from PyUtils.MetaReaderPeeker import metadata
138 chunk_size = 1
139 # In jobs without input (e.g. event generation), metadata is an empty dictionary
140 if (jp.AthenaMPFlags.ChunkSize() > 0):
141 chunk_size = jp.AthenaMPFlags.ChunkSize()
142 msg.info('Chunk size set to %i', chunk_size)
143 elif 'file_size' in metadata and metadata['file_size'] is not None:
144 #Don't use auto flush for shared reader
145 if (jp.AthenaMPFlags.UseSharedReader()):
146 msg.info('Shared Reader in use, chunk_size set to default (%i)', chunk_size)
147 #Use auto flush only if file is compressed with LZMA, else use default chunk_size
148 elif (jp.AthenaMPFlags.ChunkSize() == -1):
149 if (metadata['file_comp_alg'] == 2):
150 chunk_size = metadata['auto_flush']
151 msg.info('Chunk size set to auto flush (%i)', chunk_size)
152 else:
153 msg.info('LZMA algorithm not in use, chunk_size set to default (%i)', chunk_size)
154 #Use auto flush only if file is compressed with LZMA or ZLIB, else use default chunk_size
155 elif (jp.AthenaMPFlags.ChunkSize() == -2):
156 if (metadata['file_comp_alg'] == 1 or metadata['file_comp_alg'] == 2):
157 chunk_size = metadata['auto_flush']
158 msg.info('Chunk size set to auto flush (%i)', chunk_size)
159 else:
160 msg.info('LZMA nor ZLIB in use, chunk_size set to default (%i)', chunk_size)
161 #Use auto flush only if file is compressed with LZMA, ZLIB or LZ4, else use default chunk_size
162 elif (jp.AthenaMPFlags.ChunkSize() == -3):
163 if (metadata['file_comp_alg'] == 1 or metadata['file_comp_alg'] == 2 or metadata['file_comp_alg'] == 4):
164 chunk_size = metadata['auto_flush']
165 msg.info('Chunk size set to auto flush (%i)', chunk_size)
166 else:
167 msg.info('LZMA, ZLIB nor LZ4 in use, chunk_size set to (%i)', chunk_size)
168 #Use auto flush value for chunk_size, regarldess of compression algorithm
169 elif (jp.AthenaMPFlags.ChunkSize() <= -4):
170 chunk_size = metadata['auto_flush']
171 msg.info('Chunk size set to auto flush (%i)', chunk_size)
172 else:
173 msg.warning('Invalid ChunkSize, Chunk Size set to default (%i)', chunk_size)
174
175 return chunk_size
if(febId1==febId2)
This class provides the IPCTool for SharedMemory objects.
None configureStrategy(self, strategy, pileup, events_before_fork)
__init__(self, name='AthMpEvtLoopMgr', isPileup=False, **kw)