10 from copy
import deepcopy
15 import itertools
as it
18 import PyJobTransforms.trfExceptions
as trfExceptions
20 msg = logging.getLogger(__name__)
26 """MPI master, MPI worker, or not using MPI"""
36 trfExit.nameToCode(
"TRF_SETUP"), message
42 if mpiConfig
is not None:
43 return int(mpiConfig[
"rank"])
44 if "RANK" not in os.environ:
48 return int(os.environ[
"RANK"])
50 signalError(
"$RANK environment variable is not an integer")
56 if mpiConfig
is not None:
57 return mpiConfig[
"type"]
58 if "RANK" not in os.environ:
61 return MPIType.MPIMASTER
63 return MPIType.MPIWORKER
67 """Check environment is correct if we are in MPI mode, and setup dictionaries"""
69 if "RANK" not in os.environ:
71 "Running in MPI mode but the $RANK environment variable is not set!"
76 "Running in MPI mode with rank {0} but working directory is not called rank-{0}".
format(
82 mpiConfig[
"rank"] = rank
83 mpiConfig[
"type"] = mpiType
84 mpiConfig[
"outputs"] = {
85 dataType: deepcopy(dataDict[dataType])
for dataType
in output
88 output_proc_regex = re.compile(
r"(.+)\[(.*)](.*)")
89 for v
in mpiConfig[
"outputs"].
values():
94 if (
"[" in fn)
and (
"]" in fn):
95 match = output_proc_regex.match(fn)
98 f
"{match.group(1)}{it}{match.group(3)}"
99 for it
in match.group(2).
split(
",")
102 list_to_remove.append(match.group(1))
105 list_to_remove.append(fn)
107 v.list_to_remove =
list(
set(list_to_remove))
119 return mpiConfig[
"outputs"].
values()
123 """Merge outputs into rank 0"""
124 if mpiConfig
is None:
125 msg.warn(
"trfMPITools.mergeOutputs called when we are not in MPI mode")
127 rank_dir_regex = re.compile(
"rank-([0-9]+)$")
129 int(m.group(1)): m.string
130 for m
in (rank_dir_regex.search(d.path)
for d
in os.scandir(
"..")
if d.is_dir())
131 if m
and int(m.group(1)) > 0
134 msg.info(
"Rank output directories are:\n{}".
format(pprint.pformat(rank_dirs)))
135 all_merge_inputs =
list(
139 lambda f: f.is_file(),
140 it.chain.from_iterable(map(os.scandir, rank_dirs.values())),
146 os.remove(
"PoolFileCatalog.xml")
147 except FileNotFoundError:
149 for dtype, defn
in mpiConfig[
"outputs"].
items():
151 msg.info(f
"Output type is {dtype}")
152 merge_helper = deepcopy(defn)
153 merge_helper.multipleOK =
True
155 for fn
in defn.list_to_remove:
159 except FileNotFoundError:
162 for fn
in defn.value:
163 msg.info(f
"Generating merge list by filtering for {fn} in {all_merge_inputs}")
164 merge_inputs =
sorted(
filter(
lambda s: s.endswith(fn), all_merge_inputs))
166 merge_helper.value.extend(merge_inputs)
167 merge_lists.append((fn, merge_inputs))
169 defn.value = [x[0]
for x
in merge_lists
if len(x[1]) >= 1]
172 msg.info(f
"In rank {getMPIRank()}, not merging")
175 if len(my_merge[1]) < 1:
176 msg.info(f
"In rank {getMPIRank()}, no inputs for ../rank-0/{my_merge[0]}")
177 open(
"done_merging",
"a").close()
180 f
"In rank {getMPIRank()}, merging into ../rank-0/{my_merge[0]}. Inputs are \n{pprint.pformat(my_merge[1])}"
182 merge_helper.selfMerge(f
"../rank-0/{my_merge[0]}", my_merge[1])
184 open(
"done_merging",
"a").close()
186 from functools
import reduce
187 from operator
import and_
188 from time
import sleep
189 import sqlite3
as sq3
190 from glob
import glob
193 tables = [
"ranks",
"event_log"]
194 conn = sq3.connect(
"mpilog.db")
196 for db
in glob(
"../rank-[1-9]*/mpilog.db"):
197 cur.execute(
"ATTACH DATABASE ? as db", (db,))
199 cur.execute(f
"INSERT INTO {table} SELECT * from db.{table}")
201 cur.execute(
"DETACH DATABASE db")
206 f
"../rank-{rank}/done_merging" for rank
in range(0, len(merge_lists))
209 check = [os.path.exists(f)
for f
in files_to_check]
210 while not reduce(and_, check):
212 msg.info(
"Waiting for other ranks to finish merging")
213 msg.debug(f
"Looking for {files_to_check}")
214 msg.debug(f
"Result: {check}")
217 check = [os.path.exists(f)
for f
in files_to_check]
218 msg.info(
"All ranks done merging")