Skip to content
Snippets Groups Projects
shared-classes.py 8.47 KiB
"""
Code that is shared between the handlers and validators for different data set types.

Includes a class for reading time series data from Excel, and code for validating BaSynthec data, in particular strains.
"""
import os
import re
import sys
import java.io.File
from java.io import IOException
from java.lang import IllegalArgumentException
from ch.systemsx.cisd.openbis.dss.generic.shared.api.v1.validation import ValidationError, ValidationScriptRunner
from ch.systemsx.cisd.openbis.dss.generic.shared.utils import ExcelFileReader
from ch.systemsx.cisd.common.logging import LogFactory, LogCategory

operationLog = LogFactory.getLogger(LogCategory.OPERATION, ValidationScriptRunner)
OPENBIS_METADATA_SHEET_NAME = "openbis-metadata"
OPENBIS_DATA_SHEET_NAME = "openbis-data"

class TimeSeriesDataExcel:
  """
    An abstraction for accessing time series data following the BaSynthec conventions 
    from an Excel file. This class ported from Java, thus the camelCase naming.
  """
  def __init__(self, file, fileReader):
    self.file = file
    self.fileReader = fileReader
    
  def getRawMetadataLines(self):
    """Get the raw lines of the metadata sheet."""
    try:
      return self.fileReader.readLines(OPENBIS_METADATA_SHEET_NAME);
    except IOException, ex:
      operationLog.error("Could not read data from [file: " + self.file.getPath() + ", sheet: "
          + OPENBIS_METADATA_SHEET_NAME + "]", ex)
    return []

  def getRawDataLines(self):
    """Get the raw lines of the data sheet."""
    try:
      return self.fileReader.readLines(OPENBIS_DATA_SHEET_NAME)
    except IOException, ex:
      operationLog.error("Could not read data from [file: " + file.getPath() + ", sheet: "
          + OPENBIS_DATA_SHEET_NAME + "]", ex)
    return []

  def getMetadataMap(self):
    """
      Return the metadata has a hashmap, with all keys uppercased.
      
      Assumes the metadata sheet corresponds to the following format: [Property] [Value] [... stuff
      that can be ignored], that is the property name is in column 1 and property value is in
      column 2, and everything else can be ignored.
    """
    metadataMap = {}
    metadataLines = self.getRawMetadataLines()
    
    # Skip the first line, this is just the header
    for i in range(1, metadataLines.size()):
      line = metadataLines.get(i)
      value = line[1];
      if "BLANK" == value:
        value = None
      if line[0] is not None:
        metadataMap[line[0].upper()] = value
    return metadataMap
    
def create_time_series_excel(fileName):
  """Factory method for the TimeSeriesData object. Returns None if it cannot be created."""
  file = java.io.File(fileName)
  try:
    workbook = ExcelFileReader.getExcelWorkbook(file)
    fileReader = ExcelFileReader(workbook, True)
    return TimeSeriesDataExcel(file, fileReader)
  except IllegalArgumentException, ex:
    operationLog.error("Could not open file [" + fileName + "] as Excel data.", ex)
  except IOException, ex:
    operationLog.error("Could not open file [" + fileName + "] as Excel data.", ex)
  return None


    
class ValidationHelper:
  """
    Methods for simplifying validation in BaSynthec.
    This class is ported from Java, thus the camelCase naming. 
  """
  def __init__(self, metadataMap, errors):
    self.metadataMap = metadataMap
    self.errors = errors

  def checkIsSpecified(self, property, displayName):
    """Verify that a property is specified; if not, add a validation error to the list."""
    if self.metadataMap.get(property) is None:
      self.errors.append(ValidationError.createFileValidationError("A " + displayName
          + " must be specified."))
      return False
    return True
  
  def validateStrain(self):
    """Verify that the strain is specified and of the correct format"""
    if not self.checkIsSpecified("STRAIN", "strain"):
      return
    strain = self.metadataMap.get("STRAIN")
    if not isStrainIdValid(strain):
      self.errors.append(createFileValidationError("Strain " + strainValidationErrorMessageFragment(strain)))
      
  def validateDefaultHeaderFormat(self):
    """Validate that header format is either not specified or matches default (TIME)"""
    if self.metadataMap.get("HEADER FORMAT") is None:
      return
    format = self.metadataMap.get("HEADER FORMAT")
    expected_format = "TIME"
    if expected_format != format:
      self.errors.append(createFileValidationError("Header format must be " + expected_format + " (not " + format + ")."))
      
  def validateExplicitHeaderFormat(self, expected_format):
    """Validate that header format is specified and matches the expected_format argument"""
    if not self.checkIsSpecified("HEADER FORMAT", "header format"):
      return
    format = self.metadataMap.get("HEADER FORMAT")
    if expected_format != format:
      self.errors.append(createFileValidationError("Header format must be " + expected_format + " (not " + format + ")."))
      
  def validateControlledVocabularyProperty(self, property, displayName, allowedValues, allowedValuesDisplay):
    """Validate that the property is specified and in the list of allowed values"""
    value = self.metadataMap.get(property)
    isControlledVocabularyPropertyValid(value, displayName, allowedValues, allowedValuesDisplay, self.errors)
        
  def validateStartDataRowCol(self):
    if self.checkIsSpecified("START DATA ROW", "Start Data Row"):
      value = self.metadataMap.get("START DATA ROW")
      match = re.match("[0-9]+", value)
      if match is None:
        self.errors.append(createFileValidationError("The Start Data Row must be a number (not " + value + ")."))
    if self.checkIsSpecified("START DATA COL", "Start Data Col"):
      value = self.metadataMap.get("START DATA COL")
      match = re.match("[A-Z]", value)
      if match is None:
        self.errors.append(createFileValidationError("The Start Data Col must be a letter between A and Z (not " + value + ")."))
  
#
# Strain validation stuff
#
strainIdRegexMin = re.compile("^ms|chassis\s*[1-3]|wt 168 trp\+|jjs-mgp254")
strainIdRegexFull = re.compile("^jjs-mgp[0-9]{1,3}|^jjs-din[0-9]{1,3}|^ms|chassis\s*[1-3]|wt 168 trp\+")
strainIds = {}
home_dir = os.environ.get('HOME', '')
if os.path.exists(home_dir + '/var/strainids.txt'):
  for sid in open(home_dir + '/var/strainids.txt').readlines():
    strainIds[sid.strip().lower()] = 1

def _match(regex, strainId):
  match = regex.match(strainId)
  if match is None:
    return False
  return len(match.group(0)) == len(strainId)

def isStrainIdValid(strainId):
  """Return true if the strain id passes validation (has the form specified in the regex and is in Chris' strain db)"""
  strainIdLower = strainId.lower()
  if len(strainIds) > 0:
    return strainIds.has_key(strainIdLower) or _match(strainIdRegexMin, strainIdLower)
  else:
    return _match(strainIdRegexFull, strainIdLower)
  
def strainValidationErrorMessageFragment(strain):
    """Return a sentence fragment describing the strain validation error."""
    return "must be either JJS-MGP[0-999], JJS-DIN[0-999], MS, CHASSIS [1-3], or WT 168 TRP+ (instead of " + strain + ")."

def isControlledVocabularyPropertyValid(value, displayName, allowedValues, allowedValuesDisplay, errors):
    """Validate that the property is specified and in the list of allowed values"""
    if value is None:
      errors.append(ValidationError.createFileValidationError("A " + displayName + " must be specified."))
      return False
    value = value.upper()
    if value not in allowedValues:
      if len(allowedValues) > 1:
        errors.append(createFileValidationError("The " + displayName + " must be one of " + allowedValuesDisplay + " (not " + value + ")."))
        return False
      else:
        errors.append(createFileValidationError("The " + displayName + " must be " + allowedValuesDisplay + " (not " + value + ")."))
        return False
    return True
  
def getInitialDataRowAndCol(metadata):
  """Extract the initial row and column as specified in the metadata. Returns an array with [row, col]."""
  # get the raw value from the map
  first_data_row = metadata.get("START DATA ROW")
  first_data_col = metadata.get("START DATA COL")

  # convert the row numeric string to an int
  if first_data_row is None:
    first_data_row = 0
  else:
    try:
      first_data_row = int(float(first_data_row)) - 1
    except:
      first_data_row = 0

  # convert the column spreadsheet value to an int
  if first_data_col is None:
    first_data_col = 0
  else:
    # columns start at A
    try:
      first_data_col = ord(first_data_col) - ord('A')
    except:
      first_data_cal = 0
  return [first_data_row, first_data_col]