ATLAS Offline Software
Loading...
Searching...
No Matches
Control/AthenaMP/python/PyComps.py
Go to the documentation of this file.
1# Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
2
3#-----Python imports---#
4import os, sys, shutil, uuid
5
6#-----Athena imports---#
7from AthenaCommon.Logging import log as msg
8
9from AthenaMP.AthenaMPConf import AthMpEvtLoopMgr
11 def __init__(self, name='AthMpEvtLoopMgr', isPileup=False, **kw):
12
13 from AthenaCommon.AppMgr import theApp
14 self.nThreads = theApp._opts.threads
15
16
17 kw['name'] = name
18 super(MpEvtLoopMgr, self).__init__(**kw)
19
20 os.putenv('XRD_ENABLEFORKHANDLERS','1')
21 os.putenv('XRD_RUNFORKHANDLER','1')
22
23 from .AthenaMPFlags import jobproperties as jp
24 self.WorkerTopDir = jp.AthenaMPFlags.WorkerTopDir()
25 self.OutputReportFile = jp.AthenaMPFlags.OutputReportFile()
26 self.CollectSubprocessLogs = jp.AthenaMPFlags.CollectSubprocessLogs()
27 self.Strategy = jp.AthenaMPFlags.Strategy()
28 self.PollingInterval = jp.AthenaMPFlags.PollingInterval()
29 self.MemSamplingInterval = jp.AthenaMPFlags.MemSamplingInterval()
30 self.EventsBeforeFork = jp.AthenaMPFlags.EventsBeforeFork()
31 self.IsPileup = isPileup
32
33 if self.Strategy=='EventService':
34 self.EventsBeforeFork = 0
35
36 from AthenaCommon.AppMgr import theApp as app
37 app.EventLoop = self.getFullJobOptName()
38
39 # Enable FileMgr logging
40 from GaudiSvc.GaudiSvcConf import FileMgr
41 from AthenaCommon.AppMgr import ServiceMgr as svcMgr
42 svcMgr+=FileMgr(LogFile="FileManagerLog")
43
44 # Save PoolFileCatalog.xml if exists in the run directory
45 if os.path.isfile('PoolFileCatalog.xml'):
46 shutil.copyfile('PoolFileCatalog.xml','PoolFileCatalog.xml.AthenaMP-saved')
47
49
50 def configureStrategy(self,strategy,pileup,events_before_fork) -> None :
51 from .AthenaMPFlags import jobproperties as jp
52 import AthenaCommon.ConcurrencyFlags # noqa: F401
53 event_range_channel = jp.AthenaMPFlags.EventRangeChannel()
54
55 chunk_size = getChunkSize()
56 if chunk_size < 1:
57 msg.warning('Nonpositive ChunkSize (%i) caught, setting it to 1', chunk_size)
58 chunk_size = 1
59
60 debug_worker = jp.ConcurrencyFlags.DebugWorkers()
61 use_shared_reader = jp.AthenaMPFlags.UseSharedReader()
62 use_shared_writer = jp.AthenaMPFlags.UseSharedWriter()
63 use_parallel_compression = jp.AthenaMPFlags.UseParallelCompression()
64 unique_id = f"{str(os.getpid())}-{uuid.uuid4().hex}"
65
66 # For e.g. event generation, if we use SharedQueue the job does not complete correctly
67 if strategy == 'SharedQueue':
68 from AthenaCommon.AthenaCommonFlags import jobproperties as ajp
69 if (not ajp.AthenaCommonFlags.FilesInput.statusOn) or ajp.AthenaCommonFlags.FilesInput == []:
70 msg.info('MP strategy "SharedQueue" will not work without input files when maxEvents=-1. Switching to "RoundRobin" just in case')
71 strategy = 'RoundRobin'
72
73 if strategy=='SharedQueue' or strategy=='RoundRobin':
74 if use_shared_reader:
75 from AthenaCommon.AppMgr import ServiceMgr as svcMgr
76 svcMgr.PoolSvc.MaxFilesOpen = 2
77 if use_shared_writer:
78 from AthenaCommon.AppMgr import ServiceMgr as svcMgr
79 if 'AthenaPoolCnvSvc.WriteAthenaPool' in sys.modules:
80 from AthenaIPCTools.AthenaIPCToolsConf import AthenaSharedMemoryTool
81 svcMgr.AthenaPoolCnvSvc.OutputStreamingTool = AthenaSharedMemoryTool("OutputStreamingTool", SharedMemoryName=f"OutputStream{unique_id}")
82 svcMgr.AthenaPoolCnvSvc.ParallelCompression=use_parallel_compression
83
84 if strategy=='SharedQueue':
85 from AthenaMPTools.AthenaMPToolsConf import SharedEvtQueueProvider
86 self.Tools += [ SharedEvtQueueProvider(UseSharedReader=use_shared_reader,
87 IsPileup=pileup,
88 EventsBeforeFork=events_before_fork,
89 ChunkSize=chunk_size) ]
90
91 # In pure MP, self.nThreads may be set to None - we want the pure MP setup in that case
92 if self.nThreads is not None and self.nThreads >= 1:
93 if(pileup):
94 raise Exception('Running pileup digitization in mixed MP+MT currently not supported')
95 from AthenaMPTools.AthenaMPToolsConf import SharedHiveEvtQueueConsumer
96 self.Tools += [ SharedHiveEvtQueueConsumer(UseSharedWriter=use_shared_writer,
97 EventsBeforeFork=events_before_fork,
98 Debug=debug_worker) ]
99 else:
100 from AthenaMPTools.AthenaMPToolsConf import SharedEvtQueueConsumer
101 self.Tools += [ SharedEvtQueueConsumer(UseSharedReader=use_shared_reader,
102 UseSharedWriter=use_shared_writer,
103 IsPileup=pileup,
104 IsRoundRobin=(strategy=='RoundRobin'),
105 EventsBeforeFork=events_before_fork,
106 ReadEventOrders=jp.AthenaMPFlags.ReadEventOrders(),
107 EventOrdersFile=jp.AthenaMPFlags.EventOrdersFile(),
108 Debug=debug_worker) ]
109 if use_shared_writer:
110 from AthenaMPTools.AthenaMPToolsConf import SharedWriterTool
111 self.Tools += [ SharedWriterTool(MotherProcess=(events_before_fork>0),
112 IsPileup=pileup,
113 Debug=debug_worker) ]
114 elif strategy=='EventService':
115 channelScatterer2Processor = "AthenaMP_Scatterer2Processor"
116 channelProcessor2EvtSel = "AthenaMP_Processor2EvtSel"
117
118 from AthenaMPTools.AthenaMPToolsConf import EvtRangeScatterer
119 self.Tools += [ EvtRangeScatterer(ProcessorChannel = channelScatterer2Processor,
120 EventRangeChannel = event_range_channel,
121 DoCaching=jp.AthenaMPFlags.EvtRangeScattererCaching()) ]
122
123 from AthenaMPTools.AthenaMPToolsConf import EvtRangeProcessor
124 self.Tools += [ EvtRangeProcessor(IsPileup=pileup,
125 Channel2Scatterer = channelScatterer2Processor,
126 Channel2EvtSel = channelProcessor2EvtSel,
127 Debug=debug_worker) ]
128 else:
129 msg.warning("Unknown strategy. No MP tools will be configured")
130
131def getChunkSize() -> int :
132 from .AthenaMPFlags import jobproperties as jp
133 from PyUtils.MetaReaderPeeker import metadata
134 chunk_size = 1
135 # In jobs without input (e.g. event generation), metadata is an empty dictionary
136 if (jp.AthenaMPFlags.ChunkSize() > 0):
137 chunk_size = jp.AthenaMPFlags.ChunkSize()
138 msg.info('Chunk size set to %i', chunk_size)
139 elif 'file_size' in metadata and metadata['file_size'] is not None:
140 #Don't use auto flush for shared reader
141 if (jp.AthenaMPFlags.UseSharedReader()):
142 msg.info('Shared Reader in use, chunk_size set to default (%i)', chunk_size)
143 #Use auto flush only if file is compressed with LZMA, else use default chunk_size
144 elif (jp.AthenaMPFlags.ChunkSize() == -1):
145 if (metadata['file_comp_alg'] == 2):
146 chunk_size = metadata['auto_flush']
147 msg.info('Chunk size set to auto flush (%i)', chunk_size)
148 else:
149 msg.info('LZMA algorithm not in use, chunk_size set to default (%i)', chunk_size)
150 #Use auto flush only if file is compressed with LZMA or ZLIB, else use default chunk_size
151 elif (jp.AthenaMPFlags.ChunkSize() == -2):
152 if (metadata['file_comp_alg'] == 1 or metadata['file_comp_alg'] == 2):
153 chunk_size = metadata['auto_flush']
154 msg.info('Chunk size set to auto flush (%i)', chunk_size)
155 else:
156 msg.info('LZMA nor ZLIB in use, chunk_size set to default (%i)', chunk_size)
157 #Use auto flush only if file is compressed with LZMA, ZLIB or LZ4, else use default chunk_size
158 elif (jp.AthenaMPFlags.ChunkSize() == -3):
159 if (metadata['file_comp_alg'] == 1 or metadata['file_comp_alg'] == 2 or metadata['file_comp_alg'] == 4):
160 chunk_size = metadata['auto_flush']
161 msg.info('Chunk size set to auto flush (%i)', chunk_size)
162 else:
163 msg.info('LZMA, ZLIB nor LZ4 in use, chunk_size set to (%i)', chunk_size)
164 #Use auto flush value for chunk_size, regarldess of compression algorithm
165 elif (jp.AthenaMPFlags.ChunkSize() <= -4):
166 chunk_size = metadata['auto_flush']
167 msg.info('Chunk size set to auto flush (%i)', chunk_size)
168 else:
169 msg.warning('Invalid ChunkSize, Chunk Size set to default (%i)', chunk_size)
170
171 return chunk_size
if(pathvar)
This class provides the IPCTool for SharedMemory objects.
None configureStrategy(self, strategy, pileup, events_before_fork)
__init__(self, name='AthMpEvtLoopMgr', isPileup=False, **kw)