ATLAS Offline Software
Public Member Functions | Public Attributes | Private Member Functions | List of all members
python.trfUtils.ParallelJobProcessor Class Reference

ParallelJobProcessor: a multiple-process processor of Job objects. More...

Inheritance diagram for python.trfUtils.ParallelJobProcessor:
Collaboration diagram for python.trfUtils.ParallelJobProcessor:

Public Member Functions

def __init__ (self, jobSubmission=None, numberOfProcesses=multiprocessing.cpu_count())
 initialisation method that accepts submissions and starts pool More...
def __str__ (self)
 return an object self-description string More...
def printout (self)
 print in a human-readable way the items of the object self More...
def submit (self, jobSubmission=None)
 submit a Job object or a JobGroup object for processing More...
def getResults (self)
 get results of JobGroup object submission More...
def statusReport (self)
 return a status report string More...

Public Attributes


Private Member Functions

def _abort (self)
 abort parallel job processor More...
def _terminate (self)
 terminate parallel job processor More...

Detailed Description

ParallelJobProcessor: a multiple-process processor of Job objects.

jobSubmissionJob object or JobGroup object for submission
numberOfProcessesthe number of processes in the process pool

Definition at line 867 of file

Constructor & Destructor Documentation

◆ __init__()

def python.trfUtils.ParallelJobProcessor.__init__ (   self,
  jobSubmission = None,
  numberOfProcesses = multiprocessing.cpu_count() 

initialisation method that accepts submissions and starts pool

This method is the initialisation method of the parallel job processor. It accepts input JobGroup object submissions and prepares a pool of workers.

Definition at line 873 of file

873  def __init__(
874  self,
875  jobSubmission = None,
876  numberOfProcesses = multiprocessing.cpu_count(), # noqa: B008 (cpu_count is constant)
877  ):
878  self.jobSubmission = jobSubmission
879  self.numberOfProcesses = numberOfProcesses
880  self.className = self.__class__.__name__
881  self.status = "starting"
882  msg.debug("{notifier}: status: {status}".format(
883  notifier = self.className,
884  status = self.status)
885  )
886  self.countOfJobs = None
887  self.countOfRemainingJobs = 0
888  self.pool = multiprocessing.Pool(
889  self.numberOfProcesses,
890  initialise_processes
891  )
892  msg.debug("{notifier}: pool of {numberOfProcesses} {units} created".format(
893  notifier = self.className,
894  numberOfProcesses = str(self.numberOfProcesses),
895  units = units(quantity = self.numberOfProcesses,
896  unitSingular = "process", unitPlural = "processes")
897  ))
898  self.status = "ready"
899  msg.debug("{notifier}: status: {status}".format(
900  notifier = self.className,
901  status = self.status
902  ))

Member Function Documentation

◆ __str__()

def python.trfUtils.ParallelJobProcessor.__str__ (   self)

return an object self-description string

This method returns an object description string consisting of a listing of the items of the object self.

object description string

Definition at line 908 of file

908  def __str__(self):
909  descriptionString = ""
910  for key, value in sorted(vars(self).items()):
911  descriptionString += str("{key}:{value} ".format(
912  key = key,
913  value = value
914  ))
915  return descriptionString

◆ _abort()

def python.trfUtils.ParallelJobProcessor._abort (   self)

abort parallel job processor

This method aborts the parallel job processor. It is used typically when an exception is raised.

Definition at line 1214 of file

1214  def _abort(self):
1215  self.status = "aborting"
1216  msg.debug("{notifier}: status: {status}".format(
1217  notifier = self.className,
1218  status = self.status
1219  ))
1220  self._terminate()

◆ _terminate()

def python.trfUtils.ParallelJobProcessor._terminate (   self)

terminate parallel job processor

This method terminates the parallel job processor. It terminates the subprocesses of the parallel job processor. It is used typically when terminating the parallel job processor on successful completion of job processing and when aborting the parallel job processor.

Definition at line 1227 of file

1227  def _terminate(self):
1228  self.status = "terminating"
1229  msg.debug("{notifier}: status: {status}".format(
1230  notifier = self.className,
1231  status = self.status
1232  ))
1233  msg.debug("{notifier}: terminating pool of {numberOfProcesses} {units}".format(
1234  notifier = self.className,
1235  numberOfProcesses = str(self.numberOfProcesses),
1236  units = units(
1237  quantity = self.numberOfProcesses,
1238  unitSingular = "process",
1239  unitPlural = "processes"
1240  )
1241  ))
1242  self.pool.terminate()
1243  self.pool.join()
1244  self.status = "finished"
1245  msg.debug("{notifier}: status: {status}".format(
1246  notifier = self.className,
1247  status = self.status
1248  ))
1249  msg.debug(self.statusReport())

◆ getResults()

def python.trfUtils.ParallelJobProcessor.getResults (   self)

get results of JobGroup object submission

This method returns an ordered list of results for jobs submitted.

order list of results for jobs

Definition at line 1008 of file

1008  def getResults(self):
1009  # While the number of jobs remaining is greater than zero, cycle over
1010  # all jobs in the JobGroup object submission submission, watching for a
1011  # timeout of the JobGroup object submission. If a result has not been
1012  # retrived for a job (i.e. the Job object does not have a result data
1013  # attribute), then check if a result is available for the job (using the
1014  # method multiprocessing.pool.AsyncResult.ready()). If a result is
1015  # available for the job, then check if the job has run successfully
1016  # (using the method multiprocessing.pool.AsyncResult.successful()). If
1017  # the job has not been successful, raise an exception, otherwise, get
1018  # the result of the job and save it to the result data attribute of the
1019  # job.
1020  msg.debug("{notifier}: checking for job {units}".format(
1021  notifier = self.className,
1022  units = units(
1023  quantity = self.countOfRemainingJobs,
1024  unitSingular = "result",
1025  unitPlural = "results")
1026  )
1027  )
1028  while self.countOfRemainingJobs > 0:
1029  # Check for timeout of the job group. If the current timestamp is
1030  # greater than the job group timeout (derived from the sum of the
1031  # set of all job timeout specifications in the job group) + the job
1032  # group submission timestamp, then raise an excepton, otherwise
1033  # cycle over all jobs.
1034  # Allow time for jobs to complete.
1035  time.sleep(0.25)
1036  if self.jobSubmission.timeoutStatus():
1037  msg.error("{notifier}: job group '{name}' timed out".format(
1038  notifier = self.className,
1039  name =
1040  ))
1041  self._abort()
1042  exceptionMessage = "timeout of a function in list {listOfNamesOfRemainingJobs}".format(
1043  listOfNamesOfRemainingJobs = self.listOfNamesOfRemainingJobs
1044  )
1045  msg.error("{notifier}: exception message: {exceptionMessage}".format(
1046  notifier = self.className,
1047  exceptionMessage = exceptionMessage
1048  ))
1049  raise trfExceptions.TransformTimeoutException(
1050  trfExit.nameToCode('TRF_EXEC_TIMEOUT'),
1051  exceptionMessage
1052  )
1053  else:
1054  for job in
1055  self.listOfNamesOfRemainingJobs = []
1056  if not hasattr(job, 'result'):
1057  # Maintain a contemporary list of the names of remaining
1058  # jobs.
1059  self.listOfNamesOfRemainingJobs.append(
1060  # If the result of the job is ready...
1061  if job.resultGetter.ready():
1062  msg.debug(
1063  "{notifier}: result ready for job '{name}'".format(
1064  notifier = self.className,
1065  name =
1066  )
1067  )
1068  job.successStatus = job.resultGetter.successful()
1069  msg.debug(
1070  "{notifier}: job '{name}' success status: {successStatus}".format(
1071  notifier = self.className,
1072  name =,
1073  successStatus = job.successStatus
1074  )
1075  )
1076  # If the job was successful, create the result data
1077  # attribute of the job and save the result to it.
1078  if job.successStatus:
1079  job.result = job.resultGetter.get()
1080  msg.debug(
1081  "{notifier}: result of job '{name}': {result}".format(
1082  notifier = self.className,
1083  name =,
1084  result = job.result
1085  )
1086  )
1087  self.countOfRemainingJobs -= 1
1088  msg.debug(
1089  "{notifier}: {countOfRemainingJobs} {units} remaining".format(
1090  notifier = self.className,
1091  countOfRemainingJobs = self.countOfRemainingJobs,
1092  units = units(
1093  quantity = self.countOfRemainingJobs,
1094  unitSingular = "job",
1095  unitPlural = "jobs"
1096  )
1097  )
1098  )
1099  # If the job was not successful, raise an exception
1100  # and abort processing.
1101  elif not job.successStatus:
1102  msg.error(
1103  "{notifier}: job '{name}' failed".format(
1104  notifier = self.className,
1105  name =
1106  )
1107  )
1108  self._abort()
1109  exceptionMessage = "failure of function '{name}' with arguments {arguments}".format(
1110  name = job.workFunction.__name__,
1111  arguments = job.workFunctionKeywordArguments
1112  )
1113  msg.error("{notifier}: exception message: {exceptionMessage}".format(
1114  notifier = self.className,
1115  exceptionMessage = exceptionMessage
1116  ))
1117  raise trfExceptions.TransformExecutionException(
1118  trfExit.nameToCode('TRF_EXEC_FAIL'),
1119  exceptionMessage
1120  )
1121  # All results having been returned, create the 'results' list data
1122  # attribute of the job group and append all individual job results to
1123  # it.
1124  self.jobSubmission.timeStampComplete = time.time()
1125  self.jobSubmission.completeStatus = True
1126  msg.debug("{notifier}: all {countOfJobs} {units} complete (timestamp: {timeStampComplete})".format(
1127  notifier = self.className,
1128  countOfJobs = self.countOfJobs,
1129  units = units(
1130  quantity = self.countOfJobs,
1131  unitSingular = "job",
1132  unitPlural = "jobs"
1133  ),
1134  timeStampComplete = self.jobSubmission.timeStampComplete
1135  ))
1136  self.jobSubmission.processingTime = self.jobSubmission.timeStampComplete - self.jobSubmission.timeStampSubmission
1137  msg.debug("{notifier}: time taken to process all {countOfJobs} {units}: {processingTime}".format(
1138  notifier = self.className,
1139  countOfJobs = self.countOfJobs,
1140  units = units(
1141  quantity = self.countOfJobs,
1142  unitSingular = "job",
1143  unitPlural = "jobs"
1144  ),
1145  processingTime = self.jobSubmission.processingTime
1146  ))
1147  for job in
1148  self.jobSubmission.results.append(job.result)
1149  self._terminate()
1150  return self.jobSubmission.results
1151  self._terminate()

◆ printout()

def python.trfUtils.ParallelJobProcessor.printout (   self)

print in a human-readable way the items of the object self

This function prints in a human-readable way the items of the object self.

Definition at line 920 of file

920  def printout(self):
921  printHR(vars(self)
922  )

◆ statusReport()

def python.trfUtils.ParallelJobProcessor.statusReport (   self)

return a status report string

This method returns a status report string, detailing information on the JobGroup submission and on the job processing status.

status report string

Definition at line 1157 of file

1157  def statusReport(self):
1158  statusReport = "\n{notifier}:\n status report:".format(
1159  notifier = self.className
1160  )
1161  # information on parallel job processor
1162  statusReport += "\n parallel job processor configuration:"
1163  statusReport += "\n status: {notifier}".format(
1164  notifier = str(self.status)
1165  )
1166  statusReport += "\n number of processes: {notifier}".format(
1167  notifier = str(self.numberOfProcesses)
1168  )
1169  # information on job group submission
1170  statusReport += "\n job group submission: '{notifier}'".format(
1171  notifier =
1172  )
1173  statusReport += "\n total number of jobs: {notifier}".format(
1174  notifier = str(self.countOfJobs)
1175  )
1176  statusReport += "\n number of incomplete jobs: {notifier}".format(
1177  notifier = str(self.countOfRemainingJobs)
1178  )
1179  statusReport += "\n names of incomplete jobs: {notifier}".format(
1180  notifier = self.listOfNamesOfRemainingJobs
1181  )
1182  # information on jobs (if existent)
1183  if
1184  statusReport += "\n jobs:"
1185  for job in
1186  statusReport += "\n job '{name}':".format(
1187  name =
1188  )
1189  statusReport += "\n workFunction: '{name}'".format(
1190  name = job.workFunction.__name__
1191  )
1192  statusReport += "\n workFunctionKeywordArguments: '{arguments}'".format(
1193  arguments = job.workFunctionKeywordArguments
1194  )
1195  statusReport += "\n workFunctionTimeout: '{timeout}'".format(
1196  timeout = job.workFunctionTimeout
1197  )
1198  if hasattr(job, 'result'):
1199  statusReport += "\n result: '{result}'".format(
1200  result = job.result
1201  )
1202  # statistics of parallel job processor run
1203  if hasattr(self.jobSubmission, 'processingTime'):
1204  statusReport += "\n statistics:"
1205  if hasattr(self.jobSubmission, 'processingTime'):
1206  statusReport += "\n total processing time: {processingTime} s".format(
1207  processingTime = self.jobSubmission.processingTime
1208  )
1209  return statusReport

◆ submit()

def python.trfUtils.ParallelJobProcessor.submit (   self,
  jobSubmission = None 

submit a Job object or a JobGroup object for processing

This method submits a specified Job object or JobGroup object for processing. On successful submission, it returns the value 0.

jobSubmissionJob object or JobGroup object for submission

Definition at line 928 of file

928  def submit(
929  self,
930  jobSubmission = None
931  ):
932  # If the input submission is not None, then update the jobSubmission
933  # data attribute to that specified for this method.
934  if jobSubmission is not None:
935  self.jobSubmission = jobSubmission
936  self.status = "submitted"
937  msg.debug("{notifier}: status: {status}".format(
938  notifier = self.className,
939  status = self.status
940  ))
941  # If the input submission is a Job object, contain it in a JobGroup
942  # object.
943  if isinstance(self.jobSubmission, Job):
944  jobGroup = JobGroup(
945  jobs = [self.jobSubmission,],
946  )
947  self.jobSubmission = jobGroup
948  # Count the number of jobs.
949  self.countOfJobs = len(
950  self.countOfRemainingJobs = self.countOfJobs
951  # Build a contemporary list of the names of jobs.
952  self.listOfNamesOfRemainingJobs = []
953  for job in
954  self.listOfNamesOfRemainingJobs.append(
955  msg.debug("{notifier}: received job group submission '{name}' of {countOfJobs} {units}".format(
956  notifier = self.className,
957  name =,
958  countOfJobs = self.countOfJobs,
959  units = units(
960  quantity = self.countOfRemainingJobs,
961  unitSingular = "job",
962  unitPlural = "jobs"
963  )
964  ))
965  msg.debug(self.statusReport())
966  msg.debug("{notifier}: submitting job group submission '{name}' to pool".format(
967  notifier = self.className,
968  name =
969  ))
970  # Cycle through all jobs in the input submission and apply each to the
971  # pool.
972  for job in
973  job.timeStampSubmission = time.time()
974  msg.debug("{notifier}: job '{name}' submitted to pool".format(
975  notifier = self.className,
976  name =
977  ))
978  # Apply the job to the pool, applying the object pool.ApplyResult
979  # to the job as a data attribute.
980  job.resultGetter = self.pool.apply_async(
981  func = job.workFunction,
982  kwds = job.workFunctionKeywordArguments
983  )
984  # Prepare monitoring of job group times in order to detect a job group
985  # timeout by recording the time of complete submission of the job group.
986  self.jobSubmission.timeStampSubmission = time.time()
987  msg.debug("{notifier}: job group submission complete: {countOfJobs} {units} submitted to pool (timestamp: {timeStampSubmission})".format(
988  notifier = self.className,
989  countOfJobs = self.countOfJobs,
990  units = units(
991  quantity = self.countOfJobs,
992  unitSingular = "job",
993  unitPlural = "jobs"
994  ),
995  timeStampSubmission = self.jobSubmission.timeStampSubmission
996  ))
997  self.status = "processing"
998  msg.debug("{notifier}: status: {status}".format(
999  notifier = self.className,
1000  status = self.status
1001  ))
1002  return 0

Member Data Documentation

◆ className


Definition at line 876 of file

◆ countOfJobs


Definition at line 882 of file

◆ countOfRemainingJobs


Definition at line 883 of file

◆ jobSubmission


Definition at line 874 of file

◆ listOfNamesOfRemainingJobs


Definition at line 949 of file

◆ numberOfProcesses


Definition at line 875 of file

◆ pool


Definition at line 884 of file

◆ status


Definition at line 877 of file

The documentation for this class was generated from the following file:
def printHR(the_object)
print in a human-readable way the items of a given object
bool append
def getResults(infile, test_dict)
def units(quantity=None, unitSingular="unit", unitPlural="units")
return either singular or plural units as appropriate for a given quantity
std::vector< typename T::value_type > sorted(T begin, T end)
Helper function to create a sorted vector from an unsorted one.
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
def __init__(self, base_directory, **kwargs)
Constructor: all process options are set here.
Definition: BTagTrackIpAccessor.cxx:11