Skip to content

Pipeline

The pipeline module provides the execution framework for running sequential data processing steps with configuration, caching, and logging support.

The usage pattern for the pipeline is a bit different than the typical numbered scripts you might see elsewhere. There is no monolithic integrated script. Instead there is a standardized data processing pipeline that is configurable via YAML files and executed via a runner script.

There are three main components:

  • Setup
  • This contains the point of entry defined in projects/run.py and
  • Pipeline configuration defined in projects/config.yaml
  • Pipeline Execution
  • The central Pipeline class defined in src/processing/pipeline.py
  • A set of modular processing steps defined in src/processing/steps/
  • Data Models and Validation
  • Canonical data models and validation logic defined in src/data_canon/
  • This can be used standalone, but also is integrated into the pipeline steps for automatic validation
  • The data models define the expected schema and content for each data table (households, persons, days, trips, tours, vehicles, etc.) Each field is tagged with a set of constraints, and the step it pertains.

Conceptual Diagram

%%{init: {
  "flowchart": {
    "rankSpacing": 15,
    "nodeSpacing": 20
  }
}}%%

flowchart LR

subgraph Setup[**Setup**]
  direction TB
  RunPy([projects/run.py])
  ConfigYAML([projects/config.yaml])
end

RunPy([projects/run.py]) ----> Pipeline
ConfigYAML([projects/config.yaml]) ----> Pipeline

subgraph Pipeline[**Pipeline**]
  direction TB
  LoadData([Load Data]) --> CustomPre([Custom Pre-Processing])
  CustomPre --> Imputation([Imputation])
  Imputation --> JointTrips([Joint Trips])
  JointTrips --> LinkTrips([Link Trips])
  LinkTrips --> ExtractTours([Extract Tours])
  ExtractTours --> Format{Format Output}

  Format --> PUMS
  Format --> ActivitySim --> CnV1 --> ActivitySim
  Format --> DaySim --> CnV2 --> DaySim
  Format --> CTRAMP --> CnV3 --> CTRAMP
  Format --> Standard --> CnV4

  subgraph Output[" "]
    direction TB
    WeightingPipeline --> Modeling
  end

  subgraph WeightingPipeline[**Weighting**]
    direction TB
    PUMS --> Weighting([Weighting]) --> Weights
    Weighting([Weighting])
    Weights[/"Expansion<br>Weights"\]
  end

  subgraph Modeling["Modeling"]
    direction TB

    ActivitySim{{ActivitySim<br>Format}}
    CnV1("Calibration &<br>Validation")

    DaySim{{DaySim<br>Format}}
    CnV2("Calibration &<br>Validation")

    CTRAMP{{CT-RAMP<br>Format}}
    CnV3("Calibration &<br>Validation")

    Standard{{Standard<br>Format}}
    CnV4("Analysis &<br>Validation")

    style space1 fill:none,stroke:none
  end
end

Pipeline <----> DataModels

subgraph DataModels[**Data Models**]
  direction LR
  Households[["households"]]
  Persons[["   persons   "]]
  Days[["days"]]
  UnlinkedTrips[["unlinked_trips"]]
  LinkedTrips[["linked_trips"]]
  Tours[["tours"]]
end

style Setup fill:#f9f9f9,stroke:#333,stroke-width:1px,color:#000
style RunPy fill:#e1f5e1,color:#000
style ConfigYAML fill:#fff4e1,color:#000
style Pipeline fill:#e3f2fd,stroke:#333,stroke-width:1px,color:#000
style DataModels fill:#f3e5f5,stroke:#333,stroke-width:1px,color:#000
style Format fill:#fff4e1,color:#000

style Output fill:none,stroke:#333,stroke-width:0px,color:#000

style LoadData color:#000
style CustomPre fill:#f0f0f0,color:#000
style Imputation color:#000
style JointTrips color:#000
style LinkTrips color:#000
style ExtractTours color:#000
style Weighting color:#000
style Format color:#000

style Households fill:#d1c4e9,color:#000,stroke:#000
style Persons fill:#d1c4e9,color:#000,stroke:#000
style Days fill:#d1c4e9,color:#000,stroke:#000
style UnlinkedTrips fill:#d1c4e9,color:#000,stroke:#000
style LinkedTrips fill:#d1c4e9,color:#000,stroke:#000
style Tours fill:#d1c4e9,color:#000,stroke:#000

style ActivitySim fill:#ffe0b2,color:#000
style DaySim fill:#ffe0b2,color:#000
style CTRAMP fill:#ffe0b2,color:#000
style Standard fill:#ffe0b2,color:#000
style PUMS fill:#ffe0b2,color:#000

The pipeline system allows you to:

  • Define processing steps as Python functions
  • Configure pipeline execution via YAML files
  • Cache intermediate results for faster re-runs
  • Track and log processing progress
  • Validate data between steps

Pipeline Class

pipeline.pipeline.Pipeline

Class to run a data processing pipeline based on a configuration file.

cache instance-attribute

cache: PipelineCache | None

config_path instance-attribute

config_path = config_path

config instance-attribute

config = self._load_config()

data instance-attribute

steps instance-attribute

steps: dict[str, Callable] = {
    (func.__name__): func for func in (steps or [])
}

_step_status instance-attribute

_step_status: dict[str, dict[str, Any]] = {}

__init__

__init__(
    config_path: str | Path,
    steps: list[Callable] | None = None,
    caching: bool | Path | str = False,
    data_models: dict[str, Any] | None = None,
    log_file_mode: str = "a",
) -> None

Initialize the Pipeline with configuration and custom steps.

Parameters:

Name Type Description Default
config_path str | Path

Path to the YAML configuration.

required
steps list[Callable] | None

Optional list of processing step functions.

None
caching bool | Path | str

If False, disable caching. If True, use default cache directory ".cache". If str or Path, use specified directory for caching.

False
data_models dict[str, Any] | None

Optional dictionary of extra data models for validation. These will be added to the default data models in CanonicalData object.

None
log_file_mode str

File open mode for the log file. Use "a" to append (default) or "w" to overwrite at the start of each run.

'a'

_load_config

_load_config() -> dict[str, Any]

Load the pipeline configuration from a YAML file.

Replaces template variables in the format {{ variable_name }} with their corresponding values defined in the config.

Returns:

Type Description
dict[str, Any]

The configuration dictionary.

_scan_cache

_scan_cache() -> None

Scan cache directory to determine which steps have cached data.

For each step in the config, checks if cache exists and reads metadata from the newest cache key directory.

report_status

report_status() -> None

Report the current pipeline status with ASCII flow diagram.

Shows which steps have cached data available: - ✓ CACHED: Step has valid cache - ✗ NO CACHE: Step caching enabled but no cache exists - ∅ NO CACHE (disabled): Step caching disabled

parse_step_args

parse_step_args(
    step_name: str, step_obj: Callable
) -> dict[str, Any]

Separate the canonical data and parameters.

If argument name matches a canonical table, it is passed from self.data. Else, it is taken from the step configuration "parameters".

Parameters:

Name Type Description Default
step_name str

Name of the step.

required
step_obj Callable

The step function or class.

required

_log_git_version

_log_git_version() -> None

Log the git version of the codebase, including diffs if dirty.

run

run() -> CanonicalData

Run a data processing pipeline based on a configuration file.

_log_cache_stats

_log_cache_stats() -> None

Log cache hit/miss statistics after a run.

_get_available_tables

_get_available_tables() -> dict[str, list[str]]

Get all available tables across cached steps.

Returns:

Type Description
dict[str, list[str]]

Dictionary mapping table names to list of steps containing them.

_find_step_with_table

_find_step_with_table(table_name: str) -> str | None

Find the latest step that has cached data for a table.

Parameters:

Name Type Description Default
table_name str

Name of the table to find.

required

Returns:

Type Description
str | None

Step name containing the table, or None if not found.

_load_from_step

_load_from_step(table_name: str, step_name: str) -> Any

Load a specific table from a specific step's cache.

Parameters:

Name Type Description Default
table_name str

Name of the table to load.

required
step_name str

Name of the step to load from.

required

Returns:

Type Description
Any

The loaded table data.

Raises:

Type Description
ValueError

If step has no cache or table not in step.

get_data

get_data(table_name: str, step: str | None = None) -> Any

Fetch a table from cached pipeline data. If no cache, just return latest.

Parameters:

Name Type Description Default
table_name str

Name of the table to fetch (e.g., 'households', 'trips')

required
step str | None

Optional step name to fetch from. If None, uses the last step that has a cache containing this table.

None

Returns:

Type Description
Any

The requested DataFrame or data object

Raises:

Type Description
ValueError

If table not found in any cached steps, or if specified step doesn't have cache or doesn't contain table.

Example

pipeline = Pipeline(config_path, steps, caching=True)

Fetch from latest cached step

households = pipeline.get_data("households")

Fetch from specific step

trips = pipeline.get_data("linked_trips", step="link_trips")

Caching

pipeline.cache

Pipeline caching system using Parquet for fast checkpointing.

Provides hash-based cache invalidation and parquet storage for pipeline step outputs, enabling fast debugging and iteration.

logger module-attribute

logger = logging.getLogger(__name__)

PipelineCache

Manages parquet-based caching for pipeline steps.

Cache structure

.cache/ {step_name}/ {cache_key}/ metadata.json {table_name}.parquet ...

The cache key is a hash of: - Step name - Input data (schema + row count + content hash of all rows) - Step parameters

This ensures cache invalidation when inputs or configuration change.

_load_data

_load_data(cache_path: Path, name: str, data_type: str) -> Any

Load a single table from cache based on its type.

Parameters:

Name Type Description Default
cache_path Path

Path to the cache directory

required
name str

Name of the table to load

required
data_type str

Type of table ('polars', 'geopandas', or 'pickle')

required

Returns:

Type Description
Any

Loaded object or None if file missing

_save_data

_save_data(
    cache_path: Path, name: str, obj: Any
) -> tuple[str, str]

Save a single object to cache and return metadata.

Parameters:

Name Type Description Default
cache_path Path

Path to the cache directory

required
name str

Name of the table to save

required
obj Any

Object to save

required

Returns:

Type Description
str

Tuple of (data_type, info_string) where info_string is formatted

str

as "name obj_type → format (shape)"

Logging

pipeline.logger

Logging configuration utilities for the pipeline.

setup_logging

setup_logging(
    log_file: Path | str | None,
    console_level: int = logging.INFO,
    file_level: int = logging.DEBUG,
    log_file_mode: str = "a",
) -> logging.Logger

Configure logging to both console and file.

Parameters:

Name Type Description Default
log_file Path | str | None

Path to the log file

required
console_level int

Logging level for console output

logging.INFO
file_level int

Logging level for file output

logging.DEBUG
log_file_mode str

File open mode for the log file. Use "a" to append (default) or "w" to overwrite at the start of each run.

'a'

Returns:

Type Description
logging.Logger

Configured logger instance

Decorators

pipeline.decoration

Decorators for pipeline steps with automatic validation and caching.

logger module-attribute

logger = logging.getLogger(__name__)

CanonicalData dataclass

Canonical data structure for travel survey data with validation.

Use the validate() method to validate specific tables.

TABLE_NAMES class-attribute instance-attribute

TABLE_NAMES: list[str] = field(
    default_factory=lambda: [
        "households",
        "persons",
        "days",
        "unlinked_trips",
        "linked_trips",
        "joint_trips",
        "tours",
    ],
    repr=False,
)

step

step(
    *,
    validate_input: bool = True,
    validate_output: bool = False,
    cache: bool = False
) -> Callable[[Callable[..., Any]], Callable[..., Any]]

Decorator for pipeline steps with automatic validation and caching.

This decorator validates canonical data inputs and/or outputs using the Pydantic models defined in data.models. Only parameters/returns that match canonical table names (households, persons, days, unlinked_trips, linked_trips, tours) are validated.

Validation is skipped for tables that have already been validated if a CanonicalData instance is passed as 'canonical_data' parameter.

When caching is enabled, the decorator will check for cached outputs before executing the step function. If valid cache exists, outputs are loaded from parquet files. Otherwise, the step executes and results are cached after successful validation.

The default value is to only validate inputs to avoid duplicate validation. Recommend putting a final step full_check step at the end of the pipeline to validate all tables after all processing is complete.

Parameters:

Name Type Description Default
validate_input bool

Whether to validate inputs. Defaults to True.

True
validate_output bool

Whether to validate outputs. Defaults to False.

False
cache bool

Whether to enable caching for this step. Defaults to False.

False
Example

@step(validate_input=True) ... def link_trips( ... unlinked_trips: pl.DataFrame, ... config: dict ... ) -> dict[str, pl.DataFrame]: ... # Process trips ... linked_trips = ... ... return {"linked_trips": linked_trips}

@step(validate=False) ... def load_data(input_paths: dict) -> dict[str, pl.DataFrame]: ... return { ... "households": households_df, ... "persons": persons_df, ... ... ... }

Returns:

Type Description
Callable[[Callable[..., Any]], Callable[..., Any]]

Decorated function with validation

_update_canonical_data

_update_canonical_data(
    canonical_data: CanonicalData, result: dict[str, pl.DataFrame]
) -> None

Update canonical_data instance with result DataFrames.

_try_load_from_cache

_try_load_from_cache(
    func: Callable,
    pipeline_cache: Any,
    args: tuple,
    kwargs: dict,
    canonical_data: CanonicalData,
) -> dict[str, pl.DataFrame] | None

Try to load cached result for a step.

Returns cached result dict if found, None otherwise.

_save_to_cache

_save_to_cache(
    func: Callable,
    pipeline_cache: Any,
    args: tuple,
    kwargs: dict,
    result: dict[str, pl.DataFrame],
) -> None

Save step result to cache.

Only caches outputs that are canonical DataFrames.

_validates

_validates(
    func: Callable,
    args: tuple,
    kwargs: dict,
    canonical_data: CanonicalData,
) -> None

Validate input parameters that are canonical DataFrames.

_validate_dict_outputs

_validate_dict_outputs(
    result: dict, func_name: str, canonical_data: CanonicalData
) -> None

Validate outputs in dict format.