#!/usr/bin/env python3
"""Helper to recursively calculate a message digest hash of data streams."""
import hashlib
import logging
from dfimagetools import definitions
[docs]
class RecursiveHasher:
"""Recursively calculates message digest hashes of data streams."""
_ESCAPE_CHARACTERS = {
'/': '\\/',
':': '\\:',
'\\': '\\\\'}
_ESCAPE_CHARACTERS.update(definitions.NON_PRINTABLE_CHARACTERS)
# Class constant that defines the default read buffer size.
_READ_BUFFER_SIZE = 16 * 1024 * 1024
# List of tuple that contain:
# tuple: full path represented as a tuple of path segments
# str: data stream name
_PATHS_TO_IGNORE = frozenset([
(('$BadClus', ), '$Bad')])
[docs]
def __init__(self):
"""Initializes a recursive hahser."""
super().__init__()
self._escape_characters = str.maketrans(self._ESCAPE_CHARACTERS)
def _CalculateHashDataStream(self, file_entry, data_stream_name):
"""Calculates a message digest hash of the data of the file entry.
Args:
file_entry (dfvfs.FileEntry): file entry.
data_stream_name (str): name of the data stream.
Returns:
str: digest hash or None.
"""
if file_entry.IsDevice() or file_entry.IsPipe() or file_entry.IsSocket():
# Ignore devices, FIFOs/pipes and sockets.
return None
hash_context = hashlib.sha256()
try:
file_object = file_entry.GetFileObject(data_stream_name=data_stream_name)
except OSError as exception:
path_specification_string = file_entry.path_spec.comparable.translate(
self._escape_characters)
logging.warning((
f'Unable to open path specification:\n{path_specification_string:s}'
f'with error: {exception!s}'))
return None
if not file_object:
return None
try:
data = file_object.read(self._READ_BUFFER_SIZE)
while data:
hash_context.update(data)
data = file_object.read(self._READ_BUFFER_SIZE)
except OSError as exception:
path_specification_string = file_entry.path_spec.comparable.translate(
self._escape_characters)
logging.warning((
f'Unable to read from path specification:\n'
f'{path_specification_string:s} with error: {exception!s}'))
return None
return hash_context.hexdigest()
def _GetDisplayPath(self, path_segments, data_stream_name):
"""Retrieves a path to display.
Args:
path_segments (list[str]): path segments of the full path of the file
entry.
data_stream_name (str): name of the data stream.
Returns:
str: path to display.
"""
display_path = ''
path_segments = [
segment.translate(self._escape_characters) for segment in path_segments]
display_path = ''.join([display_path, '/'.join(path_segments)])
if data_stream_name:
data_stream_name = data_stream_name.translate(self._escape_characters)
display_path = ':'.join([display_path, data_stream_name])
return display_path or '/'
[docs]
def CalculateHashesFileEntry(self, file_entry, path_segments):
"""Recursive calculates hashes starting with the file entry.
Args:
file_entry (dfvfs.FileEntry): file entry.
path_segments (str): path segments of the full path of file entry.
Yields:
tuple[str, str]: display path and hash value.
"""
lookup_path = tuple(path_segments[1:])
for data_stream in file_entry.data_streams:
if (lookup_path, data_stream.name) in self._PATHS_TO_IGNORE:
hash_value = 'N/A (skipped)'
else:
hash_value = self._CalculateHashDataStream(file_entry, data_stream.name)
display_path = self._GetDisplayPath(path_segments, data_stream.name)
yield display_path, hash_value