Source code for dfimagetools.artifact_filters

# -*- coding: utf-8 -*-
"""Helper for filtering based on artifact definitions."""

import logging

from artifacts import definitions as artifacts_definitions

from dfvfs.helpers import file_system_searcher as dfvfs_file_system_searcher

from dfimagetools import path_resolver


[docs] class ArtifactDefinitionFiltersGenerator(object): """Generator of filters based on artifact definitions.""" # TODO: passing environment_variables and user_accounts via __init__ is # deprecated.
[docs] def __init__( self, artifacts_registry, environment_variables=None, user_accounts=None): """Initializes an artifact definition filters generator. Args: artifacts_registry (artifacts.ArtifactDefinitionsRegistry): artifact definitions registry. environment_variables (Optional[list[EnvironmentVariable]]): environment variables. user_accounts (Optional[list[UserAccount]]]): user accounts. """ super(ArtifactDefinitionFiltersGenerator, self).__init__() self._artifacts_registry = artifacts_registry self._environment_variables = environment_variables self._path_resolver = path_resolver.PathResolver() self._user_accounts = user_accounts
def _BuildFindSpecsFromArtifactDefinition( self, name, environment_variables=None, user_accounts=None): """Builds find specifications from an artifact definition. Args: name (str): name of the artifact definition. environment_variables (Optional[list[EnvironmentVariable]]): environment variables. user_accounts (Optional[list[UserAccount]]): user accounts. Yields: dfvfs.FindSpec: file system (dfVFS) find specification. """ definition = self._artifacts_registry.GetDefinitionByName(name) if not definition: definition = self._artifacts_registry.GetDefinitionByAlias(name) if not definition: logging.warning(f'Undefined artifact definition: {name:s}') else: for source in definition.sources: source_type = source.type_indicator if source_type not in ( artifacts_definitions.TYPE_INDICATOR_ARTIFACT_GROUP, artifacts_definitions.TYPE_INDICATOR_DIRECTORY, artifacts_definitions.TYPE_INDICATOR_FILE, artifacts_definitions.TYPE_INDICATOR_PATH): continue if source_type == artifacts_definitions.TYPE_INDICATOR_DIRECTORY: logging.warning(( f'Use of deprecated source type: directory in artifact ' f'definition: {name:s}')) if source_type == artifacts_definitions.TYPE_INDICATOR_ARTIFACT_GROUP: for source_name in set(source.names): for find_spec in self._BuildFindSpecsFromArtifactDefinition( source_name, environment_variables=environment_variables, user_accounts=user_accounts): yield find_spec elif source_type in ( artifacts_definitions.TYPE_INDICATOR_DIRECTORY, artifacts_definitions.TYPE_INDICATOR_FILE, artifacts_definitions.TYPE_INDICATOR_PATH): for source_path in set(source.paths): for find_spec in self._BuildFindSpecsFromFileSourcePath( source_path, source.separator, environment_variables=environment_variables, user_accounts=user_accounts): yield find_spec def _BuildFindSpecsFromFileSourcePath( self, source_path, path_separator, environment_variables=None, user_accounts=None): """Builds find specifications from a file source type. Args: source_path (str): file system path defined by the source. path_separator (str): file system path segment separator. environment_variables (Optional[list[EnvironmentVariable]]): environment variables. user_accounts (Optional[list[UserAccount]]): user accounts. Yields: dfvfs.FindSpec: file system (dfVFS) find specification. """ for path_glob in self._path_resolver.ExpandGlobStars( source_path, path_separator): for path in self._path_resolver.ExpandUsersVariable( path_glob, path_separator, user_accounts): if '%' in path: path = self._path_resolver.ExpandEnvironmentVariables( path, path_separator, environment_variables) if not path.startswith(path_separator): continue try: find_spec = dfvfs_file_system_searcher.FindSpec( case_sensitive=False, location_glob=path, location_separator=path_separator) except ValueError as exception: logging.error(( f'Unable to build find specification for path: "{path:s}" with ' f'error: {exception!s}')) continue yield find_spec
[docs] def GetFindSpecs( self, names=None, environment_variables=None, user_accounts=None): """Retrieves find specifications for one or more artifact definitions. Args: names (Optional[list[str]]): names of the artifact definitions to filter on. environment_variables (Optional[list[EnvironmentVariable]]): environment variables. user_accounts (Optional[list[UserAccount]]): user accounts. Yields: dfvfs.FindSpec: file system (dfVFS) find specification. """ if self._environment_variables: environment_variables = self._environment_variables if self._user_accounts: user_accounts = self._user_accounts for name in set(names or []): for find_spec in self._BuildFindSpecsFromArtifactDefinition( name, environment_variables=environment_variables, user_accounts=user_accounts): yield find_spec