Skip to content

Utilities

Utility functions and helper classes used throughout Network Wrangler.

Core Utilities

General utility functions used throughout package.

network_wrangler.utils.utils.DictionaryMergeError

Bases: Exception

Error raised when there is a conflict in merging two dictionaries.

Source code in network_wrangler/utils/utils.py
class DictionaryMergeError(Exception):
    """Error raised when there is a conflict in merging two dictionaries."""

network_wrangler.utils.utils.check_one_or_one_superset_present

check_one_or_one_superset_present(mixed_list, all_fields_present)

Checks that exactly one of the fields in mixed_list is in fields_present or one superset.

Source code in network_wrangler/utils/utils.py
def check_one_or_one_superset_present(
    mixed_list: list[Union[str, list[str]]], all_fields_present: list[str]
) -> bool:
    """Checks that exactly one of the fields in mixed_list is in fields_present or one superset."""
    normalized_list = normalize_to_lists(mixed_list)

    list_items_present = [i for i in normalized_list if set(i).issubset(all_fields_present)]

    if len(list_items_present) == 1:
        return True

    return list_elements_subset_of_single_element(list_items_present)

network_wrangler.utils.utils.combine_unique_unhashable_list

combine_unique_unhashable_list(list1, list2)

Combines lists preserving order of first and removing duplicates.

Parameters:

  • list1 (list) –

    The first list.

  • list2 (list) –

    The second list.

Returns:

  • list

    A new list containing the elements from list1 followed by the

  • unique elements from list2.

Example

list1 = [1, 2, 3] list2 = [2, 3, 4, 5] combine_unique_unhashable_list(list1, list2) [1, 2, 3, 4, 5]

Source code in network_wrangler/utils/utils.py
def combine_unique_unhashable_list(list1: list, list2: list):
    """Combines lists preserving order of first and removing duplicates.

    Args:
        list1 (list): The first list.
        list2 (list): The second list.

    Returns:
        list: A new list containing the elements from list1 followed by the
        unique elements from list2.

    Example:
        >>> list1 = [1, 2, 3]
        >>> list2 = [2, 3, 4, 5]
        >>> combine_unique_unhashable_list(list1, list2)
        [1, 2, 3, 4, 5]
    """
    return [item for item in list1 if item not in list2] + list2

network_wrangler.utils.utils.delete_keys_from_dict

delete_keys_from_dict(dictionary, keys)

Removes list of keys from potentially nested dictionary.

SOURCE: https://stackoverflow.com/questions/3405715/ User: @mseifert

Parameters:

  • dictionary (dict) –

    dictionary to remove keys from

  • keys (list) –

    list of keys to remove

Source code in network_wrangler/utils/utils.py
def delete_keys_from_dict(dictionary: dict, keys: list) -> dict:
    """Removes list of keys from potentially nested dictionary.

    SOURCE: https://stackoverflow.com/questions/3405715/
    User: @mseifert

    Args:
        dictionary: dictionary to remove keys from
        keys: list of keys to remove

    """
    keys_set = list(set(keys))  # Just an optimization for the "if key in keys" lookup.

    modified_dict = {}
    for key, value in dictionary.items():
        if key not in keys_set:
            if isinstance(value, dict):
                modified_dict[key] = delete_keys_from_dict(value, keys_set)
            else:
                modified_dict[key] = (
                    value  # or copy.deepcopy(value) if a copy is desired for non-dicts.
                )
    return modified_dict

network_wrangler.utils.utils.dict_to_hexkey

dict_to_hexkey(d)

Converts a dictionary to a hexdigest of the sha1 hash of the dictionary.

Parameters:

  • d (dict) –

    dictionary to convert to string

Returns:

  • str ( str ) –

    hexdigest of the sha1 hash of dictionary

Source code in network_wrangler/utils/utils.py
def dict_to_hexkey(d: dict) -> str:
    """Converts a dictionary to a hexdigest of the sha1 hash of the dictionary.

    Args:
        d (dict): dictionary to convert to string

    Returns:
        str: hexdigest of the sha1 hash of dictionary
    """
    return hashlib.sha1(str(d).encode()).hexdigest()

network_wrangler.utils.utils.findkeys

findkeys(node, kv)

Returns values of all keys in various objects.

Adapted from arainchi on Stack Overflow: https://stackoverflow.com/questions/9807634/find-all-occurrences-of-a-key-in-nested-dictionaries-and-lists

Source code in network_wrangler/utils/utils.py
def findkeys(node, kv):
    """Returns values of all keys in various objects.

    Adapted from arainchi on Stack Overflow:
    https://stackoverflow.com/questions/9807634/find-all-occurrences-of-a-key-in-nested-dictionaries-and-lists
    """
    if isinstance(node, list):
        for i in node:
            for x in findkeys(i, kv):
                yield x
    elif isinstance(node, dict):
        if kv in node:
            yield node[kv]
        for j in node.values():
            for x in findkeys(j, kv):
                yield x

network_wrangler.utils.utils.get_overlapping_range

get_overlapping_range(ranges)

Returns the overlapping range for a list of ranges or tuples defining ranges.

Parameters:

  • ranges (list[Union[tuple[int], range]]) –

    A list of ranges or tuples defining ranges.

Returns:

  • Union[None, range]

    Union[None, range]: The overlapping range if found, otherwise None.

Example

ranges = [(1, 5), (3, 7), (6, 10)] get_overlapping_range(ranges) range(3, 5)

Source code in network_wrangler/utils/utils.py
def get_overlapping_range(ranges: list[Union[tuple[int, int], range]]) -> Union[None, range]:
    """Returns the overlapping range for a list of ranges or tuples defining ranges.

    Args:
        ranges (list[Union[tuple[int], range]]): A list of ranges or tuples defining ranges.

    Returns:
        Union[None, range]: The overlapping range if found, otherwise None.

    Example:
        >>> ranges = [(1, 5), (3, 7), (6, 10)]
        >>> get_overlapping_range(ranges)
        range(3, 5)

    """
    # check that any tuples have two values
    if any(isinstance(r, tuple) and len(r) != 2 for r in ranges):  # noqa: PLR2004
        msg = "Tuple ranges must have two values."
        WranglerLogger.error(msg)
        raise ValueError(msg)

    _ranges = [r if isinstance(r, range) else range(r[0], r[1]) for r in ranges]

    _overlap_start = max(r.start for r in _ranges)
    _overlap_end = min(r.stop for r in _ranges)

    if _overlap_start < _overlap_end:
        return range(_overlap_start, _overlap_end)
    return None

network_wrangler.utils.utils.list_elements_subset_of_single_element

list_elements_subset_of_single_element(mixed_list)

Find the first list in the mixed_list.

Source code in network_wrangler/utils/utils.py
@validate_call
def list_elements_subset_of_single_element(mixed_list: list[Union[str, list[str]]]) -> bool:
    """Find the first list in the mixed_list."""
    potential_supersets = []
    for item in mixed_list:
        if isinstance(item, list) and len(item) > 0:
            potential_supersets.append(set(item))

    # If no list is found, return False
    if not potential_supersets:
        return False

    normalized_list = normalize_to_lists(mixed_list)

    valid_supersets = []
    for ss in potential_supersets:
        if all(ss.issuperset(i) for i in normalized_list):
            valid_supersets.append(ss)

    return len(valid_supersets) == 1

network_wrangler.utils.utils.make_slug

make_slug(text, delimiter='_')

Makes a slug from text.

Source code in network_wrangler/utils/utils.py
def make_slug(text: str, delimiter: str = "_") -> str:
    """Makes a slug from text."""
    text = re.sub("[,.;@#?!&$']+", "", text.lower())
    return re.sub("[\ ]+", delimiter, text)

network_wrangler.utils.utils.merge_dicts

merge_dicts(right, left, path=None)

Merges the contents of nested dict left into nested dict right.

Raises errors in case of namespace conflicts.

Parameters:

  • right

    dict, modified in place

  • left

    dict to be merged into right

  • path

    default None, sequence of keys to be reported in case of error in merging nested dictionaries

Source code in network_wrangler/utils/utils.py
def merge_dicts(right, left, path=None):
    """Merges the contents of nested dict left into nested dict right.

    Raises errors in case of namespace conflicts.

    Args:
        right: dict, modified in place
        left: dict to be merged into right
        path: default None, sequence of keys to be reported in case of
            error in merging nested dictionaries
    """
    if path is None:
        path = []
    for key in left:
        if key in right:
            if isinstance(right[key], dict) and isinstance(left[key], dict):
                merge_dicts(right[key], left[key], [*path, str(key)])
            else:
                path = ".".join([*path, str(key)])
                msg = f"duplicate keys in source dict files: {path}"
                WranglerLogger.error(msg)
                raise DictionaryMergeError(msg)
        else:
            right[key] = left[key]

network_wrangler.utils.utils.normalize_to_lists

normalize_to_lists(mixed_list)

Turn a mixed list of scalars and lists into a list of lists.

Source code in network_wrangler/utils/utils.py
def normalize_to_lists(mixed_list: list[Union[str, list]]) -> list[list]:
    """Turn a mixed list of scalars and lists into a list of lists."""
    normalized_list = []
    for item in mixed_list:
        if isinstance(item, str):
            normalized_list.append([item])
        else:
            normalized_list.append(item)
    return normalized_list

network_wrangler.utils.utils.split_string_prefix_suffix_from_num

split_string_prefix_suffix_from_num(input_string)

Split a string prefix and suffix from last number.

Parameters:

  • input_string (str) –

    The input string to be processed.

Returns:

  • tuple

    A tuple containing the prefix (including preceding numbers), the last numeric part as an integer, and the suffix.

Notes

This function uses regular expressions to split a string into three parts: the prefix, the last numeric part, and the suffix. The prefix includes any preceding numbers, the last numeric part is converted to an integer, and the suffix includes any non-digit characters after the last numeric part.

Examples:

>>> split_string_prefix_suffix_from_num("abc123def456")
('abc', 123, 'def456')
>>> split_string_prefix_suffix_from_num("hello")
('hello', 0, '')
>>> split_string_prefix_suffix_from_num("123")
('', 123, '')
Source code in network_wrangler/utils/utils.py
def split_string_prefix_suffix_from_num(input_string: str):
    """Split a string prefix and suffix from *last* number.

    Args:
        input_string (str): The input string to be processed.

    Returns:
        tuple: A tuple containing the prefix (including preceding numbers),
               the last numeric part as an integer, and the suffix.

    Notes:
        This function uses regular expressions to split a string into three parts:
        the prefix, the last numeric part, and the suffix. The prefix includes any
        preceding numbers, the last numeric part is converted to an integer, and
        the suffix includes any non-digit characters after the last numeric part.

    Examples:
        >>> split_string_prefix_suffix_from_num("abc123def456")
        ('abc', 123, 'def456')

        >>> split_string_prefix_suffix_from_num("hello")
        ('hello', 0, '')

        >>> split_string_prefix_suffix_from_num("123")
        ('', 123, '')

    """
    input_string = str(input_string)
    pattern = re.compile(r"(.*?)(\d+)(\D*)$")
    match = pattern.match(input_string)

    if match:
        # Extract the groups: prefix (including preceding numbers), last numeric part, suffix
        prefix, numeric_part, suffix = match.groups()
        # Convert the numeric part to an integer
        num_variable = int(numeric_part)
        return prefix, num_variable, suffix
    return input_string, 0, ""

network_wrangler.utils.utils.topological_sort

topological_sort(adjacency_list, visited_list)

Topological sorting for Acyclic Directed Graph.

Parameters: - adjacency_list (dict): A dictionary representing the adjacency list of the graph. - visited_list (list): A list representing the visited status of each vertex in the graph.

Returns: - output_stack (list): A list containing the vertices in topological order.

This function performs a topological sort on an acyclic directed graph. It takes an adjacency list and a visited list as input. The adjacency list represents the connections between vertices in the graph, and the visited list keeps track of the visited status of each vertex.

The function uses a recursive helper function to perform the topological sort. It starts by iterating over each vertex in the visited list. For each unvisited vertex, it calls the helper function, which recursively visits all the neighbors of the vertex and adds them to the output stack in reverse order. Finally, it returns the output stack, which contains the vertices in topological order.

Source code in network_wrangler/utils/utils.py
def topological_sort(adjacency_list, visited_list):
    """Topological sorting for Acyclic Directed Graph.

    Parameters:
    - adjacency_list (dict): A dictionary representing the adjacency list of the graph.
    - visited_list (list): A list representing the visited status of each vertex in the graph.

    Returns:
    - output_stack (list): A list containing the vertices in topological order.

    This function performs a topological sort on an acyclic directed graph. It takes an adjacency
    list and a visited list as input. The adjacency list represents the connections between
    vertices in the graph, and the visited list keeps track of the visited status of each vertex.

    The function uses a recursive helper function to perform the topological sort. It starts by
    iterating over each vertex in the visited list. For each unvisited vertex, it calls the helper
    function, which recursively visits all the neighbors of the vertex and adds them to the output
    stack in reverse order. Finally, it returns the output stack, which contains the vertices in
    topological order.
    """
    output_stack = []

    def _topology_sort_util(vertex):
        if not visited_list[vertex]:
            visited_list[vertex] = True
            for neighbor in adjacency_list[vertex]:
                _topology_sort_util(neighbor)
            output_stack.insert(0, vertex)

    for vertex in visited_list:
        _topology_sort_util(vertex)

    return output_stack

Package Constants

Parameters for Network Wrangler which should not be changed by the user.

Parameters that are here are used throughout the codebase and are stated here for easy reference. Additional parameters that are more narrowly scoped are defined in the appropriate modules.

Changing these parameters may have unintended consequences and should only be done by developers who understand the codebase.

network_wrangler.params.SMALL_RECS module-attribute

SMALL_RECS = 5

Number of records to display in a dataframe summary.

I/O Utilities

Helper functions for reading and writing files to reduce boilerplate.

network_wrangler.utils.io_table.FileReadError

Bases: Exception

Raised when there is an error reading a file.

Source code in network_wrangler/utils/io_table.py
class FileReadError(Exception):
    """Raised when there is an error reading a file."""

network_wrangler.utils.io_table.FileWriteError

Bases: Exception

Raised when there is an error writing a file.

Source code in network_wrangler/utils/io_table.py
class FileWriteError(Exception):
    """Raised when there is an error writing a file."""

network_wrangler.utils.io_table.convert_file_serialization

convert_file_serialization(input_file, output_file, overwrite=True, boundary_gdf=None, boundary_geocode=None, boundary_file=None, node_filter_s=None, chunk_size=None)

Convert a file serialization format to another and optionally filter to a boundary.

If the input file is a JSON file that is larger than a reasonable portion of available memory, and the output file is a Parquet file the JSON file will be read in chunks.

If the input file is a Geographic data type (shp, geojon, geoparquet) and a boundary is provided, the data will be filtered to the boundary.

Parameters:

  • input_file (Path) –

    Path to the input JSON or GEOJSON file.

  • output_file (Path) –

    Path to the output Parquet file.

  • overwrite (bool, default: True ) –

    If True, overwrite the output file if it exists.

  • boundary_gdf (Optional[GeoDataFrame], default: None ) –

    GeoDataFrame to filter the input data to. Only used for geographic data. Defaults to None.

  • boundary_geocode (Optional[str], default: None ) –

    Geocode to filter the input data to. Only used for geographic data. Defaults to None.

  • boundary_file (Optional[Path], default: None ) –

    File to load as a boundary to filter the input data to. Only used for geographic data. Defaults to None.

  • node_filter_s (Optional[Series], default: None ) –

    If provided, will filter links in .json file to only those that connect to nodes. Defaults to None.

  • chunk_size (Optional[int], default: None ) –

    Number of JSON objects to process in each chunk. Only works for JSON to Parquet. If None, will determine if chunking needed and what size.

Source code in network_wrangler/utils/io_table.py
def convert_file_serialization(
    input_file: Path,
    output_file: Path,
    overwrite: bool = True,
    boundary_gdf: Optional[gpd.GeoDataFrame] = None,
    boundary_geocode: Optional[str] = None,
    boundary_file: Optional[Path] = None,
    node_filter_s: Optional[pd.Series] = None,
    chunk_size: Optional[int] = None,
):
    """Convert a file serialization format to another and optionally filter to a boundary.

    If the input file is a JSON file that is larger than a reasonable portion of available
    memory, *and* the output file is a Parquet file the JSON file will be read in chunks.

    If the input file is a Geographic data type (shp, geojon, geoparquet) and a boundary is
    provided, the data will be filtered to the boundary.

    Args:
        input_file: Path to the input JSON or GEOJSON file.
        output_file: Path to the output Parquet file.
        overwrite: If True, overwrite the output file if it exists.
        boundary_gdf: GeoDataFrame to filter the input data to. Only used for geographic data.
            Defaults to None.
        boundary_geocode: Geocode to filter the input data to. Only used for geographic data.
            Defaults to None.
        boundary_file: File to load as a boundary to filter the input data to. Only used for
            geographic data. Defaults to None.
        node_filter_s: If provided, will filter links in .json file to only those that connect to
            nodes. Defaults to None.
        chunk_size: Number of JSON objects to process in each chunk. Only works for
            JSON to Parquet. If None, will determine if chunking needed and what size.
    """
    WranglerLogger.debug(f"Converting {input_file} to {output_file}.")

    if output_file.exists() and not overwrite:
        msg = f"File {output_file} already exists and overwrite is False."
        raise FileExistsError(msg)

    if Path(input_file).suffix == ".json" and Path(output_file).suffix == ".parquet":
        if chunk_size is None:
            chunk_size = _suggest_json_chunk_size(input_file)
        if chunk_size is None:
            df = read_table(input_file)
            if node_filter_s is not None and "A" in df.columns and "B" in df.columns:
                df = df[df["A"].isin(node_filter_s) | df["B"].isin(node_filter_s)]
            write_table(df, output_file, overwrite=overwrite)
        else:
            _json_to_parquet_in_chunks(input_file, output_file, chunk_size)

    df = read_table(
        input_file,
        boundary_gdf=boundary_gdf,
        boundary_geocode=boundary_geocode,
        boundary_file=boundary_file,
    )
    if node_filter_s is not None and "A" in df.columns and "B" in df.columns:
        df = df[df["A"].isin(node_filter_s) | df["B"].isin(node_filter_s)]
    write_table(df, output_file, overwrite=overwrite)

network_wrangler.utils.io_table.prep_dir

prep_dir(outdir, overwrite=True)

Prepare a directory for writing files.

Source code in network_wrangler/utils/io_table.py
def prep_dir(outdir: Path, overwrite: bool = True):
    """Prepare a directory for writing files."""
    if not overwrite and outdir.exists() and len(list(outdir.iterdir())) > 0:
        msg = f"Directory {outdir} is not empty and overwrite is False."
        raise FileExistsError(msg)
    outdir.mkdir(parents=True, exist_ok=True)

    # clean out existing files
    for f in outdir.iterdir():
        if f.is_file():
            f.unlink()

network_wrangler.utils.io_table.read_table

read_table(filename, sub_filename=None, boundary_gdf=None, boundary_geocode=None, boundary_file=None, read_speed=DefaultConfig.CPU.EST_PD_READ_SPEED)

Read file and return a dataframe or geodataframe.

If filename is a zip file, will unzip to a temporary directory.

If filename is a geojson or shapefile, will filter the data to the boundary_gdf, boundary_geocode, or boundary_file if provided. Note that you can only provide one of these boundary filters.

If filename is a geoparquet file, will filter the data to the bounding box of the boundary_gdf, boundary_geocode, or boundary_file if provided. Note that you can only provide one of these boundary filters.

NOTE: if you are accessing multiple files from this zip file you will want to unzip it first and THEN access the table files so you don’t create multiple duplicate unzipped tmp dirs.

Parameters:

  • filename (Path) –

    filename to load.

  • sub_filename (Optional[str], default: None ) –

    if the file is a zip, the sub_filename to load.

  • boundary_gdf (Optional[GeoDataFrame], default: None ) –

    GeoDataFrame to filter the input data to. Only used for geographic data. Defaults to None.

  • boundary_geocode (Optional[str], default: None ) –

    Geocode to filter the input data to. Only used for geographic data. Defaults to None.

  • boundary_file (Optional[Path], default: None ) –

    File to load as a boundary to filter the input data to. Only used for geographic data. Defaults to None.

  • read_speed (dict, default: EST_PD_READ_SPEED ) –

    dictionary of read speeds for different file types. Defaults to DefaultConfig.CPU.EST_PD_READ_SPEED.

Source code in network_wrangler/utils/io_table.py
def read_table(
    filename: Path,
    sub_filename: Optional[str] = None,
    boundary_gdf: Optional[gpd.GeoDataFrame] = None,
    boundary_geocode: Optional[str] = None,
    boundary_file: Optional[Path] = None,
    read_speed: dict = DefaultConfig.CPU.EST_PD_READ_SPEED,
) -> Union[pd.DataFrame, gpd.GeoDataFrame]:
    """Read file and return a dataframe or geodataframe.

    If filename is a zip file, will unzip to a temporary directory.

    If filename is a geojson or shapefile, will filter the data
    to the boundary_gdf, boundary_geocode, or boundary_file if provided. Note that you can only
    provide one of these boundary filters.

    If filename is a geoparquet file, will filter the data to the *bounding box* of the
    boundary_gdf, boundary_geocode, or boundary_file if provided. Note that you can only
    provide one of these boundary filters.

    NOTE:  if you are accessing multiple files from this zip file you will want to unzip it first
    and THEN access the table files so you don't create multiple duplicate unzipped tmp dirs.

    Args:
        filename (Path): filename to load.
        sub_filename: if the file is a zip, the sub_filename to load.
        boundary_gdf: GeoDataFrame to filter the input data to. Only used for geographic data.
            Defaults to None.
        boundary_geocode: Geocode to filter the input data to. Only used for geographic data.
            Defaults to None.
        boundary_file: File to load as a boundary to filter the input data to. Only used for
            geographic data. Defaults to None.
        read_speed: dictionary of read speeds for different file types. Defaults to
            DefaultConfig.CPU.EST_PD_READ_SPEED.
    """
    filename = Path(filename)
    if not filename.exists():
        msg = f"Input file {filename} does not exist."
        raise FileNotFoundError(msg)
    if filename.stat().st_size == 0:
        msg = f"File {filename} is empty."
        raise FileExistsError(msg)
    if filename.suffix == ".zip":
        if not sub_filename:
            msg = "sub_filename must be provided for zip files."
            raise ValueError(msg)
        filename = unzip_file(filename) / sub_filename
    WranglerLogger.debug(
        f"Estimated read time: {_estimate_read_time_of_file(filename, read_speed)}."
    )

    # will result in None if no boundary is provided
    mask_gdf = get_bounding_polygon(
        boundary_gdf=boundary_gdf,
        boundary_geocode=boundary_geocode,
        boundary_file=boundary_file,
    )

    if any(x in filename.suffix for x in ["geojson", "shp", "csv"]):
        try:
            # masking only supported by fiona engine, which is slower.
            if mask_gdf is None:
                return gpd.read_file(filename, engine="pyogrio")
            return gpd.read_file(filename, mask=mask_gdf, engine="fiona")
        except Exception as err:
            if "csv" in filename.suffix:
                return pd.read_csv(filename)
            raise FileReadError from err
    elif "parquet" in filename.suffix:
        return _read_parquet_table(filename, mask_gdf)
    elif "json" in filename.suffix:
        with filename.open() as f:
            return pd.read_json(f, orient="records")
    msg = f"Filetype {filename.suffix} not implemented."
    raise NotImplementedError(msg)

network_wrangler.utils.io_table.unzip_file

unzip_file(path)

Unzips a file to a temporary directory and returns the directory path.

Source code in network_wrangler/utils/io_table.py
def unzip_file(path: Path) -> Path:
    """Unzips a file to a temporary directory and returns the directory path."""
    tmpdir = tempfile.mkdtemp()
    shutil.unpack_archive(path, tmpdir)

    def finalize() -> None:
        shutil.rmtree(tmpdir)

    # Lazy cleanup
    weakref.finalize(tmpdir, finalize)

    return Path(tmpdir)

network_wrangler.utils.io_table.write_table

write_table(df, filename, overwrite=False, **kwargs)

Write a dataframe or geodataframe to a file.

Parameters:

  • df (DataFrame) –

    dataframe to write.

  • filename (Path) –

    filename to write to.

  • overwrite (bool, default: False ) –

    whether to overwrite the file if it exists. Defaults to False.

  • kwargs

    additional arguments to pass to the writer.

Source code in network_wrangler/utils/io_table.py
def write_table(
    df: Union[pd.DataFrame, gpd.GeoDataFrame],
    filename: Path,
    overwrite: bool = False,
    **kwargs,
) -> None:
    """Write a dataframe or geodataframe to a file.

    Args:
        df (pd.DataFrame): dataframe to write.
        filename (Path): filename to write to.
        overwrite (bool): whether to overwrite the file if it exists. Defaults to False.
        kwargs: additional arguments to pass to the writer.

    """
    filename = Path(filename)
    if filename.exists() and not overwrite:
        msg = f"File {filename} already exists and overwrite is False."
        raise FileExistsError(msg)

    if filename.parent.is_dir() and not filename.parent.exists():
        filename.parent.mkdir(parents=True)

    WranglerLogger.debug(f"Writing to {filename}.")

    if "shp" in filename.suffix:
        df.to_file(filename, index=False, **kwargs)
    elif "parquet" in filename.suffix:
        df.to_parquet(filename, index=False, **kwargs)
    elif "csv" in filename.suffix or "txt" in filename.suffix:
        df.to_csv(filename, index=False, date_format="%H:%M:%S", **kwargs)
    elif "geojson" in filename.suffix:
        # required due to issues with list-like columns
        if isinstance(df, gpd.GeoDataFrame):
            data = df.to_json(drop_id=True)
        else:
            data = df.to_json(orient="records", index=False)
        with filename.open("w", encoding="utf-8") as file:
            file.write(data)
    elif "json" in filename.suffix:
        with filename.open("w") as f:
            f.write(df.to_json(orient="records"))
    else:
        msg = f"Filetype {filename.suffix} not implemented."
        raise NotImplementedError(msg)

Utility functions for loading dictionaries from files.

network_wrangler.utils.io_dict.load_dict

load_dict(path)

Load a dictionary from a file.

Source code in network_wrangler/utils/io_dict.py
def load_dict(path: Path) -> dict:
    """Load a dictionary from a file."""
    path = Path(path)
    if not path.is_file():
        msg = f"Specified dict file {path} not found."
        raise FileNotFoundError(msg)

    if path.suffix.lower() == ".toml":
        return _load_toml(path)
    if path.suffix.lower() == ".json":
        return _load_json(path)
    if path.suffix.lower() == ".yaml" or path.suffix.lower() == ".yml":
        return _load_yaml(path)
    msg = f"Filetype {path.suffix} not implemented."
    raise NotImplementedError(msg)

network_wrangler.utils.io_dict.load_merge_dict

load_merge_dict(path)

Load and merge multiple dictionaries from files.

Source code in network_wrangler/utils/io_dict.py
def load_merge_dict(path: Union[Path, list[Path]]) -> dict:
    """Load and merge multiple dictionaries from files."""
    if not isinstance(path, list):
        path = [path]
    data = load_dict(path[0])
    for path_item in path[1:]:
        merge_dicts(data, load_dict(path_item))
    return data

Data Manipulation

Helper functions for data models.

network_wrangler.utils.models.DatamodelDataframeIncompatableError

Bases: Exception

Raised when a data model and a dataframe are not compatable.

Source code in network_wrangler/utils/models.py
class DatamodelDataframeIncompatableError(Exception):
    """Raised when a data model and a dataframe are not compatable."""

network_wrangler.utils.models.TableValidationError

Bases: Exception

Raised when a table validation fails.

Source code in network_wrangler/utils/models.py
class TableValidationError(Exception):
    """Raised when a table validation fails."""

network_wrangler.utils.models.coerce_extra_fields_to_type_in_df

coerce_extra_fields_to_type_in_df(data, model, df)

Coerce extra fields in data that aren’t specified in Pydantic model to the type in the df.

Note: will not coerce lists of submodels, etc.

Parameters:

  • data (dict) –

    The data to coerce.

  • model (BaseModel) –

    The Pydantic model to validate the data against.

  • df (DataFrame) –

    The DataFrame to coerce the data to.

Source code in network_wrangler/utils/models.py
def coerce_extra_fields_to_type_in_df(
    data: BaseModel, model: BaseModel, df: pd.DataFrame
) -> BaseModel:
    """Coerce extra fields in data that aren't specified in Pydantic model to the type in the df.

    Note: will not coerce lists of submodels, etc.

    Args:
        data (dict): The data to coerce.
        model (BaseModel): The Pydantic model to validate the data against.
        df (pd.DataFrame): The DataFrame to coerce the data to.
    """
    out_data = copy.deepcopy(data)

    # Coerce submodels
    for field in submodel_fields_in_model(model, data):
        out_data.__dict__[field] = coerce_extra_fields_to_type_in_df(
            data.__dict__[field], model.__annotations__[field], df
        )

    for field in extra_attributes_undefined_in_model(data, model):
        try:
            v = coerce_val_to_df_types(field, data.model_extra[field], df)
        except ValueError as err:
            raise DatamodelDataframeIncompatableError() from err
        out_data.model_extra[field] = v
    return out_data

network_wrangler.utils.models.default_from_datamodel

default_from_datamodel(data_model, field)

Returns default value from pandera data model for a given field name.

Source code in network_wrangler/utils/models.py
def default_from_datamodel(data_model: pa.DataFrameModel, field: str):
    """Returns default value from pandera data model for a given field name."""
    if field in data_model.__fields__ and hasattr(data_model.__fields__[field][1], "default"):
        return data_model.__fields__[field][1].default
    return None

network_wrangler.utils.models.empty_df_from_datamodel

empty_df_from_datamodel(model, crs=LAT_LON_CRS)

Create an empty DataFrame or GeoDataFrame with the specified columns.

Parameters:

  • model (BaseModel) –

    A pandera data model to create empty [Geo]DataFrame from.

  • crs (int, default: LAT_LON_CRS ) –

    if schema has geometry, will use this as the geometry’s crs. Defaults to LAT_LONG_CRS

Source code in network_wrangler/utils/models.py
def empty_df_from_datamodel(
    model: DataFrameModel, crs: int = LAT_LON_CRS
) -> Union[gpd.GeoDataFrame, pd.DataFrame]:
    """Create an empty DataFrame or GeoDataFrame with the specified columns.

    Args:
        model (BaseModel): A pandera data model to create empty [Geo]DataFrame from.
        crs: if schema has geometry, will use this as the geometry's crs. Defaults to LAT_LONG_CRS
    Returns:
        An empty [Geo]DataFrame that validates to the specified model.
    """
    schema = model.to_schema()
    data: dict[str, list] = {col: [] for col in schema.columns}

    if "geometry" in data:
        return model(gpd.GeoDataFrame(data, crs=crs))

    return model(pd.DataFrame(data))

network_wrangler.utils.models.extra_attributes_undefined_in_model

extra_attributes_undefined_in_model(instance, model)

Find the extra attributes in a pydantic model that are not defined in the model.

Source code in network_wrangler/utils/models.py
def extra_attributes_undefined_in_model(instance: BaseModel, model: BaseModel) -> list:
    """Find the extra attributes in a pydantic model that are not defined in the model."""
    defined_fields = model.model_fields
    all_attributes = list(instance.model_dump(exclude_none=True, by_alias=True).keys())
    extra_attributes = [a for a in all_attributes if a not in defined_fields]
    return extra_attributes

network_wrangler.utils.models.fill_df_with_defaults_from_model

fill_df_with_defaults_from_model(df, model)

Fill a DataFrame with default values from a Pandera DataFrameModel.

Parameters:

  • df

    DataFrame to fill with default values.

  • model

    Pandera DataFrameModel to get default values from.

Source code in network_wrangler/utils/models.py
def fill_df_with_defaults_from_model(df, model):
    """Fill a DataFrame with default values from a Pandera DataFrameModel.

    Args:
        df: DataFrame to fill with default values.
        model: Pandera DataFrameModel to get default values from.
    """
    for c in df.columns:
        default_value = default_from_datamodel(model, c)
        if default_value is None:
            df[c] = df[c].where(pd.notna(df[c]), None)
        else:
            df[c] = df[c].fillna(default_value)
    return df

network_wrangler.utils.models.identify_model

identify_model(data, models)

Identify the model that the input data conforms to.

Parameters:

  • data (Union[DataFrame, dict]) –

    The input data to identify.

  • models (list[DataFrameModel, BaseModel]) –

    A list of models to validate the input data against.

Source code in network_wrangler/utils/models.py
def identify_model(
    data: Union[pd.DataFrame, dict], models: list
) -> Union[DataFrameModel, BaseModel]:
    """Identify the model that the input data conforms to.

    Args:
        data (Union[pd.DataFrame, dict]): The input data to identify.
        models (list[DataFrameModel,BaseModel]): A list of models to validate the input
          data against.
    """
    for m in models:
        try:
            if isinstance(data, pd.DataFrame):
                validate_df_to_model(data, m)
            else:
                m(**data)
            return m
        except ValidationError:
            continue
        except SchemaError:
            continue

    WranglerLogger.error(
        f"The input data isn't consistant with any provided data model.\
                         \nInput data: {data}\
                         \nData Models: {models}"
    )
    msg = "The input data isn't consistant with any provided data model."
    raise TableValidationError(msg)

network_wrangler.utils.models.order_fields_from_data_model

order_fields_from_data_model(df, model)

Order the fields in a DataFrame to match the order in a Pandera DataFrameModel.

Will add any fields that are not in the model to the end of the DataFrame. Will not add any fields that are in the model but not in the DataFrame.

Parameters:

  • df (DataFrame) –

    DataFrame to order.

  • model (DataFrameModel) –

    Pandera DataFrameModel to order the DataFrame to.

Source code in network_wrangler/utils/models.py
def order_fields_from_data_model(df: pd.DataFrame, model: DataFrameModel) -> pd.DataFrame:
    """Order the fields in a DataFrame to match the order in a Pandera DataFrameModel.

    Will add any fields that are not in the model to the end of the DataFrame.
    Will not add any fields that are in the model but not in the DataFrame.

    Args:
        df: DataFrame to order.
        model: Pandera DataFrameModel to order the DataFrame to.
    """
    model_fields = list(model.__fields__.keys())
    df_model_fields = [f for f in model_fields if f in df.columns]
    df_additional_fields = [f for f in df.columns if f not in model_fields]
    return df[df_model_fields + df_additional_fields]

network_wrangler.utils.models.submodel_fields_in_model

submodel_fields_in_model(model, instance=None)

Find the fields in a pydantic model that are submodels.

Source code in network_wrangler/utils/models.py
def submodel_fields_in_model(model: type, instance: Optional[BaseModel] = None) -> list:
    """Find the fields in a pydantic model that are submodels."""
    types = get_type_hints(model)
    model_type = (ModelMetaclass, BaseModel)
    submodels = [f for f in model.model_fields if isinstance(types.get(f), model_type)]
    if instance is not None:
        defined = list(instance.model_dump(exclude_none=True, by_alias=True).keys())
        return [f for f in submodels if f in defined]
    return submodels

network_wrangler.utils.models.validate_call_pyd

validate_call_pyd(func)

Decorator to validate the function i/o using Pydantic models without Pandera.

Source code in network_wrangler/utils/models.py
def validate_call_pyd(func):
    """Decorator to validate the function i/o using Pydantic models without Pandera."""

    @wraps(func)
    def wrapper(*args, **kwargs):
        type_hints = get_type_hints(func)
        # Modify the type hints to replace pandera DataFrame models with pandas DataFrames
        modified_type_hints = {
            key: value
            for key, value in type_hints.items()
            if not _is_type_from_type_hint(value, PanderaDataFrame)
        }

        new_func = func
        new_func.__annotations__ = modified_type_hints
        validated_func = validate_call(new_func, config={"arbitrary_types_allowed": True})

        return validated_func(*args, **kwargs)

    return wrapper

network_wrangler.utils.models.validate_df_to_model

validate_df_to_model(df, model, output_file=Path('validation_failure_cases.csv'))

Wrapper to validate a DataFrame against a Pandera DataFrameModel with better logging.

Also copies the attrs from the input DataFrame to the validated DataFrame.

Parameters:

  • df (DataFrame) –

    DataFrame to validate.

  • model (type) –

    Pandera DataFrameModel to validate against.

  • output_file (Path, default: Path('validation_failure_cases.csv') ) –

    Optional file to write validation errors to. Defaults to validation_failure_cases.csv.

Source code in network_wrangler/utils/models.py
@validate_call(config={"arbitrary_types_allowed": True})
def validate_df_to_model(
    df: DataFrame, model: type, output_file: Path = Path("validation_failure_cases.csv")
) -> DataFrame:
    """Wrapper to validate a DataFrame against a Pandera DataFrameModel with better logging.

    Also copies the attrs from the input DataFrame to the validated DataFrame.

    Args:
        df: DataFrame to validate.
        model: Pandera DataFrameModel to validate against.
        output_file: Optional file to write validation errors to. Defaults to
            validation_failure_cases.csv.
    """
    attrs = copy.deepcopy(df.attrs)
    err_msg = f"Validation to {model.__name__} failed."
    try:
        model_df = model.validate(df, lazy=True)
        model_df = fill_df_with_defaults_from_model(model_df, model)
        model_df.attrs = attrs
        return model_df
    except (TypeError, ValueError) as e:
        WranglerLogger.error(f"Validation to {model.__name__} failed.\n{e}")
        raise TableValidationError(err_msg) from e
    except SchemaErrors as e:
        # Log the summary of errors
        WranglerLogger.error(
            f"Validation to {model.__name__} failed with {len(e.failure_cases)} \
            errors: \n{e.failure_cases}"
        )

        # If there are many errors, save them to a file
        if len(e.failure_cases) > SMALL_RECS:
            error_file = output_file
            e.failure_cases.to_csv(error_file)
            WranglerLogger.info(f"Detailed error cases written to {error_file}")
        else:
            # Otherwise log the errors directly
            WranglerLogger.error("Detailed failure cases:\n%s", e.failure_cases)
        raise TableValidationError(err_msg) from e
    except SchemaError as e:
        WranglerLogger.error(f"Validation to {model.__name__} failed with error: {e}")
        WranglerLogger.error(f"Failure Cases:\n{e.failure_cases}")
        raise TableValidationError(err_msg) from e

Utility functions for pandas data manipulation.

network_wrangler.utils.data.DataSegmentationError

Bases: Exception

Raised when there is an error segmenting data.

Source code in network_wrangler/utils/data.py
class DataSegmentationError(Exception):
    """Raised when there is an error segmenting data."""

network_wrangler.utils.data.InvalidJoinFieldError

Bases: Exception

Raised when the join field is not unique.

Source code in network_wrangler/utils/data.py
class InvalidJoinFieldError(Exception):
    """Raised when the join field is not unique."""

network_wrangler.utils.data.MissingPropertiesError

Bases: Exception

Raised when properties are missing from the dataframe.

Source code in network_wrangler/utils/data.py
class MissingPropertiesError(Exception):
    """Raised when properties are missing from the dataframe."""

network_wrangler.utils.data.coerce_dict_to_df_types

coerce_dict_to_df_types(d, df, skip_keys=None, return_skipped=False)

Coerce dictionary values to match the type of a dataframe columns matching dict keys.

Will also coerce a list of values.

Parameters:

  • d (dict) –

    dictionary to coerce with singleton or list values

  • df (DataFrame) –

    dataframe to get types from

  • skip_keys (Optional[list], default: None ) –

    list of dict keys to skip. Defaults to []/

  • return_skipped (bool, default: False ) –

    keep the uncoerced, skipped keys/vals in the resulting dict. Defaults to False.

Returns:

  • dict ( dict[str, CoerceTypes] ) –

    dict with coerced types

Source code in network_wrangler/utils/data.py
def coerce_dict_to_df_types(
    d: dict[str, CoerceTypes],
    df: pd.DataFrame,
    skip_keys: Optional[list] = None,
    return_skipped: bool = False,
) -> dict[str, CoerceTypes]:
    """Coerce dictionary values to match the type of a dataframe columns matching dict keys.

    Will also coerce a list of values.

    Args:
        d (dict): dictionary to coerce with singleton or list values
        df (pd.DataFrame): dataframe to get types from
        skip_keys: list of dict keys to skip. Defaults to []/
        return_skipped: keep the uncoerced, skipped keys/vals in the resulting dict.
            Defaults to False.

    Returns:
        dict: dict with coerced types
    """
    if skip_keys is None:
        skip_keys = []
    coerced_dict: dict[str, CoerceTypes] = {}
    for k, vals in d.items():
        if k in skip_keys:
            if return_skipped:
                coerced_dict[k] = vals
            continue
        if k not in df.columns:
            msg = f"Key {k} not in dataframe columns."
            raise ValueError(msg)
        if pd.api.types.infer_dtype(df[k]) == "integer":
            if isinstance(vals, list):
                coerced_v: CoerceTypes = [int(float(v)) for v in vals]
            else:
                coerced_v = int(float(vals))
        elif pd.api.types.infer_dtype(df[k]) == "floating":
            coerced_v = [float(v) for v in vals] if isinstance(vals, list) else float(vals)
        elif pd.api.types.infer_dtype(df[k]) == "boolean":
            coerced_v = [bool(v) for v in vals] if isinstance(vals, list) else bool(vals)
        elif isinstance(vals, list):
            coerced_v = [str(v) for v in vals]
        else:
            coerced_v = str(vals)
        coerced_dict[k] = coerced_v
    return coerced_dict

network_wrangler.utils.data.coerce_gdf

coerce_gdf(df, geometry=None, in_crs=LAT_LON_CRS)

Coerce a DataFrame to a GeoDataFrame, optionally with a new geometry.

Source code in network_wrangler/utils/data.py
def coerce_gdf(
    df: pd.DataFrame, geometry: GeoSeries = None, in_crs: int = LAT_LON_CRS
) -> GeoDataFrame:
    """Coerce a DataFrame to a GeoDataFrame, optionally with a new geometry."""
    if isinstance(df, GeoDataFrame):
        if df.crs is None:
            df.crs = in_crs
        return df
    p = None

    if "geometry" not in df and geometry is None:
        msg = "Must give geometry argument if don't have Geometry in dataframe"
        raise ValueError(msg)

    geometry = geometry if geometry is not None else df["geometry"]
    if not isinstance(geometry, GeoSeries):
        try:
            geometry = GeoSeries(geometry)
        except Exception:
            geometry = geometry.apply(wkt.loads)
    df = GeoDataFrame(df, geometry=geometry, crs=in_crs)

    return df

network_wrangler.utils.data.coerce_val_to_df_types

coerce_val_to_df_types(field, val, df)

Coerce field value to match the type of a matching dataframe columns.

Parameters:

  • field (str) –

    field to lookup

  • val (CoerceTypes) –

    value or list of values to coerce

  • df (DataFrame) –

    dataframe to get types from

Source code in network_wrangler/utils/data.py
def coerce_val_to_df_types(  # noqa: PLR0911
    field: str,
    val: CoerceTypes,
    df: pd.DataFrame,
) -> CoerceTypes:
    """Coerce field value to match the type of a matching dataframe columns.

    Args:
        field: field to lookup
        val: value or list of values to coerce
        df (pd.DataFrame): dataframe to get types from

    Returns: coerced value or list of values
    """
    if field not in df.columns:
        msg = f"Field {field} not in dataframe columns."
        raise ValueError(msg)
    if pd.api.types.infer_dtype(df[field]) == "integer":
        if isinstance(val, list):
            return [int(float(v)) for v in val]
        return int(float(val))
    if pd.api.types.infer_dtype(df[field]) == "floating":
        if isinstance(val, list):
            return [float(v) for v in val]
        return float(val)
    if pd.api.types.infer_dtype(df[field]) == "boolean":
        if isinstance(val, list):
            return [bool(v) for v in val]
        return bool(val)
    if isinstance(val, list):
        return [str(v) for v in val]
    return str(val)

network_wrangler.utils.data.coerce_val_to_series_type

coerce_val_to_series_type(val, s)

Coerces a value to match type of pandas series.

Will try not to fail so if you give it a value that can’t convert to a number, it will return a string.

Parameters:

  • val

    Any type of singleton value

  • s (Series) –

    series to match the type to

Source code in network_wrangler/utils/data.py
def coerce_val_to_series_type(val, s: pd.Series) -> Union[float, str, bool]:
    """Coerces a value to match type of pandas series.

    Will try not to fail so if you give it a value that can't convert to a number, it will
    return a string.

    Args:
        val: Any type of singleton value
        s (pd.Series): series to match the type to
    """
    # WranglerLogger.debug(f"Input val: {val} of type {type(val)} to match with series type \
    #    {pd.api.types.infer_dtype(s)}.")
    if pd.api.types.infer_dtype(s) in ["integer", "floating"]:
        try:
            v: Union[float, str, bool] = float(val)
        except:
            v = str(val)
    elif pd.api.types.infer_dtype(s) == "boolean":
        v = bool(val)
    else:
        v = str(val)
    # WranglerLogger.debug(f"Return value: {v}")
    return v

network_wrangler.utils.data.compare_df_values

compare_df_values(df1, df2, join_col=None, ignore=None, atol=1e-05)

Compare overlapping part of dataframes and returns where there are differences.

Source code in network_wrangler/utils/data.py
def compare_df_values(
    df1, df2, join_col: Optional[str] = None, ignore: Optional[list[str]] = None, atol=1e-5
):
    """Compare overlapping part of dataframes and returns where there are differences."""
    if ignore is None:
        ignore = []
    comp_c = [
        c
        for c in df1.columns
        if c in df2.columns and c not in ignore and not isinstance(df1[c], GeoSeries)
    ]
    if join_col is None:
        comp_df = df1[comp_c].merge(
            df2[comp_c],
            how="inner",
            right_index=True,
            left_index=True,
            suffixes=["_a", "_b"],
        )
    else:
        comp_df = df1[comp_c].merge(df2[comp_c], how="inner", on=join_col, suffixes=["_a", "_b"])

    # Filter columns by data type
    numeric_cols = [col for col in comp_c if np.issubdtype(df1[col].dtype, np.number)]
    ll_cols = list(set(list_like_columns(df1) + list_like_columns(df2)))
    other_cols = [col for col in comp_c if col not in numeric_cols and col not in ll_cols]

    # For numeric columns, use np.isclose
    if numeric_cols:
        numeric_a = comp_df[[f"{col}_a" for col in numeric_cols]]
        numeric_b = comp_df[[f"{col}_b" for col in numeric_cols]]
        is_close = np.isclose(numeric_a, numeric_b, atol=atol, equal_nan=True)
        comp_df[numeric_cols] = ~is_close

    if ll_cols:
        for ll_c in ll_cols:
            comp_df[ll_c] = diff_list_like_series(comp_df[ll_c + "_a"], comp_df[ll_c + "_b"])

    # For non-numeric columns, use direct comparison
    if other_cols:
        for col in other_cols:
            comp_df[col] = (comp_df[f"{col}_a"] != comp_df[f"{col}_b"]) & ~(
                comp_df[f"{col}_a"].isna() & comp_df[f"{col}_b"].isna()
            )

    # Filter columns and rows where no differences
    cols_w_diffs = [col for col in comp_c if comp_df[col].any()]
    out_cols = [col for subcol in cols_w_diffs for col in (f"{subcol}_a", f"{subcol}_b", subcol)]
    comp_df = comp_df[out_cols]
    comp_df = comp_df.loc[comp_df[cols_w_diffs].any(axis=1)]

    return comp_df

network_wrangler.utils.data.compare_lists

compare_lists(list1, list2)

Compare two lists.

Source code in network_wrangler/utils/data.py
def compare_lists(list1, list2) -> bool:
    """Compare two lists."""
    list1 = convert_numpy_to_list(list1)
    list2 = convert_numpy_to_list(list1)
    return list1 != list2

network_wrangler.utils.data.concat_with_attr

concat_with_attr(dfs, **kwargs)

Concatenate a list of dataframes and retain the attributes of the first dataframe.

Source code in network_wrangler/utils/data.py
def concat_with_attr(dfs: list[pd.DataFrame], **kwargs) -> pd.DataFrame:
    """Concatenate a list of dataframes and retain the attributes of the first dataframe."""
    import copy  # noqa: PLC0415

    if not dfs:
        msg = "No dataframes to concatenate."
        raise ValueError(msg)
    attrs = copy.deepcopy(dfs[0].attrs)
    df = pd.concat(dfs, **kwargs)
    df.attrs = attrs
    return df

network_wrangler.utils.data.convert_numpy_to_list

convert_numpy_to_list(item)

Function to recursively convert numpy arrays to lists.

Source code in network_wrangler/utils/data.py
def convert_numpy_to_list(item):
    """Function to recursively convert numpy arrays to lists."""
    if isinstance(item, np.ndarray):
        return item.tolist()
    if isinstance(item, list):
        return [convert_numpy_to_list(sub_item) for sub_item in item]
    if isinstance(item, dict):
        return {key: convert_numpy_to_list(value) for key, value in item.items()}
    return item

network_wrangler.utils.data.dict_fields_in_df

dict_fields_in_df(d, df)

Check if all fields in dict are in dataframe.

Source code in network_wrangler/utils/data.py
def dict_fields_in_df(d: dict, df: pd.DataFrame) -> bool:
    """Check if all fields in dict are in dataframe."""
    missing_fields = [f for f in d if f not in df.columns]
    if missing_fields:
        msg = f"Fields in dictionary missing from dataframe: {missing_fields}."
        WranglerLogger.error(msg)
        raise ValueError(msg)
    return True

network_wrangler.utils.data.dict_to_query

dict_to_query(selection_dict)

Generates the query of from selection_dict.

Parameters:

  • selection_dict (Mapping[str, Any]) –

    selection dictionary

Returns:

  • _type_ ( str ) –

    Query value

Source code in network_wrangler/utils/data.py
def dict_to_query(
    selection_dict: Mapping[str, Any],
) -> str:
    """Generates the query of from selection_dict.

    Args:
        selection_dict: selection dictionary

    Returns:
        _type_: Query value
    """
    WranglerLogger.debug("Building selection query")

    def _kv_to_query_part(k, v, _q_part=""):
        if isinstance(v, list):
            _q_part += "(" + " or ".join([_kv_to_query_part(k, i) for i in v]) + ")"
            return _q_part
        if isinstance(v, str):
            return k + '.str.contains("' + v + '")'
        return k + "==" + str(v)

    query = "(" + " and ".join([_kv_to_query_part(k, v) for k, v in selection_dict.items()]) + ")"
    WranglerLogger.debug(f"Selection query: \n{query}")
    return query

network_wrangler.utils.data.diff_dfs

diff_dfs(df1, df2, ignore=None)

Returns True if two dataframes are different and log differences.

Source code in network_wrangler/utils/data.py
def diff_dfs(df1, df2, ignore: Optional[list[str]] = None) -> bool:
    """Returns True if two dataframes are different and log differences."""
    if ignore is None:
        ignore = []
    diff = False
    if set(df1.columns) != set(df2.columns):
        WranglerLogger.warning(
            f" Columns are different 1vs2 \n    {set(df1.columns) ^ set(df2.columns)}"
        )
        common_cols = [col for col in df1.columns if col in df2.columns]
        df1 = df1[common_cols]
        df2 = df2[common_cols]
        diff = True

    cols_to_compare = [col for col in df1.columns if col not in ignore]
    df1 = df1[cols_to_compare]
    df2 = df2[cols_to_compare]

    if len(df1) != len(df2):
        WranglerLogger.warning(f" Length is different /DF1: {len(df1)} vs /DF2: {len(df2)}\n /")
        diff = True

    diff_df = compare_df_values(df1, df2)

    if not diff_df.empty:
        WranglerLogger.error(f"!!! Differences dfs: \n{diff_df}")
        return True

    if not diff:
        WranglerLogger.info("...no differences in df found.")
    return diff

network_wrangler.utils.data.diff_list_like_series

diff_list_like_series(s1, s2)

Compare two series that contain list-like items as strings.

Source code in network_wrangler/utils/data.py
def diff_list_like_series(s1, s2) -> bool:
    """Compare two series that contain list-like items as strings."""
    diff_df = concat_with_attr([s1, s2], axis=1, keys=["s1", "s2"])
    # diff_df["diff"] = diff_df.apply(lambda x: str(x["s1"]) != str(x["s2"]), axis=1)
    diff_df["diff"] = diff_df.apply(lambda x: compare_lists(x["s1"], x["s2"]), axis=1)
    if diff_df["diff"].any():
        WranglerLogger.info("List-Like differences:")
        WranglerLogger.info(diff_df)
        return True
    return False

network_wrangler.utils.data.fk_in_pk

fk_in_pk(pk, fk, ignore_nan=True)

Check if all foreign keys are in the primary keys, optionally ignoring NaN.

Source code in network_wrangler/utils/data.py
def fk_in_pk(
    pk: Union[pd.Series, list], fk: Union[pd.Series, list], ignore_nan: bool = True
) -> tuple[bool, list]:
    """Check if all foreign keys are in the primary keys, optionally ignoring NaN."""
    if isinstance(fk, list):
        fk = pd.Series(fk)

    if ignore_nan:
        fk = fk.dropna()

    missing_flag = ~fk.isin(pk)

    if missing_flag.any():
        WranglerLogger.warning(
            f"Following keys referenced in {fk.name} but missing in\
            primary key table: \n{fk[missing_flag]} "
        )
        return False, fk[missing_flag].tolist()

    return True, []

network_wrangler.utils.data.isin_dict

isin_dict(df, d, ignore_missing=True, strict_str=False)

Filter the dataframe using a dictionary - faster than using isin.

Uses merge to filter the dataframe by the dictionary keys and values.

Parameters:

  • df (DataFrame) –

    dataframe to filter

  • d (dict) –

    dictionary with keys as column names and values as values to filter by

  • ignore_missing (bool, default: True ) –

    if True, will ignore missing values in the selection dict.

  • strict_str (bool, default: False ) –

    if True, will not allow partial string matches and will force case-matching. Defaults to False. If False, will be overridden if key is in STRICT_MATCH_FIELDS or if ignore_missing is False.

Source code in network_wrangler/utils/data.py
def isin_dict(
    df: pd.DataFrame, d: dict, ignore_missing: bool = True, strict_str: bool = False
) -> pd.DataFrame:
    """Filter the dataframe using a dictionary - faster than using isin.

    Uses merge to filter the dataframe by the dictionary keys and values.

    Args:
        df: dataframe to filter
        d: dictionary with keys as column names and values as values to filter by
        ignore_missing: if True, will ignore missing values in the selection dict.
        strict_str: if True, will not allow partial string matches and will force case-matching.
            Defaults to False. If False, will be overridden if key is in STRICT_MATCH_FIELDS or if
            ignore_missing is False.
    """
    sel_links_mask = np.zeros(len(df), dtype=bool)
    missing = {}
    for col, vals in d.items():
        if vals is None:
            continue
        if col not in df.columns:
            msg = f"Key {col} not in dataframe columns."
            raise DataframeSelectionError(msg)
        _strict_str = strict_str or col in STRICT_MATCH_FIELDS or not ignore_missing
        vals_list = [vals] if not isinstance(vals, list) else vals

        index_name = df.index.name if df.index.name is not None else "index"
        _df = df[[col]].reset_index(names=index_name)

        if isinstance(vals_list[0], str) and not _strict_str:
            vals_list = [val.lower() for val in vals_list]
            _df[col] = _df[col].str.lower()

            # Use str.contains for partial matching
            mask = np.zeros(len(_df), dtype=bool)
            for val in vals_list:
                mask |= _df[col].str.contains(val, case=False, na=False)
            selected = _df[mask].set_index(index_name)
        else:
            vals_df = pd.DataFrame({col: vals_list}, index=range(len(vals_list)))
            merged_df = _df.merge(vals_df, on=col, how="outer", indicator=True)
            selected = merged_df[merged_df["_merge"] == "both"].set_index(index_name)
            _missing_vals = merged_df[merged_df["_merge"] == "right_only"][col].tolist()
            if _missing_vals:
                missing[col] = _missing_vals
                WranglerLogger.warning(f"Missing values in selection dict for {col}: {missing}")

        sel_links_mask |= df.index.isin(selected.index)

    if not ignore_missing and any(missing):
        msg = "Missing values in selection dict."
        raise DataframeSelectionError(msg)

    return df.loc[sel_links_mask]

network_wrangler.utils.data.list_like_columns

list_like_columns(df, item_type=None)

Find columns in a dataframe that contain list-like items that can’t be json-serialized.

Parameters:

  • df

    dataframe to check

  • item_type (Optional[type], default: None ) –

    if not None, will only return columns where all items are of this type by checking only the first item in the column. Defaults to None.

Source code in network_wrangler/utils/data.py
def list_like_columns(df, item_type: Optional[type] = None) -> list[str]:
    """Find columns in a dataframe that contain list-like items that can't be json-serialized.

    Args:
        df: dataframe to check
        item_type: if not None, will only return columns where all items are of this type by
            checking **only** the first item in the column.  Defaults to None.
    """
    list_like_columns = []

    for column in df.columns:
        if df[column].apply(lambda x: isinstance(x, (list, ndarray))).any():
            if item_type is not None and not isinstance(df[column].iloc[0], item_type):
                continue
            list_like_columns.append(column)
    return list_like_columns

network_wrangler.utils.data.segment_data_by_selection

segment_data_by_selection(item_list, data, field=None, end_val=0)

Segment a dataframe or series into before, middle, and end segments based on item_list.

selected segment = everything from the first to last item in item_list inclusive of the first and last items. Before segment = everything before After segment = everything after

Parameters:

  • item_list (list) –

    List of items to segment data by. If longer than two, will only use the first and last items.

  • data (Union[Series, DataFrame]) –

    Data to segment into before, middle, and after.

  • field (str, default: None ) –

    If a dataframe, specifies which field to reference. Defaults to None.

  • end_val (int, default: 0 ) –

    Notation for util the end or from the begining. Defaults to 0.

Raises:

Returns:

  • tuple ( tuple[Union[Series, list, DataFrame], Union[Series, list, DataFrame], Union[Series, list, DataFrame]] ) –

    data broken out by beofore, selected segment, and after.

Source code in network_wrangler/utils/data.py
def segment_data_by_selection(
    item_list: list,
    data: Union[list, pd.DataFrame, pd.Series],
    field: Optional[str] = None,
    end_val=0,
) -> tuple[
    Union[pd.Series, list, pd.DataFrame],
    Union[pd.Series, list, pd.DataFrame],
    Union[pd.Series, list, pd.DataFrame],
]:
    """Segment a dataframe or series into before, middle, and end segments based on item_list.

    selected segment = everything from the first to last item in item_list inclusive of the first
        and last items.
    Before segment = everything before
    After segment = everything after

    Args:
        item_list (list): List of items to segment data by. If longer than two, will only
            use the first and last items.
        data (Union[pd.Series, pd.DataFrame]): Data to segment into before, middle, and after.
        field (str, optional): If a dataframe, specifies which field to reference.
            Defaults to None.
        end_val (int, optional): Notation for util the end or from the begining. Defaults to 0.

    Raises:
        DataSegmentationError: If item list isn't found in data in correct order.

    Returns:
        tuple: data broken out by beofore, selected segment, and after.
    """
    ref_data = data
    if isinstance(data, pd.DataFrame):
        ref_data = data[field].tolist()
    elif isinstance(data, pd.Series):
        ref_data = data.tolist()

    # ------- Replace "to the end" indicators with first or last value --------
    start_item, end_item = item_list[0], item_list[-1]
    if start_item == end_val:
        start_item = ref_data[0]
    if end_item == end_val:
        end_item = ref_data[-1]

    # --------Find the start and end indices -----------------------------------
    start_idxs = list({i for i, item in enumerate(ref_data) if item == start_item})
    if not start_idxs:
        msg = f"Segment start item: {start_item} not in data."
        raise DataSegmentationError(msg)
    if len(start_idxs) > 1:
        WranglerLogger.warning(
            f"Found multiple starting locations for data segment: {start_item}.\
                                Choosing first ... largest segment being selected."
        )
    start_idx = min(start_idxs)

    # find the end node starting from the start index.
    end_idxs = [i + start_idx for i, item in enumerate(ref_data[start_idx:]) if item == end_item]
    # WranglerLogger.debug(f"End indexes: {end_idxs}")
    if not end_idxs:
        msg = f"Segment end item: {end_item} not in data after starting idx."
        raise DataSegmentationError(msg)
    if len(end_idxs) > 1:
        WranglerLogger.warning(
            f"Found multiple ending locations for data segment: {end_item}.\
                                Choosing last ... largest segment being selected."
        )
    end_idx = max(end_idxs) + 1
    # WranglerLogger.debug(
    # f"Segmenting data fr {start_item} idx:{start_idx} to {end_item} idx:{end_idx}.\n{ref_data}")
    # -------- Extract the segments --------------------------------------------
    if isinstance(data, pd.DataFrame):
        before_segment = data.iloc[:start_idx]
        selected_segment = data.iloc[start_idx:end_idx]
        after_segment = data.iloc[end_idx:]
    else:
        before_segment = data[:start_idx]
        selected_segment = data[start_idx:end_idx]
        after_segment = data[end_idx:]

    if isinstance(data, (pd.DataFrame, pd.Series)):
        before_segment = before_segment.reset_index(drop=True)
        selected_segment = selected_segment.reset_index(drop=True)
        after_segment = after_segment.reset_index(drop=True)

    # WranglerLogger.debug(f"Segmented data into before, selected, and after.\n \
    #    Before:\n{before_segment}\nSelected:\n{selected_segment}\nAfter:\n{after_segment}")

    return before_segment, selected_segment, after_segment

network_wrangler.utils.data.segment_data_by_selection_min_overlap

segment_data_by_selection_min_overlap(selection_list, data, field, replacements_list, end_val=0)

Segments data based on item_list reducing overlap with replacement list.

selected segment: everything from the first to last item in item_list inclusive of the first and last items but not if first and last items overlap with replacement list. Before segment = everything before After segment = everything after

Example: selection_list = [2,5] data = pd.DataFrame({“i”:[1,2,3,4,5,6]}) field = “i” replacements_list = [2,22,33]

Returns:

  • list

    [22,33]

  • tuple[Union[Series, DataFrame], Union[Series, DataFrame], Union[Series, DataFrame]]

    [1], [2,3,4,5], [6]

Parameters:

  • selection_list (list) –

    List of items to segment data by. If longer than two, will only use the first and last items.

  • data (Union[Series, DataFrame]) –

    Data to segment into before, middle, and after.

  • field (str) –

    Specifies which field to reference.

  • replacements_list (list) –

    List of items to eventually replace the selected segment with.

  • end_val (int, default: 0 ) –

    Notation for util the end or from the begining. Defaults to 0.

tuple containing:

  • list
    • updated replacement_list
  • tuple[Union[Series, DataFrame], Union[Series, DataFrame], Union[Series, DataFrame]]
    • tuple of before, selected segment, and after data
Source code in network_wrangler/utils/data.py
def segment_data_by_selection_min_overlap(
    selection_list: list,
    data: pd.DataFrame,
    field: str,
    replacements_list: list,
    end_val=0,
) -> tuple[
    list,
    tuple[
        Union[pd.Series, pd.DataFrame],
        Union[pd.Series, pd.DataFrame],
        Union[pd.Series, pd.DataFrame],
    ],
]:
    """Segments data based on item_list reducing overlap with replacement list.

    *selected segment*: everything from the first to last item in item_list inclusive of the first
        and last items but not if first and last items overlap with replacement list.
    Before segment = everything before
    After segment = everything after

    Example:
    selection_list = [2,5]
    data = pd.DataFrame({"i":[1,2,3,4,5,6]})
    field = "i"
    replacements_list = [2,22,33]

    Returns:
        [22,33]
        [1], [2,3,4,5], [6]

    Args:
        selection_list (list): List of items to segment data by. If longer than two, will only
            use the first and last items.
        data (Union[pd.Series, pd.DataFrame]): Data to segment into before, middle, and after.
        field (str): Specifies which field to reference.
        replacements_list (list): List of items to eventually replace the selected segment with.
        end_val (int, optional): Notation for util the end or from the begining. Defaults to 0.

    Returns: tuple containing:
        - updated replacement_list
        - tuple of before, selected segment, and after data
    """
    before_segment, segment_df, after_segment = segment_data_by_selection(
        selection_list, data, field=field, end_val=end_val
    )
    if not isinstance(segment_df, pd.DataFrame):
        msg = "segment_df should be a DataFrame - something is wrong."
        raise ValueError(msg)

    if replacements_list and replacements_list[0] == segment_df[field].iat[0]:
        # move first item from selected segment to the before_segment df
        replacements_list = replacements_list[1:]
        before_segment = concat_with_attr(
            [before_segment, segment_df.iloc[:1]], ignore_index=True, sort=False
        )
        segment_df = segment_df.iloc[1:]
        # WranglerLogger.debug(f"item start overlaps with replacement. Repl: {replacements_list}")
    if replacements_list and replacements_list[-1] == data[field].iat[-1]:
        # move last item from selected segment to the after_segment df
        replacements_list = replacements_list[:-1]
        after_segment = concat_with_attr(
            [data.iloc[-1:], after_segment], ignore_index=True, sort=False
        )
        segment_df = segment_df.iloc[:-1]
        # WranglerLogger.debug(f"item end overlaps with replacement. Repl: {replacements_list}")

    return replacements_list, (before_segment, segment_df, after_segment)

network_wrangler.utils.data.update_df_by_col_value

update_df_by_col_value(destination_df, source_df, join_col, properties=None, fail_if_missing=True)

Updates destination_df with ALL values in source_df for specified props with same join_col.

Source_df can contain a subset of IDs of destination_df. If fail_if_missing is true, destination_df must have all the IDS in source DF - ensuring all source_df values are contained in resulting df.

>> destination_df
trip_id  property1  property2
1         10      100
2         20      200
3         30      300
4         40      400

>> source_df
trip_id  property1  property2
2         25      250
3         35      350

>> updated_df
trip_id  property1  property2
0        1       10      100
1        2       25      250
2        3       35      350
3        4       40      400

Parameters:

  • destination_df (DataFrame) –

    Dataframe to modify.

  • source_df (DataFrame) –

    Dataframe with updated columns

  • join_col (str) –

    column to join on

  • properties (list[str], default: None ) –

    List of properties to use. If None, will default to all in source_df.

  • fail_if_missing (bool, default: True ) –

    If True, will raise an error if there are missing IDs in destination_df that exist in source_df.

Source code in network_wrangler/utils/data.py
def update_df_by_col_value(
    destination_df: pd.DataFrame,
    source_df: pd.DataFrame,
    join_col: str,
    properties: Optional[list[str]] = None,
    fail_if_missing: bool = True,
) -> pd.DataFrame:
    """Updates destination_df with ALL values in source_df for specified props with same join_col.

    Source_df can contain a subset of IDs of destination_df.
    If fail_if_missing is true, destination_df must have all
    the IDS in source DF - ensuring all source_df values are contained in resulting df.

    ```
    >> destination_df
    trip_id  property1  property2
    1         10      100
    2         20      200
    3         30      300
    4         40      400

    >> source_df
    trip_id  property1  property2
    2         25      250
    3         35      350

    >> updated_df
    trip_id  property1  property2
    0        1       10      100
    1        2       25      250
    2        3       35      350
    3        4       40      400
    ```

    Args:
        destination_df (pd.DataFrame): Dataframe to modify.
        source_df (pd.DataFrame): Dataframe with updated columns
        join_col (str): column to join on
        properties (list[str]): List of properties to use. If None, will default to all
            in source_df.
        fail_if_missing (bool): If True, will raise an error if there are missing IDs in
            destination_df that exist in source_df.
    """
    # 1. Identify which properties should be updated; and if they exist in both DFs.
    if properties is None:
        properties = [
            c for c in source_df.columns if c in destination_df.columns and c != join_col
        ]
    else:
        _dest_miss = _df_missing_cols(destination_df, [*properties, join_col])
        if _dest_miss:
            msg = f"Properties missing from destination_df: {_dest_miss}"
            raise MissingPropertiesError(msg)
        _source_miss = _df_missing_cols(source_df, [*properties, join_col])
        if _source_miss:
            msg = f"Properties missing from source_df: {_source_miss}"
            raise MissingPropertiesError(msg)

    # 2. Identify if there are IDs missing from destination_df that exist in source_df
    if fail_if_missing:
        missing_ids = set(source_df[join_col]) - set(destination_df[join_col])
        if missing_ids:
            msg = f"IDs missing from source_df: \n{missing_ids}"
            raise InvalidJoinFieldError(msg)

    WranglerLogger.debug(f"Updating properties for {len(source_df)} records: {properties}.")

    if not source_df[join_col].is_unique:
        msg = f"Can't join from source_df when join_col: {join_col} is not unique."
        raise InvalidJoinFieldError(msg)

    if not destination_df[join_col].is_unique:
        return _update_props_from_one_to_many(destination_df, source_df, join_col, properties)

    return _update_props_for_common_idx(destination_df, source_df, join_col, properties)

network_wrangler.utils.data.validate_existing_value_in_df

validate_existing_value_in_df(df, idx, field, expected_value)

Validate if df[field]==expected_value for all indices in idx.

Source code in network_wrangler/utils/data.py
def validate_existing_value_in_df(df: pd.DataFrame, idx: list[int], field: str, expected_value):
    """Validate if df[field]==expected_value for all indices in idx."""
    if field not in df.columns:
        WranglerLogger.warning(f"!! {field} Not an existing field.")
        return False
    if not df.loc[idx, field].eq(expected_value).all():
        WranglerLogger.warning(
            f"Existing value defined for {field} in project card \
            does not match the value in the selection links. \n\
            Specified Existing: {expected_value}\n\
            Actual Existing: \n {df.loc[idx, field]}."
        )
        return False
    return True

Dataframe accessors that allow functions to be called directly on the dataframe.

network_wrangler.utils.df_accessors.DictQueryAccessor

Query link, node and shape dataframes using project selection dictionary.

Will overlook any keys which are not columns in the dataframe.

Usage:

selection_dict = {
    "lanes": [1, 2, 3],
    "name": ["6th", "Sixth", "sixth"],
    "drive_access": 1,
}
selected_links_df = links_df.dict_query(selection_dict)
Source code in network_wrangler/utils/df_accessors.py
@pd.api.extensions.register_dataframe_accessor("dict_query")
class DictQueryAccessor:
    """Query link, node and shape dataframes using project selection dictionary.

    Will overlook any keys which are not columns in the dataframe.

    Usage:

    ```
    selection_dict = {
        "lanes": [1, 2, 3],
        "name": ["6th", "Sixth", "sixth"],
        "drive_access": 1,
    }
    selected_links_df = links_df.dict_query(selection_dict)
    ```

    """

    def __init__(self, pandas_obj):
        """Initialization function for the dictionary query accessor."""
        self._obj = pandas_obj

    def __call__(self, selection_dict: dict, return_all_if_none: bool = False):
        """Queries the dataframe using the selection dictionary.

        Args:
            selection_dict (dict): _description_
            return_all_if_none (bool, optional): If True, will return entire df if dict has
                 no values. Defaults to False.
        """
        _not_selection_keys = ["modes", "all", "ignore_missing"]
        _selection_dict = {
            k: v
            for k, v in selection_dict.items()
            if k not in _not_selection_keys and v is not None
        }
        missing_columns = [k for k in _selection_dict if k not in self._obj.columns]
        if missing_columns:
            msg = f"Selection fields not found in dataframe: {missing_columns}"
            raise SelectionError(msg)

        if not _selection_dict:
            if return_all_if_none:
                return self._obj
            msg = f"Relevant part of selection dictionary is empty: {selection_dict}"
            raise SelectionError(msg)

        _sel_query = dict_to_query(_selection_dict)
        # WranglerLogger.debug(f"_sel_query: \n   {_sel_query}")
        _df = self._obj.query(_sel_query, engine="python")

        if len(_df) == 0:
            WranglerLogger.warning(
                f"No records found in df \
                  using selection: {selection_dict}"
            )
        return _df

network_wrangler.utils.df_accessors.DictQueryAccessor.__call__

__call__(selection_dict, return_all_if_none=False)

Queries the dataframe using the selection dictionary.

Parameters:

  • selection_dict (dict) –

    description

  • return_all_if_none (bool, default: False ) –

    If True, will return entire df if dict has no values. Defaults to False.

Source code in network_wrangler/utils/df_accessors.py
def __call__(self, selection_dict: dict, return_all_if_none: bool = False):
    """Queries the dataframe using the selection dictionary.

    Args:
        selection_dict (dict): _description_
        return_all_if_none (bool, optional): If True, will return entire df if dict has
             no values. Defaults to False.
    """
    _not_selection_keys = ["modes", "all", "ignore_missing"]
    _selection_dict = {
        k: v
        for k, v in selection_dict.items()
        if k not in _not_selection_keys and v is not None
    }
    missing_columns = [k for k in _selection_dict if k not in self._obj.columns]
    if missing_columns:
        msg = f"Selection fields not found in dataframe: {missing_columns}"
        raise SelectionError(msg)

    if not _selection_dict:
        if return_all_if_none:
            return self._obj
        msg = f"Relevant part of selection dictionary is empty: {selection_dict}"
        raise SelectionError(msg)

    _sel_query = dict_to_query(_selection_dict)
    # WranglerLogger.debug(f"_sel_query: \n   {_sel_query}")
    _df = self._obj.query(_sel_query, engine="python")

    if len(_df) == 0:
        WranglerLogger.warning(
            f"No records found in df \
              using selection: {selection_dict}"
        )
    return _df

network_wrangler.utils.df_accessors.DictQueryAccessor.__init__

__init__(pandas_obj)

Initialization function for the dictionary query accessor.

Source code in network_wrangler/utils/df_accessors.py
def __init__(self, pandas_obj):
    """Initialization function for the dictionary query accessor."""
    self._obj = pandas_obj

network_wrangler.utils.df_accessors.Isin_dict

Faster implimentation of isin for querying dataframes with dictionary.

Source code in network_wrangler/utils/df_accessors.py
@pd.api.extensions.register_dataframe_accessor("isin_dict")
class Isin_dict:
    """Faster implimentation of isin for querying dataframes with dictionary."""

    def __init__(self, pandas_obj):
        """Initialization function for the dataframe hash."""
        self._obj = pandas_obj

    def __call__(self, d: dict, **kwargs) -> pd.DataFrame:
        """Function to perform the faster dictionary isin()."""
        return isin_dict(self._obj, d, **kwargs)

network_wrangler.utils.df_accessors.Isin_dict.__call__

__call__(d, **kwargs)

Function to perform the faster dictionary isin().

Source code in network_wrangler/utils/df_accessors.py
def __call__(self, d: dict, **kwargs) -> pd.DataFrame:
    """Function to perform the faster dictionary isin()."""
    return isin_dict(self._obj, d, **kwargs)

network_wrangler.utils.df_accessors.Isin_dict.__init__

__init__(pandas_obj)

Initialization function for the dataframe hash.

Source code in network_wrangler/utils/df_accessors.py
def __init__(self, pandas_obj):
    """Initialization function for the dataframe hash."""
    self._obj = pandas_obj

network_wrangler.utils.df_accessors.dfHash

Creates a dataframe hash that is compatable with geopandas and various metadata.

Definitely not the fastest, but she seems to work where others have failed.

Source code in network_wrangler/utils/df_accessors.py
@pd.api.extensions.register_dataframe_accessor("df_hash")
class dfHash:
    """Creates a dataframe hash that is compatable with geopandas and various metadata.

    Definitely not the fastest, but she seems to work where others have failed.
    """

    def __init__(self, pandas_obj):
        """Initialization function for the dataframe hash."""
        self._obj = pandas_obj

    def __call__(self):
        """Function to hash the dataframe with version-robust computation."""
        # Convert to a more stable representation that's less sensitive to version differences
        # Sort the dataframe to ensure consistent ordering regardless of how it was loaded
        df_sorted = self._obj.sort_index(axis=0).sort_index(axis=1)

        # Use a more stable string representation that's less sensitive to version differences
        # Convert to numpy array and then to string, which is more consistent across versions
        _value = str(df_sorted.values.tolist()).encode()
        hash = hashlib.sha1(_value).hexdigest()
        return hash

network_wrangler.utils.df_accessors.dfHash.__call__

__call__()

Function to hash the dataframe with version-robust computation.

Source code in network_wrangler/utils/df_accessors.py
def __call__(self):
    """Function to hash the dataframe with version-robust computation."""
    # Convert to a more stable representation that's less sensitive to version differences
    # Sort the dataframe to ensure consistent ordering regardless of how it was loaded
    df_sorted = self._obj.sort_index(axis=0).sort_index(axis=1)

    # Use a more stable string representation that's less sensitive to version differences
    # Convert to numpy array and then to string, which is more consistent across versions
    _value = str(df_sorted.values.tolist()).encode()
    hash = hashlib.sha1(_value).hexdigest()
    return hash

network_wrangler.utils.df_accessors.dfHash.__init__

__init__(pandas_obj)

Initialization function for the dataframe hash.

Source code in network_wrangler/utils/df_accessors.py
def __init__(self, pandas_obj):
    """Initialization function for the dataframe hash."""
    self._obj = pandas_obj

Network and Geographic Utilities

Functions to help with network manipulations in dataframes.

point_seq_to_links(point_seq_df, id_field, seq_field, node_id_field, from_field='A', to_field='B')

Translates a df with tidy data representing a sequence of points into links.

Parameters:

  • point_seq_df (DataFrame) –

    Dataframe with source breadcrumbs

  • id_field (str) –

    Trace ID

  • seq_field (str) –

    Order of breadcrumbs within ID_field

  • node_id_field (str) –

    field denoting the node ID

  • from_field (str, default: 'A' ) –

    Field to export from_field to. Defaults to “A”.

  • to_field (str, default: 'B' ) –

    Field to export to_field to. Defaults to “B”.

Returns:

  • DataFrame

    pd.DataFrame: Link records with id_field, from_field, to_field

Source code in network_wrangler/utils/net.py
def point_seq_to_links(
    point_seq_df: DataFrame,
    id_field: str,
    seq_field: str,
    node_id_field: str,
    from_field: str = "A",
    to_field: str = "B",
) -> DataFrame:
    """Translates a df with tidy data representing a sequence of points into links.

    Args:
        point_seq_df (pd.DataFrame): Dataframe with source breadcrumbs
        id_field (str): Trace ID
        seq_field (str): Order of breadcrumbs within ID_field
        node_id_field (str): field denoting the node ID
        from_field (str, optional): Field to export from_field to. Defaults to "A".
        to_field (str, optional): Field to export to_field to. Defaults to "B".

    Returns:
        pd.DataFrame: Link records with id_field, from_field, to_field
    """
    point_seq_df = point_seq_df.sort_values(by=[id_field, seq_field])

    links = point_seq_df.add_suffix(f"_{from_field}").join(
        point_seq_df.shift(-1).add_suffix(f"_{to_field}")
    )

    links = links[links[f"{id_field}_{to_field}"] == links[f"{id_field}_{from_field}"]]

    links = links.drop(columns=[f"{id_field}_{to_field}"])
    links = links.rename(
        columns={
            f"{id_field}_{from_field}": id_field,
            f"{node_id_field}_{from_field}": from_field,
            f"{node_id_field}_{to_field}": to_field,
        }
    )

    links = links.dropna(subset=[from_field, to_field])
    # Since join with a shift() has some NAs, we need to recast the columns to int
    _int_cols = [to_field, f"{seq_field}_{to_field}"]
    links[_int_cols] = links[_int_cols].astype(int)
    return links

Helper geographic manipulation functions.

network_wrangler.utils.geo.InvalidCRSError

Bases: Exception

Raised when a point is not valid for a given coordinate reference system.

Source code in network_wrangler/utils/geo.py
class InvalidCRSError(Exception):
    """Raised when a point is not valid for a given coordinate reference system."""

network_wrangler.utils.geo.check_point_valid_for_crs

check_point_valid_for_crs(point, crs)

Check if a point is valid for a given coordinate reference system.

Parameters:

  • point (Point) –

    Shapely Point

  • crs (int) –

    coordinate reference system in ESPG code

Source code in network_wrangler/utils/geo.py
def check_point_valid_for_crs(point: Point, crs: int):
    """Check if a point is valid for a given coordinate reference system.

    Args:
        point: Shapely Point
        crs: coordinate reference system in ESPG code

    raises: InvalidCRSError if point is not valid for the given crs
    """
    crs = CRS.from_user_input(crs)
    minx, miny, maxx, maxy = crs.area_of_use.bounds
    ok_bounds = True
    if not minx <= point.x <= maxx:
        WranglerLogger.error(f"Invalid X coordinate for CRS {crs}: {point.x}")
        ok_bounds = False
    if not miny <= point.y <= maxy:
        WranglerLogger.error(f"Invalid Y coordinate for CRS {crs}: {point.y}")
        ok_bounds = False

    if not ok_bounds:
        msg = f"Invalid coordinate for CRS {crs}: {point.x}, {point.y}"
        raise InvalidCRSError(msg)

network_wrangler.utils.geo.get_bearing

get_bearing(lat1, lon1, lat2, lon2)

Calculate the bearing (forward azimuth) b/w the two points.

returns: bearing in radians

Source code in network_wrangler/utils/geo.py
def get_bearing(lat1, lon1, lat2, lon2):
    """Calculate the bearing (forward azimuth) b/w the two points.

    returns: bearing in radians
    """
    # bearing in degrees
    brng = Geodesic.WGS84.Inverse(lat1, lon1, lat2, lon2)["azi1"]

    # convert bearing to radians
    brng = math.radians(brng)

    return brng

network_wrangler.utils.geo.get_bounding_polygon

get_bounding_polygon(boundary_geocode=None, boundary_file=None, boundary_gdf=None, crs=LAT_LON_CRS)

Get the bounding polygon for a given boundary.

Will return None if no arguments given. Will raise a ValueError if more than one given.

This function retrieves the bounding polygon for a given boundary. The boundary can be provided as a GeoDataFrame, a geocode string or dictionary, or a boundary file. The resulting polygon geometry is returned as a GeoSeries.

Parameters:

  • boundary_geocode (Union[str, dict], default: None ) –

    A geocode string or dictionary representing the boundary. Defaults to None.

  • boundary_file (Union[str, Path], default: None ) –

    A path to the boundary file. Only used if boundary_geocode is None. Defaults to None.

  • boundary_gdf (GeoDataFrame, default: None ) –

    A GeoDataFrame representing the boundary. Only used if boundary_geocode and boundary_file are None. Defaults to None.

  • crs (int, default: LAT_LON_CRS ) –

    The coordinate reference system (CRS) code. Defaults to 4326 (WGS84).

Returns:

  • GeoSeries

    gpd.GeoSeries: The polygon geometry representing the bounding polygon.

Source code in network_wrangler/utils/geo.py
def get_bounding_polygon(
    boundary_geocode: Optional[Union[str, dict]] = None,
    boundary_file: Optional[Union[str, Path]] = None,
    boundary_gdf: Optional[gpd.GeoDataFrame] = None,
    crs: int = LAT_LON_CRS,  # WGS84
) -> gpd.GeoSeries:
    """Get the bounding polygon for a given boundary.

    Will return None if no arguments given. Will raise a ValueError if more than one given.

    This function retrieves the bounding polygon for a given boundary. The boundary can be provided
    as a GeoDataFrame, a geocode string or dictionary, or a boundary file. The resulting polygon
    geometry is returned as a GeoSeries.

    Args:
        boundary_geocode (Union[str, dict], optional): A geocode string or dictionary
            representing the boundary. Defaults to None.
        boundary_file (Union[str, Path], optional): A path to the boundary file. Only used if
            boundary_geocode is None. Defaults to None.
        boundary_gdf (gpd.GeoDataFrame, optional): A GeoDataFrame representing the boundary.
            Only used if boundary_geocode and boundary_file are None. Defaults to None.
        crs (int, optional): The coordinate reference system (CRS) code. Defaults to 4326 (WGS84).

    Returns:
        gpd.GeoSeries: The polygon geometry representing the bounding polygon.
    """
    import osmnx as ox  # noqa: PLC0415

    nargs = sum(x is not None for x in [boundary_gdf, boundary_geocode, boundary_file])
    if nargs == 0:
        return None
    if nargs != 1:
        msg = "Exactly one of boundary_gdf, boundary_geocode, or boundary_file must be provided."
        raise ValueError(msg)

    OK_BOUNDARY_SUFF = [".shp", ".geojson", ".parquet"]

    if boundary_geocode is not None:
        boundary_gdf = ox.geocode_to_gdf(boundary_geocode)
    elif boundary_file is not None:
        boundary_file = Path(boundary_file)
        if boundary_file.suffix not in OK_BOUNDARY_SUFF:
            msg = "Boundary file must have one of the following suffixes: {OK_BOUNDARY_SUFF}"
            raise ValueError(msg)
        if not boundary_file.exists():
            msg = f"Boundary file {boundary_file} does not exist"
            raise FileNotFoundError(msg)
        if boundary_file.suffix == ".parquet":
            boundary_gdf = gpd.read_parquet(boundary_file)
        else:
            boundary_gdf = gpd.read_file(boundary_file)
            if boundary_file.suffix == ".geojson":  # geojson standard is WGS84
                boundary_gdf.crs = crs

    if boundary_gdf is None:
        msg = "One of boundary_gdf, boundary_geocode or boundary_file must be provided."
        raise ValueError(msg)

    if boundary_gdf.crs is not None:
        boundary_gdf = boundary_gdf.to_crs(crs)
    # make sure boundary_gdf is a polygon
    if len(boundary_gdf.geom_type[boundary_gdf.geom_type != "Polygon"]) > 0:
        msg = "boundary_gdf must all be Polygons"
        raise ValueError(msg)
    # get the boundary as a single polygon
    boundary_gs = gpd.GeoSeries([boundary_gdf.geometry.union_all()], crs=crs)

    return boundary_gs

network_wrangler.utils.geo.get_point_geometry_from_linestring

get_point_geometry_from_linestring(polyline_geometry, pos=0)

Get a point geometry from a linestring geometry.

Parameters:

  • polyline_geometry

    shapely LineString instance

  • pos (int, default: 0 ) –

    position in the linestring to get the point from. Defaults to 0.

Source code in network_wrangler/utils/geo.py
def get_point_geometry_from_linestring(polyline_geometry, pos: int = 0):
    """Get a point geometry from a linestring geometry.

    Args:
        polyline_geometry: shapely LineString instance
        pos: position in the linestring to get the point from. Defaults to 0.
    """
    # WranglerLogger.debug(
    #    f"get_point_geometry_from_linestring.polyline_geometry.coords[0]: \
    #    {polyline_geometry.coords[0]}."
    # )

    # Note: when upgrading to shapely 2.0, will need to use following command
    # _point_coords = get_coordinates(polyline_geometry).tolist()[pos]
    return point_from_xy(*polyline_geometry.coords[pos])

network_wrangler.utils.geo.length_of_linestring_miles

length_of_linestring_miles(gdf)

Returns a Series with the linestring length in miles.

Parameters:

  • gdf (Union[GeoSeries, GeoDataFrame]) –

    GeoDataFrame with linestring geometry. If given a GeoSeries will attempt to convert to a GeoDataFrame.

Source code in network_wrangler/utils/geo.py
def length_of_linestring_miles(gdf: Union[gpd.GeoSeries, gpd.GeoDataFrame]) -> pd.Series:
    """Returns a Series with the linestring length in miles.

    Args:
        gdf: GeoDataFrame with linestring geometry.  If given a GeoSeries will attempt to convert
            to a GeoDataFrame.
    """
    # WranglerLogger.debug(f"length_of_linestring_miles.gdf input:\n{gdf}.")
    if isinstance(gdf, gpd.GeoSeries):
        gdf = gpd.GeoDataFrame(geometry=gdf)

    p_crs = gdf.estimate_utm_crs()
    gdf = gdf.to_crs(p_crs)
    METERS_IN_MILES = 1609.34
    length_miles = gdf.geometry.length / METERS_IN_MILES
    length_s = pd.Series(length_miles, index=gdf.index)

    return length_s

network_wrangler.utils.geo.linestring_from_lats_lons

linestring_from_lats_lons(df, lat_fields, lon_fields)

Create a LineString geometry from a DataFrame with lon/lat fields.

Parameters:

  • df

    DataFrame with columns for lon/lat fields.

  • lat_fields

    list of column names for the lat fields.

  • lon_fields

    list of column names for the lon fields.

Source code in network_wrangler/utils/geo.py
def linestring_from_lats_lons(df, lat_fields, lon_fields) -> gpd.GeoSeries:
    """Create a LineString geometry from a DataFrame with lon/lat fields.

    Args:
        df: DataFrame with columns for lon/lat fields.
        lat_fields: list of column names for the lat fields.
        lon_fields: list of column names for the lon fields.
    """
    if len(lon_fields) != len(lat_fields):
        msg = "lon_fields and lat_fields lists must have the same length"
        raise ValueError(msg)

    line_geometries = gpd.GeoSeries(
        [
            LineString([(row[lon], row[lat]) for lon, lat in zip(lon_fields, lat_fields)])
            for _, row in df.iterrows()
        ]
    )

    return gpd.GeoSeries(line_geometries)

network_wrangler.utils.geo.linestring_from_nodes

linestring_from_nodes(links_df, nodes_df, from_node='A', to_node='B', node_pk='model_node_id')

Creates a LineString geometry GeoSeries from a DataFrame of links and a DataFrame of nodes.

Parameters:

  • links_df (DataFrame) –

    DataFrame with columns for from_node and to_node.

  • nodes_df (GeoDataFrame) –

    GeoDataFrame with geometry column.

  • from_node (str, default: 'A' ) –

    column name in links_df for the from node. Defaults to “A”.

  • to_node (str, default: 'B' ) –

    column name in links_df for the to node. Defaults to “B”.

  • node_pk (str, default: 'model_node_id' ) –

    primary key column name in nodes_df. Defaults to “model_node_id”.

Source code in network_wrangler/utils/geo.py
def linestring_from_nodes(
    links_df: pd.DataFrame,
    nodes_df: gpd.GeoDataFrame,
    from_node: str = "A",
    to_node: str = "B",
    node_pk: str = "model_node_id",
) -> gpd.GeoSeries:
    """Creates a LineString geometry GeoSeries from a DataFrame of links and a DataFrame of nodes.

    Args:
        links_df: DataFrame with columns for from_node and to_node.
        nodes_df: GeoDataFrame with geometry column.
        from_node: column name in links_df for the from node. Defaults to "A".
        to_node: column name in links_df for the to node. Defaults to "B".
        node_pk: primary key column name in nodes_df. Defaults to "model_node_id".
    """
    assert "geometry" in nodes_df.columns, "nodes_df must have a 'geometry' column"

    idx_name = "index" if links_df.index.name is None else links_df.index.name
    # WranglerLogger.debug(f"Index name: {idx_name}")
    required_link_cols = [from_node, to_node]

    if not all(col in links_df.columns for col in required_link_cols):
        WranglerLogger.error(
            f"links_df.columns missing required columns.\n\
                            links_df.columns: {links_df.columns}\n\
                            required_link_cols: {required_link_cols}"
        )
        msg = "links_df must have columns {required_link_cols} to create linestring from nodes"
        raise ValueError(msg)

    links_geo_df = copy.deepcopy(links_df[required_link_cols])
    # need to continuously reset the index to make sure the index is the same as the link index
    links_geo_df = (
        links_geo_df.reset_index()
        .merge(
            nodes_df[[node_pk, "geometry"]],
            left_on=from_node,
            right_on=node_pk,
            how="left",
        )
        .set_index(idx_name)
    )

    links_geo_df = links_geo_df.rename(columns={"geometry": "geometry_A"})

    links_geo_df = (
        links_geo_df.reset_index()
        .merge(
            nodes_df[[node_pk, "geometry"]],
            left_on=to_node,
            right_on=node_pk,
            how="left",
        )
        .set_index(idx_name)
    )

    links_geo_df = links_geo_df.rename(columns={"geometry": "geometry_B"})

    # makes sure all nodes exist
    _missing_geo_links_df = links_geo_df[
        links_geo_df["geometry_A"].isnull() | links_geo_df["geometry_B"].isnull()
    ]
    if not _missing_geo_links_df.empty:
        missing_nodes = _missing_geo_links_df[[from_node, to_node]].values
        WranglerLogger.error(
            f"Cannot create link geometry from nodes because the nodes are\
                             missing from the network. Missing nodes: {missing_nodes}"
        )
        msg = "Cannot create link geometry from nodes because the nodes are missing from the network."
        raise MissingNodesError(msg)

    # create geometry from points
    links_geo_df["geometry"] = links_geo_df.apply(
        lambda row: LineString([row["geometry_A"], row["geometry_B"]]), axis=1
    )

    # convert to GeoDataFrame
    links_gdf = gpd.GeoDataFrame(links_geo_df["geometry"], geometry=links_geo_df["geometry"])
    return links_gdf["geometry"]

network_wrangler.utils.geo.location_ref_from_point

location_ref_from_point(geometry, sequence=1, bearing=None, distance_to_next_ref=None)

Generates a shared street point location reference.

Parameters:

  • geometry (Point) –

    Point shapely geometry

  • sequence (int, default: 1 ) –

    Sequence if part of polyline. Defaults to None.

  • bearing (float, default: None ) –

    Direction of line if part of polyline. Defaults to None.

  • distance_to_next_ref (float, default: None ) –

    Distnce to next point if part of polyline. Defaults to None.

Returns:

Source code in network_wrangler/utils/geo.py
def location_ref_from_point(
    geometry: Point,
    sequence: int = 1,
    bearing: Optional[float] = None,
    distance_to_next_ref: Optional[float] = None,
) -> LocationReference:
    """Generates a shared street point location reference.

    Args:
        geometry (Point): Point shapely geometry
        sequence (int, optional): Sequence if part of polyline. Defaults to None.
        bearing (float, optional): Direction of line if part of polyline. Defaults to None.
        distance_to_next_ref (float, optional): Distnce to next point if part of polyline.
            Defaults to None.

    Returns:
        LocationReference: As defined by sharedStreets.io schema
    """
    lr = {
        "point": LatLongCoordinates(geometry.coords[0]),
    }

    for arg in ["sequence", "bearing", "distance_to_next_ref"]:
        if locals()[arg] is not None:
            lr[arg] = locals()[arg]

    return LocationReference(**lr)

network_wrangler.utils.geo.location_refs_from_linestring

location_refs_from_linestring(geometry)

Generates a shared street location reference from linestring.

Parameters:

  • geometry (LineString) –

    Shapely LineString instance

Returns:

  • LocationReferences ( list[LocationReference] ) –

    As defined by sharedStreets.io schema

Source code in network_wrangler/utils/geo.py
def location_refs_from_linestring(geometry: LineString) -> list[LocationReference]:
    """Generates a shared street location reference from linestring.

    Args:
        geometry (LineString): Shapely LineString instance

    Returns:
        LocationReferences: As defined by sharedStreets.io schema
    """
    return [
        location_ref_from_point(
            point,
            sequence=i + 1,
            distance_to_next_ref=point.distance(geometry.coords[i + 1]),
            bearing=get_bearing(*point.coords[0], *geometry.coords[i + 1]),
        )
        for i, point in enumerate(geometry.coords[:-1])
    ]

network_wrangler.utils.geo.offset_geometry_meters

offset_geometry_meters(geo_s, offset_distance_meters)

Offset a GeoSeries of LineStrings by a given distance in meters.

Parameters:

  • geo_s (GeoSeries) –

    GeoSeries of LineStrings to offset.

  • offset_distance_meters (float) –

    distance in meters to offset the LineStrings.

Source code in network_wrangler/utils/geo.py
def offset_geometry_meters(geo_s: gpd.GeoSeries, offset_distance_meters: float) -> gpd.GeoSeries:
    """Offset a GeoSeries of LineStrings by a given distance in meters.

    Args:
        geo_s: GeoSeries of LineStrings to offset.
        offset_distance_meters: distance in meters to offset the LineStrings.
    """
    if geo_s.empty:
        return geo_s
    og_crs = geo_s.crs
    meters_crs = _id_utm_crs(geo_s)
    geo_s = geo_s.to_crs(meters_crs)
    offset_geo = geo_s.apply(lambda x: x.offset_curve(offset_distance_meters))
    offset_geo = gpd.GeoSeries(offset_geo)
    return offset_geo.to_crs(og_crs)

network_wrangler.utils.geo.offset_point_with_distance_and_bearing

offset_point_with_distance_and_bearing(lon, lat, distance, bearing)

Get the new lon-lat (in degrees) given current point (lon-lat), distance and bearing.

Parameters:

  • lon (float) –

    longitude of original point

  • lat (float) –

    latitude of original point

  • distance (float) –

    distance in meters to offset point by

  • bearing (float) –

    direction to offset point to in radians

Source code in network_wrangler/utils/geo.py
def offset_point_with_distance_and_bearing(
    lon: float, lat: float, distance: float, bearing: float
) -> list[float]:
    """Get the new lon-lat (in degrees) given current point (lon-lat), distance and bearing.

    Args:
        lon: longitude of original point
        lat: latitude of original point
        distance: distance in meters to offset point by
        bearing: direction to offset point to in radians

    returns: list of new offset lon-lat
    """
    # Earth's radius in meters
    radius = 6378137

    # convert the lat long from degree to radians
    lat_radians = math.radians(lat)
    lon_radians = math.radians(lon)

    # calculate the new lat long in radians
    out_lat_radians = math.asin(
        math.sin(lat_radians) * math.cos(distance / radius)
        + math.cos(lat_radians) * math.sin(distance / radius) * math.cos(bearing)
    )

    out_lon_radians = lon_radians + math.atan2(
        math.sin(bearing) * math.sin(distance / radius) * math.cos(lat_radians),
        math.cos(distance / radius) - math.sin(lat_radians) * math.sin(lat_radians),
    )
    # convert the new lat long back to degree
    out_lat = math.degrees(out_lat_radians)
    out_lon = math.degrees(out_lon_radians)

    return [out_lon, out_lat]

network_wrangler.utils.geo.point_from_xy

point_from_xy(x, y, xy_crs=LAT_LON_CRS, point_crs=LAT_LON_CRS)

Creates point geometry from x and y coordinates.

Parameters:

  • x

    x coordinate, in xy_crs

  • y

    y coordinate, in xy_crs

  • xy_crs (int, default: LAT_LON_CRS ) –

    coordinate reference system in ESPG code for x/y inputs. Defaults to 4326 (WGS84)

  • point_crs (int, default: LAT_LON_CRS ) –

    coordinate reference system in ESPG code for point output. Defaults to 4326 (WGS84)

Source code in network_wrangler/utils/geo.py
def point_from_xy(x, y, xy_crs: int = LAT_LON_CRS, point_crs: int = LAT_LON_CRS):
    """Creates point geometry from x and y coordinates.

    Args:
        x: x coordinate, in xy_crs
        y: y coordinate, in xy_crs
        xy_crs: coordinate reference system in ESPG code for x/y inputs. Defaults to 4326 (WGS84)
        point_crs: coordinate reference system in ESPG code for point output.
            Defaults to 4326 (WGS84)

    Returns: Shapely Point in point_crs
    """
    point = Point(x, y)

    if xy_crs == point_crs:
        check_point_valid_for_crs(point, point_crs)
        return point

    if (xy_crs, point_crs) not in transformers:
        # store transformers in dictionary because they are an "expensive" operation
        transformers[(xy_crs, point_crs)] = Transformer.from_proj(
            Proj(init="epsg:" + str(xy_crs)),
            Proj(init="epsg:" + str(point_crs)),
            always_xy=True,  # required b/c Proj v6+ uses lon/lat instead of x/y
        )

    return transform(transformers[(xy_crs, point_crs)].transform, point)

network_wrangler.utils.geo.to_points_gdf

to_points_gdf(table, ref_nodes_df=None, ref_road_net=None)

Convert a table to a GeoDataFrame.

If the table is already a GeoDataFrame, return it as is. Otherwise, attempt to convert the table to a GeoDataFrame using the following methods: 1. If the table has a ‘geometry’ column, return a GeoDataFrame using that column. 2. If the table has ‘lat’ and ‘lon’ columns, return a GeoDataFrame using those columns. 3. If the table has a ‘*model_node_id’ or ‘stop_id’ column, return a GeoDataFrame using that column and the nodes_df provided. If none of the above, raise a ValueError.

Parameters:

  • table (DataFrame) –

    DataFrame to convert to GeoDataFrame.

  • ref_nodes_df (Optional[GeoDataFrame], default: None ) –

    GeoDataFrame of nodes to use to convert model_node_id to geometry.

  • ref_road_net (Optional[RoadwayNetwork], default: None ) –

    RoadwayNetwork object to use to convert model_node_id to geometry.

Returns:

  • GeoDataFrame ( GeoDataFrame ) –

    GeoDataFrame representation of the table.

Source code in network_wrangler/utils/geo.py
def to_points_gdf(
    table: pd.DataFrame,
    ref_nodes_df: Optional[gpd.GeoDataFrame] = None,
    ref_road_net: Optional[RoadwayNetwork] = None,
) -> gpd.GeoDataFrame:
    """Convert a table to a GeoDataFrame.

    If the table is already a GeoDataFrame, return it as is. Otherwise, attempt to convert the
    table to a GeoDataFrame using the following methods:
    1. If the table has a 'geometry' column, return a GeoDataFrame using that column.
    2. If the table has 'lat' and 'lon' columns, return a GeoDataFrame using those columns.
    3. If the table has a '*model_node_id' or 'stop_id' column, return a GeoDataFrame using that column and the
         nodes_df provided.
    If none of the above, raise a ValueError.

    Args:
        table: DataFrame to convert to GeoDataFrame.
        ref_nodes_df: GeoDataFrame of nodes to use to convert model_node_id to geometry.
        ref_road_net: RoadwayNetwork object to use to convert model_node_id to geometry.

    Returns:
        GeoDataFrame: GeoDataFrame representation of the table.
    """
    if table is gpd.GeoDataFrame:
        return table

    WranglerLogger.debug("Converting GTFS table to GeoDataFrame")
    if "geometry" in table.columns:
        return gpd.GeoDataFrame(table, geometry="geometry")

    lat_cols = list(filter(lambda col: "lat" in col, table.columns))
    lon_cols = list(filter(lambda col: "lon" in col, table.columns))
    model_node_id_cols = [
        c for c in ["model_node_id", "stop_id", "shape_model_node_id"] if c in table.columns
    ]

    if not (lat_cols and lon_cols) or not model_node_id_cols:
        WranglerLogger.error(
            "Needed either lat/long or *model_node_id columns to convert \
            to GeoDataFrame. Columns found: {table.columns}"
        )
        if not (lat_cols and lon_cols):
            WranglerLogger.error("No lat/long cols found.")
        if not model_node_id_cols:
            WranglerLogger.error("No *model_node_id cols found.")
        msg = "Could not find lat/long, geometry columns or *model_node_id column in \
                         table necessary to convert to GeoDataFrame"
        raise ValueError(msg)

    if lat_cols and lon_cols:
        # using first found lat and lon columns
        return gpd.GeoDataFrame(
            table,
            geometry=gpd.points_from_xy(table[lon_cols[0]], table[lat_cols[0]]),
            crs="EPSG:4326",
        )

    if model_node_id_cols:
        node_id_col = model_node_id_cols[0]

        if ref_nodes_df is None:
            if ref_road_net is None:
                msg = "Must provide either nodes_df or road_net to convert \
                                 model_node_id to geometry"
                raise ValueError(msg)
            ref_nodes_df = ref_road_net.nodes_df

        WranglerLogger.debug("Converting table to GeoDataFrame using model_node_id")

        _table = table.merge(
            ref_nodes_df[["model_node_id", "geometry"]],
            left_on=node_id_col,
            right_on="model_node_id",
        )
        return gpd.GeoDataFrame(_table, geometry="geometry")
    msg = "Could not find lat/long, geometry columns or *model_node_id column in table \
                        necessary to convert to GeoDataFrame"
    raise ValueError(msg)

network_wrangler.utils.geo.update_nodes_in_linestring_geometry

update_nodes_in_linestring_geometry(original_df, updated_nodes_df, position)

Updates the nodes in a linestring geometry and returns updated geometry.

Parameters:

  • original_df (GeoDataFrame) –

    GeoDataFrame with the model_node_id and linestring geometry

  • updated_nodes_df (GeoDataFrame) –

    GeoDataFrame with updated node geometries.

  • position (int) –

    position in the linestring to update with the node.

Source code in network_wrangler/utils/geo.py
def update_nodes_in_linestring_geometry(
    original_df: gpd.GeoDataFrame,
    updated_nodes_df: gpd.GeoDataFrame,
    position: int,
) -> gpd.GeoSeries:
    """Updates the nodes in a linestring geometry and returns updated geometry.

    Args:
        original_df: GeoDataFrame with the `model_node_id` and linestring geometry
        updated_nodes_df: GeoDataFrame with updated node geometries.
        position: position in the linestring to update with the node.
    """
    LINK_FK_NODE = ["A", "B"]
    original_index = original_df.index

    updated_df = original_df.reset_index().merge(
        updated_nodes_df[["model_node_id", "geometry"]],
        left_on=LINK_FK_NODE[position],
        right_on="model_node_id",
        suffixes=("", "_node"),
    )

    updated_df["geometry"] = updated_df.apply(
        lambda row: update_points_in_linestring(
            row["geometry"], row["geometry_node"].coords[0], position
        ),
        axis=1,
    )

    updated_df = updated_df.reset_index().set_index(original_index.names)

    WranglerLogger.debug(f"updated_df - AFTER: \n {updated_df.geometry}")
    return updated_df["geometry"]

network_wrangler.utils.geo.update_point_geometry

update_point_geometry(df, ref_point_df, lon_field='X', lat_field='Y', id_field='model_node_id', ref_lon_field='X', ref_lat_field='Y', ref_id_field='model_node_id')

Returns copy of df with lat and long fields updated with geometry from ref_point_df.

NOTE: does not update “geometry” field if it exists.

Source code in network_wrangler/utils/geo.py
def update_point_geometry(
    df: pd.DataFrame,
    ref_point_df: pd.DataFrame,
    lon_field: str = "X",
    lat_field: str = "Y",
    id_field: str = "model_node_id",
    ref_lon_field: str = "X",
    ref_lat_field: str = "Y",
    ref_id_field: str = "model_node_id",
) -> pd.DataFrame:
    """Returns copy of df with lat and long fields updated with geometry from ref_point_df.

    NOTE: does not update "geometry" field if it exists.
    """
    df = copy.deepcopy(df)

    ref_df = ref_point_df.rename(
        columns={
            ref_lon_field: lon_field,
            ref_lat_field: lat_field,
            ref_id_field: id_field,
        }
    )

    updated_df = update_df_by_col_value(
        df,
        ref_df[[id_field, lon_field, lat_field]],
        id_field,
        properties=[lat_field, lon_field],
        fail_if_missing=False,
    )
    return updated_df

network_wrangler.utils.geo.update_points_in_linestring

update_points_in_linestring(linestring, updated_coords, position)

Replaces a point in a linestring with a new point.

Parameters:

  • linestring (LineString) –

    original_linestring

  • updated_coords (List[float]) –

    updated poimt coordinates

  • position (int) –

    position in the linestring to update

Source code in network_wrangler/utils/geo.py
def update_points_in_linestring(
    linestring: LineString, updated_coords: list[float], position: int
):
    """Replaces a point in a linestring with a new point.

    Args:
        linestring (LineString): original_linestring
        updated_coords (List[float]): updated poimt coordinates
        position (int): position in the linestring to update
    """
    coords = [c for c in linestring.coords]
    coords[position] = updated_coords
    return LineString(coords)

Time Utilities

Functions related to parsing and comparing time objects and series.

Internal function terminology for timespan scopes:

  • matching: a scope that could be applied for a given timespan combination. This includes the default timespan as well as scopes wholely contained within.
  • overlapping: a timespan that fully or partially overlaps a given timespan. This includes the default timespan, all matching timespans and all timespans where at least one minute overlap.
  • conflicting: a timespan that is overlapping but not matching. By definition default scope values are not conflicting.
  • independent a timespan that is not overlapping.

network_wrangler.utils.time.TimespanDfQueryError

Bases: Exception

Error for timespan query errors.

Source code in network_wrangler/utils/time.py
class TimespanDfQueryError(Exception):
    """Error for timespan query errors."""

network_wrangler.utils.time.calc_overlap_duration_with_query

calc_overlap_duration_with_query(start_time_s, end_time_s, start_time_q, end_time_q)

Calculate the overlap series of start and end times and a query start and end times.

Parameters:

  • start_time_s (Series[datetime]) –

    Series of start times to calculate overlap with.

  • end_time_s (Series[datetime]) –

    Series of end times to calculate overlap with.

  • start_time_q (datetime) –

    Query start time to calculate overlap with.

  • end_time_q (datetime) –

    Query end time to calculate overlap with.

Source code in network_wrangler/utils/time.py
def calc_overlap_duration_with_query(
    start_time_s: pd.Series[datetime],
    end_time_s: pd.Series[datetime],
    start_time_q: datetime,
    end_time_q: datetime,
) -> pd.Series[timedelta]:
    """Calculate the overlap series of start and end times and a query start and end times.

    Args:
        start_time_s: Series of start times to calculate overlap with.
        end_time_s: Series of end times to calculate overlap with.
        start_time_q: Query start time to calculate overlap with.
        end_time_q: Query end time to calculate overlap with.
    """
    overlap_start = start_time_s.combine(start_time_q, max)
    overlap_end = end_time_s.combine(end_time_q, min)
    overlap_duration_s = (overlap_end - overlap_start).dt.total_seconds() / 60

    return overlap_duration_s

network_wrangler.utils.time.convert_timespan_to_start_end_dt

convert_timespan_to_start_end_dt(timespan_s)

Convert a timespan string [‘12:00’,‘14:00] to start_time & end_time datetime cols in df.

Source code in network_wrangler/utils/time.py
def convert_timespan_to_start_end_dt(timespan_s: pd.Serie[str]) -> pd.DataFrame:
    """Convert a timespan string ['12:00','14:00] to start_time & end_time datetime cols in df."""
    start_time = timespan_s.apply(lambda x: str_to_time(x[0]))
    end_time = timespan_s.apply(lambda x: str_to_time(x[1]))
    return pd.DataFrame({"start_time": start_time, "end_time": end_time})

network_wrangler.utils.time.dt_contains

dt_contains(timespan1, timespan2)

Check timespan1 inclusively contains timespan2.

If the end time is less than the start time, it is assumed to be the next day.

Parameters:

  • timespan1 (list[time]) –

    The first timespan represented as a list containing the start time and end time.

  • timespan2 (list[time]) –

    The second timespan represented as a list containing the start time and end time.

Returns:

  • bool ( bool ) –

    True if the first timespan contains the second timespan, False otherwise.

Source code in network_wrangler/utils/time.py
@validate_call
def dt_contains(timespan1: list[datetime], timespan2: list[datetime]) -> bool:
    """Check timespan1 inclusively contains timespan2.

    If the end time is less than the start time, it is assumed to be the next day.

    Args:
        timespan1 (list[time]): The first timespan represented as a list containing the start
            time and end time.
        timespan2 (list[time]): The second timespan represented as a list containing the start
            time and end time.

    Returns:
        bool: True if the first timespan contains the second timespan, False otherwise.
    """
    start_time_dt, end_time_dt = timespan1

    if end_time_dt < start_time_dt:
        end_time_dt = end_time_dt + timedelta(days=1)

    start_time_dt2, end_time_dt2 = timespan2

    if end_time_dt2 < start_time_dt2:
        end_time_dt2 = end_time_dt2 + timedelta(days=1)

    return (start_time_dt <= start_time_dt2) and (end_time_dt >= end_time_dt2)

network_wrangler.utils.time.dt_list_overlaps

dt_list_overlaps(timespans)

Check if any of the timespans overlap.

overlapping: a timespan that fully or partially overlaps a given timespan. This includes and all timespans where at least one minute overlap.

Source code in network_wrangler/utils/time.py
def dt_list_overlaps(timespans: list[list[datetime]]) -> bool:
    """Check if any of the timespans overlap.

    `overlapping`: a timespan that fully or partially overlaps a given timespan.
    This includes and all timespans where at least one minute overlap.
    """
    return bool(filter_dt_list_to_overlaps(timespans))

network_wrangler.utils.time.dt_overlap_duration

dt_overlap_duration(timedelta1, timedelta2)

Check if two timespans overlap and return the amount of overlap.

If the end time is less than the start time, it is assumed to be the next day.

Source code in network_wrangler/utils/time.py
@validate_call
def dt_overlap_duration(timedelta1: timedelta, timedelta2: timedelta) -> timedelta:
    """Check if two timespans overlap and return the amount of overlap.

    If the end time is less than the start time, it is assumed to be the next day.
    """
    if timedelta1.end_time < timedelta1.start_time:
        timedelta1 = timedelta1 + timedelta(days=1)
    if timedelta2.end_time < timedelta2.start_time:
        timedelta2 = timedelta2 + timedelta(days=1)
    overlap_start = max(timedelta1.start_time, timedelta2.start_time)
    overlap_end = min(timedelta1.end_time, timedelta2.end_time)
    overlap_duration = max(overlap_end - overlap_start, timedelta(0))
    return overlap_duration

network_wrangler.utils.time.dt_overlaps

dt_overlaps(timespan1, timespan2)

Check if two timespans overlap.

If the end time is less than the start time, it is assumed to be the next day.

overlapping: a timespan that fully or partially overlaps a given timespan. This includes and all timespans where at least one minute overlap.

Source code in network_wrangler/utils/time.py
@validate_call(config={"arbitrary_types_allowed": True})
def dt_overlaps(timespan1: list[datetime], timespan2: list[datetime]) -> bool:
    """Check if two timespans overlap.

    If the end time is less than the start time, it is assumed to be the next day.

    `overlapping`: a timespan that fully or partially overlaps a given timespan.
    This includes and all timespans where at least one minute overlap.
    """
    time1_start, time1_end = timespan1
    time2_start, time2_end = timespan2

    if time1_end < time1_start:
        time1_end += timedelta(days=1)
    if time2_end < time2_start:
        time2_end += timedelta(days=1)

    return (time1_start < time2_end) and (time2_start < time1_end)

network_wrangler.utils.time.dt_to_seconds_from_midnight

dt_to_seconds_from_midnight(dt)

Convert a datetime object to the number of seconds since midnight.

Source code in network_wrangler/utils/time.py
@validate_call(config={"arbitrary_types_allowed": True})
def dt_to_seconds_from_midnight(dt: datetime) -> int:
    """Convert a datetime object to the number of seconds since midnight."""
    return round((dt - dt.replace(hour=0, minute=0, second=0, microsecond=0)).total_seconds())

network_wrangler.utils.time.duration_dt

duration_dt(start_time_dt, end_time_dt)

Returns a datetime.timedelta object representing the duration of the timespan.

If end_time is less than start_time, the duration will assume that it crosses over midnight.

Source code in network_wrangler/utils/time.py
def duration_dt(start_time_dt: datetime, end_time_dt: datetime) -> timedelta:
    """Returns a datetime.timedelta object representing the duration of the timespan.

    If end_time is less than start_time, the duration will assume that it crosses over
    midnight.
    """
    if end_time_dt < start_time_dt:
        return timedelta(
            hours=24 - start_time_dt.hour + end_time_dt.hour,
            minutes=end_time_dt.minute - start_time_dt.minute,
            seconds=end_time_dt.second - start_time_dt.second,
        )
    return end_time_dt - start_time_dt

network_wrangler.utils.time.filter_df_to_max_overlapping_timespans

filter_df_to_max_overlapping_timespans(orig_df, query_timespan, strict_match=False, min_overlap_minutes=1, keep_max_of_cols=None)

Filters dataframe for entries that have maximum overlap with the given query timespan.

If the end time is less than the start time, it is assumed to be the next day.

Parameters:

  • orig_df (DataFrame) –

    dataframe to query timespans for with start_time and end_time fields.

  • query_timespan (list[TimeString]) –

    TimespanString of format [‘HH:MM’,’HH:MM’] to query orig_df for overlapping records.

  • strict_match (bool, default: False ) –

    boolean indicating if the returned df should only contain records that fully contain the query timespan. If set to True, min_overlap_minutes does not apply. Defaults to False.

  • min_overlap_minutes (int, default: 1 ) –

    minimum number of minutes the timespans need to overlap to keep. Defaults to 1.

  • keep_max_of_cols (Optional[list[str]], default: None ) –

    list of fields to return the maximum value of overlap for. If None, will return all overlapping time periods. Defaults to ['model_link_id']

Source code in network_wrangler/utils/time.py
@validate_call(config={"arbitrary_types_allowed": True})
def filter_df_to_max_overlapping_timespans(
    orig_df: pd.DataFrame,
    query_timespan: list[TimeString],
    strict_match: bool = False,
    min_overlap_minutes: int = 1,
    keep_max_of_cols: Optional[list[str]] = None,
) -> pd.DataFrame:
    """Filters dataframe for entries that have maximum overlap with the given query timespan.

    If the end time is less than the start time, it is assumed to be the next day.

    Args:
        orig_df: dataframe to query timespans for with `start_time` and `end_time` fields.
        query_timespan: TimespanString of format ['HH:MM','HH:MM'] to query orig_df for overlapping
            records.
        strict_match: boolean indicating if the returned df should only contain
            records that fully contain the query timespan. If set to True, min_overlap_minutes
            does not apply. Defaults to False.
        min_overlap_minutes: minimum number of minutes the timespans need to overlap to keep.
            Defaults to 1.
        keep_max_of_cols: list of fields to return the maximum value of overlap for.  If None,
            will return all overlapping time periods. Defaults to `['model_link_id']`
    """
    if keep_max_of_cols is None:
        keep_max_of_cols = ["model_link_id"]
    if "start_time" not in orig_df.columns or "end_time" not in orig_df.columns:
        msg = "DataFrame must have 'start_time' and 'end_time' columns"
        WranglerLogger.error(msg)
        raise TimespanDfQueryError(msg)
    q_start, q_end = str_to_time_list(query_timespan)

    real_end = orig_df["end_time"]
    real_end.loc[orig_df["end_time"] < orig_df["start_time"]] += pd.Timedelta(days=1)

    orig_df["overlap_duration"] = calc_overlap_duration_with_query(
        orig_df["start_time"],
        real_end,
        q_start,
        q_end,
    )
    if strict_match:
        overlap_df = orig_df.loc[(orig_df.start_time <= q_start) & (real_end >= q_end)]
    else:
        overlap_df = orig_df.loc[orig_df.overlap_duration > min_overlap_minutes]
    WranglerLogger.debug(f"overlap_df: \n{overlap_df}")
    if keep_max_of_cols:
        # keep only the maximum overlap
        idx = overlap_df.groupby(keep_max_of_cols)["overlap_duration"].idxmax()
        overlap_df = overlap_df.loc[idx]
    return overlap_df

network_wrangler.utils.time.filter_df_to_overlapping_timespans

filter_df_to_overlapping_timespans(orig_df, query_timespans)

Filters dataframe for entries that have any overlap with ANY of the given query timespans.

If the end time is less than the start time, it is assumed to be the next day.

Parameters:

  • orig_df (DataFrame) –

    dataframe to query timespans for with start_time and end_time fields.

  • query_timespans (list[TimespanString]) –

    List of a list of TimespanStr of format [‘HH:MM’,’HH:MM’] to query orig_df for overlapping records.

Source code in network_wrangler/utils/time.py
@validate_call(config={"arbitrary_types_allowed": True})
def filter_df_to_overlapping_timespans(
    orig_df: pd.DataFrame,
    query_timespans: list[TimespanString],
) -> pd.DataFrame:
    """Filters dataframe for entries that have any overlap with ANY of the given query timespans.

    If the end time is less than the start time, it is assumed to be the next day.

    Args:
        orig_df: dataframe to query timespans for with `start_time` and `end_time` fields.
        query_timespans: List of a list of TimespanStr of format ['HH:MM','HH:MM'] to query orig_df
            for overlapping records.
    """
    if "start_time" not in orig_df.columns or "end_time" not in orig_df.columns:
        msg = "DataFrame must have 'start_time' and 'end_time' columns"
        WranglerLogger.error(msg)
        raise TimespanDfQueryError(msg)

    mask = pd.Series([False] * len(orig_df), index=orig_df.index)
    for query_timespan in query_timespans:
        q_start_time, q_end_time = str_to_time_list(query_timespan)
        end_time_s = orig_df["end_time"]
        end_time_s.loc[orig_df["end_time"] < orig_df["start_time"]] += pd.Timedelta(days=1)
        this_ts_mask = (orig_df["start_time"] < q_end_time) & (q_start_time < end_time_s)
        mask |= this_ts_mask
    return orig_df.loc[mask]

network_wrangler.utils.time.filter_dt_list_to_overlaps

filter_dt_list_to_overlaps(timespans)

Filter a list of timespans to only include those that overlap.

overlapping: a timespan that fully or partially overlaps a given timespan. This includes and all timespans where at least one minute overlap.

Source code in network_wrangler/utils/time.py
@validate_call
def filter_dt_list_to_overlaps(timespans: list[list[datetime]]) -> list[list[datetime]]:
    """Filter a list of timespans to only include those that overlap.

    `overlapping`: a timespan that fully or partially overlaps a given timespan.
    This includes and all timespans where at least one minute overlap.
    """
    overlaps = []
    for i in range(len(timespans)):
        for j in range(i + 1, len(timespans)):
            if dt_overlaps(timespans[i], timespans[j]):
                overlaps += [timespans[i], timespans[j]]

    # remove dupes
    return list(map(list, set(map(tuple, overlaps))))

network_wrangler.utils.time.format_seconds_to_legible_str

format_seconds_to_legible_str(seconds)

Formats seconds into a human-friendly string for log files.

Source code in network_wrangler/utils/time.py
def format_seconds_to_legible_str(seconds: int) -> str:
    """Formats seconds into a human-friendly string for log files."""
    if seconds < 60:  # noqa: PLR2004
        return f"{int(seconds)} seconds"
    if seconds < 3600:  # noqa: PLR2004
        return f"{int(seconds // 60)} minutes"
    hours = int(seconds // 3600)
    minutes = int((seconds % 3600) // 60)
    return f"{hours} hours and {minutes} minutes"

network_wrangler.utils.time.is_increasing

is_increasing(datetimes)

Check if a list of datetime objects is increasing in time.

Source code in network_wrangler/utils/time.py
def is_increasing(datetimes: list[datetime]) -> bool:
    """Check if a list of datetime objects is increasing in time."""
    return all(datetimes[i] <= datetimes[i + 1] for i in range(len(datetimes) - 1))

network_wrangler.utils.time.seconds_from_midnight_to_str

seconds_from_midnight_to_str(seconds)

Convert the number of seconds since midnight to a TimeString (HH:MM).

Source code in network_wrangler/utils/time.py
@validate_call(config={"arbitrary_types_allowed": True})
def seconds_from_midnight_to_str(seconds: int) -> TimeString:
    """Convert the number of seconds since midnight to a TimeString (HH:MM)."""
    return str(timedelta(seconds=seconds))

network_wrangler.utils.time.str_to_seconds_from_midnight

str_to_seconds_from_midnight(time_str)

Convert a TimeString (HH:MM<:SS>) to the number of seconds since midnight.

Source code in network_wrangler/utils/time.py
@validate_call(config={"arbitrary_types_allowed": True})
def str_to_seconds_from_midnight(time_str: TimeString) -> int:
    """Convert a TimeString (HH:MM<:SS>) to the number of seconds since midnight."""
    dt = str_to_time(time_str)
    return dt_to_seconds_from_midnight(dt)

network_wrangler.utils.time.str_to_time

str_to_time(time_str, base_date=None)

Convert TimeString (HH:MM<:SS>) to datetime object.

If HH > 24, will subtract 24 to be within 24 hours. Timespans will be treated as the next day.

Parameters:

  • time_str (TimeString) –

    TimeString in HH:MM:SS or HH:MM format.

  • base_date (Optional[date], default: None ) –

    optional date to base the datetime on. Defaults to None. If not provided, will use today.

Source code in network_wrangler/utils/time.py
@validate_call(config={"arbitrary_types_allowed": True})
def str_to_time(time_str: TimeString, base_date: Optional[date] = None) -> datetime:
    """Convert TimeString (HH:MM<:SS>) to datetime object.

    If HH > 24, will subtract 24 to be within 24 hours. Timespans will be treated as the next day.

    Args:
        time_str: TimeString in HH:MM:SS or HH:MM format.
        base_date: optional date to base the datetime on. Defaults to None.
            If not provided, will use today.
    """
    # Set the base date to today if not provided
    if base_date is None:
        base_date = date.today()

    # Split the time string to extract hours, minutes, and seconds
    parts = time_str.split(":")
    hours = int(parts[0])
    minutes = int(parts[1])
    seconds = int(parts[2]) if len(parts) == 3 else 0  # noqa: PLR2004

    if hours >= 24:  # noqa: PLR2004
        add_days = hours // 24
        base_date += timedelta(days=add_days)
        hours -= 24 * add_days

    # Create a time object with the adjusted hours, minutes, and seconds
    adjusted_time = datetime.strptime(f"{hours:02}:{minutes:02}:{seconds:02}", "%H:%M:%S").time()

    # Combine the base date with the adjusted time and add the extra days if needed
    combined_datetime = datetime.combine(base_date, adjusted_time)

    return combined_datetime

network_wrangler.utils.time.str_to_time_list

str_to_time_list(timespan)

Convert list of TimeStrings (HH:MM<:SS>) to list of datetime.time objects.

Source code in network_wrangler/utils/time.py
@validate_call(config={"arbitrary_types_allowed": True})
def str_to_time_list(timespan: list[TimeString]) -> list[datetime]:
    """Convert list of TimeStrings (HH:MM<:SS>) to list of datetime.time objects."""
    timespan_dt: list[datetime] = list(map(str_to_time, timespan))
    if not is_increasing(timespan_dt):
        timespan_dt = [timespan_dt[0], timespan_dt[1] + timedelta(days=1)]
        WranglerLogger.warning(
            f"Timespan is not in increasing order: {timespan}.\
            End time will be treated as next day."
        )
    return timespan_dt

network_wrangler.utils.time.str_to_time_series

str_to_time_series(time_str_s, base_date=None)

Convert mixed panda series datetime and TimeString (HH:MM<:SS>) to datetime object.

If HH > 24, will subtract 24 to be within 24 hours. Timespans will be treated as the next day.

Parameters:

  • time_str_s (Series) –

    Pandas Series of TimeStrings in HH:MM:SS or HH:MM format.

  • base_date (Optional[Union[Series, date]], default: None ) –

    optional date to base the datetime on. Defaults to None. If not provided, will use today. Can be either a single instance or a series of same length as time_str_s

Source code in network_wrangler/utils/time.py
def str_to_time_series(
    time_str_s: pd.Series, base_date: Optional[Union[pd.Series, date]] = None
) -> pd.Series:
    """Convert mixed panda series datetime and TimeString (HH:MM<:SS>) to datetime object.

    If HH > 24, will subtract 24 to be within 24 hours. Timespans will be treated as the next day.

    Args:
        time_str_s: Pandas Series of TimeStrings in HH:MM:SS or HH:MM format.
        base_date: optional date to base the datetime on. Defaults to None.
            If not provided, will use today. Can be either a single instance or a series of
            same length as time_str_s
    """
    # check strings are in the correct format, leave existing date times alone
    is_string = time_str_s.apply(lambda x: isinstance(x, str))
    time_strings = time_str_s[is_string]
    result = time_str_s.copy()
    if is_string.any():
        result[is_string] = _all_str_to_time_series(time_strings, base_date)
    result = result.astype("datetime64[ns]")
    return result

network_wrangler.utils.time.timespan_str_list_to_dt

timespan_str_list_to_dt(timespans)

Convert list of TimespanStrings to list of datetime.time objects.

Source code in network_wrangler/utils/time.py
@validate_call(config={"arbitrary_types_allowed": True})
def timespan_str_list_to_dt(timespans: list[TimespanString]) -> list[list[datetime]]:
    """Convert list of TimespanStrings to list of datetime.time objects."""
    return [str_to_time_list(ts) for ts in timespans]

network_wrangler.utils.time.timespans_overlap

timespans_overlap(timespan1, timespan2)

Check if two timespan strings overlap.

overlapping: a timespan that fully or partially overlaps a given timespan. This includes and all timespans where at least one minute overlap.

Source code in network_wrangler/utils/time.py
def timespans_overlap(timespan1: list[TimespanString], timespan2: list[TimespanString]) -> bool:
    """Check if two timespan strings overlap.

    `overlapping`: a timespan that fully or partially overlaps a given timespan.
    This includes and all timespans where at least one minute overlap.
    """
    timespan1 = str_to_time_list(timespan1)
    timespan2 = str_to_time_list(timespan2)
    return dt_overlaps(timespan1, timespan2)

Module for time and timespan objects.

network_wrangler.time.Time

Represents a time object.

This class provides methods to initialize and manipulate time objects.

Attributes:

  • datetime (datetime) –

    The underlying datetime object representing the time.

  • time_str (str) –

    The time string representation in HH:MM:SS format.

  • time_sec (int) –

    The time in seconds since midnight.

  • _raw_time_in (TimeType) –

    The raw input value used to initialize the Time object.

Source code in network_wrangler/time.py
class Time:
    """Represents a time object.

    This class provides methods to initialize and manipulate time objects.

    Attributes:
        datetime (datetime): The underlying datetime object representing the time.
        time_str (str): The time string representation in HH:MM:SS format.
        time_sec (int): The time in seconds since midnight.

        _raw_time_in (TimeType): The raw input value used to initialize the Time object.

    """

    def __init__(self, value: TimeType):
        """Initializes a Time object.

        Args:
            value (TimeType): A time object, string in HH:MM[:SS] format, or seconds since
                midnight.

        Raises:
            TimeFormatError: If the value is not a valid time format.

        """
        if isinstance(value, datetime):
            self.datetime: datetime = value
        elif isinstance(value, time):
            self.datetime = datetime.combine(datetime.today(), value)
        elif isinstance(value, str):
            self.datetime = str_to_time(value)
        elif isinstance(value, int):
            self.datetime = datetime.datetime.fromtimestamp(value).time()
        else:
            msg = "time must be a string, int, or time object"
            raise TimeFormatError(msg)

        self._raw_time_in = value

    def __getitem__(self, item: Any) -> str:
        """Get the time string representation.

        Args:
            item (Any): Not used.

        Returns:
            str: The time string representation in HH:MM:SS format.
        """
        return self.time_str

    @property
    def time_str(self):
        """Get the time string representation.

        Returns:
            str: The time string representation in HH:MM:SS format.
        """
        return self.datetime.strftime("%H:%M:%S")

    @property
    def time_sec(self):
        """Get the time in seconds since midnight.

        Returns:
            int: The time in seconds since midnight.
        """
        return self.datetime.hour * 3600 + self.datetime.minute * 60 + self.datetime.second

    def __str__(self) -> str:
        """Get the string representation of the Time object.

        Returns:
            str: The time string representation in HH:MM:SS format.
        """
        return self.time_str

    def __hash__(self) -> int:
        """Get the hash value of the Time object.

        Returns:
            int: The hash value of the Time object.
        """
        return hash(str(self))

network_wrangler.time.Time.time_sec property

time_sec

Get the time in seconds since midnight.

Returns:

  • int

    The time in seconds since midnight.

network_wrangler.time.Time.time_str property

time_str

Get the time string representation.

Returns:

  • str

    The time string representation in HH:MM:SS format.

network_wrangler.time.Time.__getitem__

__getitem__(item)

Get the time string representation.

Parameters:

  • item (Any) –

    Not used.

Returns:

  • str ( str ) –

    The time string representation in HH:MM:SS format.

Source code in network_wrangler/time.py
def __getitem__(self, item: Any) -> str:
    """Get the time string representation.

    Args:
        item (Any): Not used.

    Returns:
        str: The time string representation in HH:MM:SS format.
    """
    return self.time_str

network_wrangler.time.Time.__hash__

__hash__()

Get the hash value of the Time object.

Returns:

  • int ( int ) –

    The hash value of the Time object.

Source code in network_wrangler/time.py
def __hash__(self) -> int:
    """Get the hash value of the Time object.

    Returns:
        int: The hash value of the Time object.
    """
    return hash(str(self))

network_wrangler.time.Time.__init__

__init__(value)

Initializes a Time object.

Parameters:

  • value (TimeType) –

    A time object, string in HH:MM[:SS] format, or seconds since midnight.

Raises:

Source code in network_wrangler/time.py
def __init__(self, value: TimeType):
    """Initializes a Time object.

    Args:
        value (TimeType): A time object, string in HH:MM[:SS] format, or seconds since
            midnight.

    Raises:
        TimeFormatError: If the value is not a valid time format.

    """
    if isinstance(value, datetime):
        self.datetime: datetime = value
    elif isinstance(value, time):
        self.datetime = datetime.combine(datetime.today(), value)
    elif isinstance(value, str):
        self.datetime = str_to_time(value)
    elif isinstance(value, int):
        self.datetime = datetime.datetime.fromtimestamp(value).time()
    else:
        msg = "time must be a string, int, or time object"
        raise TimeFormatError(msg)

    self._raw_time_in = value

network_wrangler.time.Time.__str__

__str__()

Get the string representation of the Time object.

Returns:

  • str ( str ) –

    The time string representation in HH:MM:SS format.

Source code in network_wrangler/time.py
def __str__(self) -> str:
    """Get the string representation of the Time object.

    Returns:
        str: The time string representation in HH:MM:SS format.
    """
    return self.time_str

network_wrangler.time.Timespan

Timespan object.

This class provides methods to initialize and manipulate time objects.

If the end_time is less than the start_time, the duration will assume that it crosses over midnight.

Attributes:

  • start_time (time) –

    The start time of the timespan.

  • end_time (time) –

    The end time of the timespan.

  • timespan_str_list (str) –

    A list of start time and end time in HH:MM:SS format.

  • start_time_sec (int) –

    The start time in seconds since midnight.

  • end_time_sec (int) –

    The end time in seconds since midnight.

  • duration (timedelta) –

    The duration of the timespan.

  • duration_sec (int) –

    The duration of the timespan in seconds.

  • _raw_timespan_in (Any) –

    The raw input value used to initialize the Timespan object.

Source code in network_wrangler/time.py
class Timespan:
    """Timespan object.

    This class provides methods to initialize and manipulate time objects.

    If the end_time is less than the start_time, the duration will assume that it crosses
        over midnight.

    Attributes:
        start_time (datetime.time): The start time of the timespan.
        end_time (datetime.time): The end time of the timespan.
        timespan_str_list (str): A list of start time and end time in HH:MM:SS format.
        start_time_sec (int): The start time in seconds since midnight.
        end_time_sec (int): The end time in seconds since midnight.
        duration (datetime.timedelta): The duration of the timespan.
        duration_sec (int): The duration of the timespan in seconds.

        _raw_timespan_in (Any): The raw input value used to initialize the Timespan object.

    """

    def __init__(self, value: list[TimeType]):
        """Constructor for the Timespan object.

        If the value is a list of two time strings, datetime objects, Time, or seconds from
        midnight, the start_time and end_time attributes will be set accordingly.

        Args:
            value (time): a list of two time strings, datetime objects, Time, or seconds from
              midnight.
        """
        if len(value) != 2:  # noqa: PLR2004
            msg = "timespan must be a list of 2 time strings, datetime objs, Time, or sec from midnight."
            raise TimespanFormatError(msg)

        self.start_time, self.end_time = (Time(t) for t in value)
        self._raw_timespan_in = value

    @property
    def timespan_str_list(self):
        """Get the timespan string representation."""
        return [self.start_time.time_str, self.end_time.time_str]

    @property
    def start_time_sec(self):
        """Start time in seconds since midnight."""
        return self.start_time.time_sec

    @property
    def end_time_sec(self):
        """End time in seconds since midnight."""
        return self.end_time.time_sec

    @property
    def duration(self):
        """Duration of timespan as a timedelta object."""
        return duration_dt(self.start_time, self.end_time)

    @property
    def duration_sec(self):
        """Duration of timespan in seconds.

        If end_time is less than start_time, the duration will assume that it crosses over
        midnight.
        """
        if self.end_time_sec < self.start_time_sec:
            return (24 * 3600) - self.start_time_sec + self.end_time_sec
        return self.end_time_sec - self.start_time_sec

    def __str__(self) -> str:
        """String representation of the Timespan object."""
        return str(self.timespan_str)

    def __hash__(self) -> int:
        """Hash value of the Timespan object."""
        return hash(str(self))

    def overlaps(self, other: Timespan) -> bool:
        """Check if two timespans overlap.

        If the start time is greater than the end time, the timespan is assumed to cross over
        midnight.

        Args:
            other (Timespan): The other timespan to compare.

        Returns:
            bool: True if the two timespans overlap, False otherwise.
        """
        real_end_time = self.end_time.datetime
        if self.end_time.datetime > self.start_time.datetime:
            real_end_time = self.end_time.datetime + datetime.timedelta(days=1)

        real_other_end_time = other.end_time.datetime
        if other.end_time.datetime > other.start_time.datetime:
            real_other_end_time = other.end_time.datetime + datetime.timedelta(days=1)
        return (
            self.start_time.datetime <= real_other_end_time
            and real_end_time >= other.start_time.datetime
        )

network_wrangler.time.Timespan.duration property

duration

Duration of timespan as a timedelta object.

network_wrangler.time.Timespan.duration_sec property

duration_sec

Duration of timespan in seconds.

If end_time is less than start_time, the duration will assume that it crosses over midnight.

network_wrangler.time.Timespan.end_time_sec property

end_time_sec

End time in seconds since midnight.

network_wrangler.time.Timespan.start_time_sec property

start_time_sec

Start time in seconds since midnight.

network_wrangler.time.Timespan.timespan_str_list property

timespan_str_list

Get the timespan string representation.

network_wrangler.time.Timespan.__hash__

__hash__()

Hash value of the Timespan object.

Source code in network_wrangler/time.py
def __hash__(self) -> int:
    """Hash value of the Timespan object."""
    return hash(str(self))

network_wrangler.time.Timespan.__init__

__init__(value)

Constructor for the Timespan object.

If the value is a list of two time strings, datetime objects, Time, or seconds from midnight, the start_time and end_time attributes will be set accordingly.

Parameters:

  • value (time) –

    a list of two time strings, datetime objects, Time, or seconds from midnight.

Source code in network_wrangler/time.py
def __init__(self, value: list[TimeType]):
    """Constructor for the Timespan object.

    If the value is a list of two time strings, datetime objects, Time, or seconds from
    midnight, the start_time and end_time attributes will be set accordingly.

    Args:
        value (time): a list of two time strings, datetime objects, Time, or seconds from
          midnight.
    """
    if len(value) != 2:  # noqa: PLR2004
        msg = "timespan must be a list of 2 time strings, datetime objs, Time, or sec from midnight."
        raise TimespanFormatError(msg)

    self.start_time, self.end_time = (Time(t) for t in value)
    self._raw_timespan_in = value

network_wrangler.time.Timespan.__str__

__str__()

String representation of the Timespan object.

Source code in network_wrangler/time.py
def __str__(self) -> str:
    """String representation of the Timespan object."""
    return str(self.timespan_str)

network_wrangler.time.Timespan.overlaps

overlaps(other)

Check if two timespans overlap.

If the start time is greater than the end time, the timespan is assumed to cross over midnight.

Parameters:

  • other (Timespan) –

    The other timespan to compare.

Returns:

  • bool ( bool ) –

    True if the two timespans overlap, False otherwise.

Source code in network_wrangler/time.py
def overlaps(self, other: Timespan) -> bool:
    """Check if two timespans overlap.

    If the start time is greater than the end time, the timespan is assumed to cross over
    midnight.

    Args:
        other (Timespan): The other timespan to compare.

    Returns:
        bool: True if the two timespans overlap, False otherwise.
    """
    real_end_time = self.end_time.datetime
    if self.end_time.datetime > self.start_time.datetime:
        real_end_time = self.end_time.datetime + datetime.timedelta(days=1)

    real_other_end_time = other.end_time.datetime
    if other.end_time.datetime > other.start_time.datetime:
        real_other_end_time = other.end_time.datetime + datetime.timedelta(days=1)
    return (
        self.start_time.datetime <= real_other_end_time
        and real_end_time >= other.start_time.datetime
    )

Logging and Visualization

Logging utilities for Network Wrangler.

network_wrangler.logger.setup_logging

setup_logging(info_log_filename=None, debug_log_filename=None, std_out_level='info', file_mode='a')

Sets up the WranglerLogger w.r.t. the debug file location and if logging to console.

Called by the test_logging fixture in conftest.py and can be called by the user to setup logging for their session. If called multiple times, the logger will be reset.

Parameters:

  • info_log_filename (Optional[Path], default: None ) –

    the location of the log file that will get created to add the INFO log. The INFO Log is terse, just gives the bare minimum of details. Defaults to file in cwd() wrangler_[datetime].log. To turn off logging to a file, use log_filename = None.

  • debug_log_filename (Optional[Path], default: None ) –

    the location of the log file that will get created to add the DEBUG log The DEBUG log is very noisy, for debugging. Defaults to file in cwd() wrangler_[datetime].log. To turn off logging to a file, use log_filename = None.

  • std_out_level (str, default: 'info' ) –

    the level of logging to the console. One of “info”, “warning”, “debug”. Defaults to “info” but will be set to ERROR if nothing provided matches.

  • file_mode (str, default: 'a' ) –

    use ‘a’ to append, ‘w’ to write without appending

Source code in network_wrangler/logger.py
def setup_logging(
    info_log_filename: Optional[Path] = None,
    debug_log_filename: Optional[Path] = None,
    std_out_level: str = "info",
    file_mode: str = 'a'
):
    """Sets up the WranglerLogger w.r.t. the debug file location and if logging to console.

    Called by the test_logging fixture in conftest.py and can be called by the user to setup
    logging for their session. If called multiple times, the logger will be reset.

    Args:
        info_log_filename: the location of the log file that will get created to add the INFO log.
            The INFO Log is terse, just gives the bare minimum of details.
            Defaults to file in cwd() `wrangler_[datetime].log`. To turn off logging to a file,
            use log_filename = None.
        debug_log_filename: the location of the log file that will get created to add the DEBUG log
            The DEBUG log is very noisy, for debugging. Defaults to file in cwd()
            `wrangler_[datetime].log`. To turn off logging to a file, use log_filename = None.
        std_out_level: the level of logging to the console. One of "info", "warning", "debug".
            Defaults to "info" but will be set to ERROR if nothing provided matches.
        file_mode: use 'a' to append, 'w' to write without appending
    """
    # add function variable so that we know if logging has been called
    setup_logging.called = True

    DEFAULT_LOG_PATH = Path(f"wrangler_{datetime.now().strftime('%Y_%m_%d__%H_%M_%S')}.debug.log")
    debug_log_filename = debug_log_filename if debug_log_filename else DEFAULT_LOG_PATH

    # Clear handles if any exist already
    WranglerLogger.handlers = []

    WranglerLogger.setLevel(logging.DEBUG)

    FORMAT = logging.Formatter(
        "%(asctime)-15s %(levelname)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S,"
    )
    default_info_f = f"network_wrangler_{datetime.now().strftime('%Y_%m_%d__%H_%M_%S')}.info.log"
    info_log_filename = info_log_filename or Path.cwd() / default_info_f

    info_file_handler = logging.FileHandler(Path(info_log_filename), mode=file_mode)
    info_file_handler.setLevel(logging.INFO)
    info_file_handler.setFormatter(FORMAT)
    WranglerLogger.addHandler(info_file_handler)

    # create debug file only when debug_log_filename is provided
    if debug_log_filename:
        debug_log_handler = logging.FileHandler(Path(debug_log_filename), mode=file_mode)
        debug_log_handler.setLevel(logging.DEBUG)
        debug_log_handler.setFormatter(FORMAT)
        WranglerLogger.addHandler(debug_log_handler)

    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setLevel(logging.DEBUG)
    console_handler.setFormatter(FORMAT)
    WranglerLogger.addHandler(console_handler)
    if std_out_level == "debug":
        console_handler.setLevel(logging.DEBUG)
    elif std_out_level == "info":
        console_handler.setLevel(logging.INFO)
    elif std_out_level == "warning":
        console_handler.setLevel(logging.WARNING)
    else:
        console_handler.setLevel(logging.ERROR)

Module for visualizing roadway and transit networks using Mapbox tiles.

This module provides a function net_to_mapbox that creates and serves Mapbox tiles on a local web server based on roadway and transit networks.

Example usage

net_to_mapbox(roadway, transit)

network_wrangler.viz.MissingMapboxTokenError

Bases: Exception

Raised when MAPBOX_ACCESS_TOKEN is not found in environment variables.

Source code in network_wrangler/viz.py
class MissingMapboxTokenError(Exception):
    """Raised when MAPBOX_ACCESS_TOKEN is not found in environment variables."""

network_wrangler.viz.net_to_mapbox

net_to_mapbox(roadway=None, transit=None, roadway_geojson_out=Path('roadway_shapes.geojson'), transit_geojson_out=Path('transit_shapes.geojson'), mbtiles_out=Path('network.mbtiles'), overwrite=True, port='9000')

Creates and serves mapbox tiles on local web server based on roadway and transit networks.

Parameters:

  • roadway (Optional[Union[RoadwayNetwork, GeoDataFrame, str, Path]], default: None ) –

    a RoadwayNetwork instance, geodataframe with roadway linetrings, or path to a geojson file. Defaults to empty GeoDataFrame.

  • transit (Optional[Union[TransitNetwork, GeoDataFrame]], default: None ) –

    a TransitNetwork instance or a geodataframe with roadway linetrings, or path to a geojson file. Defaults to empty GeoDataFrame.

  • roadway_geojson_out (Path, default: Path('roadway_shapes.geojson') ) –

    file path for roadway geojson which gets created if roadway is not a path to a geojson file. Defaults to roadway_shapes.geojson.

  • transit_geojson_out (Path, default: Path('transit_shapes.geojson') ) –

    file path for transit geojson which gets created if transit is not a path to a geojson file. Defaults to transit_shapes.geojson.

  • mbtiles_out (Path, default: Path('network.mbtiles') ) –

    path to output mapbox tiles. Defaults to network.mbtiles

  • overwrite (bool, default: True ) –

    boolean indicating if can overwrite mbtiles_out and roadway_geojson_out and transit_geojson_out. Defaults to True.

  • port (str, default: '9000' ) –

    port to serve resulting tiles on. Defaults to 9000.

Source code in network_wrangler/viz.py
def net_to_mapbox(
    roadway: Optional[Union[RoadwayNetwork, gpd.GeoDataFrame, str, Path]] = None,
    transit: Optional[Union[TransitNetwork, gpd.GeoDataFrame]] = None,
    roadway_geojson_out: Path = Path("roadway_shapes.geojson"),
    transit_geojson_out: Path = Path("transit_shapes.geojson"),
    mbtiles_out: Path = Path("network.mbtiles"),
    overwrite: bool = True,
    port: str = "9000",
):
    """Creates and serves mapbox tiles on local web server based on roadway and transit networks.

    Args:
        roadway: a RoadwayNetwork instance, geodataframe with roadway linetrings, or path to a
            geojson file. Defaults to empty GeoDataFrame.
        transit: a TransitNetwork instance or a geodataframe with roadway linetrings, or path to a
            geojson file. Defaults to empty GeoDataFrame.
        roadway_geojson_out: file path for roadway geojson which gets created if roadway is not
            a path to a geojson file. Defaults to roadway_shapes.geojson.
        transit_geojson_out: file path for transit geojson which gets created if transit is not
            a path to a geojson file. Defaults to transit_shapes.geojson.
        mbtiles_out: path to output mapbox tiles. Defaults to network.mbtiles
        overwrite: boolean indicating if can overwrite mbtiles_out and roadway_geojson_out and
            transit_geojson_out. Defaults to True.
        port: port to serve resulting tiles on. Defaults to 9000.
    """
    if roadway is None:
        roadway = gpd.GeoDataFrame()
    if transit is None:
        transit = gpd.GeoDataFrame()
    # test for mapbox token
    try:
        os.getenv("MAPBOX_ACCESS_TOKEN")
    except Exception as err:
        WranglerLogger.error(
            "NEED TO SET MAPBOX ACCESS TOKEN IN ENVIRONMENT VARIABLES/n \
                In command line: >>export MAPBOX_ACCESS_TOKEN='pk.0000.1111' # \
                replace value with your mapbox public access token"
        )
        raise MissingMapboxTokenError() from err

    if isinstance(transit, TransitNetwork):
        transit = transit.shape_links_gdf
        transit.to_file(transit_geojson_out, driver="GeoJSON")
    elif Path(transit).exists():
        transit_geojson_out = transit
    else:
        msg = f"Don't understand transit input: {transit}"
        raise ValueError(msg)

    if isinstance(roadway, RoadwayNetwork):
        roadway = roadway.link_shapes_df
        roadway.to_file(roadway_geojson_out, driver="GeoJSON")
    elif Path(roadway).exists():
        roadway_geojson_out = Path(roadway)
    else:
        msg = "Don't understand roadway input: {roadway}"
        raise ValueError(msg)

    tippe_options_list: list[str] = ["-zg", "-o", str(mbtiles_out)]
    if overwrite:
        tippe_options_list.append("--force")
    # tippe_options_list.append("--drop-densest-as-needed")
    tippe_options_list.append(str(roadway_geojson_out))
    tippe_options_list.append(str(transit_geojson_out))

    try:
        WranglerLogger.info(
            f"Running tippecanoe with following options: {' '.join(tippe_options_list)}"
        )
        subprocess.run(["tippecanoe", *tippe_options_list], check=False)
    except Exception as err:
        WranglerLogger.error(
            "If tippecanoe isn't installed, try `brew install tippecanoe` or \
                visit https://github.com/mapbox/tippecanoe"
        )
        raise ImportError() from err

    try:
        WranglerLogger.info(
            "Running mbview with following options: {}".format(" ".join(tippe_options_list))
        )
        subprocess.run(["mbview", "--port", port, f", /{mbtiles_out}"], check=False)
    except Exception as err:
        WranglerLogger.error(
            "If mbview isn't installed, try `npm install -g @mapbox/mbview` or \
                visit https://github.com/mapbox/mbview"
        )
        raise ImportError(msg) from err

Error Handling

All network wrangler errors.

network_wrangler.errors.DataframeSelectionError

Bases: Exception

Raised when there is an issue with a selection from a dataframe.

Source code in network_wrangler/errors.py
class DataframeSelectionError(Exception):
    """Raised when there is an issue with a selection from a dataframe."""

network_wrangler.errors.FeedReadError

Bases: Exception

Raised when there is an error reading a transit feed.

Source code in network_wrangler/errors.py
class FeedReadError(Exception):
    """Raised when there is an error reading a transit feed."""

network_wrangler.errors.FeedValidationError

Bases: Exception

Raised when there is an issue with the validation of the GTFS data.

Source code in network_wrangler/errors.py
class FeedValidationError(Exception):
    """Raised when there is an issue with the validation of the GTFS data."""

network_wrangler.errors.InvalidScopedLinkValue

Bases: Exception

Raised when there is an issue with a scoped link value.

Source code in network_wrangler/errors.py
class InvalidScopedLinkValue(Exception):
    """Raised when there is an issue with a scoped link value."""

network_wrangler.errors.LinkAddError

Bases: Exception

Raised when there is an issue with adding links.

Source code in network_wrangler/errors.py
class LinkAddError(Exception):
    """Raised when there is an issue with adding links."""

network_wrangler.errors.LinkChangeError

Bases: Exception

Raised when there is an error in changing a link property.

Source code in network_wrangler/errors.py
class LinkChangeError(Exception):
    """Raised when there is an error in changing a link property."""

network_wrangler.errors.LinkCreationError

Bases: Exception

Raised when there is an issue with creating links.

Source code in network_wrangler/errors.py
class LinkCreationError(Exception):
    """Raised when there is an issue with creating links."""

network_wrangler.errors.LinkDeletionError

Bases: Exception

Raised when there is an issue with deleting links.

Source code in network_wrangler/errors.py
class LinkDeletionError(Exception):
    """Raised when there is an issue with deleting links."""

network_wrangler.errors.LinkNotFoundError

Bases: Exception

Raised when a link is not found in the links table.

Source code in network_wrangler/errors.py
class LinkNotFoundError(Exception):
    """Raised when a link is not found in the links table."""

network_wrangler.errors.ManagedLaneAccessEgressError

Bases: Exception

Raised when there is an issue with access/egress points to managed lanes.

Source code in network_wrangler/errors.py
class ManagedLaneAccessEgressError(Exception):
    """Raised when there is an issue with access/egress points to managed lanes."""

network_wrangler.errors.MissingNodesError

Bases: Exception

Raised when referenced nodes are missing from the network.

Source code in network_wrangler/errors.py
class MissingNodesError(Exception):
    """Raised when referenced nodes are missing from the network."""

network_wrangler.errors.NewRoadwayError

Bases: Exception

Raised when there is an issue with applying a new roadway.

Source code in network_wrangler/errors.py
class NewRoadwayError(Exception):
    """Raised when there is an issue with applying a new roadway."""

network_wrangler.errors.NodeAddError

Bases: Exception

Raised when there is an issue with adding nodes.

Source code in network_wrangler/errors.py
class NodeAddError(Exception):
    """Raised when there is an issue with adding nodes."""

network_wrangler.errors.NodeChangeError

Bases: Exception

Raised when there is an issue with applying a node change.

Source code in network_wrangler/errors.py
class NodeChangeError(Exception):
    """Raised when there is an issue with applying a node change."""

network_wrangler.errors.NodeDeletionError

Bases: Exception

Raised when there is an issue with deleting nodes.

Source code in network_wrangler/errors.py
class NodeDeletionError(Exception):
    """Raised when there is an issue with deleting nodes."""

network_wrangler.errors.NodeNotFoundError

Bases: Exception

Raised when a node is not found in the nodes table.

Source code in network_wrangler/errors.py
class NodeNotFoundError(Exception):
    """Raised when a node is not found in the nodes table."""

network_wrangler.errors.NodesInLinksMissingError

Bases: Exception

Raised when there is an issue with validating links and nodes.

Source code in network_wrangler/errors.py
class NodesInLinksMissingError(Exception):
    """Raised when there is an issue with validating links and nodes."""

network_wrangler.errors.NotLinksError

Bases: Exception

Raised when a dataframe is not a RoadLinksTable.

Source code in network_wrangler/errors.py
class NotLinksError(Exception):
    """Raised when a dataframe is not a RoadLinksTable."""

network_wrangler.errors.NotNodesError

Bases: Exception

Raised when a dataframe is not a RoadNodesTable.

Source code in network_wrangler/errors.py
class NotNodesError(Exception):
    """Raised when a dataframe is not a RoadNodesTable."""

network_wrangler.errors.ProjectCardError

Bases: Exception

Raised when a project card is not valid.

Source code in network_wrangler/errors.py
class ProjectCardError(Exception):
    """Raised when a project card is not valid."""

network_wrangler.errors.RoadwayDeletionError

Bases: Exception

Raised when there is an issue with applying a roadway deletion.

Source code in network_wrangler/errors.py
class RoadwayDeletionError(Exception):
    """Raised when there is an issue with applying a roadway deletion."""

network_wrangler.errors.RoadwayPropertyChangeError

Bases: Exception

Raised when there is an issue with applying a roadway property change.

Source code in network_wrangler/errors.py
class RoadwayPropertyChangeError(Exception):
    """Raised when there is an issue with applying a roadway property change."""

network_wrangler.errors.ScenarioConflictError

Bases: Exception

Raised when a conflict is detected.

Source code in network_wrangler/errors.py
class ScenarioConflictError(Exception):
    """Raised when a conflict is detected."""

network_wrangler.errors.ScenarioCorequisiteError

Bases: Exception

Raised when a co-requisite is not satisfied.

Source code in network_wrangler/errors.py
class ScenarioCorequisiteError(Exception):
    """Raised when a co-requisite is not satisfied."""

network_wrangler.errors.ScenarioPrerequisiteError

Bases: Exception

Raised when a pre-requisite is not satisfied.

Source code in network_wrangler/errors.py
class ScenarioPrerequisiteError(Exception):
    """Raised when a pre-requisite is not satisfied."""

network_wrangler.errors.ScopeConflictError

Bases: Exception

Raised when there is a scope conflict in a list of ScopedPropertySetItems.

Source code in network_wrangler/errors.py
class ScopeConflictError(Exception):
    """Raised when there is a scope conflict in a list of ScopedPropertySetItems."""

network_wrangler.errors.ScopeLinkValueError

Bases: Exception

Raised when there is an issue with ScopedLinkValueList.

Source code in network_wrangler/errors.py
class ScopeLinkValueError(Exception):
    """Raised when there is an issue with ScopedLinkValueList."""

network_wrangler.errors.SegmentFormatError

Bases: Exception

Error in segment format.

Source code in network_wrangler/errors.py
class SegmentFormatError(Exception):
    """Error in segment format."""

network_wrangler.errors.SegmentSelectionError

Bases: Exception

Error in segment selection.

Source code in network_wrangler/errors.py
class SegmentSelectionError(Exception):
    """Error in segment selection."""

network_wrangler.errors.SelectionError

Bases: Exception

Raised when there is an issue with a selection.

Source code in network_wrangler/errors.py
class SelectionError(Exception):
    """Raised when there is an issue with a selection."""

network_wrangler.errors.ShapeAddError

Bases: Exception

Raised when there is an issue with adding shapes.

Source code in network_wrangler/errors.py
class ShapeAddError(Exception):
    """Raised when there is an issue with adding shapes."""

network_wrangler.errors.ShapeDeletionError

Bases: Exception

Raised when there is an issue with deleting shapes from a network.

Source code in network_wrangler/errors.py
class ShapeDeletionError(Exception):
    """Raised when there is an issue with deleting shapes from a network."""

network_wrangler.errors.SubnetCreationError

Bases: Exception

Raised when a subnet can’t be created.

Source code in network_wrangler/errors.py
class SubnetCreationError(Exception):
    """Raised when a subnet can't be created."""

network_wrangler.errors.SubnetExpansionError

Bases: Exception

Raised when a subnet can’t be expanded to include a node or set of nodes.

Source code in network_wrangler/errors.py
class SubnetExpansionError(Exception):
    """Raised when a subnet can't be expanded to include a node or set of nodes."""

network_wrangler.errors.TimeFormatError

Bases: Exception

Time format error exception.

Source code in network_wrangler/errors.py
class TimeFormatError(Exception):
    """Time format error exception."""

network_wrangler.errors.TimespanFormatError

Bases: Exception

Timespan format error exception.

Source code in network_wrangler/errors.py
class TimespanFormatError(Exception):
    """Timespan format error exception."""

network_wrangler.errors.TransitPropertyChangeError

Bases: Exception

Error raised when applying transit property changes.

Source code in network_wrangler/errors.py
class TransitPropertyChangeError(Exception):
    """Error raised when applying transit property changes."""

network_wrangler.errors.TransitRoadwayConsistencyError

Bases: Exception

Error raised when transit network is inconsistent with roadway network.

Source code in network_wrangler/errors.py
class TransitRoadwayConsistencyError(Exception):
    """Error raised when transit network is inconsistent with roadway network."""

network_wrangler.errors.TransitRouteAddError

Bases: Exception

Error raised when applying add transit route.

Source code in network_wrangler/errors.py
class TransitRouteAddError(Exception):
    """Error raised when applying add transit route."""

network_wrangler.errors.TransitRoutingChangeError

Bases: Exception

Raised when there is an error in the transit routing change.

Source code in network_wrangler/errors.py
class TransitRoutingChangeError(Exception):
    """Raised when there is an error in the transit routing change."""

network_wrangler.errors.TransitSelectionEmptyError

Bases: Exception

Error for when no transit trips are selected.

Source code in network_wrangler/errors.py
class TransitSelectionEmptyError(Exception):
    """Error for when no transit trips are selected."""

network_wrangler.errors.TransitSelectionError

Bases: Exception

Base error for transit selection errors.

Source code in network_wrangler/errors.py
class TransitSelectionError(Exception):
    """Base error for transit selection errors."""

network_wrangler.errors.TransitSelectionNetworkConsistencyError

Bases: TransitSelectionError

Error for when transit selection dictionary is not consistent with transit network.

Source code in network_wrangler/errors.py
class TransitSelectionNetworkConsistencyError(TransitSelectionError):
    """Error for when transit selection dictionary is not consistent with transit network."""

network_wrangler.errors.TransitValidationError

Bases: Exception

Error raised when transit network doesn’t have expected values.

Source code in network_wrangler/errors.py
class TransitValidationError(Exception):
    """Error raised when transit network doesn't have expected values."""