Source code for dfimagetools.scripts.extract_data_streams

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Console script to extract data streams."""

import argparse
import logging
import os
import sys

import artifacts

from artifacts import reader as artifacts_reader
from artifacts import registry as artifacts_registry

from dfvfs.lib import errors as dfvfs_errors

from dfimagetools import artifact_filters
from dfimagetools import data_stream_writer
from dfimagetools import file_entry_lister
from dfimagetools import path_filters
from dfimagetools import windows_registry
from dfimagetools.helpers import command_line


[docs] def Main(): """Entry point of console script to extract data streams. Returns: int: exit code that is provided to sys.exit(). """ argument_parser = argparse.ArgumentParser(description=( 'Extracts data streams from a storage media image.')) # TODO: add filter group argument_parser.add_argument( '--artifact_definitions', '--artifact-definitions', dest='artifact_definitions', type=str, metavar='PATH', action='store', help=('Path to a directory or file containing the artifact definition ' '.yaml files.')) argument_parser.add_argument( '--artifact_filters', '--artifact-filters', dest='artifact_filters', type=str, default=None, metavar='NAMES', action='store', help=( 'Comma separated list of names of artifact definitions to extract.')) argument_parser.add_argument( '--custom_artifact_definitions', '--custom-artifact-definitions', dest='custom_artifact_definitions', type=str, metavar='PATH', action='store', help=( 'Path to a directory or file containing custom artifact definition ' '.yaml files. ')) argument_parser.add_argument( '--path', dest='path_filter', type=str, default=None, metavar='PATH', action='store', help='Path of data stream to extract.') # TODO: add output group argument_parser.add_argument( '--no_aliases', '--no-aliases', dest='use_aliases', action='store_false', default=True, help=( 'Disable the use of partition and/or volume aliases such as ' '/apfs{f449e580-e355-4e74-8880-05e46e4e3b1e} and use indices ' 'such as /apfs1 instead.')) argument_parser.add_argument( '-t', '--target', dest='target', action='store', metavar='PATH', default=None, help=( 'target (or destination) path of a directory where the extracted ' 'data streams should be stored.')) # TODO: add source group command_line.AddStorageMediaImageCLIArguments(argument_parser) argument_parser.add_argument( 'source', nargs='?', action='store', metavar='image.raw', default=None, help='path of the storage media image.') options = argument_parser.parse_args() if not options.source: print('Source value is missing.') print('') argument_parser.print_help() print('') return 1 if options.artifact_filters: artifact_definitions = options.artifact_definitions if not artifact_definitions: artifact_definitions = os.path.join( os.path.dirname(artifacts.__file__), 'data') if not os.path.exists(artifact_definitions): artifact_definitions = os.path.join('/', 'usr', 'share', 'artifacts') if not os.path.exists(artifact_definitions): artifact_definitions = None if (not artifact_definitions and not options.custom_artifact_definitions): print('[ERROR] artifact filters were specified but no paths to ' 'artifact definitions were provided.') print('') return 1 elif not options.path_filter: print('[ERROR] no extraction filters were specified.') print('') return 1 target_path = options.target if not target_path: source_name = os.path.basename(options.source) target_path = os.path.join(os.getcwd(), f'{source_name:s}.extracted') if not os.path.exists(target_path): os.makedirs(target_path) elif not os.path.isdir(target_path): print('[ERROR] target path is not a directory.') print('') return 1 logging.basicConfig( level=logging.INFO, format='[%(levelname)s] %(message)s') mediator, volume_scanner_options = ( command_line.ParseStorageMediaImageCLIArguments(options)) if options.artifact_filters: registry = artifacts_registry.ArtifactDefinitionsRegistry() reader = artifacts_reader.YamlArtifactsReader() if artifact_definitions: if os.path.isdir(artifact_definitions): registry.ReadFromDirectory(reader, artifact_definitions) elif os.path.isfile(artifact_definitions): registry.ReadFromFile(reader, artifact_definitions) if options.custom_artifact_definitions: if os.path.isdir(options.custom_artifact_definitions): registry.ReadFromDirectory( reader, options.custom_artifact_definitions) elif os.path.isfile(options.custom_artifact_definitions): registry.ReadFromFile(reader, options.custom_artifact_definitions) filter_generator = artifact_filters.ArtifactDefinitionFiltersGenerator( registry) elif options.path_filter: filter_generator = path_filters.PathFiltersGenerator(options.path_filter) if filter_generator.partition and options.partitions: print(('[WARNING] partition specified in path filter will override ' '--partitions command line argument.')) print('') if filter_generator.partition: volume_scanner_options.partitions = [filter_generator.partition] entry_lister = file_entry_lister.FileEntryLister( mediator=mediator, use_aliases=options.use_aliases) find_specs_generated = False try: base_path_specs = entry_lister.GetBasePathSpecs( options.source, options=volume_scanner_options) if not base_path_specs: print('No supported file system found in source.') print('') return 1 for base_path_spec in base_path_specs: find_specs = [] if options.artifact_filters: environment_variables = [] user_accounts = [] windows_directory = entry_lister.GetWindowsDirectory(base_path_spec) if windows_directory: winregistry_collector = windows_registry.WindowsRegistryCollector( base_path_spec, windows_directory) environment_variables = ( winregistry_collector.CollectSystemEnvironmentVariables()) # TODO: determine user accounts. names = options.artifact_filters.split(',') find_specs = list(filter_generator.GetFindSpecs( names=names, environment_variables=environment_variables, user_accounts=user_accounts)) elif options.path_filter: find_specs = list(filter_generator.GetFindSpecs()) if filter_generator: if not find_specs: continue find_specs_generated = True file_entries_generator = entry_lister.ListFileEntriesWithFindSpecs( [base_path_spec], find_specs) stream_writer = data_stream_writer.DataStreamWriter() for file_entry, path_segments in file_entries_generator: for data_stream in file_entry.data_streams: display_path = stream_writer.GetDisplayPath( path_segments, data_stream.name) destination_path = stream_writer.GetSanitizedPath( path_segments, data_stream.name, target_path) logging.info(f'Extracting: {display_path:s} to: {destination_path:s}') destination_directory = os.path.dirname(destination_path) os.makedirs(destination_directory, exist_ok=True) stream_writer.WriteDataStream( file_entry, data_stream.name, destination_path) except dfvfs_errors.ScannerError as exception: print(f'[ERROR] {exception!s}', file=sys.stderr) print('') return 1 except (KeyboardInterrupt, dfvfs_errors.UserAbort): print('Aborted by user.', file=sys.stderr) print('') return 1 if options.artifact_filters and not find_specs_generated: print('[ERROR] an extraction filter was specified but no corresponding ' 'find specifications were generated.') print('') return 1 return 0
if __name__ == '__main__': sys.exit(Main())