Newer
Older
# Copyright ETH 2018 - 2023 Zürich, Scientific IT Services
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import abc

yvesn
committed
import hashlib
import json
import os
Adam Laskowski
committed
Adam Laskowski
committed

yvesn
committed
# We generate checksums for small files according to what is used by git annex,
# This ensures that all files in a data set have the same checksum type.

yvesn
committed
def get_checksum_generator(checksum_type, data_path, metadata_path, default=None):

yvesn
committed
if checksum_type == "SHA256":
return ChecksumGeneratorSha256(data_path, metadata_path)

yvesn
committed
elif checksum_type == "MD5":
return ChecksumGeneratorMd5(data_path, metadata_path)

yvesn
committed
elif checksum_type == "WORM":
return ChecksumGeneratorWORM(data_path, metadata_path)

yvesn
committed
elif default is not None:
return default
else:
return None
def validate_checksum(openbis, files, data_set_id, data_path, metadata_path):

yvesn
committed
invalid_files = []

yvesn
committed
dataset_files = openbis.search_files(data_set_id)['objects']
dataset_files_by_path = {}
for dataset_file in dataset_files:
dataset_files_by_path[dataset_file['path']] = dataset_file
for filename in files:
dataset_file = dataset_files_by_path[filename]
checksum_generator = None
# data set files have either checksumCRC32 or checksumType and checksum.

yvesn
committed
if dataset_file['checksumCRC32'] is not None and dataset_file['checksumCRC32'] > 0:
checksum_generator = ChecksumGeneratorCrc32(data_path, metadata_path)

yvesn
committed
expected_checksum = dataset_file['checksumCRC32']
checksum = checksum_generator.get_checksum(filename)['crc32']

yvesn
committed
elif dataset_file['checksumType'] is not None:
Adam Laskowski
committed
checksum_generator = get_checksum_generator(dataset_file['checksumType'], data_path,
metadata_path)

yvesn
committed
expected_checksum = dataset_file['checksum']
checksum = checksum_generator.get_checksum(filename)['checksum']
if checksum_generator is not None and checksum != expected_checksum:
invalid_files.append(filename)

yvesn
committed
return invalid_files

yvesn
committed
class ChecksumGenerator(metaclass=abc.ABCMeta):
def __init__(self, data_path, metadata_path=None):
self.data_path = data_path
self.metadata_path = metadata_path
def get_checksum(self, file):
with cd(self.data_path):
return self._get_checksum(file)
@abc.abstractmethod
def _get_checksum(self, file):
return
Adam Laskowski
committed
class ChecksumGeneratorCrc32(ChecksumGenerator):
def _crc32(self, file):
with open(file, 'rb') as f:
computed_hash = 0
for chunk in iter(lambda: f.read(65536), b""):
computed_hash = zlib.crc32(chunk, computed_hash)
return computed_hash & 0xFFFFFFFF
def _get_checksum(self, file):

yvesn
committed
return {
'crc32': result,
'fileLength': os.path.getsize(file),

yvesn
committed
'path': file
}
class ChecksumGeneratorHashlib(ChecksumGenerator):

yvesn
committed
def hash_function(self):

yvesn
committed
pass

yvesn
committed
def hash_type(self):

yvesn
committed
pass
def _get_checksum(self, file):
return {
'checksum': self._checksum(file),
'checksumType': self.hash_type(),
'fileLength': os.path.getsize(file),
'path': file
}

yvesn
committed
def _checksum(self, file):
hash_function = self.hash_function()
with open(file, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_function.update(chunk)
return hash_function.hexdigest()
class ChecksumGeneratorSha256(ChecksumGeneratorHashlib):
def hash_function(self):
return hashlib.sha256()
Adam Laskowski
committed

yvesn
committed
def hash_type(self):
return 'SHA256'
class ChecksumGeneratorMd5(ChecksumGeneratorHashlib):
def hash_function(self):
return hashlib.md5()
Adam Laskowski
committed

yvesn
committed
def hash_type(self):
return "MD5"
class ChecksumGeneratorWORM(ChecksumGenerator):
def _get_checksum(self, file):

yvesn
committed
return {
'checksum': self.worm(file),
'checksumType': 'WORM',
'fileLength': os.path.getsize(file),
'path': file
Adam Laskowski
committed
}

yvesn
committed
def worm(self, file):
modification_time = int(os.path.getmtime(file))
size = os.path.getsize(file)
return "s{}-m{}--{}".format(size, modification_time, file)

yvesn
committed
class ChecksumGeneratorGitAnnex(ChecksumGenerator):
""" This class generates checksums according to the git annex backend configuration. """

yvesn
committed
def __init__(self, data_path, metadata_path):
self.data_path = data_path
self.metadata_path = metadata_path

yvesn
committed
self.backend = self._get_annex_backend()
self.checksum_generator_replacement = None
if self.backend is None:
Adam Laskowski
committed
self.checksum_generator_replacement = ChecksumGeneratorCrc32(self.data_path,
self.metadata_path)

yvesn
committed
# define which generator to use for files which are not handled by annex
self.checksum_generator_supplement = get_checksum_generator(
Adam Laskowski
committed
self.backend, self.data_path, self.metadata_path,
default=ChecksumGeneratorCrc32(self.data_path, self.metadata_path))

yvesn
committed
def _get_checksum(self, file):

yvesn
committed
if self.checksum_generator_replacement is not None:
return self.checksum_generator_replacement.get_checksum(file)
return self.__get_checksum(file)

yvesn
committed
def __get_checksum(self, file):
git_dir = os.path.join(self.metadata_path, '.git')
Adam Laskowski
committed
annex_result = run_shell(
['git', '--work-tree', self.data_path, '--git-dir', git_dir, 'annex', 'info', '-j',
file], raise_exception_on_failure=True)

yvesn
committed
if 'Not a valid object name' in annex_result.output:
return self.checksum_generator_supplement.get_checksum(file)
annex_info = json.loads(annex_result.output)

yvesn
committed
# TODO annex_info will not have 'present' if there is a git repository within the obis repository

yvesn
committed
if annex_info['present'] != True:
return self.checksum_generator_supplement.get_checksum(file)
return {
'checksum': self._get_checksum_from_annex_info(annex_info),
'checksumType': self.backend,
'fileLength': os.path.getsize(file),
'path': file
}
def _get_checksum_from_annex_info(self, annex_info):
if self.backend in ['MD5', 'SHA256']:
return annex_info['key'].split('--')[1].split('.')[0]
elif self.backend == 'WORM':
return annex_info['key'][5:]
else:
raise ValueError("Git annex backend not supported: " + self.backend)
def _get_annex_backend(self):
with cd(self.metadata_path):
with open('.git/info/attributes') as gitattributes:
for line in gitattributes.readlines():
if 'annex.backend' in line:
backend = line.split('=')[1].strip()
if backend == 'SHA256E':
backend = 'SHA256'
return backend

yvesn
committed
return None