Skip to content
Snippets Groups Projects
checksum.py 7.64 KiB
Newer Older
  • Learn to ignore specific revisions
  • #   Copyright ETH 2018 - 2023 Zürich, Scientific IT Services
    # 
    #   Licensed under the Apache License, Version 2.0 (the "License");
    #   you may not use this file except in compliance with the License.
    #   You may obtain a copy of the License at
    # 
    #        http://www.apache.org/licenses/LICENSE-2.0
    #   
    #   Unless required by applicable law or agreed to in writing, software
    #   distributed under the License is distributed on an "AS IS" BASIS,
    #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    #   See the License for the specific language governing permissions and
    #   limitations under the License.
    #
    
    from .utils import run_shell, cd
    
    # We generate checksums for small files according to what is used by git annex,
    # This ensures that all files in a data set have the same checksum type.
    
    def get_checksum_generator(checksum_type, data_path, metadata_path, default=None):
    
            return ChecksumGeneratorSha256(data_path, metadata_path)
    
            return ChecksumGeneratorMd5(data_path, metadata_path)
    
            return ChecksumGeneratorWORM(data_path, metadata_path)
    
        elif default is not None:
            return default
        else:
            return None
    
    
    
    def validate_checksum(openbis, files, data_set_id, data_path, metadata_path):
    
        dataset_files = openbis.search_files(data_set_id)['objects']
        dataset_files_by_path = {}
        for dataset_file in dataset_files:
            dataset_files_by_path[dataset_file['path']] = dataset_file
        for filename in files:
            dataset_file = dataset_files_by_path[filename]
            checksum_generator = None
    
            # data set files have either checksumCRC32 or checksumType and checksum.
    
            if dataset_file['checksumCRC32'] is not None and dataset_file['checksumCRC32'] > 0:
    
                checksum_generator = ChecksumGeneratorCrc32(data_path, metadata_path)
    
                expected_checksum = dataset_file['checksumCRC32']
    
                checksum = checksum_generator.get_checksum(filename)['crc32']
    
            elif dataset_file['checksumType'] is not None:
    
                checksum_generator = get_checksum_generator(dataset_file['checksumType'], data_path,
                                                            metadata_path)
    
                expected_checksum = dataset_file['checksum']
    
                checksum = checksum_generator.get_checksum(filename)['checksum']
    
            if checksum_generator is not None and checksum != expected_checksum:
                invalid_files.append(filename)
    
    class ChecksumGenerator(metaclass=abc.ABCMeta):
    
    
        def __init__(self, data_path, metadata_path=None):
            self.data_path = data_path
            self.metadata_path = metadata_path
    
    
        def get_checksum(self, file):
            with cd(self.data_path):
                return self._get_checksum(file)
    
        @abc.abstractmethod
        def _get_checksum(self, file):
            return
    
    class ChecksumGeneratorCrc32(ChecksumGenerator):
    
    
        def _crc32(self, file):
            with open(file, 'rb') as f:
                computed_hash = 0
                for chunk in iter(lambda: f.read(65536), b""):
                    computed_hash = zlib.crc32(chunk, computed_hash)
                return computed_hash & 0xFFFFFFFF
    
    
            result = self._crc32(file)
    
                'crc32': result,
                'fileLength': os.path.getsize(file),
    
    class ChecksumGeneratorHashlib(ChecksumGenerator):
    
    
            """ Implemented in subclass. """
    
            """ Implemented in subclass. """
    
        def _get_checksum(self, file):
            return {
                'checksum': self._checksum(file),
                'checksumType': self.hash_type(),
                'fileLength': os.path.getsize(file),
                'path': file
            }
    
    
        def _checksum(self, file):
            hash_function = self.hash_function()
            with open(file, "rb") as f:
                for chunk in iter(lambda: f.read(4096), b""):
                    hash_function.update(chunk)
            return hash_function.hexdigest()
    
    
    class ChecksumGeneratorSha256(ChecksumGeneratorHashlib):
        def hash_function(self):
            return hashlib.sha256()
    
        def hash_type(self):
            return 'SHA256'
    
    
    class ChecksumGeneratorMd5(ChecksumGeneratorHashlib):
        def hash_function(self):
            return hashlib.md5()
    
    class ChecksumGeneratorWORM(ChecksumGenerator):
    
            return {
                'checksum': self.worm(file),
                'checksumType': 'WORM',
                'fileLength': os.path.getsize(file),
                'path': file
    
        def worm(self, file):
            modification_time = int(os.path.getmtime(file))
            size = os.path.getsize(file)
    
            return "s{}-m{}--{}".format(size, modification_time, file)
    
    class ChecksumGeneratorGitAnnex(ChecksumGenerator):
    
        """ This class generates checksums according to the git annex backend configuration. """
    
        def __init__(self, data_path, metadata_path):
            self.data_path = data_path
            self.metadata_path = metadata_path
    
            self.checksum_generator_replacement = None
            if self.backend is None:
    
                self.checksum_generator_replacement = ChecksumGeneratorCrc32(self.data_path,
                                                                             self.metadata_path)
    
            # define which generator to use for files which are not handled by annex
    
            self.checksum_generator_supplement = get_checksum_generator(
    
                self.backend, self.data_path, self.metadata_path,
    
                default=ChecksumGeneratorCrc32(self.data_path, self.metadata_path))
    
            if self.checksum_generator_replacement is not None:
                return self.checksum_generator_replacement.get_checksum(file)
    
            return self.__get_checksum(file)
    
        def __get_checksum(self, file):
            git_dir = os.path.join(self.metadata_path, '.git')
    
            annex_result = run_shell(
                ['git', '--work-tree', self.data_path, '--git-dir', git_dir, 'annex', 'info', '-j',
                 file], raise_exception_on_failure=True)
    
            if 'Not a valid object name' in annex_result.output:
                return self.checksum_generator_supplement.get_checksum(file)
            annex_info = json.loads(annex_result.output)
    
            # TODO annex_info will not have 'present' if there is a git repository within the obis repository
    
            if annex_info['present'] != True:
                return self.checksum_generator_supplement.get_checksum(file)
            return {
                'checksum': self._get_checksum_from_annex_info(annex_info),
                'checksumType': self.backend,
                'fileLength': os.path.getsize(file),
                'path': file
            }
    
        def _get_checksum_from_annex_info(self, annex_info):
            if self.backend in ['MD5', 'SHA256']:
                return annex_info['key'].split('--')[1].split('.')[0]
            elif self.backend == 'WORM':
                return annex_info['key'][5:]
            else:
                raise ValueError("Git annex backend not supported: " + self.backend)
    
        def _get_annex_backend(self):
    
            with cd(self.metadata_path):
                with open('.git/info/attributes') as gitattributes:
                    for line in gitattributes.readlines():
                        if 'annex.backend' in line:
                            backend = line.split('=')[1].strip()
                            if backend == 'SHA256E':
                                backend = 'SHA256'
                            return backend