ATLAS Offline Software
Loading...
Searching...
No Matches
python.trfUtils.ParallelJobProcessor Class Reference

ParallelJobProcessor: a multiple-process processor of Job objects. More...

Inheritance diagram for python.trfUtils.ParallelJobProcessor:
Collaboration diagram for python.trfUtils.ParallelJobProcessor:

Public Member Functions

 __init__ (self, jobSubmission=None, numberOfProcesses=multiprocessing.cpu_count(), startMethod="fork")
 initialisation method that accepts submissions and starts pool
 __str__ (self)
 return an object self-description string
 printout (self)
 print in a human-readable way the items of the object self
 submit (self, jobSubmission=None)
 submit a Job object or a JobGroup object for processing
 getResults (self)
 get results of JobGroup object submission
 statusReport (self)
 return a status report string

Public Attributes

 jobSubmission = jobSubmission
 numberOfProcesses = numberOfProcesses
 startMethod = startMethod
 className = self.__class__.__name__
str status = "starting"
 countOfJobs = None
int countOfRemainingJobs = 0
 pool
list listOfNamesOfRemainingJobs = []

Protected Member Functions

 _abort (self)
 abort parallel job processor
 _terminate (self)
 terminate parallel job processor

Detailed Description

ParallelJobProcessor: a multiple-process processor of Job objects.

Parameters
jobSubmissionJob object or JobGroup object for submission
numberOfProcessesthe number of processes in the process pool

Definition at line 869 of file trfUtils.py.

Constructor & Destructor Documentation

◆ __init__()

python.trfUtils.ParallelJobProcessor.__init__ ( self,
jobSubmission = None,
numberOfProcesses = multiprocessing.cpu_count(),
startMethod = "fork" )

initialisation method that accepts submissions and starts pool

This method is the initialisation method of the parallel job processor. It accepts input JobGroup object submissions and prepares a pool of workers.

Definition at line 875 of file trfUtils.py.

883 ):
884 self.jobSubmission = jobSubmission
885 self.numberOfProcesses = numberOfProcesses
886 self.startMethod = startMethod
887 self.className = self.__class__.__name__
888 msg.debug(f"{self.className}: current process: {multiprocessing.current_process().name}")
889 self.status = "starting"
890 msg.debug("{notifier}: status: {status}".format(
891 notifier = self.className,
892 status = self.status)
893 )
894 self.countOfJobs = None
895 self.countOfRemainingJobs = 0
896
897 self.pool = multiprocessing.pool.Pool(
898 self.numberOfProcesses,
899 initialise_processes,
900 (),
901 None,
902 multiprocessing.get_context(self.startMethod)
903 )
904 msg.debug("{notifier}: pool of {numberOfProcesses} {units} with start method {startMethod!r} created".format(
905 notifier = self.className,
906 numberOfProcesses = str(self.numberOfProcesses),
907 units = units(quantity = self.numberOfProcesses,
908 unitSingular = "process", unitPlural = "processes"),
909 startMethod = self.startMethod,
910 ))
911 self.status = "ready"
912 msg.debug("{notifier}: status: {status}".format(
913 notifier = self.className,
914 status = self.status
915 ))
916

Member Function Documentation

◆ __str__()

python.trfUtils.ParallelJobProcessor.__str__ ( self)

return an object self-description string

This method returns an object description string consisting of a listing of the items of the object self.

Returns
object description string

Definition at line 921 of file trfUtils.py.

921 def __str__(self):
922 descriptionString = ""
923 for key, value in sorted(vars(self).items()):
924 descriptionString += str("{key}:{value} ".format(
925 key = key,
926 value = value
927 ))
928 return descriptionString
929

◆ _abort()

python.trfUtils.ParallelJobProcessor._abort ( self)
protected

abort parallel job processor

This method aborts the parallel job processor. It is used typically when an exception is raised.

Definition at line 1225 of file trfUtils.py.

1225 def _abort(self):
1226 self.status = "aborting"
1227 msg.debug("{notifier}: status: {status}".format(
1228 notifier = self.className,
1229 status = self.status
1230 ))
1231 self._terminate()
1232

◆ _terminate()

python.trfUtils.ParallelJobProcessor._terminate ( self)
protected

terminate parallel job processor

This method terminates the parallel job processor. It terminates the subprocesses of the parallel job processor. It is used typically when terminating the parallel job processor on successful completion of job processing and when aborting the parallel job processor.

Definition at line 1238 of file trfUtils.py.

1238 def _terminate(self):
1239 self.status = "terminating"
1240 msg.debug("{notifier}: status: {status}".format(
1241 notifier = self.className,
1242 status = self.status
1243 ))
1244 msg.debug("{notifier}: terminating pool of {numberOfProcesses} {units}".format(
1245 notifier = self.className,
1246 numberOfProcesses = str(self.numberOfProcesses),
1247 units = units(
1248 quantity = self.numberOfProcesses,
1249 unitSingular = "process",
1250 unitPlural = "processes"
1251 )
1252 ))
1253 self.pool.terminate()
1254 self.pool.join()
1255 self.status = "finished"
1256 msg.debug("{notifier}: status: {status}".format(
1257 notifier = self.className,
1258 status = self.status
1259 ))
1260 msg.debug(self.statusReport())
1261

◆ getResults()

python.trfUtils.ParallelJobProcessor.getResults ( self)

get results of JobGroup object submission

This method returns an ordered list of results for jobs submitted.

Returns
order list of results for jobs

Definition at line 1022 of file trfUtils.py.

1022 def getResults(self):
1023 # While the number of jobs remaining is greater than zero, cycle over
1024 # all jobs in the JobGroup object submission submission, watching for a
1025 # timeout of the JobGroup object submission. If a result has not been
1026 # retrieved for a job (i.e. the Job object does not have a result data
1027 # attribute), then check if a result is available for the job (using the
1028 # method multiprocessing.pool.AsyncResult.ready()). If a result is
1029 # available for the job, then check if the job has run successfully
1030 # (using the method multiprocessing.pool.AsyncResult.successful()). If
1031 # the job has not been successful, raise an exception, otherwise, get
1032 # the result of the job and save it to the result data attribute of the
1033 # job.
1034 msg.debug("{notifier}: checking for job {units}".format(
1035 notifier = self.className,
1036 units = units(
1037 quantity = self.countOfRemainingJobs,
1038 unitSingular = "result",
1039 unitPlural = "results")
1040 )
1041 )
1042 while self.countOfRemainingJobs > 0:
1043 # Check for timeout of the job group. If the current timestamp is
1044 # greater than the job group timeout (derived from the sum of the
1045 # set of all job timeout specifications in the job group) + the job
1046 # group submission timestamp, then raise an excepton, otherwise
1047 # cycle over all jobs.
1048 # Allow time for jobs to complete.
1049 time.sleep(5.0)
1050 if self.jobSubmission.timeoutStatus():
1051 msg.error("{notifier}: job group '{name}' timed out".format(
1052 notifier = self.className,
1053 name = self.jobSubmission.name
1054 ))
1055 self._abort()
1056 exceptionMessage = "timeout of a function in list {listOfNamesOfRemainingJobs}".format(
1057 listOfNamesOfRemainingJobs = self.listOfNamesOfRemainingJobs
1058 )
1059 msg.error("{notifier}: exception message: {exceptionMessage}".format(
1060 notifier = self.className,
1061 exceptionMessage = exceptionMessage
1062 ))
1063 raise trfExceptions.TransformTimeoutException(
1064 trfExit.nameToCode('TRF_EXEC_TIMEOUT'),
1065 exceptionMessage
1066 )
1067 else:
1068 for job in self.jobSubmission.jobs:
1069 if not hasattr(job, 'result'):
1070 # If the result of the job is ready...
1071 if job.resultGetter.ready():
1072 msg.debug(
1073 "{notifier}: result ready for job '{name}'".format(
1074 notifier = self.className,
1075 name = job.name
1076 )
1077 )
1078 job.successStatus = job.resultGetter.successful()
1079 msg.debug(
1080 "{notifier}: job '{name}' success status: {successStatus}".format(
1081 notifier = self.className,
1082 name = job.name,
1083 successStatus = job.successStatus
1084 )
1085 )
1086 # If the job was successful, create the result data
1087 # attribute of the job and save the result to it.
1088 if job.successStatus:
1089 job.result = job.resultGetter.get()
1090 msg.debug(
1091 "{notifier}: result of job '{name}': {result}".format(
1092 notifier = self.className,
1093 name = job.name,
1094 result = job.result
1095 )
1096 )
1097 self.countOfRemainingJobs -= 1
1098 self.listOfNamesOfRemainingJobs.remove(job.name)
1099 msg.debug(
1100 "{notifier}: {countOfRemainingJobs} {units} remaining".format(
1101 notifier = self.className,
1102 countOfRemainingJobs = self.countOfRemainingJobs,
1103 units = units(
1104 quantity = self.countOfRemainingJobs,
1105 unitSingular = "job",
1106 unitPlural = "jobs"
1107 )
1108 )
1109 + f"\n names of remaining jobs: {self.listOfNamesOfRemainingJobs}"
1110 )
1111 # If the job was not successful, raise an exception
1112 # and abort processing.
1113 elif not job.successStatus:
1114 msg.error(
1115 "{notifier}: job '{name}' failed".format(
1116 notifier = self.className,
1117 name = job.name
1118 )
1119 )
1120 self._abort()
1121 exceptionMessage = "failure of function '{name}' with arguments {arguments}".format(
1122 name = job.workFunction.__name__,
1123 arguments = job.workFunctionKeywordArguments
1124 )
1125 msg.error("{notifier}: exception message: {exceptionMessage}".format(
1126 notifier = self.className,
1127 exceptionMessage = exceptionMessage
1128 ))
1129 raise trfExceptions.TransformExecutionException(
1130 trfExit.nameToCode('TRF_EXEC_FAIL'),
1131 exceptionMessage
1132 )
1133 # All results having been returned, create the 'results' list data
1134 # attribute of the job group and append all individual job results to
1135 # it.
1136 self.jobSubmission.timeStampComplete = time.time()
1137 self.jobSubmission.completeStatus = True
1138 msg.debug("{notifier}: all {countOfJobs} {units} complete (timestamp: {timeStampComplete})".format(
1139 notifier = self.className,
1140 countOfJobs = self.countOfJobs,
1141 units = units(
1142 quantity = self.countOfJobs,
1143 unitSingular = "job",
1144 unitPlural = "jobs"
1145 ),
1146 timeStampComplete = self.jobSubmission.timeStampComplete
1147 ))
1148 self.jobSubmission.processingTime = self.jobSubmission.timeStampComplete - self.jobSubmission.timeStampSubmission
1149 msg.debug("{notifier}: time taken to process all {countOfJobs} {units}: {processingTime}".format(
1150 notifier = self.className,
1151 countOfJobs = self.countOfJobs,
1152 units = units(
1153 quantity = self.countOfJobs,
1154 unitSingular = "job",
1155 unitPlural = "jobs"
1156 ),
1157 processingTime = self.jobSubmission.processingTime
1158 ))
1159 for job in self.jobSubmission.jobs:
1160 self.jobSubmission.results.append(job.result)
1161 self._terminate()
1162 return self.jobSubmission.results
1163

◆ printout()

python.trfUtils.ParallelJobProcessor.printout ( self)

print in a human-readable way the items of the object self

This function prints in a human-readable way the items of the object self.

Definition at line 933 of file trfUtils.py.

933 def printout(self):
934 printHR(vars(self)
935 )
936

◆ statusReport()

python.trfUtils.ParallelJobProcessor.statusReport ( self)

return a status report string

This method returns a status report string, detailing information on the JobGroup submission and on the job processing status.

Returns
status report string

Definition at line 1168 of file trfUtils.py.

1168 def statusReport(self):
1169 statusReport = "{notifier}:\n status report:".format(
1170 notifier = self.className
1171 )
1172 # information on parallel job processor
1173 statusReport += "\n parallel job processor configuration:"
1174 statusReport += "\n status: {notifier}".format(
1175 notifier = str(self.status)
1176 )
1177 statusReport += "\n number of processes: {notifier}".format(
1178 notifier = str(self.numberOfProcesses)
1179 )
1180 # information on job group submission
1181 statusReport += "\n job group submission: '{notifier}'".format(
1182 notifier = self.jobSubmission.name
1183 )
1184 statusReport += "\n total number of jobs: {notifier}".format(
1185 notifier = str(self.countOfJobs)
1186 )
1187 statusReport += "\n number of incomplete jobs: {notifier}".format(
1188 notifier = str(self.countOfRemainingJobs)
1189 )
1190 statusReport += "\n names of incomplete jobs: {notifier}".format(
1191 notifier = self.listOfNamesOfRemainingJobs
1192 )
1193 # information on jobs (if existent)
1194 if self.jobSubmission.jobs:
1195 statusReport += "\n jobs:"
1196 for job in self.jobSubmission.jobs:
1197 statusReport += "\n job '{name}':".format(
1198 name = job.name
1199 )
1200 statusReport += "\n workFunction: '{name}'".format(
1201 name = job.workFunction.__name__
1202 )
1203 statusReport += "\n workFunctionKeywordArguments: '{arguments}'".format(
1204 arguments = job.workFunctionKeywordArguments
1205 )
1206 statusReport += "\n workFunctionTimeout: '{timeout}'".format(
1207 timeout = job.workFunctionTimeout
1208 )
1209 if hasattr(job, 'result'):
1210 statusReport += "\n result: '{result}'".format(
1211 result = job.result
1212 )
1213 # statistics of parallel job processor run
1214 if hasattr(self.jobSubmission, 'processingTime'):
1215 statusReport += "\n statistics:"
1216 if hasattr(self.jobSubmission, 'processingTime'):
1217 statusReport += "\n total processing time: {processingTime} s".format(
1218 processingTime = self.jobSubmission.processingTime
1219 )
1220 return statusReport
1221

◆ submit()

python.trfUtils.ParallelJobProcessor.submit ( self,
jobSubmission = None )

submit a Job object or a JobGroup object for processing

This method submits a specified Job object or JobGroup object for processing. On successful submission, it returns the value 0.

Parameters
jobSubmissionJob object or JobGroup object for submission

Definition at line 941 of file trfUtils.py.

944 ):
945 # If the input submission is not None, then update the jobSubmission
946 # data attribute to that specified for this method.
947 if jobSubmission is not None:
948 self.jobSubmission = jobSubmission
949 self.status = "submitted"
950 msg.debug("{notifier}: status: {status}".format(
951 notifier = self.className,
952 status = self.status
953 ))
954 # If the input submission is a Job object, contain it in a JobGroup
955 # object.
956 if isinstance(self.jobSubmission, Job):
957 jobGroup = JobGroup(
958 jobs = [self.jobSubmission,],
959 )
960 self.jobSubmission = jobGroup
961 # Count the number of jobs.
962 self.countOfJobs = len(self.jobSubmission.jobs)
963 self.countOfRemainingJobs = self.countOfJobs
964 # Build a contemporary list of the names of jobs.
965 self.listOfNamesOfRemainingJobs = []
966 for job in self.jobSubmission.jobs:
967 self.listOfNamesOfRemainingJobs.append(job.name)
968 msg.debug("{notifier}: received job group submission '{name}' of {countOfJobs} {units}".format(
969 notifier = self.className,
970 name = self.jobSubmission.name,
971 countOfJobs = self.countOfJobs,
972 units = units(
973 quantity = self.countOfRemainingJobs,
974 unitSingular = "job",
975 unitPlural = "jobs"
976 )
977 ))
978 msg.debug(self.statusReport())
979 msg.debug("{notifier}: submitting job group submission '{name}' to pool".format(
980 notifier = self.className,
981 name = self.jobSubmission.name
982 ))
983 # Cycle through all jobs in the input submission and apply each to the
984 # pool.
985 for job in self.jobSubmission.jobs:
986 job.timeStampSubmission = time.time()
987 msg.debug("{notifier}: job '{name}' submitted to pool".format(
988 notifier = self.className,
989 name = job.name
990 ))
991 # Apply the job to the pool, applying the object
992 # multiprocessing.pool.ApplyResult (AsyncResult alias thereof)
993 # to the job as a data attribute.
994 job.resultGetter = self.pool.apply_async(
995 func = job.workFunction,
996 kwds = job.workFunctionKeywordArguments
997 )
998 # Prepare monitoring of job group times in order to detect a job group
999 # timeout by recording the time of complete submission of the job group.
1000 self.jobSubmission.timeStampSubmission = time.time()
1001 msg.debug("{notifier}: job group submission complete: {countOfJobs} {units} submitted to pool (timestamp: {timeStampSubmission})".format(
1002 notifier = self.className,
1003 countOfJobs = self.countOfJobs,
1004 units = units(
1005 quantity = self.countOfJobs,
1006 unitSingular = "job",
1007 unitPlural = "jobs"
1008 ),
1009 timeStampSubmission = self.jobSubmission.timeStampSubmission
1010 ))
1011 self.status = "processing"
1012 msg.debug("{notifier}: status: {status}".format(
1013 notifier = self.className,
1014 status = self.status
1015 ))
1016 return 0
1017

Member Data Documentation

◆ className

python.trfUtils.ParallelJobProcessor.className = self.__class__.__name__

Definition at line 887 of file trfUtils.py.

◆ countOfJobs

python.trfUtils.ParallelJobProcessor.countOfJobs = None

Definition at line 894 of file trfUtils.py.

◆ countOfRemainingJobs

int python.trfUtils.ParallelJobProcessor.countOfRemainingJobs = 0

Definition at line 895 of file trfUtils.py.

◆ jobSubmission

python.trfUtils.ParallelJobProcessor.jobSubmission = jobSubmission

Definition at line 884 of file trfUtils.py.

◆ listOfNamesOfRemainingJobs

list python.trfUtils.ParallelJobProcessor.listOfNamesOfRemainingJobs = []

Definition at line 965 of file trfUtils.py.

◆ numberOfProcesses

python.trfUtils.ParallelJobProcessor.numberOfProcesses = numberOfProcesses

Definition at line 885 of file trfUtils.py.

◆ pool

python.trfUtils.ParallelJobProcessor.pool
Initial value:
= multiprocessing.pool.Pool(
self.numberOfProcesses,
initialise_processes,
(),
None,
multiprocessing.get_context(self.startMethod)
)

Definition at line 897 of file trfUtils.py.

◆ startMethod

python.trfUtils.ParallelJobProcessor.startMethod = startMethod

Definition at line 886 of file trfUtils.py.

◆ status

str python.trfUtils.ParallelJobProcessor.status = "starting"

Definition at line 889 of file trfUtils.py.


The documentation for this class was generated from the following file: