Source code for tracktable.render.map_processing.common_processing

#
# Copyright (c) 2014-2023 National Technology and Engineering
# Solutions of Sandia, LLC. Under the terms of Contract DE-NA0003525
# with National Technology and Engineering Solutions of Sandia, LLC,
# the U.S. Government retains certain rights in this software.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

"""
tracktable.render.common_processing - Collection of functions that are commonly used across all of the rendering backends
"""

import hashlib
import logging
from math import ceil

import cartopy.mpl.geoaxes
import cartopy.crs
import folium as fol
import matplotlib
import matplotlib.colors
import matplotlib.pyplot
import numpy
import tracktable.domain.terrestrial as domain
from tracktable.core.geomath import distance, length, point_at_length_fraction
from tracktable.render.map_decoration import coloring

logger = logging.getLogger(__name__)

[docs] def common_processing(trajectories, obj_ids, line_color, color_map, gradient_hue): """Common processing functionality Args: trajectories (list): List of Trajectories obj_ids (list): List of IDs line_color (name of standard color as string, hex color string or matplotlib color object, or list of any of these): The single color to use for all the segments in each trajectory. Overrides color_map and gradient_hue values. Can be a list of matplotlib color name strings, hex color strings or matplotlib color objects the same length as the length of the list of trajectories. color_map (name of standard colormap as string or matplotlib color_map object or list of either): The color map to use in rendering the segments of each trajectory. gradient_hue (float or list of floats): hue or list of hues (one per trajectory) to be used in definig the gradient color map (dark to light) for the trajectories. Only used if line_color and color_map are not used (set to ''). If line_color, color_map and gradient_hue are all unset the default behavior is to set the gradient_hue based on a hash of the object_id Returns: trajectories, line_color, color_map and gradient_hue """ # handle a single traj as input if type(trajectories) is not list: trajectories = [trajectories] # filter trajectories list by obj_ids if specified if obj_ids != []: if type(obj_ids) is not list: trajectories = [traj for traj in trajectories \ if traj[0].object_id == obj_ids] else: filtered_trajs = [] for obj_id in obj_ids: matched = [traj for traj in trajectories \ if traj[0].object_id == obj_id] filtered_trajs+=matched trajectories = filtered_trajs # now handle some color processing # translate strings into colormaps if type(color_map) is str and color_map != '': color_map = matplotlib.cm.get_cmap(color_map) elif type(color_map) is list: for i, cm in enumerate(color_map): if type(cm) is str: color_map[i] = matplotlib.cm.get_cmap(cm) # TODO make this into a function called 3 times # Handle too few colors if type(line_color) is list and len(trajectories) > len(line_color): times_to_repeat = ceil(len(trajectories)/len(line_color)) line_color = line_color * times_to_repeat # Handle too few color maps if type(color_map) is list and len(trajectories) > len(color_map): times_to_repeat = ceil(len(trajectories)/len(color_map)) color_map = color_map * times_to_repeat # Handle too few hues (say that 5 times fast) ;) if type(gradient_hue) is list and len(trajectories) > len(gradient_hue): times_to_repeat = ceil(len(trajectories)/len(gradient_hue)) gradient_hue = gradient_hue * times_to_repeat return trajectories, line_color, color_map, gradient_hue
# ----------------------------------------------------------------------
[docs] def hash_short_md5(string): """Given any string, returns a number between 0 and 1. The same number is always returned given the same string. Internally uses hashlib.md5, but only uses the first quarter of the full hash Args: string (str): String to be hashed Returns: 0 or 1 """ return int(hashlib.md5(string.encode('utf-8')).hexdigest()[:8], base=16)/((2**32)-1)
# perceived brightness (may be useful later) # sqrt(R*R*.241 + G*G*.691 + B*B*.068) # ----------------------------------------------------------------------
[docs] def path_length_fraction_generator(trajectory): """Generator to produce path length fraction scalars A genertor that given a trajectory will generate a scalar for each point such that each scalar represents the fraction of the total length along the path at the associated point. Arguments: trajectory (Trajectory): The trajectory to use for generating scaler values Returns: Fraction length scalar values for each point in the trajectory """ dist_fractions = [] prev_point = trajectory[0] cumulative_distance = 0 for point in trajectory[1:]: cumulative_distance += distance(prev_point,point) dist_fractions.append(cumulative_distance) prev_point = point if cumulative_distance != 0: dist_fractions = [d / cumulative_distance for d in dist_fractions] return dist_fractions
# ----------------------------------------------------------------------
[docs] def progress_linewidth_generator(trajectory): """Generator to produce progress linewidth scalars A generator that given a trajectory will generate a scalar for each point such that each scalar represents a good width value for the fraction of points that come before that point in the trajectory. Arguments: trajectory (Trajectory): The trajectory to use for generating scaler values Returns: Linewidth scalar values for each point in the trajectory """ widths = [] tlen = len(trajectory) for i, point in enumerate(trajectory[1:]): widths.append((((i+1)/tlen)*5.0)+0.37) return widths
# another way: # annotator = tracktable.feature.annotations.retrieve_feature_function('progress') # annotator(trajectory) # ----------------------------------------------------------------------
[docs] def point_tooltip(current_point): """Formats the tooltip string for a point Args: current_point (point): Current point of the trajectory Returns: Current points timestamp """ return current_point.timestamp.strftime("%H:%M:%S")
# ----------------------------------------------------------------------
[docs] def point_popup(current_point, point_popup_properties): """Formats the popup string for a point Args: current_point (point): Current point of the trajectory point_popup_properties (list): Point properties Returns: String of point properties """ popup_str_point = str(current_point.object_id)+'<br>'+ \ current_point.timestamp.strftime("%H:%M:%S")+'<br>Lat='+ \ str(current_point[1])+'<br>Lon='+str(current_point[0]) if point_popup_properties and point_popup_properties[0] == '*': for (name, value) in current_point.properties.items(): popup_str_point += '<br>'+name+': '+str(value) else: for prop_str in point_popup_properties: if prop_str in current_point.properties: popup_str_point += '<br>'+prop_str+': '+ \ str(current_point.properties[prop_str]) return popup_str_point
# ----------------------------------------------------------------------
[docs] def in_notebook(): """Returns True if run within a Jupyter notebook, and false otherwise """ try: from IPython import get_ipython ip = get_ipython() if ip == None: return False if 'IPKernelApp' not in ip.config: return False except ImportError: return False return True
# ----------------------------------------------------------------------
[docs] def render_line(backend, map_canvas, line_coords, control_color, weight, tooltip): """Renders a line onto a cartopy or folium map Args: backend (str): Backend to use rendering into. Only supports 'cartopy' and 'folium' map_canvas (GeoAxes): The canvas where the line will be rendered line_coords (tuple): Lon lat coordinates where to render the line control_color (float): Color of the rendered line weight (int): Linewidth tooltip (str): Value to display for the line such as ``object_id`` Returns: No return value """ coords = list(zip(*line_coords)) if backend=='cartopy': map_canvas.plot(coords[1], coords[0], color=control_color, linewidth=weight, marker='o', ms=1, fillstyle='none', transform=cartopy.crs.Geodetic()) elif backend=="folium": fol.PolyLine(line_coords, color=control_color, weight=weight, tooltip=tooltip).add_to(map_canvas) else: logger.error("Unsupported backend unable to render line.")
# ----------------------------------------------------------------------
[docs] def render_distance_geometry(backend, distance_geometry_depth, traj, map_canvas): """Renders the distance geometry calculations to the folium map Args: backend (str): Backend to render with, either ``cartopy`` or ``folium`` distance_geometry_depth (int): The depth of the distance geometry calculation traj (Trajectory): The trajectory map_canvas (GeoAxes): The canvas where distance geometry will be rendered Returns: No return value """ #cp=control_point cp_colors = ['red', 'blue', 'yellow', 'purple']+ \ [coloring.random_color() for i in range(4, distance_geometry_depth)] traj_length = length(traj) for num_cps in range(2,distance_geometry_depth+2): cp_increment = 1.0/(num_cps-1) cp_fractions = [cp_increment * i for i in range(num_cps)] cps = [point_at_length_fraction(traj, t) for t in cp_fractions] cp_coords = [(round(point[1],7), round(point[0],7)) for point in cps] for i, cp_coord in enumerate(cp_coords): normalization_term = traj_length*cp_increment control_color = cp_colors[num_cps-2] for j in range(len(cps)-1): line_coords = [(round(cps[j][1],7), round(cps[j][0],7)), (round(cps[j+1][1],7), round(cps[j+1][0],7))] val = round(distance(cps[j], cps[j+1]) / normalization_term, 4) tooltip = str(j+1)+'/'+str(len(cps)-1)+' = '+str(val) if backend == 'cartopy': render_line('cartopy', map_canvas, line_coords, control_color, .5, tooltip) else: render_line('folium', map_canvas, line_coords, control_color, 1, tooltip) popup=str(traj[0].object_id)+'<br>'+ \ traj[i].timestamp.strftime("%H:%M:%S")+'<br>Latitude='+ \ str(round(traj[i][1],7))+'<br>Longitude='+str(round(traj[i][0],7)) if backend != 'cartopy': #cartopy renders markers with lines fol.CircleMarker(cp_coord, radius=4, fill=True, color=control_color, tooltip=round(cp_fractions[i], 7), popup=popup).add_to(map_canvas)
# ----------------------------------------------------------------------
[docs] def sub_trajs_from_frac(trajectories, zoom_frac): """Create sub-trajectories from a given zoom fraction Args: trajectories (Trajectory): Trajectories to create sub-trajectories from zoom_frac (list): Fraction list to segment trajectories Returns: sub_trajs """ # Eventually replace with common method to do this, for now manually create sub-traj from given fraction sub_trajs = [] for traj in trajectories: # Make start and end points seg_start = point_at_length_fraction(traj, zoom_frac[0]) seg_end = point_at_length_fraction(traj, zoom_frac[1]) # Must be a better way to do this! first_in_mid = 0 for j, point in enumerate(traj): if point.timestamp >= seg_start.timestamp: first_in_mid = j break last_in_mid = len(traj) for j, point in reversed(list(enumerate(traj))): if point.timestamp <= seg_end.timestamp: last_in_mid = j break points = [seg_start] for point in traj[first_in_mid:last_in_mid+1]: points.append(point) points.append(seg_end) traj_mid = domain.Trajectory.from_position_list(points) #traj_mid.set_property("id", sub_trajs.append(traj_mid) return sub_trajs
# ----------------------------------------------------------------------
[docs] def save_density_array(density, outfile): """Save and output the density array to a file. Args: density (tuple): Density to be saved to file outfile (str): Filename to save output Returns: No return value. """ outfile.write('{} {}\n'.format(density.shape[0], density.shape[1])) rows = density.shape[0] columns = density.shape[1] for row in range(rows): for col in range(columns): outfile.write("{} ".format(density[row, col])) outfile.write("\n")
# ----------------------------------------------------------------------
[docs] def load_density_array(infile): """Load the density array from a file. Args: infile (str): Filename for input Returns: The density array in the file. """ first_line = infile.readline() words = first_line.strip().split(' ') dims = [ int(word) for word in words ] density = numpy.zeros(shape=(dims[0], dims[1]), dtype=numpy.int32) rows = dims[0] columns = dims[1] for row in range(rows): line = infile.readline() words = line.strip().split(' ') nums = [ int(word) for word in words ] for col in range(columns): density[row, col] = nums[col] return density
# ----------------------------------------------------------------------
[docs] def draw_density_array(density, x_bin_boundaries: numpy.ndarray, y_bin_boundaries: numpy.ndarray, map_projection, bounding_box, colormap=None, colorscale=None, zorder=10, axes=None): """Render a histogram for the given map projection. Args: density (iterable): Density array that will be drawn onto the map x_bin_boundaries (NumPy array, 1xN): Boundaries of bins in X/longitude y_bin_boundaries (NumPy array, 1xM): Boundaries of bins in Y/latitude map_projection (Basemap): Map to render onto bounding_box (point2d): Bounding box of area to gdraw the Keyword Args: colormap (str or Colormap): Colors to use for histogram (Default: None) colorscale (matplotlib.colors.Normalize or subclass): Mapping from bin counts to color. Useful values are matplotlib.colors.Normalize() and matplotlib.colors.LogNorm(). (Default: None) zorder (int): Image priority for rendering. Higher values will be rendered on top of actors with lower z-order. (Default: 10) axes (matplotlib.axes.Axes): Axes to render into. Defaults to "current axes" as defined by Matplotlib. (Default:None) Returns: The density rendered onto the map. """ # Yes, it looks like we've got the indices backwards on # density.shape[]. Recall that the X coordinate refers to # columns, typically dimension 1, while the Y coordinate refers to # rows, typically dimension 0. x_bins_mesh, y_bins_mesh = numpy.meshgrid(x_bin_boundaries, y_bin_boundaries) # And finally render it onto the map. if axes is None: axes = matplotlib.pyplot.gca() # Are we in a GeoAxes instance? If so, we need to tell pcolormesh # how to transform the density map to whatever map projection # we're using. if isinstance(axes, cartopy.mpl.geoaxes.GeoAxes): projection_kwargs = {"transform": cartopy.crs.PlateCarree()} else: projection_kwargs = {} mesh = axes.pcolormesh( x_bins_mesh, y_bins_mesh, density, cmap=colormap, norm=colorscale, zorder=zorder, **projection_kwargs ) return [mesh]