Newer
Older
Swen Vermeul
committed
print(json.dumps(request))
raise ValueError(resp['error']['message'])
else:
raise ValueError('request did not return either result nor error')
else:
raise ValueError('general error while performing post request')
""" Log out of openBIS. After logout, the session token is no longer valid.
if self.token is None:
return
"method": "logout",
"params": [self.token],
resp = self._post_request(self.as_v3, logout_request)
Swen Vermeul
committed
self.token = None
self.token_path = None
Swen Vermeul
committed
def login(self, username=None, password=None, save_token=False):
"""Log into openBIS.
Expects a username and a password and updates the token (session-ID).
The token is then used for every request.
Chandrasekhar Ramakrishnan
committed
Clients may want to store the credentials object in a credentials store after successful login.
Throw a ValueError with the error message if login failed.
"""
if password is None:
import getpass
password = getpass.getpass()
"method": "login",
"params": [username, password],
result = self._post_request(self.as_v3, login_request)
if result is None:
raise ValueError("login to openBIS failed")
else:
self.token = result
Swen Vermeul
committed
if save_token:
self._save_token()
self._password(password)
Swen Vermeul
committed
# update the OPENBIS_TOKEN environment variable, if OPENBIS_URL is identical to self.url
if os.environ.get('OPENBIS_URL') == self.url:
os.environ['OPENBIS_TOKEN'] = self.token
Swen Vermeul
committed
return self.token
def _password(self, password=None, pstore={} ):
"""An elegant way to store passwords which are used later
without giving the user an easy possibility to retrieve it.
"""
import inspect
allowed_methods = ['mount']
if password is not None:
pstore['password'] = password
else:
if inspect.stack()[1][3] in allowed_methods:
return pstore.get('password')
else:
raise Exception("This method can only be called from these internal methods: {}".format(allowed_methods))
def unmount(self, mountpoint=None):
"""Unmount a given mountpoint or unmount the stored mountpoint.
If the umount command does not work, try the pkill command.
If still not successful, throw an error message.
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
if mountpoint is None and not getattr(self, 'mountpoint', None):
raise ValueError("please provide a mountpoint to unmount")
if mountpoint is None:
mountpoint = self.mountpoint
full_mountpoint_path = os.path.abspath(os.path.expanduser(mountpoint))
if not os.path.exists(full_mountpoint_path):
return
# mountpoint is not a mountpoint path
if not os.path.ismount(full_mountpoint_path):
return
status = subprocess.call('umount {}'.format(full_mountpoint_path), shell=True)
if status == 1:
status = subprocess.call(
'pkill -9 sshfs && umount "{}"'.format(full_mountpoint_path),
shell = True
)
if status == 1:
raise OSError("could not unmount mountpoint: {} Please try to unmount manually".format(full_mountpoint_path))
else:
if VERBOSE: print("Successfully unmounted {}".format(full_mountpoint_path))
self.mountpoint = None
def is_mounted(self, mountpoint=None):
if mountpoint is None:
mountpoint = getattr(self, 'mountpoint', None)
if mountpoint is None:
return False
return os.path.ismount(mountpoint)
def get_mountpoint(self, search_mountpoint=False):
"""Returns the path to the active mountpoint.
Returns None if no mountpoint is found or if the mountpoint is not mounted anymore.
search_mountpoint=True: Tries to figure out an existing mountpoint for a given hostname
(experimental, does not work under Windows yet)
"""
mountpoint = getattr(self, 'mountpoint', None)
if mountpoint:
if self.is_mounted(mountpoint):
return mountpoint
else:
return None
else:
if not search_mountpoint: return None
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
# try to find out the mountpoint
import subprocess
p1 = subprocess.Popen(["mount", "-d"], stdout=subprocess.PIPE)
p2 = subprocess.Popen(["grep", "--fixed-strings", self.hostname], stdin=p1.stdout, stdout=subprocess.PIPE)
p1.stdout.close() # Allow p1 to receive a SIGPIPE if p2 exits.
output = p2.communicate()[0]
output = output.decode()
# output will either be '' (=not mounted) or a string like this:
# {username}@{hostname}:{path} on {mountpoint} (osxfuse, nodev, nosuid, synchronous, mounted by vermeul)
try:
mountpoint = output.split()[2]
self.mountpoint = mountpoint
return mountpoint
except Exception:
return None
def mount(self,
username=None, password=None,
hostname=None, mountpoint=None,
volname=None, path='/', port=2222,
kex_algorithms ='+diffie-hellman-group1-sha1'
):
"""Mounts openBIS dataStore without being root, using sshfs and fuse. Both
SSHFS and FUSE must be installed on the system (see below)
Params:
username -- default: the currently used username
password -- default: the currently used password
hostname -- default: the current hostname
mountpoint -- default: ~/hostname
FUSE / SSHFS Installation (requires root privileges):
Mac OS X
========
Follow the installation instructions on
https://osxfuse.github.io
Unix Cent OS 7
==============
$ sudo yum install epel-release
$ sudo yum --enablerepo=epel -y install fuse-sshfs
$ user="$(whoami)"
$ usermod -a -G fuse "$user"
if self.is_mounted():
if VERBOSE: print("openBIS dataStore is already mounted on {}".format(self.mountpoint))
return
def check_sshfs_is_installed():
import subprocess
import errno
try:
subprocess.call('sshfs --help', shell=True)
except OSError as e:
if e.errno == errno.ENOENT:
raise ValueError('Your system seems not to have SSHFS installed. For Mac OS X, see installation instructions on https://osxfuse.github.io For Unix: $ sudo yum install epel-release && sudo yum --enablerepo=epel -y install fuse-sshfs && user="$(whoami)" && usermod -a -G fuse "$user"')
check_sshfs_is_installed()
if username is None: username = self._get_username()
if not username: raise ValueError("no token available - please provide a username")
if password is None: password = self._password()
if not password: raise ValueError("please provide a password")
if hostname is None: hostname = self.hostname
if not hostname: raise ValueError("please provide a hostname")
if mountpoint is None: mountpoint = os.path.join('~', self.hostname)
# check if mountpoint exists, otherwise create it
full_mountpoint_path = os.path.abspath(os.path.expanduser(mountpoint))
if not os.path.exists(full_mountpoint_path):
os.makedirs(full_mountpoint_path)
print("full_mountpoint_path: ", full_mountpoint_path)
from sys import platform
supported_platforms = ['darwin', 'linux']
if platform not in supported_platforms:
raise ValueError("This method is not yet supported on {} plattform".format(platform))
os_options = {
Swen Vermeul
committed
"darwin": "-oauto_cache,reconnect,defer_permissions,noappledouble,negative_vncache,volname={} -oStrictHostKeyChecking=no ".format(hostname),
"linux": "-oauto_cache,reconnect -oStrictHostKeyChecking=no",
}
if volname is None:
volname = hostname
import subprocess
args = {
"username": username,
"password": password,
"hostname": hostname,
"port": port,
"path": path,
"mountpoint": mountpoint,
"volname": volname,
"os_options": os_options[platform],
"kex_algorithms": kex_algorithms,
cmd = 'echo "{password}" | sshfs'\
' {username}@{hostname}:{path} {mountpoint}' \
' -o port={port} -o ssh_command="ssh -oKexAlgorithms={kex_algorithms}" -o password_stdin'\
' {os_options}'.format(**args)
status = subprocess.call(cmd, shell=True)
if status == 0:
if VERBOSE: print("Mounted successfully to {}".format(full_mountpoint_path))
self.mountpoint = full_mountpoint_path
return self.mountpoint
raise OSError("mount failed, exit status: ", status)
def get_server_information(self):
""" Returns a dict containing the following server information:
api-version, archiving-configured, authentication-service, enabled-technologies, project-samples-enabled
"""
if self.server_information is not None:
return self.server_information
request = {
"method": "getServerInformation",
"params": [self.token],
}
resp = self._post_request(self.as_v3, request)
if resp is not None:
# result is a dict of strings - use more useful types
keys_boolean = ['archiving-configured', 'project-samples-enabled']
keys_csv = ['enabled-technologies']
for key in keys_boolean:
if key in resp:
resp[key] = resp[key] == 'true'
for key in keys_csv:
if key in resp:
resp[key] = list(map(lambda item: item.strip(), resp[key].split(',')))
self.server_information = ServerInformation(resp)
return self.server_information
else:
raise ValueError("Could not get the server information")
def create_permId(self):
Chandrasekhar Ramakrishnan
committed
"""Have the server generate a new permId"""
Chandrasekhar Ramakrishnan
committed
# Request just 1 permId
request = {
"method": "createPermIdStrings",
"params": [self.token, 1],
}
resp = self._post_request(self.as_v3, request)
if resp is not None:
return resp[0]
else:
raise ValueError("Could not create permId")
Chandrasekhar Ramakrishnan
committed
def get_datastores(self):
""" Get a list of all available datastores. Usually there is only one, but in some cases
there might be multiple servers. If you upload a file, you need to specifiy the datastore you want
"method": "searchDataStores",
"params": [
self.token,
{
"@type": "as.dto.datastore.search.DataStoreSearchCriteria"
},
{
"@type": "as.dto.datastore.fetchoptions.DataStoreFetchOptions"
}
]
resp = self._post_request(self.as_v3, request)
attrs=['code','downloadUrl','remoteUrl']
if len(resp['objects']) == 0:
raise ValueError("No datastore found!")
else:
objects = resp['objects']
parse_jackson(objects)
datastores = DataFrame(objects)
self.datastores = datastores[attrs]
Swen Vermeul
committed
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
def gen_code(self, entity, prefix=""):
""" Get the next sequence number for a Sample, Experiment, DataSet and Material. Other entities are currently not supported.
Usage::
gen_code('SAMPLE', 'SAM-')
gen_code('EXPERIMENT', 'EXP-')
gen_code('DATASET', '')
gen_code('MATERIAL', 'MAT-')
"""
entity = entity.upper()
entity2enum = {
"DATASET" : "DATA_SET",
"OBJECT" : "SAMPLE",
"SAMPLE" : "SAMPLE",
"EXPERIMENT" : "EXPERIMENT",
"COLLECTION" : "EXPERIMENT",
"MATERIAL" : "MATERIAL",
}
if entity not in entity2enum:
raise ValueError("no such entity: {}. Allowed entities are: DATA_SET, SAMPLE, EXPERIMENT, MATERIAL")
request = {
"method": "generateCode",
"params": [
self.token,
prefix,
entity2enum[entity]
]
}
try:
return self._post_request(self.as_v1, request)
except Exception as e:
raise ValueError("Could not generate a code for {}: {}".format(entity, e))
def gen_permId(self, count=1):
""" Generate a permId (or many permIds) for a dataSet
"""
request = {
"method": "createPermIdStrings",
"params": [
self.token,
count
]
}
try:
return self._post_request(self.as_v3, request)
except Exception as e:
raise ValueError("Could not generate a code for {}: {}".format(entity, e))
def new_person(self, userId, space=None):
""" creates an openBIS person
"""
try:
person = self.get_person(userId=userId)
except Exception:
return Person(self, userId=userId, space=space)
raise ValueError(
"There already exists a user with userId={}".format(userId)
)
def new_group(self, code, description=None, userIds=None):
""" creates an openBIS person
"""
return Group(self, code=code, description=description, userIds=userIds)
def get_group(self, code, only_data=False):
""" Get an openBIS AuthorizationGroup. Returns a Group object.
ids = [{
"@type": "as.dto.authorizationgroup.id.AuthorizationGroupPermId",
fetchopts = {
"@type": "as.dto.authorizationgroup.fetchoptions.AuthorizationGroupFetchOptions"
}
for option in ['roleAssignments', 'users', 'registrator']:
fetchopts[option] = fetch_option[option]
fetchopts['users']['space'] = fetch_option['space']
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
request = {
"method": "getAuthorizationGroups",
"params": [
self.token,
ids,
fetchopts
]
}
resp = self._post_request(self.as_v3, request)
if len(resp) == 0:
raise ValueError("No group found!")
for permid in resp:
group = resp[permid]
parse_jackson(group)
if only_data:
return group
else:
return Group(self, data=group)
def get_role_assignments(self, start_with=None, count=None, **search_args):
""" Get the assigned roles for a given group, person or space
"""
Swen Vermeul
committed
entity = 'roleAssignment'
search_criteria = get_type_for_entity(entity, 'search')
Swen Vermeul
committed
allowed_search_attrs = ['role', 'roleLevel', 'user', 'group', 'person', 'space']
sub_crit = []
for attr in search_args:
if attr in allowed_search_attrs:
if attr == 'space':
sub_crit.append(
_subcriteria_for_code(search_args[attr], 'space')
)
elif attr in ['user','person']:
userId = ''
if isinstance(search_args[attr], str):
userId = search_args[attr]
else:
userId = search_args[attr].userId
sub_crit.append(
_subcriteria_for_userId(userId)
)
elif attr == 'group':
groupId = ''
if isinstance(search_args[attr], str):
groupId = search_args[attr]
else:
groupId = search_args[attr].code
_subcriteria_for_permid(groupId, 'authorizationGroup')
)
elif attr == 'role':
# TODO
raise ValueError("not yet implemented")
elif attr == 'roleLevel':
# TODO
raise ValueError("not yet implemented")
else:
pass
else:
Swen Vermeul
committed
raise ValueError("unknown search argument {}".format(attr))
search_criteria['criteria'] = sub_crit
Swen Vermeul
committed
method_name = get_method_for_entity(entity, 'search')
fetchopts = fetch_option[entity]
fetchopts['from'] = start_with
fetchopts['count'] = count
for option in ['space', 'project', 'user', 'authorizationGroup','registrator']:
fetchopts[option] = fetch_option[option]
request = {
Swen Vermeul
committed
"method": method_name,
"params": [
self.token,
search_criteria,
fetchopts
]
}
Swen Vermeul
committed
attrs=['techId', 'role', 'roleLevel', 'user', 'group', 'space', 'project']
resp = self._post_request(self.as_v3, request)
if len(resp['objects']) == 0:
Swen Vermeul
committed
roles = DataFrame(columns=attrs)
Swen Vermeul
committed
else:
objects = resp['objects']
parse_jackson(objects)
roles = DataFrame(objects)
roles['techId'] = roles['id'].map(extract_id)
roles['user'] = roles['user'].map(extract_userId)
roles['group'] = roles['authorizationGroup'].map(extract_code)
roles['space'] = roles['space'].map(extract_code)
roles['project'] = roles['project'].map(extract_code)
return Things(
openbis_obj = self,
entity='role_assignment',
Swen Vermeul
committed
df=roles[attrs],
identifier_name='techId',
start_with = start_with,
count = count,
totalCount = resp.get('totalCount'),
Swen Vermeul
committed
def get_role_assignment(self, techId, only_data=False):
""" Fetches one assigned role by its techId.
"""
Swen Vermeul
committed
fetchopts = fetch_option['roleAssignment']
for option in ['space', 'project', 'user', 'authorizationGroup','registrator']:
fetchopts[option] = fetch_option[option]
request = {
"method": "getRoleAssignments",
"params": [
self.token,
[{
"@type": "as.dto.roleassignment.id.RoleAssignmentTechId"
}],
fetchopts
]
}
resp = self._post_request(self.as_v3, request)
if len(resp) == 0:
raise ValueError("No assigned role found for techId={}".format(techId))
for id in resp:
data = resp[id]
parse_jackson(data)
if only_data:
return data
else:
return RoleAssignment(self, data=data)
def assign_role(self, role, **args):
""" general method to assign a role to either
- a person
- a group
The scope is either
- the whole instance
- a space
- a project
"""
Swen Vermeul
committed
role = role.upper()
defs = get_definition_for_entity('roleAssignment')
if role not in defs['role']:
raise ValueError("Role should be one of these: {}".format(defs['role']))
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
userId = None
groupId = None
spaceId = None
projectId = None
for arg in args:
if arg in ['person', 'group', 'space', 'project']:
permId = args[arg] if isinstance(args[arg],str) else args[arg].permId
if arg == 'person':
userId = {
"permId": permId,
"@type": "as.dto.person.id.PersonPermId"
}
elif arg == 'group':
groupId = {
"permId": permId,
"@type": "as.dto.authorizationgroup.id.AuthorizationGroupPermId"
}
elif arg == 'space':
spaceId = {
"permId": permId,
"@type": "as.dto.space.id.SpacePermId"
}
elif arg == 'project':
projectId = {
"permId": permId,
"@type": "as.dto.project.id.ProjectPermId"
}
request = {
"method": "createRoleAssignments",
"params": [
[ {
"role": role,
"userId": userId,
"authorizationGroupId": groupId,
"spaceId": spaceId,
"projectId": projectId,
"@type": "as.dto.roleassignment.create.RoleAssignmentCreation",
} ]
]
}
resp = self._post_request(self.as_v3, request)
return
def get_groups(self, start_with=None, count=None, **search_args):
""" Get openBIS AuthorizationGroups. Returns a «Things» object.
Usage::
groups = e.get.groups()
groups[0] # select first group
groups['GROUP_NAME'] # select group with this code
for group in groups:
... # a Group object
groups.df # get a DataFrame object of the group list
print(groups) # print a nice ASCII table (eg. in IPython)
groups # HTML table (in a Jupyter notebook)
"""
criteria = []
for search_arg in ['code']:
# unfortunately, there aren't many search possibilities yet...
if search_arg in search_args:
if search_arg == 'code':
criteria.append(_criteria_for_code(search_args[search_arg]))
search_criteria = get_search_type_for_entity('authorizationGroup')
search_criteria['criteria'] = criteria
fetchopts = fetch_option['authorizationGroup']
fetchopts['from'] = start_with
fetchopts['count'] = count
for option in ['roleAssignments', 'registrator', 'users']:
fetchopts[option] = fetch_option[option]
request = {
"method": "searchAuthorizationGroups",
"params": [
self.token,
fetchopts
],
}
resp = self._post_request(self.as_v3, request)
attrs = ['permId', 'code', 'description', 'users', 'registrator', 'registrationDate', 'modificationDate']
if len(resp['objects']) == 0:
groups = DataFrame(columns=attrs)
else:
objects = resp['objects']
parse_jackson(objects)
groups = DataFrame(objects)
groups['permId'] = groups['permId'].map(extract_permid)
groups['registrator'] = groups['registrator'].map(extract_person)
groups['users'] = groups['users'].map(extract_userId)
groups['registrationDate'] = groups['registrationDate'].map(format_timestamp)
groups['modificationDate'] = groups['modificationDate'].map(format_timestamp)
return Things(
openbis_obj = self,
entity='group',
df=groups[attrs],
identifier_name='permId',
start_with = start_with,
count = count,
totalCount = resp.get('totalCount'),
def get_persons(self, start_with=None, count=None, **search_args):
""" Get openBIS users
"""
search_criteria = get_search_criteria('person', **search_args)
fetchopts = fetch_option['person']
fetchopts['from'] = start_with
fetchopts['count'] = count
for option in ['space']:
fetchopts[option] = fetch_option[option]
request = {
"method": "searchPersons",
"params": [
self.token,
search_criteria,
fetchopts
],
}
resp = self._post_request(self.as_v3, request)
attrs = ['permId', 'userId', 'firstName', 'lastName', 'email', 'space', 'registrationDate', 'active']
if len(resp['objects']) == 0:
persons = DataFrame(columns=attrs)
else:
objects = resp['objects']
parse_jackson(objects)
persons = DataFrame(resp['objects'])
persons['permId'] = persons['permId'].map(extract_permid)
persons['registrationDate'] = persons['registrationDate'].map(format_timestamp)
persons['space'] = persons['space'].map(extract_nested_permid)
return Things(
openbis_obj = self,
entity='person',
df=persons[attrs],
identifier_name='permId',
start_with = start_with,
count = count,
totalCount = resp.get('totalCount'),
get_users = get_persons # Alias
def get_person(self, userId, only_data=False):
""" Get a person (user)
"""
ids = [{
"@type": "as.dto.person.id.PersonPermId",
"permId": userId
}]
fetchopts = {
"@type": "as.dto.person.fetchoptions.PersonFetchOptions"
}
Swen Vermeul
committed
for option in ['roleAssignments', 'space']:
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
fetchopts[option] = fetch_option[option]
request = {
"method": "getPersons",
"params": [
self.token,
ids,
fetchopts,
],
}
resp = self._post_request(self.as_v3, request)
if len(resp) == 0:
raise ValueError("No person found!")
for permid in resp:
person = resp[permid]
parse_jackson(person)
if only_data:
return person
else:
return Person(self, data=person)
get_user = get_person # Alias
def get_spaces(self, code=None, start_with=None, count=None):
""" Get a list of all available spaces (DataFrame object). To create a sample or a
dataset, you need to specify in which space it should live.
"""
method = get_method_for_entity('space', 'search')
search_criteria = _subcriteria_for_code(code, 'space')
fetchopts = fetch_option['space']
fetchopts['from'] = start_with
fetchopts['count'] = count
"method": method,
"params": [
self.token,
search_criteria,
fetchopts,
],
}
resp = self._post_request(self.as_v3, request)
attrs = ['code', 'description', 'registrationDate', 'modificationDate']
if len(resp['objects']) == 0:
spaces = DataFrame(columns=attrs)
else:
spaces['registrationDate'] = spaces['registrationDate'].map(format_timestamp)
spaces['modificationDate'] = spaces['modificationDate'].map(format_timestamp)
return Things(
openbis_obj = self,
entity = 'space',
df = spaces[attrs],
start_with = start_with,
count = count,
totalCount = resp.get('totalCount'),
def get_space(self, code, only_data=False):
""" Returns a Space object for a given identifier.
code = str(code).upper()
space = not only_data and self._object_cache(entity='space',code=code)
if space:
return space
fetchopts = {"@type": "as.dto.space.fetchoptions.SpaceFetchOptions"}
for option in ['registrator']:
fetchopts[option] = fetch_option[option]
method = get_method_for_entity('space', 'get')
"params": [
self.token,
[{
"permId": code,
"@type": "as.dto.space.id.SpacePermId"
}],
fetchopts
resp = self._post_request(self.as_v3, request)
raise ValueError("No such space: %s" % code)
for permid in resp:
if only_data:
return resp[permid]
else:
space = Space(self, data=resp[permid])
if self.use_cache:
self._object_cache(entity='space',code=code,value=space)
return space
Chandrasekhar Ramakrishnan
committed
Swen Vermeul
committed
self, identifier=None, code=None, permId=None,
space=None, project=None, experiment=None, collection=None, type=None,
withParents=None, withChildren=None, tags=None, attrs=None, props=None,
**properties
"""Returns a DataFrame of all samples for a given space/project/experiment (or any combination)
Filters
-------
type -- sampleType code or object
space -- space code or object
project -- project code or object
experiment -- experiment code or object
collection -- same as above
tags -- only return samples with the specified tags
Paging
------
start_with -- default=None
count -- number of samples that should be fetched. default=None.
Include in result list
----------------------
withParents -- the list of parent's permIds in a column 'parents'
withChildren -- the list of children's permIds in a column 'children'
attrs -- list of all desired attributes. Examples:
space, project, experiment: just return their identifier
space.code, project.code, experiment.code
registrator.email, registrator.firstName
type.generatedCodePrefix
props -- list of all desired properties. Returns an empty string if
a) property is not present
b) property is not defined for this sampleType
if collection is not None:
experiment = collection
crit = _subcriteria_for(identifier, 'sample')
sub_criteria += crit['criteria']
sub_criteria.append(_subcriteria_for(space, 'space'))
sub_criteria.append(_subcriteria_for(project, 'project'))
sub_criteria.append(_subcriteria_for(experiment, 'experiment'))
if withParents:
sub_criteria.append(_subcriteria_for(withParents, 'sample', 'Parents'))
if withChildren:
sub_criteria.append(_subcriteria_for(withChildren, 'sample', 'Children'))
if properties is not None:
for prop in properties:
Swen Vermeul
committed
sub_criteria.append(_subcriteria_for_properties(prop, properties[prop], entity='sample'))
if type:
Swen Vermeul
committed
sub_criteria.append(_subcriteria_for_code(type, 'sampleType'))
if tags:
sub_criteria.append(_subcriteria_for_tags(tags))
if code:
sub_criteria.append(_criteria_for_code(code))
if permId:
sub_criteria.append(_common_search("as.dto.common.search.PermIdSearchCriteria", permId))
criteria = {
"criteria": sub_criteria,
"@type": "as.dto.sample.search.SampleSearchCriteria",
"operator": "AND"
}
attrs_fetchoptions = self._get_fetchopts_for_attrs(attrs)
# build the various fetch options
fetchopts = fetch_option['sample']
fetchopts['from'] = start_with
fetchopts['count'] = count
options = ['tags', 'properties', 'attachments', 'space', 'experiment', 'registrator', 'modifier', 'dataSets']
if self.get_server_information().project_samples_enabled:
options.append('project')
for option in options:
fetchopts[option] = fetch_option[option]
for key in ['parents','children','container','components']:
fetchopts[key] = {"@type": "as.dto.sample.fetchoptions.SampleFetchOptions"}
if props is not None:
fetchopts['properties'] = fetch_option['properties']
request = {
"method": "searchSamples",
"params": [
self.token,
criteria,
fetchopts,
],
Swen Vermeul
committed
resp = self._post_request(self.as_v3, request)
samples = []
parse_jackson(resp)
for obj in resp['objects']:
sample = Sample(
openbis_obj = self,
type = self.get_sample_type(obj['type']['code']),
data = obj
samples.append(sample)
return self._sample_list_for_response(
response=resp['objects'],
attrs=attrs,
props=props,
start_with=start_with,
count=count,
totalCount=resp['totalCount'],
objects=samples
)
Swen Vermeul
committed
get_objects = get_samples # Alias
Swen Vermeul
committed
def _get_fetchopts_for_attrs(self, attrs=None):
if attrs is None:
fetchopts = []
for attr in attrs:
if attr.startswith('space'): fetchopts.append('space')
if attr.startswith('project'): fetchopts.append('project')
if attr.startswith('experiment'): fetchopts.append('experiment')
if attr.startswith('sample'): fetchopts.append('sample')
if attr.startswith('registrator'): fetchopts.append('registrator')
if attr.startswith('modifier'): fetchopts.append('modifier')
return fetchopts
self, code=None, permId=None, type=None, space=None, project=None,
Swen Vermeul
committed
tags=None, is_finished=None, attrs=None, props=None, **properties
Swen Vermeul
committed
"""Returns a DataFrame of all samples for a given space/project (or any combination)