122def mergeOutputs():
123 """Merge outputs into rank 0"""
124 if mpiConfig is None:
125 msg.warn("trfMPITools.mergeOutputs called when we are not in MPI mode")
126 return
127 rank_dir_regex = re.compile("rank-([0-9]+)$")
128 rank_dirs = {
129 int(m.group(1)): m.string
130 for m in (rank_dir_regex.search(d.path) for d in os.scandir("..") if d.is_dir())
131 if m and int(m.group(1)) > 0
132 }
133 if getMPIRank() == 0:
134 msg.info("Rank output directories are:\n{}".format(pprint.pformat(rank_dirs)))
135 all_merge_inputs = list(
137 lambda f: f.path,
138 filter(
139 lambda f: f.is_file(),
140 it.chain.from_iterable(
map(os.scandir, rank_dirs.values())),
141 ),
142 )
143 )
144
145 try:
146 os.remove("PoolFileCatalog.xml")
147 except FileNotFoundError:
148 pass
149 for dtype, defn in mpiConfig["outputs"].items():
150 if getMPIRank() == 0:
151 msg.info(f"Output type is {dtype}")
152 merge_helper = deepcopy(defn)
153 merge_helper.multipleOK = True
154 if getMPIRank() == 0:
155 for fn in defn.list_to_remove:
156
157 try:
158 os.remove(fn)
159 except FileNotFoundError:
160 pass
161 merge_lists = []
162 for fn in defn.value:
163 msg.info(f"Generating merge list by filtering for {fn} in {all_merge_inputs}")
164 merge_inputs = sorted(filter(lambda s: s.endswith(fn), all_merge_inputs))
165
166 merge_helper.value.extend(merge_inputs)
167 merge_lists.append((fn, merge_inputs))
168
169 defn.value = [x[0] for x in merge_lists if len(x[1]) >= 1]
170
171 if getMPIRank() >= len(merge_lists):
172 msg.info(f"In rank {getMPIRank()}, not merging")
173 break
174 my_merge = merge_lists[getMPIRank()]
175 if len(my_merge[1]) < 1:
176 msg.info(f"In rank {getMPIRank()}, no inputs for ../rank-0/{my_merge[0]}")
177 open("done_merging", "a").close()
178 break
179 msg.info(
180 f"In rank {getMPIRank()}, merging into ../rank-0/{my_merge[0]}. Inputs are \n{pprint.pformat(my_merge[1])}"
181 )
182 merge_helper.selfMerge(f"../rank-0/{my_merge[0]}", my_merge[1])
183
184 open("done_merging", "a").close()
185 if getMPIRank() == 0:
186 from functools import reduce
187 from operator import and_
188 from time import sleep
189 import sqlite3 as sq3
190 from glob import glob
191
192
193 conn = sq3.connect("mpilog.db")
194 cur = conn.cursor()
195 tables = ["ranks", "files", "event_log"]
196 for db in glob("../rank-[1-9]*/mpilog.db"):
197 cur.execute("ATTACH DATABASE ? as db", (db,))
198 for table in tables:
199 upsert = "INSERT OR IGNORE" if table == "files" else "INSERT"
200 cur.execute(f"{upsert} INTO {table} SELECT * from db.{table}")
201 conn.commit()
202 cur.execute("DETACH DATABASE db")
203 conn.close()
204
205
206 files_to_check = [
207 f"../rank-{rank}/done_merging" for rank in range(0, len(merge_lists))
208 ]
209 count = 0
210 check = [os.path.exists(f) for f in files_to_check]
211 while not reduce(and_, check):
212 if count % 10 == 0:
213 msg.info("Waiting for other ranks to finish merging")
214 msg.debug(f"Looking for {files_to_check}")
215 msg.debug(f"Result: {check}")
216 count = count + 1
217 sleep(6)
218 check = [os.path.exists(f) for f in files_to_check]
219 msg.info("All ranks done merging")
static void reduce(HepMC::GenEvent *ge, HepMC::GenParticle *gp)
Remove an unwanted particle from the event, collapsing the graph structure consistently.