import json
import os
import re
from typing import Any, Dict, Optional, Union, List
from csv import reader
from pandas.core import base
import numpy as np
import pandas as pd
from pandas import DataFrame
import geopandas as gpd
from network_wrangler import ProjectCard
from network_wrangler import RoadwayNetwork
from .transit import CubeTransit, StandardTransit
from .logger import WranglerLogger
from .parameters import Parameters
from .roadway import ModelRoadwayNetwork
from .util import column_name_to_parts
[docs]class Project(object):
"""A single or set of changes to the roadway or transit system.
Compares a base and a build transit network or a base and build
highway network and produces project cards.
.. highlight:: python
Typical usage example:
::
test_project = Project.create_project(
base_cube_transit_source=os.path.join(CUBE_DIR, "transit.LIN"),
build_cube_transit_source=os.path.join(CUBE_DIR, "transit_route_shape_change"),
)
test_project.evaluate_changes()
test_project.write_project_card(
os.path.join(SCRATCH_DIR, "t_transit_shape_test.yml")
)
Attributes:
DEFAULT_PROJECT_NAME: a class-level constant that defines what
the project name will be if none is set.
STATIC_VALUES: a class-level constant which defines values that
are not evaluated when assessing changes.
card_data (dict): {"project": <project_name>, "changes": <list of change dicts>}
roadway_link_changes (DataFrame): pandas dataframe of CUBE roadway link changes.
roadway_node_changes (DataFrame): pandas dataframe of CUBE roadway node changes.
transit_changes (CubeTransit):
base_roadway_network (RoadwayNetwork):
base_cube_transit_network (CubeTransit):
build_cube_transit_network (CubeTransit):
project_name (str): name of the project, set to DEFAULT_PROJECT_NAME if not provided
parameters: an instance of the Parameters class which sets a bunch of parameters
"""
DEFAULT_PROJECT_NAME = "USER TO define"
STATIC_VALUES = [
"model_link_id",
"area_type",
"county",
# "assign_group",
"centroidconnect",
]
CALCULATED_VALUES = [
"area_type",
"county",
"assign_group",
"centroidconnect",
]
[docs] def __init__(
self,
roadway_link_changes: Optional[DataFrame] = None,
roadway_node_changes: Optional[DataFrame] = None,
transit_changes: Optional[DataFrame] = None,
base_roadway_network: Optional[RoadwayNetwork] = None,
base_transit_network: Optional[StandardTransit] = None,
base_cube_transit_network: Optional[CubeTransit] = None,
build_cube_transit_network: Optional[CubeTransit] = None,
project_name: Optional[str] = "",
evaluate: Optional[bool] = False,
parameters: Union[dict, Parameters] = {},
):
"""
ProjectCard constructor.
args:
roadway_link_changes: dataframe of roadway changes read from a log file
roadway_node_changes: dataframe of roadway changes read from a log file
transit_changes: dataframe of transit changes read from a log file
base_roadway_network: RoadwayNetwork instance for base case
base_transit_network: StandardTransit instance for base case
base_cube_transit_network: CubeTransit instance for base transit network
build_cube_transit_network: CubeTransit instance for build transit network
project_name: name of the project
evaluate: defaults to false, but if true, will create card data
parameters: dictionary of parameter settings (see Parameters class) or an instance of Parameters. If not specified, will use default parameters.
returns: instance of ProjectCard
"""
self.card_data = Dict[str, Dict[str, Any]]
self.roadway_link_changes = roadway_link_changes
self.roadway_node_changes = roadway_node_changes
self.base_roadway_network = base_roadway_network
self.base_transit_network = base_transit_network
self.base_cube_transit_network = base_cube_transit_network
self.build_cube_transit_network = build_cube_transit_network
self.transit_changes = transit_changes
self.project_name = (
project_name if project_name else Project.DEFAULT_PROJECT_NAME
)
if type(parameters) is dict:
self.parameters = Parameters(**parameters)
elif isinstance(parameters, Parameters):
self.parameters = Parameters(**parameters.__dict__)
else:
msg = "Parameters should be a dict or instance of Parameters: found {} which is of type:{}".format(
parameters, type(parameters)
)
WranglerLogger.error(msg)
raise ValueError(msg)
if base_roadway_network != None:
self.determine_roadway_network_changes_compatibility(
self.base_roadway_network,
self.roadway_link_changes,
self.roadway_node_changes,
self.parameters
)
if evaluate:
self.evaluate_changes()
[docs] def write_project_card(self, filename: str = None):
"""
Writes project cards.
Args:
filename (str): File path to output .yml
Returns:
None
"""
ProjectCard(self.card_data).write(out_filename=filename)
[docs] @staticmethod
def create_project(
roadway_log_file: Union[str, List[str], None] = None,
roadway_shp_file: Optional[str] = None,
roadway_csv_file: Optional[str] = None,
network_build_file: Optional[str] = None,
emme_node_id_crosswalk_file: Optional[str] = None,
emme_name_crosswalk_file: Optional[str] = None,
base_roadway_dir: Optional[str] = None,
base_transit_dir: Optional[str] = None,
base_cube_transit_source: Optional[str] = None,
build_cube_transit_source: Optional[str] = None,
roadway_link_changes: Optional[DataFrame] = None,
roadway_node_changes: Optional[DataFrame] = None,
transit_changes: Optional[CubeTransit] = None,
base_roadway_network: Optional[RoadwayNetwork] = None,
base_cube_transit_network: Optional[CubeTransit] = None,
build_cube_transit_network: Optional[CubeTransit] = None,
project_name: Optional[str] = None,
recalculate_calculated_variables: Optional[bool] = False,
recalculate_distance: Optional[bool] = False,
parameters: Optional[dict] = {},
**kwargs,
):
"""
Constructor for a Project instance.
Args:
roadway_log_file: File path to consuming logfile or a list of logfile paths.
roadway_shp_file: File path to consuming shape file for roadway changes.
roadway_csv_file: File path to consuming csv file for roadway changes.
network_build_file: File path to consuming EMME network build for network changes.
base_roadway_dir: Folder path to base roadway network.
base_transit_dir: Folder path to base transit network.
base_cube_transit_source: Folder path to base transit network or cube line file string.
base_cube_transit_file: File path to base transit network.
build_cube_transit_source: Folder path to build transit network or cube line file string.
build_cube_transit_file: File path to build transit network.
roadway_link_changes: pandas dataframe of CUBE roadway link changes.
roadway_node_changes: pandas dataframe of CUBE roadway node changes.
transit_changes: build transit changes.
base_roadway_network: Base roadway network object.
base_cube_transit_network: Base cube transit network object.
build_cube_transit_network: Build cube transit network object.
project_name: If not provided, will default to the roadway_log_file filename if
provided (or the first filename if a list is provided)
recalculate_calculated_variables: if reading in a base network, if this is true it
will recalculate variables such as area type, etc. This only needs to be true
if you are creating project cards that are changing the calculated variables.
recalculate_distance: recalculate the distance variable. This only needs to be
true if you are creating project cards that change the distance.
parameters: dictionary of parameters
crs (int): coordinate reference system, ESPG number
node_foreign_key (str): variable linking the node table to the link table
link_foreign_key (list): list of variable linking the link table to the node foreign key
shape_foreign_key (str): variable linking the links table and shape table
unique_link_ids (list): list of variables unique to each link
unique_node_ids (list): list of variables unique to each node
modes_to_network_link_variables (dict): Mapping of modes to link variables in
the network
modes_to_network_nodes_variables (dict): Mapping of modes to node variables
in the network
managed_lanes_node_id_scalar (int): Scalar values added to primary keys for nodes for
corresponding managed lanes.
managed_lanes_link_id_scalar (int): Scalar values added to primary keys for links for
corresponding managed lanes.
managed_lanes_required_attributes (list): attributes that must be specified in managed
lane projects.
keep_same_attributes_ml_and_gp (list): attributes to copy to managed lanes from parallel
general purpose lanes.
Returns:
A Project instance.
"""
if base_cube_transit_source and base_cube_transit_network:
msg = "Method takes only one of 'base_cube_transit_source' and 'base_cube_transit_network' but both given"
WranglerLogger.error(msg)
raise ValueError(msg)
if base_cube_transit_source:
base_cube_transit_network = CubeTransit.create_from_cube(base_cube_transit_source, parameters)
WranglerLogger.debug(
"Base network has {} lines".format(len(base_cube_transit_network.lines))
)
if len(base_cube_transit_network.lines) <= 10:
WranglerLogger.debug(
"Base network lines: {}".format(
"\n - ".join(base_cube_transit_network.lines)
)
)
elif base_cube_transit_network:
pass
else:
msg = "No base cube transit network."
WranglerLogger.info(msg)
base_cube_transit_network = None
if build_cube_transit_source and transit_changes:
msg = "Method takes only one of 'build_cube_transit_source' and 'transit_changes' but both given"
WranglerLogger.error(msg)
raise ValueError(msg)
if build_cube_transit_source:
WranglerLogger.debug("build")
build_cube_transit_network = CubeTransit.create_from_cube(build_cube_transit_source, parameters)
WranglerLogger.debug(
"Build network has {} lines".format(len(build_cube_transit_network.lines))
)
if len(build_cube_transit_network.lines) <= 10:
WranglerLogger.debug(
"Build network lines: {}".format(
"\n - ".join(build_cube_transit_network.lines)
)
)
elif transit_changes:
pass
else:
msg = "No cube transit changes given or processed."
WranglerLogger.info(msg)
transit_changes = None
if roadway_log_file and (roadway_link_changes or roadway_node_changes):
msg = "Method takes only one of 'roadway_log_file' and 'roadway_changes' but both given"
WranglerLogger.error(msg)
raise ValueError(msg)
if roadway_shp_file and (roadway_link_changes or roadway_node_changes):
msg = "Method takes only one of 'roadway_shp_file' and 'roadway_changes' but both given"
WranglerLogger.error(msg)
raise ValueError(msg)
if roadway_csv_file and (roadway_link_changes or roadway_node_changes):
msg = "Method takes only one of 'roadway_csv_file' and 'roadway_changes' but both given"
WranglerLogger.error(msg)
raise ValueError(msg)
if roadway_log_file and roadway_csv_file:
msg = "Method takes only one of 'roadway_log_file' and 'roadway_csv_file' but both given"
WranglerLogger.error(msg)
raise ValueError(msg)
if roadway_shp_file and roadway_csv_file:
msg = "Method takes only one of 'roadway_shp_file' and 'roadway_csv_file' but both given"
WranglerLogger.error(msg)
raise ValueError(msg)
if roadway_log_file and roadway_shp_file:
msg = "Method takes only one of 'roadway_log_file' and 'roadway_shp_file' but both given"
WranglerLogger.error(msg)
raise ValueError(msg)
if roadway_log_file and not project_name:
if type(roadway_log_file) == list:
project_name = os.path.splitext(os.path.basename(roadway_log_file[0]))[
0
]
WranglerLogger.info(
"No Project Name - Using name of first log file in list"
)
else:
project_name = os.path.splitext(os.path.basename(roadway_log_file))[0]
WranglerLogger.info("No Project Name - Using name of log file")
if network_build_file and not project_name:
if type(network_build_file) == list:
with open(network_build_file[0]) as f:
_content = json.load(f)
project_name = (
_content.get('metadata').get('project_title') + ' ' +
_content.get('metadata').get('date') + ' ' +
_content.get('metadata').get('comments')
)
WranglerLogger.info(
"No Project Name - Using metadata of first network build file in list"
)
else:
with open(network_build_file) as f:
_content = json.load(f)
project_name = (
_content.get('metadata').get('project_title') + ' ' +
_content.get('metadata').get('date') + ' ' +
_content.get('metadata').get('comments')
)
WranglerLogger.info("No Project Name - Using metadata of network build file")
if roadway_log_file:
roadway_link_changes, roadway_node_changes = Project.read_logfile(roadway_log_file)
elif roadway_shp_file:
roadway_changes = gpd.read_file(roadway_shp_file)
roadway_link_changes = roadway_changes[roadway_changes.OBJECT == 'L'].copy()
roadway_node_changes = roadway_changes[roadway_changes.OBJECT == 'N'].copy()
roadway_link_changes = DataFrame(roadway_link_changes.drop("geometry", axis=1))
roadway_node_changes = DataFrame(roadway_node_changes.drop("geometry", axis=1))
roadway_node_changes["model_node_id"] = 0
elif roadway_csv_file:
roadway_changes = pd.read_csv(roadway_csv_file)
roadway_link_changes = roadway_changes[roadway_changes.OBJECT == 'L'].copy()
roadway_node_changes = roadway_changes[roadway_changes.OBJECT == 'N'].copy()
roadway_node_changes["model_node_id"] = 0
elif network_build_file:
roadway_link_changes, roadway_node_changes, transit_changes = Project.read_network_build_file(network_build_file)
if emme_node_id_crosswalk_file:
# get wrangler IDs from emme element_id
roadway_link_changes, roadway_node_changes, transit_changes = Project.emme_id_to_wrangler_id(
roadway_link_changes,
roadway_node_changes,
transit_changes,
emme_node_id_crosswalk_file
)
else:
msg = "User needs to specify emme node id crosswalk file using emme_node_id_crosswalk_file = "
WranglerLogger.error(msg)
raise ValueError(msg)
# rename emme attributes to wrangler attributes
if emme_name_crosswalk_file is None:
emme_name_crosswalk_file = parameters.emme_name_crosswalk_file
roadway_link_changes, roadway_node_changes = Project.emme_name_to_wrangler_name(
roadway_link_changes,
roadway_node_changes,
emme_name_crosswalk_file
)
elif roadway_link_changes:
pass
elif roadway_node_changes:
pass
else:
msg = "No roadway changes given or processed."
WranglerLogger.info(msg)
roadway_link_changes = pd.DataFrame({})
roadway_node_changes = pd.DataFrame({})
if base_roadway_network and base_roadway_dir:
msg = "Method takes only one of 'base_roadway_network' and 'base_roadway_dir' but both given"
WranglerLogger.error(msg)
raise ValueError(msg)
if base_roadway_dir:
base_roadway_network = ModelRoadwayNetwork.read(
os.path.join(base_roadway_dir, "link.json"),
os.path.join(base_roadway_dir, "node.geojson"),
os.path.join(base_roadway_dir, "shape.geojson"),
fast=True,
recalculate_calculated_variables=recalculate_calculated_variables,
recalculate_distance=recalculate_distance,
parameters=parameters,
**kwargs,
)
base_roadway_network.split_properties_by_time_period_and_category()
elif base_roadway_network:
base_roadway_network.split_properties_by_time_period_and_category()
else:
msg = "No base roadway network."
WranglerLogger.info(msg)
base_roadway_network = None
if base_cube_transit_source and base_transit_dir:
msg = "Method takes only one of 'base_cube_transit_source' and 'base_transit_dir' but both given"
WranglerLogger.error(msg)
raise ValueError(msg)
if base_transit_dir:
base_transit_network = StandardTransit.read_gtfs(
gtfs_feed_dir=base_transit_dir,
parameters=parameters
)
else:
msg = "No base transit network."
WranglerLogger.info(msg)
base_transit_network = None
project = Project(
roadway_link_changes=roadway_link_changes,
roadway_node_changes=roadway_node_changes,
transit_changes=transit_changes,
base_roadway_network=base_roadway_network,
base_transit_network=base_transit_network,
base_cube_transit_network=base_cube_transit_network,
build_cube_transit_network=build_cube_transit_network,
evaluate=True,
project_name=project_name,
parameters=parameters,
)
return project
[docs] @staticmethod
def read_logfile(logfilename: Union[str, List[str]]):
"""
Reads a Cube log file and returns separate dataframes of roadway_link_changes and roadway_node_changes
Args:
logfilename (str or list[str]): File path to CUBE logfile or list of logfile paths.
Returns:
A DataFrame reprsentation of the log file.
"""
if type(logfilename) == str:
logfilename = [logfilename]
link_df = pd.DataFrame()
node_df = pd.DataFrame()
for file in logfilename:
WranglerLogger.info("Reading logfile: {}".format(file))
with open(file) as f:
_content = f.readlines()
_node_lines = [
x.strip().replace(";", ",") for x in _content if x.startswith("N")
]
WranglerLogger.debug("node lines: {}".format(_node_lines))
_link_lines = [
x.strip().replace(";", ",") for x in _content if x.startswith("L")
]
WranglerLogger.debug("link lines: {}".format(_link_lines))
_nodecol = ["OBJECT", "OPERATION", "GROUP"] + _node_lines[0].split(",")[
1:
]
WranglerLogger.debug("Node Cols: {}".format(_nodecol))
_linkcol = ["OBJECT", "OPERATION", "GROUP"] + _link_lines[0].split(",")[
1:
]
WranglerLogger.debug("Link Cols: {}".format(_linkcol))
def split_log(x):
return list(reader([x], delimiter=',', quotechar='"'))[0]
_node_df = pd.DataFrame([split_log(x) for x in _node_lines[1:]],columns = _nodecol)
WranglerLogger.debug("Node DF: {}".format(_node_df))
_link_df = pd.DataFrame([split_log(x) for x in _link_lines[1:]],columns = _linkcol)
WranglerLogger.debug("Link DF: {}".format(_link_df))
node_df = pd.concat([node_df, _node_df])
link_df = pd.concat([link_df, _link_df])
# CUBE logfile headers for string fields: NAME[111] instead of NAME, need to shorten that
link_df.columns = [c.split("[")[0] for c in link_df.columns]
# CUBE logfile headers for string fields: NAME[111] instead of NAME, need to shorten that
node_df.columns = [c.split("[")[0] for c in node_df.columns]
if len(link_df) > 0:
# create operation history
action_history_df = (
link_df.groupby(['A', 'B'])["OPERATION"]
.agg(lambda x: x.tolist())
.rename("operation_history")
.reset_index()
)
action_history_df["operation_final"] = action_history_df.apply(lambda x: Project._final_op(x), axis=1)
link_df = pd.merge(link_df, action_history_df, on=['A', 'B'], how="left")
if len(node_df) > 0:
action_history_df = (
node_df.groupby('N')["OPERATION"]
.agg(lambda x: x.tolist())
.rename("operation_history")
.reset_index()
)
action_history_df["operation_final"] = action_history_df.apply(lambda x: Project._final_op(x), axis=1)
node_df = pd.merge(node_df, action_history_df, on='N', how="left")
WranglerLogger.info(
"Processed {} Node lines and {} Link lines".format(
node_df.shape[0], link_df.shape[0]
)
)
return link_df, node_df
[docs] @staticmethod
def read_network_build_file(networkbuildfilename: Union[str, List[str]]):
"""
Reads a emme network build file and returns separate dataframes of roadway_link_changes and roadway_node_changes
Args:
networkbuildfilename (str or list[str]): File path to emme nework build file or list of network build file paths.
Returns:
A DataFrame representation of the network build file
"""
if type(networkbuildfilename) == str:
networkbuildfilename = [networkbuildfilename]
_link_command_history_df = DataFrame()
_node_command_history_df = DataFrame()
_transit_command_history_df = DataFrame()
for file in networkbuildfilename:
WranglerLogger.info("Reading network build file: {}".format(file))
with open(file) as f:
_content = json.load(f)
_command_history = _content.get('command_history')
# loop through all the commands
for command in _command_history:
if command.get('command') == 'set_attribute':
element_id = command.get('parameters').get('element_ids')
object = Project.get_object_from_network_build_command(command)
operation = Project.get_operation_from_network_build_command(command)
_command_df = DataFrame(
data = {
'element_id' : element_id,
'object' : object,
'operation' : operation
}
)
_command_df[command.get('parameters').get('attribute_name')] = command.get('parameters').get('value')
if command.get('command') in ['create_link', 'create_node']:
if command.get('command') == 'create_link':
element_id = command.get('results').get('changes').get('added').get('LINK')
if command.get('command') == 'create_node':
element_id = command.get('results').get('changes').get('added').get('NODE')
object = Project.get_object_from_network_build_command(command)
operation = Project.get_operation_from_network_build_command(command)
_command_df = DataFrame(
data = {
'element_id' : element_id,
'object' : object,
'operation' : operation
}
)
for attribute_name, attribute_value in command.get('parameters').get('attributes').items():
_command_df[attribute_name] = attribute_value
if command.get('command') == 'delete_link':
element_id = command.get('results').get('changes').get('removed').get('LINK')
object = Project.get_object_from_network_build_command(command)
operation = Project.get_operation_from_network_build_command(command)
_command_df = DataFrame(
data = {
'element_id' : element_id,
'object' : object,
'operation' : operation
}
)
if command.get('command') == 'modify_transit_line':
element_id = command.get('parameters').get('line_id')
object = Project.get_object_from_network_build_command(command)
operation = Project.get_operation_from_network_build_command(command)
_command_df = DataFrame(
data = {
'element_id' : pd.Series(element_id),
'object' : pd.Series(object),
'operation' : pd.Series(operation)
}
)
_command_df['new_itinerary'] = [command.get('parameters').get('new_itinerary')]
if ('L' in _command_df['object'].unique()):
_link_command_history_df = _link_command_history_df.append(
_command_df[_command_df['object'] == 'L'],
sort = False,
ignore_index = True
)
if ('N' in _command_df['object'].unique()):
_node_command_history_df = _node_command_history_df.append(
_command_df[_command_df['object'] == 'N'],
sort = False,
ignore_index = True
)
if (
('TRANSIT_LINE' in _command_df['object'].unique()) |
('TRANSIT_STOP' in _command_df['object'].unique()) |
('TRANSIT_SHAPE' in _command_df['object'].unique())
):
_transit_command_history_df = _transit_command_history_df.append(
_command_df[_command_df['object'].isin(['TRANSIT_LINE', 'TRANSIT_STOP', 'TRANSIT_SHAPE'])],
sort = False,
ignore_index = True
)
if len(_link_command_history_df) > 0:
# create operation history
link_action_history_df = (
_link_command_history_df.groupby('element_id')["operation"]
.agg(lambda x: x.tolist())
.rename("operation_history")
.reset_index()
)
link_action_history_df["operation_final"] = link_action_history_df.apply(
lambda x: Project._final_op(x),
axis=1
)
# get the last none null value for each element
# consolidate elements to single record
def get_last_valid(series):
if len(series.dropna()) > 0:
return series.dropna().iloc[-1]
else:
return np.nan
#_command_history_df = _command_history_df.groupby(['element_id']).apply(get_last_valid).reset_index()
_link_command_history_df = _link_command_history_df.groupby(
['element_id']
).last().reset_index()
_link_command_history_df = pd.merge(_link_command_history_df, link_action_history_df, on='element_id', how="left")
if len(_node_command_history_df) > 0:
# create node operation history
node_action_history_df = (
_node_command_history_df.groupby('element_id')["operation"]
.agg(lambda x: x.tolist())
.rename("operation_history")
.reset_index()
)
node_action_history_df["operation_final"] = node_action_history_df.apply(
lambda x: Project._final_op(x),
axis=1
)
_node_command_history_df = _node_command_history_df.groupby(
['element_id']
).last().reset_index()
_node_command_history_df = pd.merge(_node_command_history_df, node_action_history_df, on='element_id', how="left")
WranglerLogger.info(
"Processed {} link element commands, {} node element commands".format(
_link_command_history_df.shape[0],
_node_command_history_df.shape[0]
)
)
return _link_command_history_df, _node_command_history_df, _transit_command_history_df
[docs] @staticmethod
def emme_id_to_wrangler_id(emme_link_change_df, emme_node_change_df, emme_transit_changes_df, emme_node_id_crosswalk_file):
"""
rewrite the emme id with wrangler id, using the emme wrangler id crosswalk located in database folder
"""
WranglerLogger.info('Reading emme node id crosswalk file from {}'.format(emme_node_id_crosswalk_file))
emme_node_id_crosswalk_df = pd.read_csv(emme_node_id_crosswalk_file)
emme_node_id_dict = dict(zip(emme_node_id_crosswalk_df['emme_node_id'], emme_node_id_crosswalk_df['model_node_id']))
# get node changes
if len(emme_node_change_df) > 0:
emme_node_change_df['emme_id'] = emme_node_change_df['element_id'].apply(lambda x: int(x.split('-')[0]))
# get new emme nodes
new_emme_node_id_list = [
n for n in emme_node_change_df['emme_id'] if n not in emme_node_id_crosswalk_df['emme_node_id']
]
WranglerLogger.info('New emme node id list {}'.format(new_emme_node_id_list))
new_wrangler_node = emme_node_id_crosswalk_df['model_node_id'].max()
# add crosswalk for new emme nodes
for new_emme_node in new_emme_node_id_list:
new_wrangler_node = new_wrangler_node + 1
emme_node_id_dict.update({new_emme_node : new_wrangler_node})
# for nodes update model_node_id
emme_node_change_df['model_node_id'] = emme_node_change_df['emme_id'].map(emme_node_id_dict).fillna(0)
if len(emme_link_change_df) > 0:
emme_link_change_df['A'] = emme_link_change_df['element_id'].apply(lambda x: int(x.split('-')[0]))
emme_link_change_df['B'] = emme_link_change_df['element_id'].apply(lambda x: int(x.split('-')[-1]))
# for links update A,B nodes
emme_link_change_df['A'] = emme_link_change_df['A'].map(emme_node_id_dict)
emme_link_change_df['B'] = emme_link_change_df['B'].map(emme_node_id_dict)
if len(emme_transit_changes_df) > 0:
emme_transit_changes_df['i_node'] = emme_transit_changes_df.apply(
lambda x: x['element_id'].split('-')[-3] if x['object'] == 'TRANSIT_STOP' else 0,
axis = 1
)
emme_transit_changes_df['j_node'] = emme_transit_changes_df.apply(
lambda x: x['element_id'].split('-')[-2] if x['object'] == 'TRANSIT_STOP' else 0,
axis = 1
)
# update i,j nodes
emme_transit_changes_df['i_node'] = emme_transit_changes_df[
'i_node'
].astype(
int
).map(
emme_node_id_dict
).fillna(0).astype(int)
emme_transit_changes_df['j_node'] = emme_transit_changes_df[
'j_node'
].astype(
int
).map(
emme_node_id_dict
).fillna(0).astype(int)
# update routing nodes
emme_transit_changes_df['new_itinerary'] = emme_transit_changes_df.apply(
lambda x: [emme_node_id_dict.get(n) for n in x['new_itinerary']] if x['object'] == 'TRANSIT_SHAPE' else 0,
axis = 1
)
return emme_link_change_df, emme_node_change_df, emme_transit_changes_df
[docs] def get_object_from_network_build_command(row):
"""
determine the network build object is node or link
Args:
row: network build command history dataframe
Returns:
'N' for node, 'L' for link
"""
if row.get('command') == 'create_link':
return 'L'
if row.get('command') == 'create_node':
return 'N'
if row.get('command') == 'delete_link':
return 'L'
if row.get('command') == 'set_attribute':
if row.get('parameters').get('element_type') == 'LINK':
return 'L'
if row.get('parameters').get('element_type') == 'NODE':
return 'N'
if row.get('parameters').get('element_type') == 'TRANSIT_LINE':
return 'TRANSIT_LINE'
if row.get('parameters').get('element_type') == 'TRANSIT_SEGMENT':
return 'TRANSIT_STOP'
if row.get('command') == 'modify_transit_line':
return 'TRANSIT_SHAPE'
[docs] def get_operation_from_network_build_command(row):
"""
determine the network build object action type
Args:
row: network build command history dataframe
Returns:
'A', 'C', 'D'
"""
if row.get('command') == 'create_link':
return 'A'
if row.get('command') == 'create_node':
return 'A'
if row.get('command') == 'delete_link':
return 'D'
if row.get('command') == 'set_attribute':
if row.get('parameters').get('element_type') == 'LINK':
return 'C'
if row.get('parameters').get('element_type') == 'NODE':
return 'C'
if row.get('parameters').get('element_type') == 'TRANSIT_LINE':
return 'C'
if row.get('parameters').get('element_type') == 'TRANSIT_SEGMENT':
return 'C'
if row.get('command') == 'modify_transit_line':
return 'C'
[docs] @staticmethod
def emme_name_to_wrangler_name(emme_link_change_df, emme_node_change_df, emme_name_crosswalk_file):
"""
rename emme names to wrangler names using crosswalk file
"""
WranglerLogger.info('Reading emme attribute name crosswalk file {}'.format(emme_name_crosswalk_file))
emme_name_crosswalk_df = pd.read_csv(emme_name_crosswalk_file)
emme_name_crosswalk_dict = dict(zip(emme_name_crosswalk_df['emme_name'], emme_name_crosswalk_df['wrangler_name']))
# drop columns we don't need from emme to avoid confusion
ignore_columns = [
c for c in emme_link_change_df.columns if c not in list(emme_name_crosswalk_dict.keys()) + ['operation_final', 'A', 'B']
]
WranglerLogger.info('Ignoring link changes in {}'.format(ignore_columns))
emme_link_change_df = emme_link_change_df.drop(ignore_columns, axis = 1)
ignore_columns = [
c for c in emme_node_change_df.columns if c not in list(emme_name_crosswalk_dict.keys()) + ['operation_final', 'model_node_id']
]
WranglerLogger.info('Ignoring node changes in {}'.format(ignore_columns))
emme_node_change_df = emme_node_change_df.drop(ignore_columns, axis = 1)
# rename emme name to wrangler name
emme_link_change_df.rename(columns = emme_name_crosswalk_dict, inplace = True)
emme_node_change_df.rename(columns = emme_name_crosswalk_dict, inplace = True)
return emme_link_change_df, emme_node_change_df
[docs] @staticmethod
def determine_roadway_network_changes_compatability(
base_roadway_network: ModelRoadwayNetwork,
roadway_link_changes: DataFrame,
roadway_node_changes: DataFrame,
parameters: Parameters,
):
"""
Checks to see that any links or nodes that change exist in base roadway network.
"""
WranglerLogger.info(
"Evaluating compatibility between roadway network changes and base network. Not evaluating deletions."
)
# CUBE log file saves all variable names in upper cases, need to convert them to be same as network
log_to_net_df = pd.read_csv(parameters.log_to_net_crosswalk)
log_to_net_dict = dict(zip(log_to_net_df["log"], log_to_net_df["net"]))
dbf_to_net_df = pd.read_csv(parameters.net_to_dbf_crosswalk)
dbf_to_net_dict = dict(zip(dbf_to_net_df["dbf"], dbf_to_net_df["net"]))
for c in roadway_link_changes.columns:
if (c not in log_to_net_df["log"].tolist() + log_to_net_df["net"].tolist()) & (c not in ["A", "B"]):
roadway_link_changes.rename(columns={c : c.lower()}, inplace=True)
roadway_link_changes.rename(columns=log_to_net_dict, inplace=True)
roadway_link_changes.rename(columns=dbf_to_net_dict, inplace=True)
for c in roadway_node_changes.columns:
if (c not in log_to_net_df["log"].tolist() + log_to_net_df["net"].tolist()) & (c not in ["A", "B"]):
roadway_node_changes.rename(columns={c : c.lower()}, inplace=True)
roadway_node_changes.rename(columns=log_to_net_dict, inplace=True)
roadway_node_changes.rename(columns=dbf_to_net_dict, inplace=True)
# for links "L" that change "C",
# find locations where there isn't a base roadway link
if len(roadway_link_changes) > 0:
link_changes_df = roadway_link_changes[
roadway_link_changes["operation_final"] == "C"
].copy()
link_merge_df = pd.merge(
link_changes_df[["A", "B"]].astype(str),
base_roadway_network.links_df[["A", "B", "model_link_id"]].astype(str),
how="left",
on=["A", "B"],
)
missing_links = link_merge_df.loc[link_merge_df["model_link_id"].isna()]
if missing_links.shape[0]:
msg = "Network missing the following AB links:\n{}".format(missing_links)
WranglerLogger.error(msg)
raise ValueError(msg)
# for links "N" that change "C",
# find locations where there isn't a base roadway node
if len(roadway_node_changes) > 0:
node_changes_df = roadway_node_changes[
roadway_node_changes["operation_final"] == "C"
].copy()
node_merge_df = pd.merge(
node_changes_df[["model_node_id"]],
base_roadway_network.nodes_df[["model_node_id", "geometry"]],
how="left",
on=["model_node_id"],
)
missing_nodes = node_merge_df.loc[node_merge_df["geometry"].isna()]
if missing_nodes.shape[0]:
msg = "Network missing the following nodes:\n{}".format(missing_nodes)
WranglerLogger.error(msg)
raise ValueError(msg)
[docs] def evaluate_changes(self):
"""
Determines which changes should be evaluated, initiates
self.card_data to be an aggregation of transit and highway changes.
"""
highway_change_list = []
transit_change_list = []
WranglerLogger.info("Evaluating project changes.")
if (not self.roadway_link_changes.empty) | (not self.roadway_node_changes.empty):
highway_change_list = self.add_highway_changes()
if (not self.transit_changes.empty) or (
self.base_cube_transit_network is not None
and self.build_cube_transit_network is not None
):
transit_change_list = self.add_transit_changes()
self.card_data = {
"project": self.project_name,
"changes": transit_change_list + highway_change_list,
}
[docs] def add_transit_changes(self):
"""
Evaluates changes between base and build transit objects and
adds entries into the self.card_data dictionary.
"""
if self.build_cube_transit_network:
transit_change_list = self.build_cube_transit_network.evaluate_differences(
self.base_cube_transit_network
)
elif self.base_transit_network:
transit_change_list = self.base_transit_network.evaluate_differences(
self.transit_changes
)
return transit_change_list
@staticmethod
def _final_op(x):
if x["operation_history"][-1] == "D":
if "A" in x["operation_history"][:-1]:
return "N"
else:
return "D"
elif x["operation_history"][-1] == "A":
if "D" in x["operation_history"][:-1]:
return "C"
else:
return "A"
else:
if "A" in x["operation_history"][:-1]:
return "A"
else:
return "C"
[docs] def add_highway_changes(self, limit_variables_to_existing_network=False):
"""
Evaluates changes from the log file based on the base highway object and
adds entries into the self.card_data dictionary.
Args:
limit_variables_to_existing_network (bool): True if no ad-hoc variables. Default to False.
"""
for c in self.parameters.string_col:
if c in self.roadway_link_changes.columns:
self.roadway_link_changes[c] = self.roadway_link_changes[c].str.lstrip(" ")
if c in self.roadway_node_changes.columns:
self.roadway_node_changes[c] = self.roadway_node_changes[c].str.lstrip(" ")
## if worth it, could also add some functionality to network wrangler itself.
node_changes_df = self.roadway_node_changes.copy()
link_changes_df = self.roadway_link_changes.copy()
def _process_deletions(link_changes_df):
"""
create deletion section in project card
"""
WranglerLogger.debug("Processing link deletions")
cube_delete_df = link_changes_df[link_changes_df["operation_final"] == "D"].copy()
# make sure columns has the same type as base network
cube_delete_df['A'] = cube_delete_df['A'].astype(
type(self.base_roadway_network.links_df['A'].iloc[0])
)
cube_delete_df['B'] = cube_delete_df['B'].astype(
type(self.base_roadway_network.links_df['B'].iloc[0])
)
if 'model_link_id' in cube_delete_df.columns:
cube_delete_df.drop(['model_link_id'], axis = 1, inplace = True)
cube_delete_df = pd.merge(
cube_delete_df,
self.base_roadway_network.links_df[['A', 'B', 'model_link_id']],
how = 'left',
on = ['A', 'B']
)
if len(cube_delete_df) > 0:
links_to_delete = cube_delete_df["model_link_id"].tolist()
delete_link_dict = {
"category": "Roadway Deletion",
"links": {"model_link_id": links_to_delete},
}
WranglerLogger.debug("{} Links Deleted.".format(len(links_to_delete)))
else:
delete_link_dict = None
WranglerLogger.debug("No link deletions processed")
return delete_link_dict
def _process_link_additions(
link_changes_df, limit_variables_to_existing_network
):
""""""
WranglerLogger.debug("Processing link additions")
cube_add_df = link_changes_df[link_changes_df["operation_final"] == "A"]
if len(cube_add_df) == 0:
WranglerLogger.debug("No link additions processed")
return {}
if limit_variables_to_existing_network:
add_col = [
c
for c in cube_add_df.columns
if c in self.base_roadway_network.links_df.columns
]
else:
add_col = [
c for c in cube_add_df.columns if c not in ["operation_final"]
]
# can leave out "operation_final" from writing out, is there a reason to write it out?
for x in add_col:
cube_add_df[x] = cube_add_df[x].astype(self.base_roadway_network.links_df[x].dtype)
add_link_properties = cube_add_df[add_col].to_dict("records")
# WranglerLogger.debug("Add Link Properties: {}".format(add_link_properties))
WranglerLogger.debug("{} Links Added".format(len(add_link_properties)))
return {"category": "Add New Roadway", "links": add_link_properties}
def _process_node_additions(node_add_df):
""""""
WranglerLogger.debug("Processing node additions")
if len(node_add_df) == 0:
WranglerLogger.debug("No node additions processed")
return []
node_add_df = node_add_df.drop(["operation_final"], axis=1)
for x in node_add_df.columns:
node_add_df[x] = node_add_df[x].astype(self.base_roadway_network.nodes_df[x].dtype)
add_nodes_dict_list = node_add_df.to_dict(
"records"
)
WranglerLogger.debug("{} Nodes Added".format(len(add_nodes_dict_list)))
return add_nodes_dict_list
def _process_single_link_change(change_row, changeable_col):
""""""
# 1. Find associated base year network values
base_df = self.base_roadway_network.links_df[
(self.base_roadway_network.links_df["A"] == int(change_row.A))
& (self.base_roadway_network.links_df["B"] == int(change_row.B))
]
if not base_df.shape[0]:
msg = "No match found in network for AB combination: ({},{}). Incompatible base network.".format(
change_row.A, change_row.B
)
WranglerLogger.error(msg)
raise ValueError(msg)
elif base_df.shape[0] > 1:
WranglerLogger.warning(
"Found more than one match in base network for AB combination: ({},{}). Selecting first one to operate on but AB should be unique to network.".format(
change_row.A, change_row.B
)
)
base_row = base_df.iloc[0]
# WranglerLogger.debug("Properties with changes: {}".format(changeable_col))
# 2. find columns that changed (enough)
changed_col = []
for col in changeable_col:
WranglerLogger.debug("Assessing Column: {}".format(col))
# if it is the same as before, or a static value, don't process as a change
if str(change_row[col]).strip('"\'') == str(base_row[col]).strip('"\''):
continue
# if it is NaN or None, don't process as a change
if (change_row[col] != change_row[col]) | (change_row[col] is None):
continue
if (col == "roadway_class") & (change_row[col] == 0):
continue
# only look at distance if it has significantly changed
if col == "distance":
if (
abs(
(change_row[col] - float(base_row[col]))
/ base_row[col].astype(float)
)
> 0.01
):
change_row[col] = type(base_row[col])(change_row[col])
changed_col.append(col)
else:
continue
else:
change_row[col] = type(base_row[col])(change_row[col])
changed_col.append(col)
WranglerLogger.debug(
"Properties with changes that will be processed: {}".format(changed_col)
)
if not changed_col:
return pd.DataFrame()
# 3. Iterate through columns with changed values and structure the changes as expected in project card
property_dict_list = []
processed_properties = []
# check if it's a manged lane change
for c in changed_col:
if c.startswith("ML_"):
# TODO ML project card skeleton
msg = "Detected managed lane changes, please create managed lane project card!"
WranglerLogger.error(msg)
raise ValueError(msg)
return
# regular roadway property change
for c in changed_col:
# WranglerLogger.debug("Processing Column: {}".format(c))
(
p_base_name,
p_time_period,
p_category,
managed_lane,
) = column_name_to_parts(c, self.parameters)
_d = {
"existing": base_row[c],
"set": change_row[c],
}
if c in Project.CALCULATED_VALUES:
_d = {
"set": change_row[c],
}
if p_time_period:
if managed_lane == 1:
_d["time"] = list(
self.parameters.time_period_to_time[p_time_period]
)
if p_category:
_d["category"] = p_category
# iterate through existing properties that have been changed and see if you should just add
if (p_base_name in processed_properties) & (managed_lane == 1):
for processed_p in property_dict_list:
if processed_p["property"] == p_base_name:
processed_p["timeofday"] += [_d]
elif (p_base_name in processed_properties) & (managed_lane == 0):
for processed_p in property_dict_list:
if processed_p["property"] == p_base_name:
if processed_p["set"] != change_row[c]:
msg = "Detected different changes for split-property variables on regular roadway links: "
msg += "conflicting \"{}\" values \"{}\", \"{}\"".format(p_base_name, processed_p["set"], change_row[c])
WranglerLogger.error(msg)
raise ValueError(msg)
elif p_time_period:
if managed_lane == 1:
property_dict = {"property": p_base_name, "timeofday": [_d]}
processed_properties.append(p_base_name)
property_dict_list.append(property_dict)
else:
_d["property"] = p_base_name
processed_properties.append(_d["property"])
property_dict_list.append(_d)
else:
_d["property"] = p_base_name
processed_properties.append(_d["property"])
property_dict_list.append(_d)
card_df = pd.DataFrame(
{
"properties": pd.Series([property_dict_list]),
"model_link_id": pd.Series(base_row["model_link_id"]),
}
)
# WranglerLogger.debug('single change card_df:\n {}'.format(card_df))
return card_df
def _process_link_changes(link_changes_df, changeable_col):
""""""
cube_change_df = link_changes_df[link_changes_df["operation_final"] == "C"].copy()
# make sure columns has the same type as base network
cube_change_df['A'] = cube_change_df['A'].astype(
type(self.base_roadway_network.links_df['A'].iloc[0])
)
cube_change_df['B'] = cube_change_df['B'].astype(
type(self.base_roadway_network.links_df['B'].iloc[0])
)
if 'model_link_id' in cube_change_df.columns:
cube_change_df.drop('model_link_id', axis = 1, inplace = True)
cube_change_df = pd.merge(
cube_change_df,
self.base_roadway_network.links_df[['A', 'B', 'model_link_id']],
how = 'left',
on = ['A', 'B']
)
if not cube_change_df.shape[0]:
WranglerLogger.info("No link changes processed")
return []
change_link_dict_df = pd.DataFrame(columns=["properties", "model_link_id"])
for index, row in cube_change_df.iterrows():
card_df = _process_single_link_change(row, changeable_col)
change_link_dict_df = pd.concat(
[change_link_dict_df, card_df], ignore_index=True, sort=False
)
if not change_link_dict_df.shape[0]:
WranglerLogger.info("No link changes processed")
return []
# WranglerLogger.debug('change_link_dict_df Unaggregated:\n {}'.format(change_link_dict_df))
# Have to change to string so that it is a hashable type for the aggregation
change_link_dict_df["properties"] = change_link_dict_df[
"properties"
].astype(str)
# Group the changes that are the same
change_link_dict_df = (
change_link_dict_df.groupby("properties")[["model_link_id"]]
.agg(lambda x: list(x))
.reset_index()
)
# WranglerLogger.debug('change_link_dict_df Aggregated:\n {}'.format(change_link_dict_df))
# Reformat model link id to correct "facility" format
change_link_dict_df["facility"] = change_link_dict_df.apply(
lambda x: {"link": [{"model_link_id": x.model_link_id}]}, axis=1
)
# WranglerLogger.debug('change_link_dict_df 3: {}'.format(change_link_dict_df))
change_link_dict_df["properties"] = change_link_dict_df["properties"].apply(
lambda x: json.loads(
x.replace("'\"", "'").replace("\"'", "'").replace("'", '"')
)
)
change_link_dict_df["category"] = "Roadway Property Change"
change_link_dict_list = change_link_dict_df[
["category", "facility", "properties"]
].to_dict("record")
WranglerLogger.debug(
"{} Changes Processed".format(len(change_link_dict_list))
)
return change_link_dict_list
def _consolidate_actions(log, base, key_list):
log_df = log.copy()
# will be changed if to allow new variables being added/changed that are not in base network
changeable_col = [x for x in log_df.columns if x in base.columns]
#print(log_df)
#for x in changeable_col:
# print(x)
#log_df[x] = log_df[x].astype(base[x].dtype)
if 'operation_final' not in log_df.columns:
action_history_df = (
log_df.groupby(key_list)["operation"]
.agg(lambda x: x.tolist())
.rename("operation_history")
.reset_index()
)
log_df = pd.merge(log_df, action_history_df, on=key_list, how="left")
log_df.drop_duplicates(subset=key_list, keep="last", inplace=True)
log_df["operation_final"] = log_df.apply(lambda x: Project._final_op(x), axis=1)
return log_df[changeable_col + ["operation_final"]]
delete_link_dict = None
add_link_dict = None
change_link_dict_list = []
if len(link_changes_df) != 0:
link_changes_df = _consolidate_actions(
link_changes_df, self.base_roadway_network.links_df, ["A", "B"]
)
# process deletions
delete_link_dict = _process_deletions(link_changes_df)
# process additions
add_link_dict = _process_link_additions(
link_changes_df, limit_variables_to_existing_network
)
# process changes
WranglerLogger.debug("Processing changes")
WranglerLogger.debug(link_changes_df)
changeable_col = list(
(
set(link_changes_df.columns)
& set(self.base_roadway_network.links_df.columns)
)
- set(Project.STATIC_VALUES)
)
cols_in_changes_not_in_net = list(
set(link_changes_df.columns)
- set(self.base_roadway_network.links_df.columns)
)
if cols_in_changes_not_in_net:
WranglerLogger.warning(
"The following attributes are specified in the changes but do not exist in the base network: {}".format(
cols_in_changes_not_in_net
)
)
change_link_dict_list = _process_link_changes(link_changes_df, changeable_col)
if len(node_changes_df) != 0:
node_changes_df = _consolidate_actions(
node_changes_df, self.base_roadway_network.nodes_df, ["model_node_id"]
)
# print error message for node change and node deletion
if (
len(node_changes_df[node_changes_df["operation_final"].isin(["C", "D"])])
> 0
):
msg = "NODE changes and deletions are not allowed!"
WranglerLogger.warning(msg)
#raise ValueError(msg)
node_add_df = node_changes_df[node_changes_df["operation_final"] == "A"]
if add_link_dict:
add_link_dict["nodes"] = _process_node_additions(node_add_df)
else:
add_link_dict = {"category": "Add New Roadway", "nodes": _process_node_additions(node_add_df)}
else:
None
# combine together
highway_change_list = list(
filter(None, [delete_link_dict] + [add_link_dict] + change_link_dict_list)
)
return highway_change_list