645 def valueSetter(self, value):
646
647 if isinstance(value, (list, tuple)):
648 if len(value) > 0 and isinstance(value[0], dict):
649 self._value=[]
650 for myfile in value:
651 try:
652 self._value.append(myfile['lfn'])
653 self._resetMetadata(files = [myfile['lfn']])
654 except KeyError:
655 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'),
656 'Filename (key "lfn") not found in Tier-0 file dictionary: {0}'.format(myfile))
657 for k, v in myfile.items():
658 if k == 'guid':
659 self._setMetadata([myfile['lfn']], {'file_guid': v})
660 elif k == 'events':
661 self._setMetadata([myfile['lfn']], {'nentries': v})
662 elif k == 'checksum':
663 self._setMetadata([myfile['lfn']], {'checksum': v})
664 elif k == 'dsn':
665 if not self._dataset:
666 self.dataset = v
667 elif self.dataset != v:
668 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_DATASET'),
669 'Inconsistent dataset names in Tier-0 dictionary: {0} != {1}'.format(self.dataset, v))
670 else:
671 self._value = list(value)
672 self._getDatasetFromFilename(reset = False)
673 self._resetMetadata()
674 elif value is None:
675 self._value = []
676 return
677 else:
678 try:
679 if value.lower().startswith('lfn'):
680
681 from PyUtils.PoolFile import file_name
682 protocol, pfn = file_name(value)
683 self._value = [pfn]
684 self._getDatasetFromFilename(reset = False)
685 self._resetMetadata()
686 else:
687
688 if self._io == 'output' and ('[' in value) and (']' in value):
689 self._value = [value]
690 else:
691 self._value = value.split(self._splitter)
692 self._getDatasetFromFilename(reset = False)
693 self._resetMetadata()
694 except (AttributeError, TypeError):
695 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'),
696 'Failed to convert %s to a list' % str(value))
697
698
699 deDuplicatedValue = []
700 for fname in self._value:
701 if fname not in deDuplicatedValue:
702 deDuplicatedValue.append(fname)
703 else:
704 msg.warning("Removing duplicated file {0} from file list".format(fname))
705 if len(self._value) != len(deDuplicatedValue):
706 self._value = deDuplicatedValue
707 msg.warning('File list after duplicate removal: {0}'.format(self._value))
708
709
710
711
712 if len(self._value) > 0:
713 self._urlType = urlType(self._value[0])
714 else:
715 self._urlType = None
716
717
718 if self._io == 'input':
719
723 if self._urlType == 'posix':
724 msg.debug('Found POSIX filesystem input - activating globbing')
725 newValue = []
726 for filename in self._value:
727
728 globbedFiles = glob.glob(filename)
729 if len(globbedFiles) == 0:
730 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_INPUT_FILE_ERROR'),
731 'Input file argument {0} globbed to NO input files - probably the file(s) are missing'.format(filename))
732
733 globbedFiles.sort()
734 newValue.extend(globbedFiles)
735
736 self._value = newValue
737 msg.debug ('File input is globbed to %s' % self._value)
738
739 elif self._urlType == 'root':
740 msg.debug('Found root filesystem input - activating globbing')
741 newValue = []
742 for filename in self._value:
743 if str(filename).startswith("root"):
744 msg.debug('Found input file name starting with "root," setting XRD_RUNFORKHANDLER=1, which enables fork handlers for xrootd in direct I/O')
745 os.environ["XRD_RUNFORKHANDLER"] = "1"
746 if str(filename).startswith("https") or str(filename).startswith("davs") or not(str(filename).endswith('/')) and '*' not in filename and '?' not in filename:
747 msg.debug('Seems that only one file was given: {0}'.format(filename))
748 newValue.extend(([filename]))
749 else:
750
751 path = filename
752 fileMask = ''
753 if '*' in filename or '?' in filename:
754 msg.debug('Split input into path for listdir() and a filemask to select available files.')
755 path = filename[0:filename.rfind('/')+1]
756 msg.debug('path: {0}'.format(path))
757 fileMask = filename[filename.rfind('/')+1:len(filename)]
758 msg.debug('Will select according to: {0}'.format(fileMask))
759
760 cmd = ['/afs/cern.ch/project/eos/installation/atlas/bin/eos.select' ]
761 if not os.access ('/afs/cern.ch/project/eos/installation/atlas/bin/eos.select', os.X_OK ):
762 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_INPUT_FILE_ERROR'),
763 'No execute access to "eos.select" - could not glob EOS input files.')
764
765 cmd.extend(['ls'])
766 cmd.extend([path])
767
768 myFiles = []
769 try:
770 proc = subprocess.Popen(args = cmd,bufsize = 1, shell = False, stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
771 rc = proc.wait()
772 output = proc.stdout.readlines()
773 if rc!=0:
774 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_INPUT_FILE_ERROR'),
775 'EOS list command ("{0!s}") failed: rc {1}, output {2}'.format(cmd, rc, output))
776 msg.debug("eos returned: {0}".format(output))
777 for line in output:
778 if "root" in line:
779 myFiles += [str(path)+str(line.rstrip('\n'))]
780
781 patt = re.compile(fileMask.replace(
'*',
'.*').
replace(
'?',
'.'))
782 for srmFile in myFiles:
783 if fileMask != '':
784 if(patt.search(srmFile)) is not None:
785
786 msg.debug('match: %s',srmFile)
787 newValue.extend(([srmFile]))
788 else:
789 newValue.extend(([srmFile]))
790
791 msg.debug('Selected files: %s', newValue)
792 except (AttributeError, TypeError, OSError):
793 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_RUNTIME_ERROR'),
794 'Failed to convert %s to a list' % str(value))
795 if len(self._value) > 0 and len(newValue) == 0:
796
797 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_INPUT_FILE_ERROR'),
798 'Input file argument(s) {0!s} globbed to NO input files - ls command failed')
799 self._value = newValue
800 msg.debug ('File input is globbed to %s' % self._value)
801
802 elif self._multipleOK is False and len(self._value) > 1:
803 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
804 'Multiple file arguments are not supported for {0} (was given: {1}'.format(self, self._value))
805
std::string replace(std::string s, const std::string &s2, const std::string &s3)