Pipeline
The pipeline module provides the execution framework for running sequential data processing steps with configuration, caching, and logging support.
The usage pattern for the pipeline is a bit different than the typical numbered scripts you might see elsewhere. There is no monolithic integrated script. Instead there is a standardized data processing pipeline that is configurable via YAML files and executed via a runner script.
There are three main components:
- Setup
- This contains the point of entry defined in
projects/run.pyand - Pipeline configuration defined in
projects/config.yaml - Pipeline Execution
- The central
Pipelineclass defined insrc/processing/pipeline.py - A set of modular processing steps defined in
src/processing/steps/ - Data Models and Validation
- Canonical data models and validation logic defined in
src/data_canon/ - This can be used standalone, but also is integrated into the pipeline steps for automatic validation
- The data models define the expected schema and content for each data table (households, persons, days, trips, tours, vehicles, etc.) Each field is tagged with a set of constraints, and the step it pertains.
Conceptual Diagram
%%{init: {
"flowchart": {
"rankSpacing": 15,
"nodeSpacing": 20
}
}}%%
flowchart LR
subgraph Setup[**Setup**]
direction TB
RunPy([projects/run.py])
ConfigYAML([projects/config.yaml])
end
RunPy([projects/run.py]) ----> Pipeline
ConfigYAML([projects/config.yaml]) ----> Pipeline
subgraph Pipeline[**Pipeline**]
direction TB
LoadData([Load Data]) --> CustomPre([Custom Pre-Processing])
CustomPre --> Imputation([Imputation])
Imputation --> JointTrips([Joint Trips])
JointTrips --> LinkTrips([Link Trips])
LinkTrips --> ExtractTours([Extract Tours])
ExtractTours --> Format{Format Output}
Format --> PUMS
Format --> ActivitySim --> CnV1 --> ActivitySim
Format --> DaySim --> CnV2 --> DaySim
Format --> CTRAMP --> CnV3 --> CTRAMP
Format --> Standard --> CnV4
subgraph Output[" "]
direction TB
WeightingPipeline --> Modeling
end
subgraph WeightingPipeline[**Weighting**]
direction TB
PUMS --> Weighting([Weighting]) --> Weights
Weighting([Weighting])
Weights[/"Expansion<br>Weights"\]
end
subgraph Modeling["Modeling"]
direction TB
ActivitySim{{ActivitySim<br>Format}}
CnV1("Calibration &<br>Validation")
DaySim{{DaySim<br>Format}}
CnV2("Calibration &<br>Validation")
CTRAMP{{CT-RAMP<br>Format}}
CnV3("Calibration &<br>Validation")
Standard{{Standard<br>Format}}
CnV4("Analysis &<br>Validation")
style space1 fill:none,stroke:none
end
end
Pipeline <----> DataModels
subgraph DataModels[**Data Models**]
direction LR
Households[["households"]]
Persons[[" persons "]]
Days[["days"]]
UnlinkedTrips[["unlinked_trips"]]
LinkedTrips[["linked_trips"]]
Tours[["tours"]]
end
style Setup fill:#f9f9f9,stroke:#333,stroke-width:1px,color:#000
style RunPy fill:#e1f5e1,color:#000
style ConfigYAML fill:#fff4e1,color:#000
style Pipeline fill:#e3f2fd,stroke:#333,stroke-width:1px,color:#000
style DataModels fill:#f3e5f5,stroke:#333,stroke-width:1px,color:#000
style Format fill:#fff4e1,color:#000
style Output fill:none,stroke:#333,stroke-width:0px,color:#000
style LoadData color:#000
style CustomPre fill:#f0f0f0,color:#000
style Imputation color:#000
style JointTrips color:#000
style LinkTrips color:#000
style ExtractTours color:#000
style Weighting color:#000
style Format color:#000
style Households fill:#d1c4e9,color:#000,stroke:#000
style Persons fill:#d1c4e9,color:#000,stroke:#000
style Days fill:#d1c4e9,color:#000,stroke:#000
style UnlinkedTrips fill:#d1c4e9,color:#000,stroke:#000
style LinkedTrips fill:#d1c4e9,color:#000,stroke:#000
style Tours fill:#d1c4e9,color:#000,stroke:#000
style ActivitySim fill:#ffe0b2,color:#000
style DaySim fill:#ffe0b2,color:#000
style CTRAMP fill:#ffe0b2,color:#000
style Standard fill:#ffe0b2,color:#000
style PUMS fill:#ffe0b2,color:#000
The pipeline system allows you to:
- Define processing steps as Python functions
- Configure pipeline execution via YAML files
- Cache intermediate results for faster re-runs
- Track and log processing progress
- Validate data between steps
Pipeline Class
pipeline.pipeline.Pipeline
Class to run a data processing pipeline based on a configuration file.
config_path
instance-attribute
config_path = config_path
config
instance-attribute
config = self._load_config()
steps
instance-attribute
steps: dict[str, Callable] = {
(func.__name__): func for func in (steps or [])
}
_step_status
instance-attribute
_step_status: dict[str, dict[str, Any]] = {}
__init__
__init__(
config_path: str | Path,
steps: list[Callable] | None = None,
caching: bool | Path | str = False,
data_models: dict[str, Any] | None = None,
log_file_mode: str = "a",
) -> None
Initialize the Pipeline with configuration and custom steps.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config_path
|
str | Path
|
Path to the YAML configuration. |
required |
steps
|
list[Callable] | None
|
Optional list of processing step functions. |
None
|
caching
|
bool | Path | str
|
If False, disable caching. If True, use default cache directory ".cache". If str or Path, use specified directory for caching. |
False
|
data_models
|
dict[str, Any] | None
|
Optional dictionary of extra data models for validation. These will be added to the default data models in CanonicalData object. |
None
|
log_file_mode
|
str
|
File open mode for the log file. Use "a" to append (default) or "w" to overwrite at the start of each run. |
'a'
|
_load_config
_load_config() -> dict[str, Any]
Load the pipeline configuration from a YAML file.
Replaces template variables in the format {{ variable_name }} with their corresponding values defined in the config.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
The configuration dictionary. |
_scan_cache
_scan_cache() -> None
Scan cache directory to determine which steps have cached data.
For each step in the config, checks if cache exists and reads metadata from the newest cache key directory.
report_status
report_status() -> None
Report the current pipeline status with ASCII flow diagram.
Shows which steps have cached data available: - ✓ CACHED: Step has valid cache - ✗ NO CACHE: Step caching enabled but no cache exists - ∅ NO CACHE (disabled): Step caching disabled
parse_step_args
parse_step_args(
step_name: str, step_obj: Callable
) -> dict[str, Any]
Separate the canonical data and parameters.
If argument name matches a canonical table, it is passed from self.data. Else, it is taken from the step configuration "parameters".
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
step_name
|
str
|
Name of the step. |
required |
step_obj
|
Callable
|
The step function or class. |
required |
_log_git_version
_log_git_version() -> None
Log the git version of the codebase, including diffs if dirty.
_log_cache_stats
_log_cache_stats() -> None
Log cache hit/miss statistics after a run.
_get_available_tables
_get_available_tables() -> dict[str, list[str]]
Get all available tables across cached steps.
Returns:
| Type | Description |
|---|---|
dict[str, list[str]]
|
Dictionary mapping table names to list of steps containing them. |
_find_step_with_table
_find_step_with_table(table_name: str) -> str | None
Find the latest step that has cached data for a table.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table_name
|
str
|
Name of the table to find. |
required |
Returns:
| Type | Description |
|---|---|
str | None
|
Step name containing the table, or None if not found. |
_load_from_step
_load_from_step(table_name: str, step_name: str) -> Any
Load a specific table from a specific step's cache.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table_name
|
str
|
Name of the table to load. |
required |
step_name
|
str
|
Name of the step to load from. |
required |
Returns:
| Type | Description |
|---|---|
Any
|
The loaded table data. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If step has no cache or table not in step. |
get_data
get_data(table_name: str, step: str | None = None) -> Any
Fetch a table from cached pipeline data. If no cache, just return latest.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table_name
|
str
|
Name of the table to fetch (e.g., 'households', 'trips') |
required |
step
|
str | None
|
Optional step name to fetch from. If None, uses the last step that has a cache containing this table. |
None
|
Returns:
| Type | Description |
|---|---|
Any
|
The requested DataFrame or data object |
Raises:
| Type | Description |
|---|---|
ValueError
|
If table not found in any cached steps, or if specified step doesn't have cache or doesn't contain table. |
Example
pipeline = Pipeline(config_path, steps, caching=True)
Fetch from latest cached step
households = pipeline.get_data("households")
Fetch from specific step
trips = pipeline.get_data("linked_trips", step="link_trips")
Caching
pipeline.cache
Pipeline caching system using Parquet for fast checkpointing.
Provides hash-based cache invalidation and parquet storage for pipeline step outputs, enabling fast debugging and iteration.
PipelineCache
Manages parquet-based caching for pipeline steps.
Cache structure
.cache/ {step_name}/ {cache_key}/ metadata.json {table_name}.parquet ...
The cache key is a hash of: - Step name - Input data (schema + row count + content hash of all rows) - Step parameters
This ensures cache invalidation when inputs or configuration change.
_load_data
_load_data(cache_path: Path, name: str, data_type: str) -> Any
Load a single table from cache based on its type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cache_path
|
Path
|
Path to the cache directory |
required |
name
|
str
|
Name of the table to load |
required |
data_type
|
str
|
Type of table ('polars', 'geopandas', or 'pickle') |
required |
Returns:
| Type | Description |
|---|---|
Any
|
Loaded object or None if file missing |
_save_data
_save_data(
cache_path: Path, name: str, obj: Any
) -> tuple[str, str]
Save a single object to cache and return metadata.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cache_path
|
Path
|
Path to the cache directory |
required |
name
|
str
|
Name of the table to save |
required |
obj
|
Any
|
Object to save |
required |
Returns:
| Type | Description |
|---|---|
str
|
Tuple of (data_type, info_string) where info_string is formatted |
str
|
as "name obj_type → format (shape)" |
Logging
pipeline.logger
Logging configuration utilities for the pipeline.
setup_logging
setup_logging(
log_file: Path | str | None,
console_level: int = logging.INFO,
file_level: int = logging.DEBUG,
log_file_mode: str = "a",
) -> logging.Logger
Configure logging to both console and file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
log_file
|
Path | str | None
|
Path to the log file |
required |
console_level
|
int
|
Logging level for console output |
logging.INFO
|
file_level
|
int
|
Logging level for file output |
logging.DEBUG
|
log_file_mode
|
str
|
File open mode for the log file. Use "a" to append (default) or "w" to overwrite at the start of each run. |
'a'
|
Returns:
| Type | Description |
|---|---|
logging.Logger
|
Configured logger instance |
Decorators
pipeline.decoration
Decorators for pipeline steps with automatic validation and caching.
CanonicalData
dataclass
Canonical data structure for travel survey data with validation.
Use the validate() method to validate specific tables.
TABLE_NAMES
class-attribute
instance-attribute
TABLE_NAMES: list[str] = field(
default_factory=lambda: [
"households",
"persons",
"days",
"unlinked_trips",
"linked_trips",
"joint_trips",
"tours",
],
repr=False,
)
step
step(
*,
validate_input: bool = True,
validate_output: bool = False,
cache: bool = False
) -> Callable[[Callable[..., Any]], Callable[..., Any]]
Decorator for pipeline steps with automatic validation and caching.
This decorator validates canonical data inputs and/or outputs using the Pydantic models defined in data.models. Only parameters/returns that match canonical table names (households, persons, days, unlinked_trips, linked_trips, tours) are validated.
Validation is skipped for tables that have already been validated if a CanonicalData instance is passed as 'canonical_data' parameter.
When caching is enabled, the decorator will check for cached outputs before executing the step function. If valid cache exists, outputs are loaded from parquet files. Otherwise, the step executes and results are cached after successful validation.
The default value is to only validate inputs to avoid duplicate validation. Recommend putting a final step full_check step at the end of the pipeline to validate all tables after all processing is complete.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
validate_input
|
bool
|
Whether to validate inputs. Defaults to True. |
True
|
validate_output
|
bool
|
Whether to validate outputs. Defaults to False. |
False
|
cache
|
bool
|
Whether to enable caching for this step. Defaults to False. |
False
|
Example
@step(validate_input=True) ... def link_trips( ... unlinked_trips: pl.DataFrame, ... config: dict ... ) -> dict[str, pl.DataFrame]: ... # Process trips ... linked_trips = ... ... return {"linked_trips": linked_trips}
@step(validate=False) ... def load_data(input_paths: dict) -> dict[str, pl.DataFrame]: ... return { ... "households": households_df, ... "persons": persons_df, ... ... ... }
Returns:
| Type | Description |
|---|---|
Callable[[Callable[..., Any]], Callable[..., Any]]
|
Decorated function with validation |
_update_canonical_data
_update_canonical_data(
canonical_data: CanonicalData, result: dict[str, pl.DataFrame]
) -> None
Update canonical_data instance with result DataFrames.
_try_load_from_cache
_try_load_from_cache(
func: Callable,
pipeline_cache: Any,
args: tuple,
kwargs: dict,
canonical_data: CanonicalData,
) -> dict[str, pl.DataFrame] | None
Try to load cached result for a step.
Returns cached result dict if found, None otherwise.
_save_to_cache
_save_to_cache(
func: Callable,
pipeline_cache: Any,
args: tuple,
kwargs: dict,
result: dict[str, pl.DataFrame],
) -> None
Save step result to cache.
Only caches outputs that are canonical DataFrames.
_validates
_validates(
func: Callable,
args: tuple,
kwargs: dict,
canonical_data: CanonicalData,
) -> None
Validate input parameters that are canonical DataFrames.
_validate_dict_outputs
_validate_dict_outputs(
result: dict, func_name: str, canonical_data: CanonicalData
) -> None
Validate outputs in dict format.