723 def execute(self):
724 self._hasExecuted = True
725 msg.info('Starting execution of {0} ({1})'.format(self._name, self._cmd))
726
727 self._exeStart = os.times()
728 msg.debug('exeStart time is {0}'.format(self._exeStart))
729 if ('execOnly' in self.conf.argdict and self.conf.argdict['execOnly'] is True):
730 msg.info('execOnly flag is set - execution will now switch, replacing the transform')
731 os.execvp(self._cmd[0], self._cmd)
732
733 encargs = {'encoding' : 'utf8'}
734 try:
735
736
737
738 if self._alreadyInContainer and self._containerSetup is not None:
739 msg.info("chdir /srv to launch a nested container for the substep")
740 os.chdir("/srv")
741 p = subprocess.Popen(self._cmd, shell = False, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, bufsize = 1, **encargs)
742
743 if self._alreadyInContainer and self._containerSetup is not None:
744 msg.info("chdir {} after launching the nested container".format(self._workdir))
745 os.chdir(self._workdir)
746
747 if self._memMonitor:
748 try:
749 self._memSummaryFile = 'prmon.summary.' + self._name + '.json'
750 self._memFullFile = 'prmon.full.' + self._name
751 memMonitorCommand = ['prmon', '--pid', str(p.pid), '--filename', 'prmon.full.' + self._name,
752 '--json-summary', self._memSummaryFile, '--log-filename', 'prmon.' + self._name + '.log',
753 '--interval', '30']
754 mem_proc = subprocess.Popen(memMonitorCommand, shell = False, close_fds=True, **encargs)
755
756 except Exception as e:
757 msg.warning('Failed to spawn memory monitor for {0}: {1}'.format(self._name, e))
758 self._memMonitor = False
759
760 while p.poll() is None:
761 line = p.stdout.readline()
762 if line:
763 self._echologger.info(line.rstrip())
764
765 for line in p.stdout:
766 self._echologger.info(line.rstrip())
767
768 self._rc = p.returncode
769 msg.info('%s executor returns %d', self._name, self._rc)
770 self._exeStop = os.times()
771 msg.debug('exeStop time is {0}'.format(self._exeStop))
772 except OSError as e:
773 errMsg = 'Execution of {0} failed and raised OSError: {1}'.format(self._cmd[0], e)
774 msg.error(errMsg)
775 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC'), errMsg)
776 finally:
777 if self._memMonitor:
778 try:
779 mem_proc.send_signal(signal.SIGUSR1)
780 countWait = 0
781 while (not mem_proc.poll()) and countWait < 10:
782 time.sleep(0.1)
783 countWait += 1
784 except OSError:
785 pass
786
787