Skip to content
Snippets Groups Projects
Commit 1f529c8c authored by kohleman's avatar kohleman
Browse files

- added first tests

- re-written the barcode length detection

SVN: 34664
parent c5013f4d
No related branches found
No related tags found
No related merge requests found
......@@ -70,6 +70,7 @@ lineending = {'win32':'\r\n', 'linux':'\n', 'mac':'\r'}
COMMA = ','
CSV = ".csv"
class Sequencers:
HISEQ_4000, HISEQ_3000, HISEQ_2500, HISEQ_2000, HISEQ_X, NEXTSEQ_500, MISEQ , UNIDENTIFIED= \
('Illumina HiSeq 4000','Illumina HiSeq 3000','Illumina HiSeq 2500','Illumina HiSeq 2000',
......@@ -90,6 +91,7 @@ def logout (service, logger):
service.logout()
logger.info('Logged out')
def setUpLogger(logPath, logLevel=logging.INFO):
logFileName = 'create_sample_sheet_dict'
d = datetime.now()
......@@ -99,6 +101,7 @@ def setUpLogger(logPath, logLevel=logging.INFO):
logger = logging.getLogger(logFileName)
return logger
def parseOptions(logger):
logger.info('Parsing command line parameters')
parser = OptionParser(version='%prog 1.0')
......@@ -141,7 +144,6 @@ def parseOptions(logger):
action='store_true',
help='Write Sample Sheet to stout. Default: False')
(options, args) = parser.parse_args()
if options.outdir[-1] <> '/':
......@@ -229,7 +231,7 @@ def getDate():
return d.strftime('%A, %d of %B %Y')
def sanitizeString(myString):
def sanitize_string(myString):
return re.sub('[^A-Za-z0-9]+', '_', myString)
......@@ -303,6 +305,15 @@ def get_flowcell (illuminaFlowCellTypeName, flowCellName, service, logger):
return foundSample[0], foundContainedSamples
def get_reverse_complement(sequence):
lookup_table = {'A': 'T', 'T': 'A', 'G': 'C', 'C': 'G'}
reverse_complement = ''
for nucleotide in reversed(sequence):
reverse_complement += lookup_table[nucleotide]
return reverse_complement
def get_model(run_id):
"""
Guesses the sequencer model from the run folder name
......@@ -392,7 +403,7 @@ def get_contained_sample_properties(contained_samples, service):
propertyDict['LANE'] = lane.getCode()
myKey = sanitizeString(parentCode + '_' + lane.getCode())
myKey = sanitize_string(parentCode + '_' + lane.getCode())
parentDict[myKey] = propertyDict
return parentDict, samplesPerLaneDict
......@@ -412,18 +423,6 @@ def transform_sample_to_dict(foundFlowCell):
return flowCellDict
def pickleDemultiplexCommandList(logger, demultiplexCommandList, fileName):
import pickle
try:
with open(fileName, 'w') as pickleDemux:
pickle.dump(demultiplexCommandList, pickleDemux)
logger.info('Writing file ' + fileName)
except IOError, err:
logger.error('File error: ' + str(err))
print ('File error: ' + str(err))
def write_sample_sheet(sampleSheetDict, headerList, myoptions, logger, fileName):
"""
Writes the given dictionary to a csv file. The order does not matter. As the
......@@ -440,13 +439,10 @@ def write_sample_sheet(sampleSheetDict, headerList, myoptions, logger, fileName)
if myoptions.verbose:
print sampleSheetDict[sample][0]
sampleSheetFile.write(sampleSheetDict[sample][0] + newline)
logger.info('Writing file ' + fileName)
except IOError:
logger.error('File error: ' + str(err))
print ('File error: ' + str(err))
return fileName
......@@ -454,35 +450,15 @@ def write_sample_sheet_single_lane(ordered_sample_sheet_dict, flowCellDict,
parentDict, configMap, myoptions, logger, csv_file):
newline = lineending[myoptions.lineending]
header_list = create_header_section (configMap, parentDict, flowCellDict)
for lane in range(1, int(flowCellDict[configMap['laneCount']]) + 1):
per_lane_dict = [ordered_sample_sheet_dict[key] for key in ordered_sample_sheet_dict.keys() if int(key[0]) == lane]
csv_file_path = myoptions.outdir + csv_file + "_" + str(lane) + CSV
try:
with open(csv_file_path, 'wb') as sample_sheet_file:
per_lane_dict = [ordered_sample_sheet_dict[key] for key in ordered_sample_sheet_dict.keys() if int(key[0]) == lane]
print(str(lane))
index1_set = set ()
index2_set = set ()
for line in per_lane_dict:
split = line[0].split(",")
if (len(split[6])):
index1_set.add(len(split[6]))
if len(split[8]) > 0:
index2_set.add(len(split[8]))
print(index1_set)
print(index2_set)
with open(csv_file_path, 'wb') as sample_sheet_file:
for header_element in header_list:
sample_sheet_file.write(header_element + newline)
# per_lane_dict = [ordered_sample_sheet_dict[key] for key in ordered_sample_sheet_dict.keys() if int(key[0]) == lane]
for sample in per_lane_dict:
sample_sheet_file.write(str(sample[0]) + newline)
except IOError:
......@@ -490,14 +466,6 @@ def write_sample_sheet_single_lane(ordered_sample_sheet_dict, flowCellDict,
print ('File error: ' + str(err))
def get_reverse_complement(sequence):
lookup_table = {'A': 'T', 'T': 'A', 'G': 'C', 'C': 'G'}
reverse_complement = ''
for nucleotide in reversed(sequence):
reverse_complement += lookup_table[nucleotide]
return reverse_complement
def create_header_section (configMap, parentDict, flowCellDict):
kitsDict = {"CHIP_SEQ_SAMPLE_PREP" : ["",""],
......@@ -578,71 +546,128 @@ def create_header_section (configMap, parentDict, flowCellDict):
return header_list
def verify_index_length (parentDict, flowCellDict, configMap, logger):
index_length_dict = {}
verified_per_lane_dict = []
flowcell_len_index1 = int(flowCellDict['INDEXREAD'])
flowcell_len_index2 = int(flowCellDict['INDEXREAD2'])
print("Flowcell has index length [" + str(flowcell_len_index1) + ", " + str(flowcell_len_index2) + "]")
for lane in range(1,int(flowCellDict['LANECOUNT'])+1):
index1_set = set ()
index2_set = set ()
index1_length = 0
index2_length = 0
logger.info("Lane: " + str(lane))
per_lane_list = [parentDict[key] for key in parentDict.keys() if int(key[-1]) == lane]
for sample in per_lane_list:
# If no index then just skip this sample
if (configMap['index1Name'] not in sample) or (sample[configMap['index1Name']] == 'NOINDEX'):
continue
index1 = sample[configMap['index1Name']]
index2=""
if configMap['index2Name'] in sample:
index2 = sample[configMap['index2Name']]
index1_set.add(len(index1))
if index2:
index2_set.add(len(index2))
else:
index2_set.add(0)
# adding the index length of the flow cell to make sure that dual-indexed
# samples also work on a single-indexed run
index1_set.add(flowcell_len_index1)
index2_set.add(flowcell_len_index2)
if index1_set:
index1_length = min(index1_set)
if index2_set:
index2_length = min(index2_set)
index_length_dict[lane] = [index1_length, index2_length]
logger.info("Index1 Length Set: " + str(index1_set))
logger.info("Index2 Length Set: " + str(index2_set))
logger.info("Final length of index1 " + str(index1_length))
logger.info("Final length of index2 " + str(index2_length))
#print("Lane " + str(lane) + " [" + str(index1_length) + "," + str(index2_length) + "]")
return index_length_dict
def create_sample_sheet_dict(model, parentDict, flowCellDict, configMap, index1Vocabulary, index2Vocabulary, flowCellName):
sampleSheetDict = {}
separator = configMap['separator']
def create_sample_sheet_dict(model, parentDict, flowCellDict, configMap, index1Vocabulary,
index2Vocabulary, flowCellName, logger):
for key in parentDict.keys():
lane = parentDict[key]['LANE'][-1:]
# If no index then just skip this sample
if (configMap['index1Name'] not in parentDict[key]) or (parentDict[key][configMap['index1Name']] == 'NOINDEX'):
continue
index1 = parentDict[key][configMap['index1Name']]
index2=""
if configMap['index2Name'] in parentDict[key]:
index2 = parentDict[key][configMap['index2Name']]
indexNumber = index2Vocabulary[parentDict[key][configMap['index2Name']]].split()[2]
sampleSheetDict = {}
separator = configMap['separator']
try:
kit = parentDict[key][configMap['kit']]
prefix = kitsDict[kit][0]
except:
# print "Missing Kit on " + str(key)
prefix = ""
len_index1 = int(flowCellDict['INDEXREAD'])
len_index2 = int(flowCellDict['INDEXREAD2'])
lane_string =""
if model in HISEQ_LIST or model in Sequencers.MISEQ:
lane_string = lane + separator
if int(flowCellDict['INDEXREAD2']) > 0 and index2:
if model in Sequencers.NEXTSEQ_500:
index2_processed = get_reverse_complement(index2[0:len_index2])
else:
index2_processed = index2
sampleSheetDict[lane + '_' + key] = [
lane_string
+ key + separator
+ key + '_' + sanitizeString(parentDict[key][configMap['externalSampleName']]) + '_' + index1[0:len_index1] + '_' + index2[0:len_index2] + separator
+ separator
+ separator
+ index1Vocabulary[index1].split()[1] + separator
+ index1[0:len_index1] + separator
+ prefix + indexNumber + separator
+ index2_processed + separator
+ key + separator
]
else:
index_length_dict = verify_index_length(parentDict, flowCellDict, configMap, logger)
print(index_length_dict)
for key in parentDict.keys():
lane = parentDict[key]['LANE'][-1:]
# If no index then just skip this sample
if (configMap['index1Name'] not in parentDict[key]) or (parentDict[key][configMap['index1Name']] == 'NOINDEX'):
continue
index1 = parentDict[key][configMap['index1Name']]
index2=""
if configMap['index2Name'] in parentDict[key]:
index2 = parentDict[key][configMap['index2Name']]
indexNumber = index2Vocabulary[parentDict[key][configMap['index2Name']]].split()[2]
try:
kit = parentDict[key][configMap['kit']]
prefix = kitsDict[kit][0]
except:
# print "Missing Kit on " + str(key)
prefix = ""
len_index1 = index_length_dict[int(lane)][0]
len_index2 = index_length_dict[int(lane)][1]
lane_string =""
if model in HISEQ_LIST or model in Sequencers.MISEQ:
lane_string = lane + separator
if int(flowCellDict['INDEXREAD2']) > 0 and len_index2 > 0:
if model in Sequencers.NEXTSEQ_500:
index2_processed = get_reverse_complement(index2[0:len_index2])
else:
index2_processed = index2
sampleSheetDict[lane + '_' + key] = [
lane_string
+ key + separator
+ key + '_' + sanitize_string(parentDict[key][configMap['externalSampleName']]) + '_' + index1[0:len_index1] + '_' + index2[0:len_index2] + separator
+ separator
+ separator
+ index1Vocabulary[index1].split()[1] + separator
+ index1[0:len_index1] + separator
+ prefix + indexNumber + separator
+ index2_processed + separator
+ key + separator
]
else:
sampleSheetDict[lane + '_' + key] = [
lane_string
+ key + separator
+ key + '_' + sanitizeString(parentDict[key][configMap['externalSampleName']]) + '_' + index1[0:len_index1] + separator
+ separator
+ separator
+ index1Vocabulary[index1].split()[1] + separator
+ index1[0:len_index1] + separator
+ key + separator
]
lane_string
+ key + separator
+ key + '_' + sanitize_string(parentDict[key][configMap['externalSampleName']]) + '_' + index1[0:len_index1] + separator
+ separator
+ separator
+ index1Vocabulary[index1].split()[1] + separator
+ index1[0:len_index1] + separator
+ key + separator
]
csv_file_name = configMap['SampleSheetFileName'] + '_' + flowCellName
ordered_sample_sheet_dict = OrderedDict(sorted(sampleSheetDict.items(), key=lambda t: t[0]))
csv_file_name = configMap['SampleSheetFileName'] + '_' + flowCellName
ordered_sample_sheet_dict = OrderedDict(sorted(sampleSheetDict.items(), key=lambda t: t[0]))
return ordered_sample_sheet_dict, csv_file_name
return ordered_sample_sheet_dict, csv_file_name
'''
Main script
......@@ -664,18 +689,19 @@ def main ():
foundFlowCell, containedSamples = get_flowcell(configMap['illuminaFlowCellTypeName'], flowCellName,
service, logger)
parentDict, samplesPerLaneDict = get_contained_sample_properties (containedSamples, service)
parentDict, samplesPerLaneDict = get_contained_sample_properties(containedSamples, service)
logger.info('Found ' + str(len(parentDict)) + ' samples on the flow cell ' + flowCellName)
flowCellName = foundFlowCell.getCode()
flowCellDict = transform_sample_to_dict(foundFlowCell)
model = get_model(flowCellDict['RUN_NAME_FOLDER'])
print("Auto-detected: " + model)
logger.info("Auto-detected: " + model)
index1Vocabulary = get_vocabulary(configMap['index1Name'], service)
index2Vocabulary = get_vocabulary(configMap['index2Name'], service)
ordered_sample_sheet_dict, csv_file_name = create_sample_sheet_dict(model, parentDict,
flowCellDict, configMap, index1Vocabulary, index2Vocabulary, flowCellName)
flowCellDict, configMap, index1Vocabulary, index2Vocabulary, flowCellName, logger)
if myoptions.singlelane:
write_sample_sheet_single_lane(ordered_sample_sheet_dict, flowCellDict,
......
import unittest
import re
from createSampleSheet_bcl2fastq import *
class test_sanitize_string(unittest.TestCase):
def testDefault(self):
self.assertEqual(sanitize_string('abc#a$v%c^D&P-'), 'abc_a_v_c_D_P_')
def testOnlySpecialChars(self):
self.assertEqual(sanitize_string('@#$%^&*('), '_')
class test_get_model(unittest.TestCase):
def test_HiseqX(self):
self.assertEqual(get_model('141121_ST-E00107_0356_AH00C3CCXX'), Sequencers.HISEQ_X)
def test_expectError(self):
self.assertNotEqual(get_model('150724_J00121_0017_AH2VYMBBXX'), Sequencers.NEXTSEQ_500)
class test_get_reverse_complement(unittest.TestCase):
def test_happyCase(self):
self.assertEqual(get_reverse_complement('ACTGAATTTT'), 'AAAATTCAGT', 'Reverse complement is faulty')
def test_failingCase(self):
self.assertNotEqual(get_reverse_complement('ACTG'), 'CAGA')
class test_get_flowCell(unittest.TestCase):
def setUp(self):
self.myCode = 'C7GMNANXX'
self.logger = setUpLogger('log/')
configDict = readConfig(self.logger)
self.service = OpenbisServiceFacadeFactory.tryCreate(configDict['openbisUserName'],
configDict['openbisPassword'],
configDict['openbisServer'],
configDict['connectionTimeout'])
self.flowcell, self.containedSamples = get_flowcell('ILLUMINA_FLOW_CELL',
self.myCode, self.service, self.logger)
def test_get_flowCell (self):
self.assertEqual(self.flowcell.getCode(), self.myCode)
self.assertEqual(self.containedSamples.size(), 8)
fcProp = self.flowcell.getProperties()
self.assertEqual(fcProp['SEQUENCER'], 'D00535')
self.flowCellDict = transform_sample_to_dict(self.flowcell)
self.assertEqual(self.flowCellDict['FLOWCELLTYPE'], 'HiSeq Flow Cell v4')
def test_get_contained_sample_properties(self):
self.parentDict, self.samplesPerLaneDict = get_contained_sample_properties(
self.containedSamples, self.service)
self.assertEqual(self.parentDict['BSSE_QGF_34778_C7GMNANXX_1']['BARCODE'], 'GTCCGC')
self.assertEqual(self.parentDict['BSSE_QGF_32285_C7GMNANXX_7']['CONTACT_PERSON_EMAIL'], 'yann.bourgeois@unibas.ch')
self.assertEqual(self.samplesPerLaneDict['2'], 23)
def tearDown(self):
self.service.logout()
self.logger.info('Logged out')
# class test_get_contained_sample_properties(unittest.TestCase):
# def setUp(self):
# self.flowcell = test_get_flowCell.setUp(self)
#
# class test_verify_index_length(test_get_flowCell):
#
# def setUp(self):
# foundFlowCell, containedSamples = test_get_flowCell()
#
#
# def test_verify_index_length(self):
# foundFlowCell
def main():
unittest.main()
if __name__ == '__main__':
main()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment