Source code for veil.config.pipeline

from __future__ import annotations

"""Pipeline configuration dataclasses built on Veil’s frozen-dataclass config system.
"""

from dataclasses import field
from typing import List, Optional

from veil.config.api_server import ApiServerConfig
from veil.config.core.flat_dataclass import create_flat_dataclass
from veil.config.core.frozen_dataclass import frozen_dataclass
from veil.config.datahandler import DataHandlerConfig
from veil.config.entity_detectors import BaseEntityDetectorConfig
from veil.config.entity_resolvers import BaseEntityResolverConfig
from veil.config.evaluator import EvaluatorConfig
from veil.config.masker import MaskerConfig
from veil.config.metric_store import MetricStoreConfig
from veil.config.overlap_resolver import OverlapResolverConfig
from veil.core.enums.pipeline_mode import PipelineMode
from veil.logger import init_logger, set_log_level

logger = init_logger(__name__)


[docs] @frozen_dataclass(allow_from_file=True) class PipelineConfig: """Root configuration dataclass for a Veil masking run.""" mode: str = field( default="offline", metadata={ "help": "Mode of the pipeline (offline|online). Online exposes an API for real-time masking. 'Offline' is the default and batch processes files with the dataloader." }, ) datahandler: Optional[DataHandlerConfig] = field( default=None, metadata={ "help": "Input/output data handler configuration. Required for offline mode." }, ) api_server: Optional[ApiServerConfig] = field( default=None, metadata={"help": "API server configuration. Required for online mode."}, ) entity_detectors: List[BaseEntityDetectorConfig] = field( default_factory=list, metadata={"help": "Entity detector configurations executed in order"}, ) entity_resolvers: Optional[List[BaseEntityResolverConfig]] = field( default=None, metadata={"help": "Optional entity resolvers executed after detection"}, ) metric_store: MetricStoreConfig = field( default_factory=MetricStoreConfig, metadata={"help": "Metric store configuration"}, ) masker: MaskerConfig = field( default_factory=MaskerConfig, metadata={"help": "Masker configuration"}, ) overlap_resolver: OverlapResolverConfig = field( default_factory=OverlapResolverConfig, metadata={"help": "Configuration for the OverlapResolver"}, ) evaluator: Optional[EvaluatorConfig] = field( default=None, metadata={ "help": "Evaluator configuration. If absent, no evaluation is performed. " "Ground truth in input is required." }, ) concurrency: int = field( default=1, metadata={ "help": "Number of documents to process in parallel in offline mode (threads)." }, ) log_level: str = field( default="INFO", metadata={ "help": "Global log level for Veil (DEBUG, INFO, WARNING, ERROR, CRITICAL)." }, ) def __post_init__(self): # Apply logging level as early as possible try: set_log_level(getattr(self, "log_level", "INFO")) except Exception: pass mode = PipelineMode.from_str(self.mode.lower()) if mode == PipelineMode.OFFLINE: if self.datahandler is None: raise ValueError("Offline mode requires a datahandler config.") elif mode == PipelineMode.ONLINE: if self.api_server is None: logger.warning("No API server config provided, using default.") self.api_server = ApiServerConfig() self.datahandler = None # Eagerly load all entity type modules and validate alias map to fail fast on conflicts try: import pkgutil from importlib import import_module import veil.entity_detectors as detectors_pkg # type: ignore[import-not-found] import veil.entity_resolvers as resolvers_pkg # type: ignore[import-not-found] for _finder, modname, _ispkg in pkgutil.walk_packages( detectors_pkg.__path__, detectors_pkg.__name__ + "." ): if modname.endswith("_entity_type"): try: import_module(modname) except Exception: # Bubble up; importing an entity type module should not fail silently raise # Trigger import to register resolvers at import-time for _finder, modname, _ispkg in pkgutil.walk_packages( resolvers_pkg.__path__, resolvers_pkg.__name__ + "." ): try: import_module(modname) except Exception: raise from veil.core.base_entity_type import EntityTypeBase _ = EntityTypeBase.global_alias_map() except Exception: # Re-raise to fail fast during configuration construction raise
[docs] @classmethod def create_from_cli_args(cls): """Return one or many PipelineConfig instances based on CLI/YAML combos.""" flat_configs = create_flat_dataclass(cls).create_from_cli_args() instances = [] for flat_config in flat_configs: instance = flat_config.reconstruct_original_dataclass() object.__setattr__(instance, "__flat_config__", flat_config) instances.append(instance) return instances