Source code for dfimagetools.recursive_hasher

#!/usr/bin/env python3
"""Helper to recursively calculate a message digest hash of data streams."""

import hashlib
import logging

from dfimagetools import definitions


[docs] class RecursiveHasher: """Recursively calculates message digest hashes of data streams.""" _ESCAPE_CHARACTERS = { '/': '\\/', ':': '\\:', '\\': '\\\\'} _ESCAPE_CHARACTERS.update(definitions.NON_PRINTABLE_CHARACTERS) # Class constant that defines the default read buffer size. _READ_BUFFER_SIZE = 16 * 1024 * 1024 # List of tuple that contain: # tuple: full path represented as a tuple of path segments # str: data stream name _PATHS_TO_IGNORE = frozenset([ (('$BadClus', ), '$Bad')])
[docs] def __init__(self): """Initializes a recursive hahser.""" super().__init__() self._escape_characters = str.maketrans(self._ESCAPE_CHARACTERS)
def _CalculateHashDataStream(self, file_entry, data_stream_name): """Calculates a message digest hash of the data of the file entry. Args: file_entry (dfvfs.FileEntry): file entry. data_stream_name (str): name of the data stream. Returns: str: digest hash or None. """ if file_entry.IsDevice() or file_entry.IsPipe() or file_entry.IsSocket(): # Ignore devices, FIFOs/pipes and sockets. return None hash_context = hashlib.sha256() try: file_object = file_entry.GetFileObject(data_stream_name=data_stream_name) except OSError as exception: path_specification_string = file_entry.path_spec.comparable.translate( self._escape_characters) logging.warning(( f'Unable to open path specification:\n{path_specification_string:s}' f'with error: {exception!s}')) return None if not file_object: return None try: data = file_object.read(self._READ_BUFFER_SIZE) while data: hash_context.update(data) data = file_object.read(self._READ_BUFFER_SIZE) except OSError as exception: path_specification_string = file_entry.path_spec.comparable.translate( self._escape_characters) logging.warning(( f'Unable to read from path specification:\n' f'{path_specification_string:s} with error: {exception!s}')) return None return hash_context.hexdigest() def _GetDisplayPath(self, path_segments, data_stream_name): """Retrieves a path to display. Args: path_segments (list[str]): path segments of the full path of the file entry. data_stream_name (str): name of the data stream. Returns: str: path to display. """ display_path = '' path_segments = [ segment.translate(self._escape_characters) for segment in path_segments] display_path = ''.join([display_path, '/'.join(path_segments)]) if data_stream_name: data_stream_name = data_stream_name.translate(self._escape_characters) display_path = ':'.join([display_path, data_stream_name]) return display_path or '/'
[docs] def CalculateHashesFileEntry(self, file_entry, path_segments): """Recursive calculates hashes starting with the file entry. Args: file_entry (dfvfs.FileEntry): file entry. path_segments (str): path segments of the full path of file entry. Yields: tuple[str, str]: display path and hash value. """ lookup_path = tuple(path_segments[1:]) for data_stream in file_entry.data_streams: if (lookup_path, data_stream.name) in self._PATHS_TO_IGNORE: hash_value = 'N/A (skipped)' else: hash_value = self._CalculateHashDataStream(file_entry, data_stream.name) display_path = self._GetDisplayPath(path_segments, data_stream.name) yield display_path, hash_value