123 """Merge outputs into rank 0"""
124 if mpiConfig
is None:
125 msg.warn(
"trfMPITools.mergeOutputs called when we are not in MPI mode")
127 rank_dir_regex = re.compile(
"rank-([0-9]+)$")
129 int(m.group(1)): m.string
130 for m
in (rank_dir_regex.search(d.path)
for d
in os.scandir(
"..")
if d.is_dir())
131 if m
and int(m.group(1)) > 0
134 msg.info(
"Rank output directories are:\n{}".
format(pprint.pformat(rank_dirs)))
135 all_merge_inputs =
list(
139 lambda f: f.is_file(),
140 it.chain.from_iterable(map(os.scandir, rank_dirs.values())),
146 os.remove(
"PoolFileCatalog.xml")
147 except FileNotFoundError:
149 for dtype, defn
in mpiConfig[
"outputs"].
items():
151 msg.info(f
"Output type is {dtype}")
152 merge_helper = deepcopy(defn)
153 merge_helper.multipleOK =
True
155 for fn
in defn.list_to_remove:
159 except FileNotFoundError:
162 for fn
in defn.value:
163 msg.info(f
"Generating merge list by filtering for {fn} in {all_merge_inputs}")
164 merge_inputs =
sorted(
filter(
lambda s: s.endswith(fn), all_merge_inputs))
166 merge_helper.value.extend(merge_inputs)
167 merge_lists.append((fn, merge_inputs))
169 defn.value = [x[0]
for x
in merge_lists
if len(x[1]) >= 1]
172 msg.info(f
"In rank {getMPIRank()}, not merging")
175 if len(my_merge[1]) < 1:
176 msg.info(f
"In rank {getMPIRank()}, no inputs for ../rank-0/{my_merge[0]}")
177 open(
"done_merging",
"a").close()
180 f
"In rank {getMPIRank()}, merging into ../rank-0/{my_merge[0]}. Inputs are \n{pprint.pformat(my_merge[1])}"
182 merge_helper.selfMerge(f
"../rank-0/{my_merge[0]}", my_merge[1])
184 open(
"done_merging",
"a").close()
186 from functools
import reduce
187 from operator
import and_
188 from time
import sleep
189 import sqlite3
as sq3
190 from glob
import glob
193 conn = sq3.connect(
"mpilog.db")
195 tables = [
"ranks",
"files",
"event_log"]
196 for db
in glob(
"../rank-[1-9]*/mpilog.db"):
197 cur.execute(
"ATTACH DATABASE ? as db", (db,))
199 upsert =
"INSERT OR IGNORE" if table ==
"files" else "INSERT"
200 cur.execute(f
"{upsert} INTO {table} SELECT * from db.{table}")
202 cur.execute(
"DETACH DATABASE db")
207 f
"../rank-{rank}/done_merging" for rank
in range(0, len(merge_lists))
210 check = [os.path.exists(f)
for f
in files_to_check]
211 while not reduce(and_, check):
213 msg.info(
"Waiting for other ranks to finish merging")
214 msg.debug(f
"Looking for {files_to_check}")
215 msg.debug(f
"Result: {check}")
218 check = [os.path.exists(f)
for f
in files_to_check]
219 msg.info(
"All ranks done merging")