From 1f529c8c4a92c013e3be06b6500530106be68844 Mon Sep 17 00:00:00 2001 From: kohleman <kohleman> Date: Thu, 17 Sep 2015 14:45:16 +0000 Subject: [PATCH] - added first tests - re-written the barcode length detection SVN: 34664 --- .../Jython/createSampleSheet_bcl2fastq.py | 242 ++++++++++-------- .../createSampleSheet_bcl2fastq_Test.py | 94 +++++++ 2 files changed, 228 insertions(+), 108 deletions(-) create mode 100644 deep_sequencing_unit/source/Jython/createSampleSheet_bcl2fastq_Test.py diff --git a/deep_sequencing_unit/source/Jython/createSampleSheet_bcl2fastq.py b/deep_sequencing_unit/source/Jython/createSampleSheet_bcl2fastq.py index 054d388c1f0..630c5641afc 100644 --- a/deep_sequencing_unit/source/Jython/createSampleSheet_bcl2fastq.py +++ b/deep_sequencing_unit/source/Jython/createSampleSheet_bcl2fastq.py @@ -70,6 +70,7 @@ lineending = {'win32':'\r\n', 'linux':'\n', 'mac':'\r'} COMMA = ',' CSV = ".csv" + class Sequencers: HISEQ_4000, HISEQ_3000, HISEQ_2500, HISEQ_2000, HISEQ_X, NEXTSEQ_500, MISEQ , UNIDENTIFIED= \ ('Illumina HiSeq 4000','Illumina HiSeq 3000','Illumina HiSeq 2500','Illumina HiSeq 2000', @@ -90,6 +91,7 @@ def logout (service, logger): service.logout() logger.info('Logged out') + def setUpLogger(logPath, logLevel=logging.INFO): logFileName = 'create_sample_sheet_dict' d = datetime.now() @@ -99,6 +101,7 @@ def setUpLogger(logPath, logLevel=logging.INFO): logger = logging.getLogger(logFileName) return logger + def parseOptions(logger): logger.info('Parsing command line parameters') parser = OptionParser(version='%prog 1.0') @@ -141,7 +144,6 @@ def parseOptions(logger): action='store_true', help='Write Sample Sheet to stout. Default: False') - (options, args) = parser.parse_args() if options.outdir[-1] <> '/': @@ -229,7 +231,7 @@ def getDate(): return d.strftime('%A, %d of %B %Y') -def sanitizeString(myString): +def sanitize_string(myString): return re.sub('[^A-Za-z0-9]+', '_', myString) @@ -303,6 +305,15 @@ def get_flowcell (illuminaFlowCellTypeName, flowCellName, service, logger): return foundSample[0], foundContainedSamples + +def get_reverse_complement(sequence): + lookup_table = {'A': 'T', 'T': 'A', 'G': 'C', 'C': 'G'} + reverse_complement = '' + for nucleotide in reversed(sequence): + reverse_complement += lookup_table[nucleotide] + return reverse_complement + + def get_model(run_id): """ Guesses the sequencer model from the run folder name @@ -392,7 +403,7 @@ def get_contained_sample_properties(contained_samples, service): propertyDict['LANE'] = lane.getCode() - myKey = sanitizeString(parentCode + '_' + lane.getCode()) + myKey = sanitize_string(parentCode + '_' + lane.getCode()) parentDict[myKey] = propertyDict return parentDict, samplesPerLaneDict @@ -412,18 +423,6 @@ def transform_sample_to_dict(foundFlowCell): return flowCellDict -def pickleDemultiplexCommandList(logger, demultiplexCommandList, fileName): - import pickle - - try: - with open(fileName, 'w') as pickleDemux: - pickle.dump(demultiplexCommandList, pickleDemux) - logger.info('Writing file ' + fileName) - except IOError, err: - logger.error('File error: ' + str(err)) - print ('File error: ' + str(err)) - - def write_sample_sheet(sampleSheetDict, headerList, myoptions, logger, fileName): """ Writes the given dictionary to a csv file. The order does not matter. As the @@ -440,13 +439,10 @@ def write_sample_sheet(sampleSheetDict, headerList, myoptions, logger, fileName) if myoptions.verbose: print sampleSheetDict[sample][0] sampleSheetFile.write(sampleSheetDict[sample][0] + newline) - logger.info('Writing file ' + fileName) - except IOError: logger.error('File error: ' + str(err)) print ('File error: ' + str(err)) - return fileName @@ -454,35 +450,15 @@ def write_sample_sheet_single_lane(ordered_sample_sheet_dict, flowCellDict, parentDict, configMap, myoptions, logger, csv_file): newline = lineending[myoptions.lineending] - header_list = create_header_section (configMap, parentDict, flowCellDict) for lane in range(1, int(flowCellDict[configMap['laneCount']]) + 1): + per_lane_dict = [ordered_sample_sheet_dict[key] for key in ordered_sample_sheet_dict.keys() if int(key[0]) == lane] csv_file_path = myoptions.outdir + csv_file + "_" + str(lane) + CSV try: - with open(csv_file_path, 'wb') as sample_sheet_file: - - per_lane_dict = [ordered_sample_sheet_dict[key] for key in ordered_sample_sheet_dict.keys() if int(key[0]) == lane] - print(str(lane)) - - index1_set = set () - index2_set = set () - - for line in per_lane_dict: - split = line[0].split(",") - - if (len(split[6])): - index1_set.add(len(split[6])) - - if len(split[8]) > 0: - index2_set.add(len(split[8])) - - print(index1_set) - print(index2_set) - + with open(csv_file_path, 'wb') as sample_sheet_file: for header_element in header_list: sample_sheet_file.write(header_element + newline) -# per_lane_dict = [ordered_sample_sheet_dict[key] for key in ordered_sample_sheet_dict.keys() if int(key[0]) == lane] for sample in per_lane_dict: sample_sheet_file.write(str(sample[0]) + newline) except IOError: @@ -490,14 +466,6 @@ def write_sample_sheet_single_lane(ordered_sample_sheet_dict, flowCellDict, print ('File error: ' + str(err)) -def get_reverse_complement(sequence): - lookup_table = {'A': 'T', 'T': 'A', 'G': 'C', 'C': 'G'} - reverse_complement = '' - for nucleotide in reversed(sequence): - reverse_complement += lookup_table[nucleotide] - return reverse_complement - - def create_header_section (configMap, parentDict, flowCellDict): kitsDict = {"CHIP_SEQ_SAMPLE_PREP" : ["",""], @@ -578,71 +546,128 @@ def create_header_section (configMap, parentDict, flowCellDict): return header_list +def verify_index_length (parentDict, flowCellDict, configMap, logger): + + index_length_dict = {} + verified_per_lane_dict = [] + + flowcell_len_index1 = int(flowCellDict['INDEXREAD']) + flowcell_len_index2 = int(flowCellDict['INDEXREAD2']) + + print("Flowcell has index length [" + str(flowcell_len_index1) + ", " + str(flowcell_len_index2) + "]") + + for lane in range(1,int(flowCellDict['LANECOUNT'])+1): + index1_set = set () + index2_set = set () + index1_length = 0 + index2_length = 0 + + logger.info("Lane: " + str(lane)) + per_lane_list = [parentDict[key] for key in parentDict.keys() if int(key[-1]) == lane] + + for sample in per_lane_list: + # If no index then just skip this sample + if (configMap['index1Name'] not in sample) or (sample[configMap['index1Name']] == 'NOINDEX'): + continue + index1 = sample[configMap['index1Name']] + index2="" + if configMap['index2Name'] in sample: + index2 = sample[configMap['index2Name']] + + index1_set.add(len(index1)) + if index2: + index2_set.add(len(index2)) + else: + index2_set.add(0) + + # adding the index length of the flow cell to make sure that dual-indexed + # samples also work on a single-indexed run + index1_set.add(flowcell_len_index1) + index2_set.add(flowcell_len_index2) + + if index1_set: + index1_length = min(index1_set) + if index2_set: + index2_length = min(index2_set) + + index_length_dict[lane] = [index1_length, index2_length] + logger.info("Index1 Length Set: " + str(index1_set)) + logger.info("Index2 Length Set: " + str(index2_set)) + logger.info("Final length of index1 " + str(index1_length)) + logger.info("Final length of index2 " + str(index2_length)) + #print("Lane " + str(lane) + " [" + str(index1_length) + "," + str(index2_length) + "]") + + return index_length_dict -def create_sample_sheet_dict(model, parentDict, flowCellDict, configMap, index1Vocabulary, index2Vocabulary, flowCellName): - sampleSheetDict = {} - separator = configMap['separator'] +def create_sample_sheet_dict(model, parentDict, flowCellDict, configMap, index1Vocabulary, + index2Vocabulary, flowCellName, logger): - for key in parentDict.keys(): - lane = parentDict[key]['LANE'][-1:] - # If no index then just skip this sample - if (configMap['index1Name'] not in parentDict[key]) or (parentDict[key][configMap['index1Name']] == 'NOINDEX'): - continue - index1 = parentDict[key][configMap['index1Name']] - index2="" - if configMap['index2Name'] in parentDict[key]: - index2 = parentDict[key][configMap['index2Name']] - indexNumber = index2Vocabulary[parentDict[key][configMap['index2Name']]].split()[2] + sampleSheetDict = {} + separator = configMap['separator'] - try: - kit = parentDict[key][configMap['kit']] - prefix = kitsDict[kit][0] - except: -# print "Missing Kit on " + str(key) - prefix = "" - - len_index1 = int(flowCellDict['INDEXREAD']) - len_index2 = int(flowCellDict['INDEXREAD2']) - - lane_string ="" - if model in HISEQ_LIST or model in Sequencers.MISEQ: - lane_string = lane + separator - - if int(flowCellDict['INDEXREAD2']) > 0 and index2: - if model in Sequencers.NEXTSEQ_500: - index2_processed = get_reverse_complement(index2[0:len_index2]) - else: - index2_processed = index2 - - sampleSheetDict[lane + '_' + key] = [ - lane_string - + key + separator - + key + '_' + sanitizeString(parentDict[key][configMap['externalSampleName']]) + '_' + index1[0:len_index1] + '_' + index2[0:len_index2] + separator - + separator - + separator - + index1Vocabulary[index1].split()[1] + separator - + index1[0:len_index1] + separator - + prefix + indexNumber + separator - + index2_processed + separator - + key + separator - ] - else: + index_length_dict = verify_index_length(parentDict, flowCellDict, configMap, logger) + print(index_length_dict) + + for key in parentDict.keys(): + lane = parentDict[key]['LANE'][-1:] + # If no index then just skip this sample + if (configMap['index1Name'] not in parentDict[key]) or (parentDict[key][configMap['index1Name']] == 'NOINDEX'): + continue + index1 = parentDict[key][configMap['index1Name']] + index2="" + if configMap['index2Name'] in parentDict[key]: + index2 = parentDict[key][configMap['index2Name']] + indexNumber = index2Vocabulary[parentDict[key][configMap['index2Name']]].split()[2] + + try: + kit = parentDict[key][configMap['kit']] + prefix = kitsDict[kit][0] + except: + # print "Missing Kit on " + str(key) + prefix = "" + + len_index1 = index_length_dict[int(lane)][0] + len_index2 = index_length_dict[int(lane)][1] + + lane_string ="" + if model in HISEQ_LIST or model in Sequencers.MISEQ: + lane_string = lane + separator + + if int(flowCellDict['INDEXREAD2']) > 0 and len_index2 > 0: + if model in Sequencers.NEXTSEQ_500: + index2_processed = get_reverse_complement(index2[0:len_index2]) + else: + index2_processed = index2 + + sampleSheetDict[lane + '_' + key] = [ + lane_string + + key + separator + + key + '_' + sanitize_string(parentDict[key][configMap['externalSampleName']]) + '_' + index1[0:len_index1] + '_' + index2[0:len_index2] + separator + + separator + + separator + + index1Vocabulary[index1].split()[1] + separator + + index1[0:len_index1] + separator + + prefix + indexNumber + separator + + index2_processed + separator + + key + separator + ] + else: sampleSheetDict[lane + '_' + key] = [ - lane_string - + key + separator - + key + '_' + sanitizeString(parentDict[key][configMap['externalSampleName']]) + '_' + index1[0:len_index1] + separator - + separator - + separator - + index1Vocabulary[index1].split()[1] + separator - + index1[0:len_index1] + separator - + key + separator - ] + lane_string + + key + separator + + key + '_' + sanitize_string(parentDict[key][configMap['externalSampleName']]) + '_' + index1[0:len_index1] + separator + + separator + + separator + + index1Vocabulary[index1].split()[1] + separator + + index1[0:len_index1] + separator + + key + separator + ] - csv_file_name = configMap['SampleSheetFileName'] + '_' + flowCellName - ordered_sample_sheet_dict = OrderedDict(sorted(sampleSheetDict.items(), key=lambda t: t[0])) + csv_file_name = configMap['SampleSheetFileName'] + '_' + flowCellName + ordered_sample_sheet_dict = OrderedDict(sorted(sampleSheetDict.items(), key=lambda t: t[0])) - return ordered_sample_sheet_dict, csv_file_name + return ordered_sample_sheet_dict, csv_file_name ''' Main script @@ -664,18 +689,19 @@ def main (): foundFlowCell, containedSamples = get_flowcell(configMap['illuminaFlowCellTypeName'], flowCellName, service, logger) - parentDict, samplesPerLaneDict = get_contained_sample_properties (containedSamples, service) + parentDict, samplesPerLaneDict = get_contained_sample_properties(containedSamples, service) logger.info('Found ' + str(len(parentDict)) + ' samples on the flow cell ' + flowCellName) flowCellName = foundFlowCell.getCode() flowCellDict = transform_sample_to_dict(foundFlowCell) model = get_model(flowCellDict['RUN_NAME_FOLDER']) print("Auto-detected: " + model) + logger.info("Auto-detected: " + model) index1Vocabulary = get_vocabulary(configMap['index1Name'], service) index2Vocabulary = get_vocabulary(configMap['index2Name'], service) ordered_sample_sheet_dict, csv_file_name = create_sample_sheet_dict(model, parentDict, - flowCellDict, configMap, index1Vocabulary, index2Vocabulary, flowCellName) + flowCellDict, configMap, index1Vocabulary, index2Vocabulary, flowCellName, logger) if myoptions.singlelane: write_sample_sheet_single_lane(ordered_sample_sheet_dict, flowCellDict, diff --git a/deep_sequencing_unit/source/Jython/createSampleSheet_bcl2fastq_Test.py b/deep_sequencing_unit/source/Jython/createSampleSheet_bcl2fastq_Test.py new file mode 100644 index 00000000000..a5b84e58005 --- /dev/null +++ b/deep_sequencing_unit/source/Jython/createSampleSheet_bcl2fastq_Test.py @@ -0,0 +1,94 @@ +import unittest +import re +from createSampleSheet_bcl2fastq import * + +class test_sanitize_string(unittest.TestCase): + + + def testDefault(self): + self.assertEqual(sanitize_string('abc#a$v%c^D&P-'), 'abc_a_v_c_D_P_') + + + def testOnlySpecialChars(self): + self.assertEqual(sanitize_string('@#$%^&*('), '_') + + +class test_get_model(unittest.TestCase): + + def test_HiseqX(self): + self.assertEqual(get_model('141121_ST-E00107_0356_AH00C3CCXX'), Sequencers.HISEQ_X) + + def test_expectError(self): + self.assertNotEqual(get_model('150724_J00121_0017_AH2VYMBBXX'), Sequencers.NEXTSEQ_500) + + +class test_get_reverse_complement(unittest.TestCase): + def test_happyCase(self): + self.assertEqual(get_reverse_complement('ACTGAATTTT'), 'AAAATTCAGT', 'Reverse complement is faulty') + + def test_failingCase(self): + self.assertNotEqual(get_reverse_complement('ACTG'), 'CAGA') + + +class test_get_flowCell(unittest.TestCase): + + def setUp(self): + self.myCode = 'C7GMNANXX' + self.logger = setUpLogger('log/') + configDict = readConfig(self.logger) + self.service = OpenbisServiceFacadeFactory.tryCreate(configDict['openbisUserName'], + configDict['openbisPassword'], + configDict['openbisServer'], + configDict['connectionTimeout']) + + self.flowcell, self.containedSamples = get_flowcell('ILLUMINA_FLOW_CELL', + self.myCode, self.service, self.logger) + + + def test_get_flowCell (self): + + self.assertEqual(self.flowcell.getCode(), self.myCode) + self.assertEqual(self.containedSamples.size(), 8) + + fcProp = self.flowcell.getProperties() + self.assertEqual(fcProp['SEQUENCER'], 'D00535') + self.flowCellDict = transform_sample_to_dict(self.flowcell) + self.assertEqual(self.flowCellDict['FLOWCELLTYPE'], 'HiSeq Flow Cell v4') + + + def test_get_contained_sample_properties(self): + self.parentDict, self.samplesPerLaneDict = get_contained_sample_properties( + self.containedSamples, self.service) + self.assertEqual(self.parentDict['BSSE_QGF_34778_C7GMNANXX_1']['BARCODE'], 'GTCCGC') + self.assertEqual(self.parentDict['BSSE_QGF_32285_C7GMNANXX_7']['CONTACT_PERSON_EMAIL'], 'yann.bourgeois@unibas.ch') + self.assertEqual(self.samplesPerLaneDict['2'], 23) + + + def tearDown(self): + self.service.logout() + self.logger.info('Logged out') + + +# class test_get_contained_sample_properties(unittest.TestCase): +# def setUp(self): +# self.flowcell = test_get_flowCell.setUp(self) +# + + + + + +# class test_verify_index_length(test_get_flowCell): +# +# def setUp(self): +# foundFlowCell, containedSamples = test_get_flowCell() +# +# +# def test_verify_index_length(self): +# foundFlowCell + +def main(): + unittest.main() + +if __name__ == '__main__': + main() \ No newline at end of file -- GitLab