Source code for lasso.util

from functools import partial
import pyproj
from shapely.ops import transform
from shapely.geometry import Point, Polygon
import re
from unidecode import unidecode

[docs]def get_shared_streets_intersection_hash(lat, long, osm_node_id=None): """ Calculated per: https://github.com/sharedstreets/sharedstreets-js/blob/0e6d7de0aee2e9ae3b007d1e45284b06cc241d02/src/index.ts#L553-L565 Expected in/out -93.0965985, 44.952112199999995 osm_node_id = 954734870 69f13f881649cb21ee3b359730790bb9 """ import hashlib message = "Intersection {0:.5f} {0:.5f}".format(long, lat) if osm_node_id: message += " {}".format(osm_node_id) unhashed = message.encode("utf-8") hash = hashlib.md5(unhashed).hexdigest() return hash
[docs]def hhmmss_to_datetime(hhmmss_str: str): """ Creates a datetime time object from a string of hh:mm:ss Args: hhmmss_str: string of hh:mm:ss Returns: dt: datetime.time object representing time """ import datetime dt = datetime.time(*[int(i) for i in hhmmss_str.split(":")]) return dt
[docs]def secs_to_datetime(secs: int): """ Creates a datetime time object from a seconds from midnight Args: secs: seconds from midnight Returns: dt: datetime.time object representing time """ import datetime dt = (datetime.datetime.min + datetime.timedelta(seconds=secs)).time() return dt
[docs]def geodesic_point_buffer(lat, lon, meters): """ creates circular buffer polygon for node Args: lat: node lat lon: node lon meters: buffer distance, radius of circle Returns: Polygon """ proj_wgs84 = pyproj.Proj('+proj=longlat +datum=WGS84') # Azimuthal equidistant projection aeqd_proj = '+proj=aeqd +lat_0={lat} +lon_0={lon} +x_0=0 +y_0=0' project = partial( pyproj.transform, pyproj.Proj(aeqd_proj.format(lat=lat, lon=lon)), proj_wgs84) buf = Point(0, 0).buffer(meters) # distance in meters return Polygon(transform(project, buf).exterior.coords[:])
[docs]def create_locationreference(node, link): node['X'] = node['geometry'].apply(lambda p: p.x) node['Y'] = node['geometry'].apply(lambda p: p.y) node['point'] = [list(xy) for xy in zip(node.X, node.Y)] node_dict = dict(zip(node.model_node_id, node.point)) link['A_point'] = link['A'].map(node_dict) link['B_point'] = link['B'].map(node_dict) link['locationReferences'] = link.apply(lambda x: [{'sequence':1, 'point': x['A_point'], 'distanceToNextRef':x['length'], 'bearing' : 0, 'intersectionId':x['fromIntersectionId']}, {'sequence':2, 'point': x['B_point'], 'intersectionId':x['toIntersectionId']}], axis = 1)
[docs]def column_name_to_parts(c, parameters=None): if not parameters: from .parameters import Parameters parameters = Parameters() if c[0:2] == "ML": managed = True else: managed = False time_period = None category = None if c.split("_")[0] not in parameters.properties_to_split.keys(): return c, None, None, managed tps = parameters.time_period_to_time.keys() cats = parameters.categories.keys() if c.split("_")[-1] in tps: time_period = c.split("_")[-1] base_name = c.split(time_period)[-2][:-1] if c.split("_")[-2] in cats: category = c.split("_")[-2] base_name = c.split(category)[-2][:-1] elif c.split("_")[-1] in cats: category = c.split("_")[-1] base_name = c.split(category)[-2][:-1] else: msg = "Can't split property correctly: {}".format(c) WranglerLogger.error(msg) return base_name, time_period, category, managed
[docs]def shorten_name(name): if type(name) == str: name_list = name.split(',') else: name_list = name name_list = [re.sub(r'\W+', ' ', c).replace('nan', '').strip(' ') for c in name_list] name_list = list(set(name_list)) #name_list.remove('') name_new = ' '.join(name_list).strip(' ') # convert non english character to english name_new = unidecode(name_new) return name_new