ATLAS Offline Software
Loading...
Searching...
No Matches
python.trfUtils.ParallelJobProcessor Class Reference

ParallelJobProcessor: a multiple-process processor of Job objects. More...

Inheritance diagram for python.trfUtils.ParallelJobProcessor:
Collaboration diagram for python.trfUtils.ParallelJobProcessor:

Public Types

typedef HLT::TypeInformation::for_each_type_c< typenameEDMLIST::map, my_functor, my_result<>, my_arg< HLT::TypeInformation::get_cont, CONTAINER > >::type result

Public Member Functions

 __init__ (self, jobSubmission=None, numberOfProcesses=multiprocessing.cpu_count())
 initialisation method that accepts submissions and starts pool
 __str__ (self)
 return an object self-description string
 printout (self)
 print in a human-readable way the items of the object self
 submit (self, jobSubmission=None)
 submit a Job object or a JobGroup object for processing
 getResults (self)
 get results of JobGroup object submission
 statusReport (self)
 return a status report string

Public Attributes

 jobSubmission = jobSubmission
 numberOfProcesses = numberOfProcesses
 className = self.__class__.__name__
str status = "starting"
 countOfJobs = None
int countOfRemainingJobs = 0
 pool
list listOfNamesOfRemainingJobs = []

Protected Member Functions

 _abort (self)
 abort parallel job processor
 _terminate (self)
 terminate parallel job processor

Detailed Description

ParallelJobProcessor: a multiple-process processor of Job objects.

Parameters
jobSubmissionJob object or JobGroup object for submission
numberOfProcessesthe number of processes in the process pool

Definition at line 867 of file trfUtils.py.

Member Typedef Documentation

◆ result

Definition at line 90 of file EDM_MasterSearch.h.

Constructor & Destructor Documentation

◆ __init__()

python.trfUtils.ParallelJobProcessor.__init__ ( self,
jobSubmission = None,
numberOfProcesses = multiprocessing.cpu_count() )

initialisation method that accepts submissions and starts pool

This method is the initialisation method of the parallel job processor. It accepts input JobGroup object submissions and prepares a pool of workers.

Definition at line 873 of file trfUtils.py.

877 ):
878 self.jobSubmission = jobSubmission
879 self.numberOfProcesses = numberOfProcesses
880 self.className = self.__class__.__name__
881 self.status = "starting"
882 msg.debug("{notifier}: status: {status}".format(
883 notifier = self.className,
884 status = self.status)
885 )
886 self.countOfJobs = None
887 self.countOfRemainingJobs = 0
888 self.pool = multiprocessing.Pool(
889 self.numberOfProcesses,
890 initialise_processes
891 )
892 msg.debug("{notifier}: pool of {numberOfProcesses} {units} created".format(
893 notifier = self.className,
894 numberOfProcesses = str(self.numberOfProcesses),
895 units = units(quantity = self.numberOfProcesses,
896 unitSingular = "process", unitPlural = "processes")
897 ))
898 self.status = "ready"
899 msg.debug("{notifier}: status: {status}".format(
900 notifier = self.className,
901 status = self.status
902 ))
903

Member Function Documentation

◆ __str__()

python.trfUtils.ParallelJobProcessor.__str__ ( self)

return an object self-description string

This method returns an object description string consisting of a listing of the items of the object self.

Returns
object description string

Definition at line 908 of file trfUtils.py.

908 def __str__(self):
909 descriptionString = ""
910 for key, value in sorted(vars(self).items()):
911 descriptionString += str("{key}:{value} ".format(
912 key = key,
913 value = value
914 ))
915 return descriptionString
916

◆ _abort()

python.trfUtils.ParallelJobProcessor._abort ( self)
protected

abort parallel job processor

This method aborts the parallel job processor. It is used typically when an exception is raised.

Definition at line 1214 of file trfUtils.py.

1214 def _abort(self):
1215 self.status = "aborting"
1216 msg.debug("{notifier}: status: {status}".format(
1217 notifier = self.className,
1218 status = self.status
1219 ))
1220 self._terminate()
1221

◆ _terminate()

python.trfUtils.ParallelJobProcessor._terminate ( self)
protected

terminate parallel job processor

This method terminates the parallel job processor. It terminates the subprocesses of the parallel job processor. It is used typically when terminating the parallel job processor on successful completion of job processing and when aborting the parallel job processor.

Definition at line 1227 of file trfUtils.py.

1227 def _terminate(self):
1228 self.status = "terminating"
1229 msg.debug("{notifier}: status: {status}".format(
1230 notifier = self.className,
1231 status = self.status
1232 ))
1233 msg.debug("{notifier}: terminating pool of {numberOfProcesses} {units}".format(
1234 notifier = self.className,
1235 numberOfProcesses = str(self.numberOfProcesses),
1236 units = units(
1237 quantity = self.numberOfProcesses,
1238 unitSingular = "process",
1239 unitPlural = "processes"
1240 )
1241 ))
1242 self.pool.terminate()
1243 self.pool.join()
1244 self.status = "finished"
1245 msg.debug("{notifier}: status: {status}".format(
1246 notifier = self.className,
1247 status = self.status
1248 ))
1249 msg.debug(self.statusReport())
1250

◆ getResults()

python.trfUtils.ParallelJobProcessor.getResults ( self)

get results of JobGroup object submission

This method returns an ordered list of results for jobs submitted.

Returns
order list of results for jobs

Definition at line 1008 of file trfUtils.py.

1008 def getResults(self):
1009 # While the number of jobs remaining is greater than zero, cycle over
1010 # all jobs in the JobGroup object submission submission, watching for a
1011 # timeout of the JobGroup object submission. If a result has not been
1012 # retrieved for a job (i.e. the Job object does not have a result data
1013 # attribute), then check if a result is available for the job (using the
1014 # method multiprocessing.pool.AsyncResult.ready()). If a result is
1015 # available for the job, then check if the job has run successfully
1016 # (using the method multiprocessing.pool.AsyncResult.successful()). If
1017 # the job has not been successful, raise an exception, otherwise, get
1018 # the result of the job and save it to the result data attribute of the
1019 # job.
1020 msg.debug("{notifier}: checking for job {units}".format(
1021 notifier = self.className,
1022 units = units(
1023 quantity = self.countOfRemainingJobs,
1024 unitSingular = "result",
1025 unitPlural = "results")
1026 )
1027 )
1028 while self.countOfRemainingJobs > 0:
1029 # Check for timeout of the job group. If the current timestamp is
1030 # greater than the job group timeout (derived from the sum of the
1031 # set of all job timeout specifications in the job group) + the job
1032 # group submission timestamp, then raise an excepton, otherwise
1033 # cycle over all jobs.
1034 # Allow time for jobs to complete.
1035 time.sleep(0.25)
1036 if self.jobSubmission.timeoutStatus():
1037 msg.error("{notifier}: job group '{name}' timed out".format(
1038 notifier = self.className,
1039 name = self.jobSubmission.name
1040 ))
1041 self._abort()
1042 exceptionMessage = "timeout of a function in list {listOfNamesOfRemainingJobs}".format(
1043 listOfNamesOfRemainingJobs = self.listOfNamesOfRemainingJobs
1044 )
1045 msg.error("{notifier}: exception message: {exceptionMessage}".format(
1046 notifier = self.className,
1047 exceptionMessage = exceptionMessage
1048 ))
1049 raise trfExceptions.TransformTimeoutException(
1050 trfExit.nameToCode('TRF_EXEC_TIMEOUT'),
1051 exceptionMessage
1052 )
1053 else:
1054 for job in self.jobSubmission.jobs:
1055 self.listOfNamesOfRemainingJobs = []
1056 if not hasattr(job, 'result'):
1057 # Maintain a contemporary list of the names of remaining
1058 # jobs.
1059 self.listOfNamesOfRemainingJobs.append(job.name)
1060 # If the result of the job is ready...
1061 if job.resultGetter.ready():
1062 msg.debug(
1063 "{notifier}: result ready for job '{name}'".format(
1064 notifier = self.className,
1065 name = job.name
1066 )
1067 )
1068 job.successStatus = job.resultGetter.successful()
1069 msg.debug(
1070 "{notifier}: job '{name}' success status: {successStatus}".format(
1071 notifier = self.className,
1072 name = job.name,
1073 successStatus = job.successStatus
1074 )
1075 )
1076 # If the job was successful, create the result data
1077 # attribute of the job and save the result to it.
1078 if job.successStatus:
1079 job.result = job.resultGetter.get()
1080 msg.debug(
1081 "{notifier}: result of job '{name}': {result}".format(
1082 notifier = self.className,
1083 name = job.name,
1084 result = job.result
1085 )
1086 )
1087 self.countOfRemainingJobs -= 1
1088 msg.debug(
1089 "{notifier}: {countOfRemainingJobs} {units} remaining".format(
1090 notifier = self.className,
1091 countOfRemainingJobs = self.countOfRemainingJobs,
1092 units = units(
1093 quantity = self.countOfRemainingJobs,
1094 unitSingular = "job",
1095 unitPlural = "jobs"
1096 )
1097 )
1098 )
1099 # If the job was not successful, raise an exception
1100 # and abort processing.
1101 elif not job.successStatus:
1102 msg.error(
1103 "{notifier}: job '{name}' failed".format(
1104 notifier = self.className,
1105 name = job.name
1106 )
1107 )
1108 self._abort()
1109 exceptionMessage = "failure of function '{name}' with arguments {arguments}".format(
1110 name = job.workFunction.__name__,
1111 arguments = job.workFunctionKeywordArguments
1112 )
1113 msg.error("{notifier}: exception message: {exceptionMessage}".format(
1114 notifier = self.className,
1115 exceptionMessage = exceptionMessage
1116 ))
1117 raise trfExceptions.TransformExecutionException(
1118 trfExit.nameToCode('TRF_EXEC_FAIL'),
1119 exceptionMessage
1120 )
1121 # All results having been returned, create the 'results' list data
1122 # attribute of the job group and append all individual job results to
1123 # it.
1124 self.jobSubmission.timeStampComplete = time.time()
1125 self.jobSubmission.completeStatus = True
1126 msg.debug("{notifier}: all {countOfJobs} {units} complete (timestamp: {timeStampComplete})".format(
1127 notifier = self.className,
1128 countOfJobs = self.countOfJobs,
1129 units = units(
1130 quantity = self.countOfJobs,
1131 unitSingular = "job",
1132 unitPlural = "jobs"
1133 ),
1134 timeStampComplete = self.jobSubmission.timeStampComplete
1135 ))
1136 self.jobSubmission.processingTime = self.jobSubmission.timeStampComplete - self.jobSubmission.timeStampSubmission
1137 msg.debug("{notifier}: time taken to process all {countOfJobs} {units}: {processingTime}".format(
1138 notifier = self.className,
1139 countOfJobs = self.countOfJobs,
1140 units = units(
1141 quantity = self.countOfJobs,
1142 unitSingular = "job",
1143 unitPlural = "jobs"
1144 ),
1145 processingTime = self.jobSubmission.processingTime
1146 ))
1147 for job in self.jobSubmission.jobs:
1148 self.jobSubmission.results.append(job.result)
1149 self._terminate()
1150 return self.jobSubmission.results
1151 self._terminate()
1152

◆ printout()

python.trfUtils.ParallelJobProcessor.printout ( self)

print in a human-readable way the items of the object self

This function prints in a human-readable way the items of the object self.

Definition at line 920 of file trfUtils.py.

920 def printout(self):
921 printHR(vars(self)
922 )
923

◆ statusReport()

python.trfUtils.ParallelJobProcessor.statusReport ( self)

return a status report string

This method returns a status report string, detailing information on the JobGroup submission and on the job processing status.

Returns
status report string

Definition at line 1157 of file trfUtils.py.

1157 def statusReport(self):
1158 statusReport = "\n{notifier}:\n status report:".format(
1159 notifier = self.className
1160 )
1161 # information on parallel job processor
1162 statusReport += "\n parallel job processor configuration:"
1163 statusReport += "\n status: {notifier}".format(
1164 notifier = str(self.status)
1165 )
1166 statusReport += "\n number of processes: {notifier}".format(
1167 notifier = str(self.numberOfProcesses)
1168 )
1169 # information on job group submission
1170 statusReport += "\n job group submission: '{notifier}'".format(
1171 notifier = self.jobSubmission.name
1172 )
1173 statusReport += "\n total number of jobs: {notifier}".format(
1174 notifier = str(self.countOfJobs)
1175 )
1176 statusReport += "\n number of incomplete jobs: {notifier}".format(
1177 notifier = str(self.countOfRemainingJobs)
1178 )
1179 statusReport += "\n names of incomplete jobs: {notifier}".format(
1180 notifier = self.listOfNamesOfRemainingJobs
1181 )
1182 # information on jobs (if existent)
1183 if self.jobSubmission.jobs:
1184 statusReport += "\n jobs:"
1185 for job in self.jobSubmission.jobs:
1186 statusReport += "\n job '{name}':".format(
1187 name = job.name
1188 )
1189 statusReport += "\n workFunction: '{name}'".format(
1190 name = job.workFunction.__name__
1191 )
1192 statusReport += "\n workFunctionKeywordArguments: '{arguments}'".format(
1193 arguments = job.workFunctionKeywordArguments
1194 )
1195 statusReport += "\n workFunctionTimeout: '{timeout}'".format(
1196 timeout = job.workFunctionTimeout
1197 )
1198 if hasattr(job, 'result'):
1199 statusReport += "\n result: '{result}'".format(
1200 result = job.result
1201 )
1202 # statistics of parallel job processor run
1203 if hasattr(self.jobSubmission, 'processingTime'):
1204 statusReport += "\n statistics:"
1205 if hasattr(self.jobSubmission, 'processingTime'):
1206 statusReport += "\n total processing time: {processingTime} s".format(
1207 processingTime = self.jobSubmission.processingTime
1208 )
1209 return statusReport
1210

◆ submit()

python.trfUtils.ParallelJobProcessor.submit ( self,
jobSubmission = None )

submit a Job object or a JobGroup object for processing

This method submits a specified Job object or JobGroup object for processing. On successful submission, it returns the value 0.

Parameters
jobSubmissionJob object or JobGroup object for submission

Definition at line 928 of file trfUtils.py.

931 ):
932 # If the input submission is not None, then update the jobSubmission
933 # data attribute to that specified for this method.
934 if jobSubmission is not None:
935 self.jobSubmission = jobSubmission
936 self.status = "submitted"
937 msg.debug("{notifier}: status: {status}".format(
938 notifier = self.className,
939 status = self.status
940 ))
941 # If the input submission is a Job object, contain it in a JobGroup
942 # object.
943 if isinstance(self.jobSubmission, Job):
944 jobGroup = JobGroup(
945 jobs = [self.jobSubmission,],
946 )
947 self.jobSubmission = jobGroup
948 # Count the number of jobs.
949 self.countOfJobs = len(self.jobSubmission.jobs)
950 self.countOfRemainingJobs = self.countOfJobs
951 # Build a contemporary list of the names of jobs.
952 self.listOfNamesOfRemainingJobs = []
953 for job in self.jobSubmission.jobs:
954 self.listOfNamesOfRemainingJobs.append(job.name)
955 msg.debug("{notifier}: received job group submission '{name}' of {countOfJobs} {units}".format(
956 notifier = self.className,
957 name = self.jobSubmission.name,
958 countOfJobs = self.countOfJobs,
959 units = units(
960 quantity = self.countOfRemainingJobs,
961 unitSingular = "job",
962 unitPlural = "jobs"
963 )
964 ))
965 msg.debug(self.statusReport())
966 msg.debug("{notifier}: submitting job group submission '{name}' to pool".format(
967 notifier = self.className,
968 name = self.jobSubmission.name
969 ))
970 # Cycle through all jobs in the input submission and apply each to the
971 # pool.
972 for job in self.jobSubmission.jobs:
973 job.timeStampSubmission = time.time()
974 msg.debug("{notifier}: job '{name}' submitted to pool".format(
975 notifier = self.className,
976 name = job.name
977 ))
978 # Apply the job to the pool, applying the object pool.ApplyResult
979 # to the job as a data attribute.
980 job.resultGetter = self.pool.apply_async(
981 func = job.workFunction,
982 kwds = job.workFunctionKeywordArguments
983 )
984 # Prepare monitoring of job group times in order to detect a job group
985 # timeout by recording the time of complete submission of the job group.
986 self.jobSubmission.timeStampSubmission = time.time()
987 msg.debug("{notifier}: job group submission complete: {countOfJobs} {units} submitted to pool (timestamp: {timeStampSubmission})".format(
988 notifier = self.className,
989 countOfJobs = self.countOfJobs,
990 units = units(
991 quantity = self.countOfJobs,
992 unitSingular = "job",
993 unitPlural = "jobs"
994 ),
995 timeStampSubmission = self.jobSubmission.timeStampSubmission
996 ))
997 self.status = "processing"
998 msg.debug("{notifier}: status: {status}".format(
999 notifier = self.className,
1000 status = self.status
1001 ))
1002 return 0
1003

Member Data Documentation

◆ className

python.trfUtils.ParallelJobProcessor.className = self.__class__.__name__

Definition at line 880 of file trfUtils.py.

◆ countOfJobs

python.trfUtils.ParallelJobProcessor.countOfJobs = None

Definition at line 886 of file trfUtils.py.

◆ countOfRemainingJobs

int python.trfUtils.ParallelJobProcessor.countOfRemainingJobs = 0

Definition at line 887 of file trfUtils.py.

◆ jobSubmission

python.trfUtils.ParallelJobProcessor.jobSubmission = jobSubmission

Definition at line 878 of file trfUtils.py.

◆ listOfNamesOfRemainingJobs

list python.trfUtils.ParallelJobProcessor.listOfNamesOfRemainingJobs = []

Definition at line 952 of file trfUtils.py.

◆ numberOfProcesses

python.trfUtils.ParallelJobProcessor.numberOfProcesses = numberOfProcesses

Definition at line 879 of file trfUtils.py.

◆ pool

python.trfUtils.ParallelJobProcessor.pool
Initial value:
= multiprocessing.Pool(
self.numberOfProcesses,
initialise_processes
)

Definition at line 888 of file trfUtils.py.

◆ status

str python.trfUtils.ParallelJobProcessor.status = "starting"

Definition at line 881 of file trfUtils.py.


The documentation for this class was generated from the following file: