Skip to content
Snippets Groups Projects
Commit 81900bfc authored by Adam Laskowski's avatar Adam Laskowski
Browse files

BIS-713: Implementation of Python API

parent 490f75be
No related branches found
No related tags found
1 merge request!40SSDM-13578 : 2PT : Database and V3 Implementation - include the new AFS "free"...
......@@ -12,11 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
name = "premise"
from . import imaging
name = "imaging"
__author__ = "ID SIS • ETH Zürich"
__email__ = "openbis-support@id.ethz.ch"
__version__ = "0.0.0"
from . import imaging
# from .pybis import DataSet
# from .pybis import Openbis
\ No newline at end of file
......@@ -15,15 +15,16 @@
from pybis import Openbis
import abc
import time
import json
import base64
import numpy as np
import sys
from urllib.parse import urljoin, urlparse
import requests
import os
from urllib.parse import urljoin
SERVICE_NAME = "imaging"
DEFAULT_SERVICE_NAME = "imaging"
IMAGING_CONFIG_PROP_NAME = "$IMAGING_DATA_CONFIG".lower()
def get_instance(url="http://localhost:8888/openbis"):
base_url = url
......@@ -40,36 +41,22 @@ def get_instance(url="http://localhost:8888/openbis"):
return openbis_instance
def execute_custom_dss_service(openbis, code, parameters):
service_id = {
"@type": "dss.dto.service.id.CustomDssServiceCode",
"permId": code
}
options = {
"@type": "dss.dto.service.CustomDSSServiceExecutionOptions",
"parameters": parameters
}
request = {
"method": "executeCustomDSSService",
"params": [
openbis.token,
service_id,
options
],
}
full_url = urljoin(openbis._get_dss_url(), openbis.dss_v3)
return openbis._post_request_full_url(full_url, request)
class AbstractImagingRequest(metaclass=abc.ABCMeta):
class AbstractImagingClass(metaclass=abc.ABCMeta):
def to_json(self):
return json.dumps(self, default=lambda x: x.__dict__, sort_keys=True, indent=4)
def __str__(self):
return json.dumps(self.__dict__, default=lambda x: x.__dict__)
def __repr__(self):
return json.dumps(self.__dict__, default=lambda x: x.__dict__)
class AbstractImagingRequest(AbstractImagingClass, metaclass=abc.ABCMeta):
@abc.abstractmethod
def _validate_data(self):
return
def __str__(self):
return json.dumps(self.__dict__)
class ImagingDataSetPreview(AbstractImagingRequest):
config: dict
......@@ -95,6 +82,16 @@ class ImagingDataSetPreview(AbstractImagingRequest):
with open(file_path, "wb") as fh:
fh.write(base64.decodebytes(img_data))
@classmethod
def from_dict(cls, data):
if data is None:
return None
preview = cls(None, None, None)
for prop in cls.__annotations__.keys():
attribute = data.get(prop)
preview.__dict__[prop] = attribute
return preview
class ImagingDataSetExport(AbstractImagingRequest):
config: dict
......@@ -138,17 +135,31 @@ class ImagingDataSetMultiExport(AbstractImagingRequest):
f"export->config->{key}: Must not be None!"
class ImagingDataSetControlVisibility:
class ImagingDataSetControlVisibility(AbstractImagingClass):
label: str
values: list[str]
range: list[str]
unit: str
def __init__(self):
def __init__(self, label: str, values: list[str], values_range: list[str], unit: str = None):
self.__dict__["@type"] = "dss.dto.imaging.ImagingDataSetControlVisibility"
class ImagingDataSetControl:
self.label = label
self.values = values
self.range = values_range
self.unit = unit
@classmethod
def from_dict(cls, data):
if data is None:
return None
control = cls(None, None, None, None)
for prop in cls.__annotations__.keys():
attribute = data.get(prop)
control.__dict__[prop] = attribute
return control
class ImagingDataSetControl(AbstractImagingClass):
label: str
section: str
type: str
......@@ -156,16 +167,36 @@ class ImagingDataSetControl:
unit: str
range: list[str]
multiselect: bool
playable: bool | None
playable: bool
speeds: list[int]
visibility: list[ImagingDataSetControlVisibility]
metaData: dict
def __init__(self):
def __init__(self, label: str, control_type: str, values: list[str] = None,
values_range: list[str] = None, multiselect: bool = None):
self.__dict__["@type"] = "dss.dto.imaging.ImagingDataSetControl"
class ImagingDataSetConfig:
self.label = label
self.type = control_type
if control_type.lower() in ["slider", "range"]:
self.range = values_range
elif control_type.lower() == "dropdown":
self.values = values
self.multiselect = multiselect
@classmethod
def from_dict(cls, data):
if data is None:
return None
control = cls(None, "", None, None)
for prop in cls.__annotations__.keys():
attribute = data.get(prop)
if prop == 'visibility' and attribute is not None:
attribute = [ImagingDataSetControlVisibility.from_dict(visibility) for visibility in attribute]
control.__dict__[prop] = attribute
return control
class ImagingDataSetConfig(AbstractImagingClass):
adaptor: str
version: float
speeds: list[int]
......@@ -175,11 +206,34 @@ class ImagingDataSetConfig:
inputs: list[ImagingDataSetControl]
metadata: dict
def __init__(self):
def __init__(self, adaptor: str, version: float, resolutions: list[str], playable: bool,
speeds: list[int] = None, exports: list[ImagingDataSetControl] = None,
inputs: list[ImagingDataSetControl] = None, metadata: dict = None):
self.__dict__["@type"] = "dss.dto.imaging.ImagingDataSetConfig"
class ImagingDataSetImage:
self.adaptor = adaptor
self.version = version
self.resolutions = resolutions
self.playable = playable
if playable:
self.speeds = speeds
self.exports = exports
self.inputs = inputs
self.metadata = metadata
@classmethod
def from_dict(cls, data):
if data is None:
return None
config = cls(None, None, None, None)
for prop in cls.__annotations__.keys():
attribute = data.get(prop)
if prop in ['exports', 'inputs'] and attribute is not None:
attribute = [ImagingDataSetControl.from_dict(control) for control in attribute]
config.__dict__[prop] = attribute
return config
class ImagingDataSetImage(AbstractImagingClass):
previews: list[ImagingDataSetPreview]
config: dict
metadata: dict
......@@ -193,8 +247,20 @@ class ImagingDataSetImage:
def add_preview(self, preview):
self.previews += [preview]
@classmethod
def from_dict(cls, data):
if data is None:
return None
image = cls(None, None, None)
for prop in cls.__annotations__.keys():
attribute = data.get(prop)
if prop == 'images' and attribute is not None:
attribute = [ImagingDataSetPreview.from_dict(image) for image in attribute]
image.__dict__[prop] = attribute
return image
class ImagingDataSetPropertyConfig:
class ImagingDataSetPropertyConfig(AbstractImagingClass):
config: ImagingDataSetConfig
images: list[ImagingDataSetImage]
......@@ -204,88 +270,179 @@ class ImagingDataSetPropertyConfig:
self.config = config
self.images = images if images is not None else []
@classmethod
def from_dict(cls, data):
config = ImagingDataSetConfig.from_dict(data.get('config'))
attr = data.get('images')
images = [ImagingDataSetImage.from_dict(image) for image in attr] if attr is not None else None
return cls(config, images)
def add_image(self, image):
if self.images is None:
self.images = []
self.images += [image]
def get_preview(perm_id: str, index: int, preview: ImagingDataSetPreview) -> ImagingDataSetPreview:
parameters = {
"type": "preview",
"permId": perm_id,
"index": index,
"error": None,
"preview": preview.__dict__
}
service_response = execute_custom_dss_service(o, SERVICE_NAME, parameters)
if service_response['error'] is None:
preview.__dict__ = service_response["preview"]
return preview
else:
raise ValueError(service_response['error'])
def get_export(perm_id: str, index: int, export: ImagingDataSetExport) -> str:
parameters = {
"type": "export",
"permId": perm_id,
"index": index,
"error": None,
"url": None,
"export": export.__dict__
}
service_response = execute_custom_dss_service(o, SERVICE_NAME, parameters)
if service_response['error'] is None:
return service_response['url']
else:
raise ValueError(service_response['error'])
def get_multi_export(exports: list[ImagingDataSetMultiExport]) -> str:
parameters = {
"type": "multi-export",
"error": None,
"url": None,
"exports": [export.__dict__ for export in exports]
}
service_response = execute_custom_dss_service(o, SERVICE_NAME, parameters)
if service_response['error'] is None:
return service_response['url']
else:
raise ValueError(service_response['error'])
o = get_instance()
config_sxm_preview = {
"channel": "z", # usually one of these: ['z', 'I', 'dIdV', 'dIdV_Y']
"x-axis": [0.0, 3.0], # file dependent
"y-axis": [0.0, 3.0], # file dependent
"color-scale": [-700.0, 700.0], # file dependend
"colormap": "cividis", # [gray, YlOrBr, viridis, cividis, inferno, rainbow, Spectral, RdBu, RdGy]
"scaling": "linear", # ['linear', 'logarithmic']
# "mode": 3 # uncomment this if you want to generate random pixel image generation
}
# imaging_preview = ImagingDataSetPreview(preview_format="png", config=config_sxm)
# response = get_preview('20231110130838616-26', 0, imaging_preview)
# print(response)
config_export = {
"include": ['image', 'raw data'],
"image-format": 'original',
"archive-format": "zip",
"resolution": "original"
}
class ImagingControl:
def __init__(self, openbis_instance, service_name=DEFAULT_SERVICE_NAME):
self._openbis = openbis_instance
self._service_name = service_name
def _execute_custom_dss_service(self, parameters):
service_id = {
"@type": "dss.dto.service.id.CustomDssServiceCode",
"permId": self._service_name
}
options = {
"@type": "dss.dto.service.CustomDSSServiceExecutionOptions",
"parameters": parameters
}
request = {
"method": "executeCustomDSSService",
"params": [
self._openbis.token,
service_id,
options
],
}
full_url = urljoin(self._openbis._get_dss_url(), self._openbis.dss_v3)
return self._openbis._post_request_full_url(full_url, request)
def make_preview(self, perm_id: str, index: int, preview: ImagingDataSetPreview) -> ImagingDataSetPreview:
parameters = {
"type": "preview",
"permId": perm_id,
"index": index,
"error": None,
"preview": preview.__dict__
}
service_response = self._execute_custom_dss_service(parameters)
if service_response['error'] is None:
preview.__dict__ = service_response["preview"]
return preview
else:
raise ValueError(service_response['error'])
def get_export_url(self, perm_id: str, export: ImagingDataSetExport, image_index: int = 0) -> str:
parameters = {
"type": "export",
"permId": perm_id,
"index": image_index,
"error": None,
"url": None,
"export": export.__dict__
}
service_response = self._execute_custom_dss_service(parameters)
if service_response['error'] is None:
return service_response['url']
else:
raise ValueError(service_response['error'])
def get_multi_export_url(self, exports: list[ImagingDataSetMultiExport]) -> str:
parameters = {
"type": "multi-export",
"error": None,
"url": None,
"exports": [export.__dict__ for export in exports]
}
service_response = self._execute_custom_dss_service(parameters)
if service_response['error'] is None:
return service_response['url']
else:
raise ValueError(service_response['error'])
def single_export_download(self, perm_id: str, export: ImagingDataSetExport, image_index: int = 0, directory_path=""):
export_url = self.get_export_url(perm_id, export, image_index)
self._download(export_url, directory_path)
def multi_export_download(self, exports: list[ImagingDataSetMultiExport], directory_path=""):
export_url = self.get_multi_export_url(exports)
self._download(export_url, directory_path)
def _download(self, url, directory_path=""):
get_response = requests.get(url, stream=True)
file_name = url.split("/")[-1]
with open(os.path.join(directory_path, file_name), 'wb') as f:
for chunk in get_response.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
def get_property_config(self, perm_id: str) -> ImagingDataSetPropertyConfig:
dataset = self._openbis.get_dataset(perm_id)
imaging_property = json.loads(dataset.props[IMAGING_CONFIG_PROP_NAME])
return ImagingDataSetPropertyConfig.from_dict(imaging_property)
def update_property_config(self, perm_id: str, config: ImagingDataSetPropertyConfig):
dataset = self._openbis.get_dataset(perm_id)
dataset.props[IMAGING_CONFIG_PROP_NAME] = config.to_json()
dataset.save()
# o = get_instance()
#
#
# # imaging_preview = ImagingDataSetPreview(preview_format="png", config=config_sxm)
# # response = get_preview('20231110130838616-26', 0, imaging_preview)
# # print(response)
#
# config_export = {
# "include": ['image', 'raw data'],
# "image-format": 'original',
# "archive-format": "zip",
# "resolution": "original"
# }
# # imaging_export = ImagingDataSetExport(config_export)
# # export_response = get_export('20231110130838616-26', 0, imaging_export)
# # print(export_response)
#
#
# # imaging_export1 = ImagingDataSetMultiExport('20231110130838616-26', 0, config_export)
# # imaging_export2 = ImagingDataSetMultiExport('20231110134813653-27', 0, config_export)
# # multi_export_response = get_multi_export([imaging_export1, imaging_export2])
# # print(multi_export_response)
#
#
# # imaging_property_config = get_property_config('20231110134813653-27')
# # print(imaging_property_config.to_json())
#
# ic = ImagingControl(o)
# perm_id = '20231110130838616-26'
# pc = ic.get_property_config(perm_id)
#
#
#
# config_sxm_preview = {
# "channel": "z", # usually one of these: ['z', 'I', 'dIdV', 'dIdV_Y']
# "x-axis": [1.2, 3.0], # file dependent
# "y-axis": [1.2, 3.0], # file dependent
# "color-scale": [-700.0, 700.0], # file dependend
# "colormap": "gray", # [gray, YlOrBr, viridis, cividis, inferno, rainbow, Spectral, RdBu, RdGy]
# "scaling": "linear", # ['linear', 'logarithmic']
# # "mode": 3 # uncomment this if you want to generate random pixel image generation
# }
#
# # imaging_preview = ImagingDataSetPreview(preview_format="png", config=config_sxm_preview)
# #
# # preview = ic.make_preview(perm_id, 0, imaging_preview)
# # pc.images[0].add_preview(preview)
# # ic.update_property_config(perm_id, pc)
# #
# # print(ic.get_property_config(perm_id))
#
#
#
# config_export = {
# "include": ['image', 'raw data'],
# "image-format": 'original',
# "archive-format": "zip",
# "resolution": "original"
# }
# imaging_export = ImagingDataSetExport(config_export)
# export_response = get_export('20231110130838616-26', 0, imaging_export)
# print(export_response)
# imaging_export1 = ImagingDataSetMultiExport('20231110130838616-26', 0, config_export)
# imaging_export2 = ImagingDataSetMultiExport('20231110134813653-27', 0, config_export)
# multi_export_response = get_multi_export([imaging_export1, imaging_export2])
# print(multi_export_response)
# ic.single_export_download(perm_id, imaging_export, 0, '/home/alaskowski/PREMISE')
......
# Copyright ETH 2023 Zürich, Scientific IT Services
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import unittest
import imaging.imaging as imaging
from unittest.mock import MagicMock
import json
IMAGING_CONFIG_PROP_NAME = "$IMAGING_DATA_CONFIG".lower()
PROPER_CONFIG = {
"config": {
"adaptor": "some.adaptor.class.MyAdaptorClass",
"version": 1.0,
"speeds": [1, 2, 5],
"resolutions": ["150dpi", "300dpi"],
"playable": True,
"exports": [{
"label": "Archive",
"type": "Dropdown",
"values": ["zip", "tar.gz"],
"multiselect": False,
"metaData": {}
}],
"inputs": [{
"label": "Option 1",
"section": "section",
"type": "Dropdown",
"values": ["a", "b", "c"],
"multiselect": True,
"playable": False,
"metaData": {}
}, {
"label": "Option 2",
"section": "section",
"type": "Slider",
"unit": "cm",
"range": ["1", "2", "0.1"],
"multiselect": False,
"playable": True,
"speeds": [1, 2, 5],
"metaData": {}
}, {
"label": "Option 2",
"section": "section",
"type": "Range",
"multiselect": False,
"playable": False,
"visibility": [{
"label": "Option 1",
"values": ["a", "b"],
"range": ["1", "2", "0.1"],
"unit": "mm"
}, {
"label": "Option 1",
"values": ["c"],
"range": ["0", "100", "1"],
"unit": "px"
}],
"metaData": {}
}],
"metadata": {}
},
"images": [{
"previews": [{
"config": {},
"format": "png",
"bytes": "base64_encoded_bytes",
"show": False,
"metadata": {}
}],
"config": {},
"metadata": {}
}]
}
class ImagingTestCase(unittest.TestCase):
def setUp(self):
self.dataset_mock = MagicMock()
self.set_dataset_config(PROPER_CONFIG)
self.openbis_mock = MagicMock()
self.openbis_mock.get_dataset.return_value = self.dataset_mock
self.imaging_control = imaging.ImagingControl(self.openbis_mock)
def set_dataset_config(self, config):
json_config = json.dumps(config)
self.dataset_mock.props = {IMAGING_CONFIG_PROP_NAME: json_config}
def test_get_property_config(self):
self.imaging_control.get_property_config('some_perm_id')
if __name__ == '__main__':
unittest.main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment