Source code for dfimagetools.source_analyzer

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Helper to recursively check for volumes and file systems."""

import locale
import os

from dfvfs.credentials import manager as credentials_manager
from dfvfs.helpers import source_scanner as dfvfs_source_scanner
from dfvfs.lib import definitions as dfvfs_definitions
from dfvfs.resolver import resolver

[docs] class SourceAnalyzer(object): """Analyzer to recursively check for volumes and file systems.""" # Class constant that defines the default read buffer size. _READ_BUFFER_SIZE = 32768
[docs] def __init__(self, auto_recurse=True, mediator=None): """Initializes a source analyzer. Args: auto_recurse (Optional[bool]): True if the scan should automatically recurse as far as possible. mediator (Optional[VolumeScannerMediator]): a volume scanner mediator. """ super(SourceAnalyzer, self).__init__() self._auto_recurse = auto_recurse self._encode_errors = 'strict' self._mediator = mediator self._preferred_encoding = locale.getpreferredencoding() self._source_scanner = dfvfs_source_scanner.SourceScanner()
[docs] def Analyze(self, source_path): """Analyzes the source. Args: source_path (str): the source path. Yields: dfvfs.SourceScannerContext: the source scanner context. Raises: RuntimeError: if the source path does not exists, or if the source path is not a file or directory, or if the format of or within the source file is not supported. """ if (not source_path.startswith('\\\\.\\') and not os.path.exists(source_path)): raise RuntimeError(f'No such source: {source_path:s}.') scan_context = dfvfs_source_scanner.SourceScannerContext() scan_path_spec = None scan_context.OpenSourcePath(source_path) while True: self._source_scanner.Scan( scan_context, auto_recurse=self._auto_recurse, scan_path_spec=scan_path_spec) if not scan_context.updated: break if not self._auto_recurse: yield scan_context # The source is a directory or file. if scan_context.source_type in ( dfvfs_definitions.SOURCE_TYPE_DIRECTORY, dfvfs_definitions.SOURCE_TYPE_FILE): break # The source scanner found a locked volume, e.g. an encrypted volume, # and we need a credential to unlock the volume. for locked_scan_node in scan_context.locked_scan_nodes: credentials = credentials_manager.CredentialsManager.GetCredentials( locked_scan_node.path_spec) self._mediator.UnlockEncryptedVolume( self._source_scanner, scan_context, locked_scan_node, credentials) if not self._auto_recurse: scan_node = scan_context.GetUnscannedScanNode() if not scan_node: return scan_path_spec = scan_node.path_spec if self._auto_recurse: yield scan_context
[docs] def WriteScanNode(self, scan_context, scan_node, indentation=''): """Writes the source scanner node to stdout. Args: scan_context (dfvfs.SourceScannerContext): the source scanner context. scan_node (dfvfs.SourceScanNode): the scan node. indentation (Optional[str]): indentation. """ if not scan_node: return values = [] part_index = getattr(scan_node.path_spec, 'part_index', None) if part_index is not None: values.append(f'{part_index:d}') store_index = getattr(scan_node.path_spec, 'store_index', None) if store_index is not None: values.append(f'{store_index:d}') start_offset = getattr(scan_node.path_spec, 'start_offset', None) if start_offset is not None: values.append(f'start offset: {start_offset:d} (0x{start_offset:08x})') location = getattr(scan_node.path_spec, 'location', None) if location is not None: values.append(f'location: {location:s}') values = ', '.join(values) flags = [] if scan_node in scan_context.locked_scan_nodes: flags.append(' [LOCKED]') type_indicator = scan_node.path_spec.type_indicator if type_indicator == dfvfs_definitions.TYPE_INDICATOR_TSK: file_system = resolver.Resolver.OpenFileSystem(scan_node.path_spec) if file_system.IsHFS(): flags.append('[HFS/HFS+/HFSX]') elif file_system.IsNTFS(): flags.append('[NTFS]') flags = ' '.join(flags) print(f'{indentation:s}{type_indicator:s}: {values:s}{flags:s}') indentation = f' {indentation:s}' for sub_scan_node in scan_node.sub_nodes: self.WriteScanNode(scan_context, sub_scan_node, indentation=indentation)