#!/usr/bin/env python3
"""Console script to extract data streams."""
import argparse
import logging
import os
import sys
import artifacts
from artifacts import reader as artifacts_reader
from artifacts import registry as artifacts_registry
from dfvfs.lib import errors as dfvfs_errors
from dfimagetools import artifact_filters
from dfimagetools import data_stream_writer
from dfimagetools import file_entry_lister
from dfimagetools import path_filters
from dfimagetools import windows_registry
from dfimagetools.helpers import command_line
[docs]
def Main():
"""Entry point of console script to extract data streams.
Returns:
int: exit code that is provided to sys.exit().
"""
argument_parser = argparse.ArgumentParser(
description=("Extracts data streams from a storage media image.")
)
# TODO: add filter group
argument_parser.add_argument(
"--artifact_definitions",
"--artifact-definitions",
dest="artifact_definitions",
type=str,
metavar="PATH",
action="store",
help=(
"Path to a directory or file containing the artifact definition "
".yaml files."
),
)
argument_parser.add_argument(
"--artifact_filters",
"--artifact-filters",
dest="artifact_filters",
type=str,
default=None,
metavar="NAMES",
action="store",
help=("Comma separated list of names of artifact definitions to extract."),
)
argument_parser.add_argument(
"--custom_artifact_definitions",
"--custom-artifact-definitions",
dest="custom_artifact_definitions",
type=str,
metavar="PATH",
action="store",
help=(
"Path to a directory or file containing custom artifact definition "
".yaml files. "
),
)
argument_parser.add_argument(
"--path",
dest="path_filter",
type=str,
default=None,
metavar="PATH",
action="store",
help="Path of data stream to extract.",
)
# TODO: add output group
argument_parser.add_argument(
"--no_aliases",
"--no-aliases",
dest="use_aliases",
action="store_false",
default=True,
help=(
"Disable the use of partition and/or volume aliases such as "
"/apfs{f449e580-e355-4e74-8880-05e46e4e3b1e} and use indices "
"such as /apfs1 instead."
),
)
argument_parser.add_argument(
"-t",
"--target",
dest="target",
action="store",
metavar="PATH",
default=None,
help=(
"target (or destination) path of a directory where the extracted "
"data streams should be stored."
),
)
# TODO: add source group
command_line.AddStorageMediaImageCLIArguments(argument_parser)
argument_parser.add_argument(
"source",
nargs="?",
action="store",
metavar="image.raw",
default=None,
help="path of the storage media image.",
)
options = argument_parser.parse_args()
if not options.source:
print("Source value is missing.")
print("")
argument_parser.print_help()
print("")
return 1
if options.artifact_filters:
artifact_definitions = options.artifact_definitions
if not artifact_definitions:
artifact_definitions = os.path.join(
os.path.dirname(artifacts.__file__), "data"
)
if not os.path.exists(artifact_definitions):
artifact_definitions = os.path.join("/", "usr", "share", "artifacts")
if not os.path.exists(artifact_definitions):
artifact_definitions = None
if not artifact_definitions and not options.custom_artifact_definitions:
print(
"[ERROR] artifact filters were specified but no paths to "
"artifact definitions were provided."
)
print("")
return 1
elif not options.path_filter:
print("[ERROR] no extraction filters were specified.")
print("")
return 1
target_path = options.target
if not target_path:
source_name = os.path.basename(options.source)
target_path = os.path.join(os.getcwd(), f"{source_name:s}.extracted")
if not os.path.exists(target_path):
os.makedirs(target_path)
elif not os.path.isdir(target_path):
print("[ERROR] target path is not a directory.")
print("")
return 1
logging.basicConfig(level=logging.INFO, format="[%(levelname)s] %(message)s")
mediator, volume_scanner_options = command_line.ParseStorageMediaImageCLIArguments(
options
)
if options.artifact_filters:
registry = artifacts_registry.ArtifactDefinitionsRegistry()
reader = artifacts_reader.YamlArtifactsReader()
if artifact_definitions:
if os.path.isdir(artifact_definitions):
registry.ReadFromDirectory(reader, artifact_definitions)
elif os.path.isfile(artifact_definitions):
registry.ReadFromFile(reader, artifact_definitions)
if options.custom_artifact_definitions:
if os.path.isdir(options.custom_artifact_definitions):
registry.ReadFromDirectory(reader, options.custom_artifact_definitions)
elif os.path.isfile(options.custom_artifact_definitions):
registry.ReadFromFile(reader, options.custom_artifact_definitions)
filter_generator = artifact_filters.ArtifactDefinitionFiltersGenerator(registry)
elif options.path_filter:
filter_generator = path_filters.PathFiltersGenerator(options.path_filter)
if filter_generator.partition and options.partitions:
print(
(
"[WARNING] partition specified in path filter will override "
"--partitions command line argument."
)
)
print("")
if filter_generator.partition:
volume_scanner_options.partitions = [filter_generator.partition]
entry_lister = file_entry_lister.FileEntryLister(
mediator=mediator, use_aliases=options.use_aliases
)
find_specs_generated = False
try:
base_path_specs = entry_lister.GetBasePathSpecs(
options.source, options=volume_scanner_options
)
if not base_path_specs:
print("No supported file system found in source.")
print("")
return 1
for base_path_spec in base_path_specs:
find_specs = []
if options.artifact_filters:
environment_variables = []
user_accounts = []
windows_directory = entry_lister.GetWindowsDirectory(base_path_spec)
if windows_directory:
winregistry_collector = windows_registry.WindowsRegistryCollector(
base_path_spec, windows_directory
)
environment_variables = (
winregistry_collector.CollectSystemEnvironmentVariables()
)
# TODO: determine user accounts.
names = options.artifact_filters.split(",")
find_specs = list(
filter_generator.GetFindSpecs(
names=names,
environment_variables=environment_variables,
user_accounts=user_accounts,
)
)
elif options.path_filter:
find_specs = list(filter_generator.GetFindSpecs())
if filter_generator:
if not find_specs:
continue
find_specs_generated = True
file_entries_generator = entry_lister.ListFileEntriesWithFindSpecs(
[base_path_spec], find_specs
)
stream_writer = data_stream_writer.DataStreamWriter()
for file_entry, path_segments in file_entries_generator:
for data_stream in file_entry.data_streams:
display_path = stream_writer.GetDisplayPath(
path_segments, data_stream.name
)
destination_path = stream_writer.GetSanitizedPath(
path_segments, data_stream.name, target_path
)
logging.info(
f"Extracting: {display_path:s} to: {destination_path:s}"
)
destination_directory = os.path.dirname(destination_path)
os.makedirs(destination_directory, exist_ok=True)
stream_writer.WriteDataStream(
file_entry, data_stream.name, destination_path
)
except dfvfs_errors.ScannerError as exception:
print(f"[ERROR] {exception!s}", file=sys.stderr)
print("")
return 1
except (KeyboardInterrupt, dfvfs_errors.UserAbort):
print("Aborted by user.", file=sys.stderr)
print("")
return 1
if options.artifact_filters and not find_specs_generated:
print(
"[ERROR] an extraction filter was specified but no corresponding "
"find specifications were generated."
)
print("")
return 1
return 0
if __name__ == "__main__":
sys.exit(Main())