723 def execute(self):
724 self._hasExecuted = True
725 msg.info('Starting execution of {0} ({1})'.format(self._name, self._cmd))
726
727 self._exeStart = os.times()
728 msg.debug('exeStart time is {0}'.format(self._exeStart))
729 if ('execOnly' in self.conf.argdict and self.conf.argdict['execOnly'] is True):
730 msg.info('execOnly flag is set - execution will now switch, replacing the transform')
731 os.execvp(self._cmd[0], self._cmd)
732
733 encargs = {'encoding' : 'utf8'}
734 try:
735
736
737
738 if self._alreadyInContainer and self._containerSetup is not None:
739 msg.info("chdir /srv to launch a nested container for the substep")
740 os.chdir("/srv")
741 p = subprocess.Popen(self._cmd, shell = False, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, bufsize = 1, **encargs)
742
743 if self._alreadyInContainer and self._containerSetup is not None:
744 msg.info("chdir {} after launching the nested container".format(self._workdir))
745 os.chdir(self._workdir)
746
747 if self._memMonitor:
748 try:
749 self._memSummaryFile = 'prmon.summary.' + self._name + '.json'
750 self._memFullFile = 'prmon.full.' + self._name
751 memMonitorCommand = [
'prmon',
'--pid',
str(p.pid),
'--filename',
'prmon.full.' + self._name,
752 '--json-summary', self._memSummaryFile, '--log-filename', 'prmon.' + self._name + '.log',
753 '--interval', '30']
754 mem_proc = subprocess.Popen(memMonitorCommand, shell = False, close_fds=True, **encargs)
755
756 except Exception as e:
757 msg.warning('Failed to spawn memory monitor for {0}: {1}'.format(self._name, e))
758 self._memMonitor = False
759
760 while p.poll() is None:
761 try:
762 line = p.stdout.readline()
763 if line:
764 self._echologger.info(line.rstrip())
765 except UnicodeDecodeError as e:
766 msg.warning('Exception raised processing athena log: {0}'.format(e))
767
768 for line in p.stdout:
769 self._echologger.info(line.rstrip())
770
771 self._rc = p.returncode
772 msg.info('%s executor returns %d', self._name, self._rc)
773 self._exeStop = os.times()
774 msg.debug('exeStop time is {0}'.format(self._exeStop))
775 except OSError as e:
776 errMsg = 'Execution of {0} failed and raised OSError: {1}'.format(self._cmd[0], e)
777 msg.error(errMsg)
778 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC'), errMsg)
779 finally:
780 if self._memMonitor:
781 try:
782 mem_proc.send_signal(signal.SIGUSR1)
783 countWait = 0
784 while (not mem_proc.poll()) and countWait < 10:
785 time.sleep(0.1)
786 countWait += 1
787 except OSError:
788 pass
789
790