#
# Copyright (c) 2014-2021 National Technology and Engineering
# Solutions of Sandia, LLC. Under the terms of Contract DE-NA0003525
# with National Technology and Engineering Solutions of Sandia, LLC,
# the U.S. Government retains certain rights in this software.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""
tracktable.render.paths - Functions to render trajectories as curves on a map
If you're trying to figure out how to render a bunch of trajectories,
take a look at draw_traffic and at
tracktable/examples/render_trajectories_from_csv.py. Those will get you
pointed in the right direction.
"""
from __future__ import print_function, absolute_import, division
import logging
import math
import matplotlib
import matplotlib.collections
import matplotlib.colors
from matplotlib.collections import LineCollection
from matplotlib.colors import ListedColormap
from six.moves import range
import numpy
import cartopy.crs
# ----------------------------------------------------------------------
[docs]def remove_duplicate_points(trajectory):
"""Create a new trajectory with no adjacent duplicate points
Duplicate positions in a trajectory lead to degenerate line
segments. This, in turn, gives some of the back-end renderers
fits. The cleanest thing to do is to use the input trajectory to
compute a new one s.t. no segments are degenerate.
There's still one problem case: if the entire trajectory is a
single position, you will get back a trajectory where the only two
points (beginning and end) are at the same position.
Args:
trajectory (tracktable.core.Trajectory): trajectory to de-duplicate
Returns:
New trajectory with some of the points from the input
"""
new_points = []
last_point = None
for point in trajectory:
if ((not last_point)
or (point[1] != last_point[1])
or (point[0] != last_point[0])):
new_points.append(point)
last_point = point
if len(new_points) == 1:
new_points.append(trajectory[-1])
logger = logging.getLogger(__name__)
logger.debug(("remove_duplicate_points: old trajectory has length {}, "
"new trajectory has length {}").format(
len(trajectory), len(new_points)))
return type(trajectory).from_position_list(new_points)
# ----------------------------------------------------------------------
[docs]def points_to_segments(point_list, maximum_distance=None):
"""points_to_segments(point_list, maximum_distance=None) -> segment_list
Given a list of N points, create a list of the N-1 line segments
that connect them. Each segment is a list containing two points.
If a value is supplied for maximum_distance, any segment longer than
that distance will be ignored.
In English: We discard outliers.
Args:
point_list (list): List of points to be connected
Keyword Args:
maximum_distance (float): Maximum length that a segment can be
Returns:
Segements that connect the points
"""
def cart2_distance(point1, point2):
"""Cartesian distance between points defined as tuples"""
return math.sqrt(
(point2[0]-point1[0])*(point2[0]-point1[0]) +
(point2[1]-point1[1])*(point2[1]-point1[1])
)
# Now that we know the thresholds, we can go through and build the
# actual segments.
segments = []
logger = None
# Python 3: An object of type zip does not have a len() so we need
# to turn this into an actual list
point_list = list(point_list)
for i in range(len(point_list) - 1):
point1 = point_list[i]
point2 = point_list[i+1]
segment_length = cart2_distance(point_list[i], point_list[i+1])
if maximum_distance and segment_length > maximum_distance:
if logger is None:
logger = logging.getLogger(__name__)
logger.debug(
("WARNING: Discarding outlier line segment with length {}, {} "
"times the maximum length of {}.").format(
segment_length,
segment_length/maximum_distance,
maximum_distance))
segments.append((point1, point1))
else:
segments.append((point1, point2))
return segments
# ----------------------------------------------------------------------
[docs]def concat_color_maps(color_maps, scalars_list, color_scale):
"""Concatenate a list of color maps into a single color map with adjusted scalars and color_scale
Lists are assumed to be the same length
Args:
color_maps (list): a list of color_maps to concatenate
scalars_list (list[numpy arrays]): a list of numpy arrays where each array contains the scalars associated with the
respective color_map
color_scale (matplotlib.colors.Normalize() or LogNorm()): Linear or logarithmic color scale
Returns:
Concatenated, scaled and adjusted single color map
"""
all_color_maps = []
N = 256 # number of colors per color_map once combined
for color_map in color_maps:
cmap = color_map
if isinstance(color_map, str):
cmap = matplotlib.cm.get_cmap(color_map, N)
all_color_maps.append(cmap(numpy.linspace(0,1,N)))
stacked_colors = numpy.vstack(all_color_maps)
new_color_map = ListedColormap(stacked_colors)
num_colors = N*len(color_maps)
new_scalars = []
for i,scalars in enumerate(scalars_list):
# To deal with potential boundary issues when colors are combined into a single map the where statment was
# added below. The last color in a map has an inclusive range (including 1.0 or the max value), but when
# combinging 2 colors, for example, the first may have a range up to but not including 0.5, but a value of 1.0
# on the old color mpa may be mapped to 0.5 on the new color map (which now maps to an adjascent color map).
# If a maximum scalar value of N is used we change it to N-1 in the where statement below.
vals = (numpy.array(scalars)*N).astype(int)
new_scalars.append(((N*i)+numpy.where(vals==N, N-1, vals))/num_colors)
new_color_scale = color_scale #TODO support multiple scales later?
return new_color_map, new_scalars, new_color_scale
# ----------------------------------------------------------------------
[docs]def draw_traffic(traffic_map,
trajectory_iterable,
color_map='BrBG',
color_scale=matplotlib.colors.Normalize(),
trajectory_scalar_generator=None,
trajectory_linewidth_generator=None,
linewidth=1,
dot_size=2,
dot_color='white',
label_objects=False,
label_generator=None,
label_kwargs=dict(),
axes=None,
zorder=8,
transform=cartopy.crs.Geodetic(),
show_points=False,
point_size=12,
point_color='',
show_lines=True):
"""Draw a set of (possibly decorated trajectories.
Args:
traffic_map: Map projection (Basemap or Cartesian)
trajectory_iterable:
Keyword Args:
color_map (str or Matplotlib colormap): The name of a registered color map (Default: 'BrBG')
color_scale (matplotlib.colors.Normalize() or LogNorm()): Linear or logarithmic scale (Default: matplotlib.colors.Normalize())
trajectory_scalar_generator (Trajectory function): Function to generate scalars for a trajectory (Default: None)
trajectory_linewidth_generator (Trajectory function): Function to generate path widths for a trajectory (Default: None)
linewidth (float): Constant linewidth if no generator is supplied (default 0.1, measured in points) (Default: 1)
dot_size (float): Radius (in points) of a dot dawn at the latest point of each trajectory (Default: 2)
dot_color (str): Color of spot that will be drawn at the latest point of each trajectory (Default: 'white')
label_objects (bool): Whether to draw object_id at latest point of each trajectory (Default: False)
label_generator (TrajectoryPoint function): Function to generate label for a trajectory (Default: None)
label_kwargs (dict): Dictionary of arguments to be passed to labeler (FIXME) (Default: dict())
axes (Matplotlib axes): The axis frame that will hold the Matplotlib artists that will render the trajectories (Default: None)
zorder (int): Layer into which trajectories will be drawn (Default: 8)
transform (cartopy.crs.CRS): The input projection (Default: cartopy.crs.Geodetic())
show_points (bool): Whether or not to show the points along the trajectory (Default: False)
point_size (float): Radius of the points along the path (in points default=12) (Default: 12)
point_color (str): Color of the points along the path (Default: '')
show_lines (bool): Whether or not to show the trajectory lines (Default: True)
Returns:
List of Matplotlib artists
Parameters in more detail:
``traffic_map``: Map instance (no default)
Cartopy GeoAxes instance of the space in which trajectories will be
rendered. We don't actually render into this object. Instead we
use it to project points from longitude/latitude space down into
the map's local coordinate system. Take a look at
tracktable.render.maps for several ways to create this map including
a few convenience functions for common regions.
``trajectory_iterable``: iterable(Trajectory) (no default)
Sequence of Trajectory objects. We will traverse this exactly
once. It can be any Python iterable.
``color_map``: Matplotlib color map or string (default 'BrBG')
Either a Matplotlib color map object or a string denoting the name
of a registered color map. See the Matplotlib documentation for
the names of the built-ins and the tracktable.render.colormaps
module for several more examples and a way to create your own.
``color_scale``: Matplotlib color normalizer (default Normalize())
Object that maps your scalar range onto a colormap. This will
usually be either matplotlib.colors.Normalize() or
matplotlib.colors.LogNorm() for linear and logarithmic mapping
respectively.
``trajectory_scalar_generator``: function(Trajectory) -> list(float)
You can color each line segment in a trajectory with your choice
of scalars. This argument must be a function that computes those
scalars for a trajectory or else None if you don't care. The
scalar function should take a Trajectory as its input and return a
list of len(trajectory) - 1 scalars, one for each line segment to
be drawn.
``trajectory_linewidth_generator``: function(Trajectory) -> list(float)
Just as you can generate a scalar (and thus a color) for each line
segment, you can also generate a width for that segment. If you
supply a value for this argument then it should take a Trajectory
as its input and return a list of len(trajectory)-1 scalars
specifying the width for each segment. This value is measured in
points. If you need a single linewidth all the way through use
the 'linewidth' argument.
``linewidth``: float (default 0.1)
This is the stroke width measured in points that will be used to
draw the line segments in each trajectory. If you need different
per-segment widths then use trajectory_linewidth_generator.
``dot_size``: float (default 2)
If this value is non-zero then a dot will be drawn at the point on
each trajectory that has the largest timestamp. It will be dot_size
points in radius and will be colored with whatever scalar is present
at that point of the trajectory.
TODO: Add a point_size_generator argument to allow programmatic
control of this argument.
``label_objects``: boolean (default False)
You can optionally label the point with the largest timestamp in
each trajectory. To do so you must supply 'label_objects=True' and
also a function for the label_generator argument.
``label_generator``: function(TrajectoryPoint) -> string
Construct a label for the specified trajectory. The result must
be a string. This argument is ignored if label_objects is False.
TODO: Test whether Unicode strings will work here.
``label_text_kwargs``: dict (default empty)
We ultimately render labels using matplotlib.axes.Text(). If you
want to pass in arguments to change the font, font size, angle or
other parameters, specify them here.
``axes``: matplotlib.axes.Axes (default pyplot.gca())
This is the axis frame that will hold the Matplotlib artists that
will render the trajectories. By default it will be whatever
pyplot thinks the current axis set is.
``zorder``: int (default 8)
Height level where the trajectories will be drawn. If you want
them to be on top of the map and anything else you draw on it then
make this value large. It has no range limit.
``transform``: cartopy.crs.CRS (default cartopy.crs.Geodetic())
The input projection. Needed to get nearly any projection but PlateCarree
to work correctly.
``show_points``: boolean (default False)
True if points along the trajectory should be rendered
``point_size``: int (default 12)
If show_poitns is true, the size of the markers rendered at each point
in the trajectory.
``point_color``: string (default '')
If show_points is true, the color of the markers rendered at each point
in the trajectory
``show_lines``: boolean (default True)
True if the trajectory lines (piecewise-linear path) should be rendered
"""
if transform is None:
transform = cartopy.crs.Geodetic()
if axes is None:
axes = matplotlib.pyplot.gca()
logger = logging.getLogger(__name__)
all_artists = []
lead_point_scalars = []
lead_point_x = []
lead_point_y = []
lead_point_labels = []
# If the user hasn't specified a custom linewidth function then we
# use the value of the linewidth argument throughout.
if trajectory_linewidth_generator is None:
trajectory_linewidth_generator = lambda trajectory: [ linewidth ] * (len(trajectory)-1)
# If there is no scalar generator then we will automatically
# generate scalars for each trajectory that begin at 0 and end at
# 1.
if trajectory_scalar_generator is None:
trajectory_scalar_generator = lambda trajectory: numpy.linspace(0, 1, len(trajectory)-1)
if label_generator is None:
if label_objects:
logger.warning(("Object labels requested in draw_traffic but no "
"label formatter is present. Labels will look "
"weird."))
label_generator = lambda thing: thing
trajectory_number = 0
total_points = 0
max_batch_size = 1000
current_batch_paths = []
current_batch_scalars = []
current_batch_linewidths = []
current_batch_color_maps = []
if show_points:
current_batch_points_x = []
current_batch_points_y = []
# We want to ignore individual segments that span most of the way across
# the map. These are almost always errors in the data, especially where
# segments cross the limb of the map.
if hasattr(traffic_map, 'get_extent'):
map_extent = traffic_map.get_extent()
x_span = map_extent[2] - map_extent[0]
y_span = map_extent[3] - map_extent[1]
max_segment_length = 0.5 * max(x_span, y_span)
else:
# The above kluge is really only there for terrestrial maps so
# that we can detect and ignore points that cross the map
# discontinuity. If we're in Cartesian-land then it's not a
# problem.
max_segment_length = None
num_trajectories_rendered = 0
num_trajectories_examined = 0
single_color_map = True
if isinstance(color_map, list):
single_color_map = False
t_ind = 0
for trajectory in trajectory_iterable:
num_trajectories_examined += 1
if len(trajectory) < 2:
continue
trajectory = remove_duplicate_points(trajectory)
local_x = numpy.zeros(len(trajectory))
local_y = numpy.zeros(len(trajectory))
trajectory_number += 1
num_trajectories_rendered += 1
total_points += len(trajectory)
# First we convert the longitude/latitude points in the
# trajectory to coordinates in the map projection's native
# space
for i in range(len(trajectory)):
local_x[i] = trajectory[i][0]
local_y[i] = trajectory[i][1]
if show_points:
current_batch_points_x.append(local_x[:-1])
current_batch_points_y.append(local_y[:-1]) #all but last
# Now we turn that list of n points into a list of n-1 line
# segments. We shouldn't have any degenerate segments because
# of the call to remove_duplicate_points() earlier.
local_segments = points_to_segments(
zip(local_x, local_y),
maximum_distance=max_segment_length)
if len(local_segments) > 0:
# Save the line segments, linewidths, scalars for the
# whole path -- we'll render a bunch of them together in a
# batch
local_scalars = trajectory_scalar_generator(trajectory)
current_batch_linewidths.append(
trajectory_linewidth_generator(trajectory))
current_batch_paths.append(local_segments)
current_batch_scalars.append(local_scalars)
if not single_color_map:
if len(color_map) > t_ind: #in case the lengths of trajs and color_maps lists differ
current_batch_color_maps.append(color_map[t_ind])
else:
current_batch_color_maps.append('BrBG') #fallback
# The lead point is the last point in the trajectory
if label_generator is not None:
lead_point_labels.append(label_generator(trajectory[-1]))
lead_point_x.append(local_x[-1])
lead_point_y.append(local_y[-1])
lead_point_scalars.append(local_scalars[-1])
if len(current_batch_paths) >= max_batch_size:
# Now we've processed some traffic and made it into line
# segments. Time to create the line segment collection that we
# can plot.
if not single_color_map:
new_color_map, new_scalars, new_color_scale = concat_color_maps(current_batch_color_maps,
current_batch_scalars,
color_scale)
else:
new_color_map = color_map
new_scalars = current_batch_scalars
new_color_scale = color_scale
stacked_scalars = numpy.hstack(new_scalars)
if show_lines:
segment_collection = LineCollection(numpy.vstack(
current_batch_paths),
norm=new_color_scale,
cmap=new_color_map,
linewidth=numpy.hstack(
current_batch_linewidths),
zorder=zorder,
transform=transform)
segment_collection.set_array(stacked_scalars)
all_artists.append(segment_collection)
axes.add_collection(segment_collection)
if show_points:
colors = stacked_scalars
if point_color != '':
colors = point_color
point_collection = traffic_map.scatter(numpy.hstack(current_batch_points_x),
numpy.hstack(current_batch_points_y),
s=point_size,
linewidth=0,
marker='o',
zorder=zorder+1,
cmap = new_color_map,
c = colors,
transform=transform)
all_artists.append(point_collection)
current_batch_scalars = []
current_batch_linewidths = []
current_batch_paths = []
current_batch_color_maps = []
if show_points:
current_batch_points_x = []
current_batch_points_y = []
t_ind+=1
# one more batch now that we're done
if len(current_batch_paths) > 0:
if not single_color_map:
new_color_map, new_scalars, new_color_scale = concat_color_maps(current_batch_color_maps,
current_batch_scalars,
color_scale)
else:
new_color_map = color_map
new_scalars = current_batch_scalars
new_color_scale = color_scale
stacked_scalars = numpy.hstack(new_scalars)
if show_lines:
segment_collection = LineCollection(numpy.vstack(current_batch_paths),
norm=new_color_scale,
cmap=new_color_map,
linewidth=numpy.hstack(
current_batch_linewidths),
zorder=zorder,
transform=transform)
segment_collection.set_array(stacked_scalars)
all_artists.append(segment_collection)
axes.add_collection(segment_collection)
if show_points:
colors = stacked_scalars
if point_color != '':
colors = point_color
point_collection = traffic_map.scatter(numpy.hstack(current_batch_points_x),
numpy.hstack(current_batch_points_y),
s=point_size,
linewidth=0,
marker='o',
zorder=zorder+1,
cmap = new_color_map,
c = colors,
transform=transform)
all_artists.append(point_collection)
current_batch_scalars = []
current_batch_linewidths = []
current_batch_paths = []
current_batch_color_maps = []
if show_points:
current_batch_points_x = []
current_batch_points_y = []
if len(lead_point_x) > 0:
if dot_size:
if zorder:
dot_zorder = zorder+1
else:
dot_zorder = None
if dot_color == 'body':
if not single_color_map:
curr_color_map = color_map[0]
else:
curr_color_map = color_map
dot_color_kwargs = {'cmap': curr_color_map,
'c': lead_point_scalars}
else:
dot_color_kwargs = {'c': dot_color}
if axes is None:
axes = matplotlib.pyplot.gca()
dot_collection = axes.scatter(lead_point_x, lead_point_y,
s=dot_size,
linewidth=0,
marker='o',
zorder=dot_zorder,
**dot_color_kwargs,
transform=transform)
all_artists.append(dot_collection)
if label_objects:
for (x, y, label) in zip(
lead_point_x, lead_point_y, lead_point_labels):
all_artists.append(axes.text(x, y, label, **label_kwargs))
return all_artists
[docs]def unwrap_path(locs):
"""Inspects a list of [lat, lon] coordinates. If the trajectory crosses
the antimeridian (180 deg. Long), 'unwrap' the trajectory by projecting
longitudinal values onto >+180 or <-180. This will prevent horizontal
lines from streaking across a mercator projection, when plotting the
trajectory in mapping packages like Folium. Operates by comparing pairs
of elements (coord. point tuples) for the entire list.
Args:
locs (list): A list of (lat,lon) tuples
Returns:
No return value, modifies the locs[] list in-place.
"""
# 't' is an arbitrary threshold for comparing suqsequent points. If they are 'far enough' apart,
# then we assume they must be on opposite sides of the antimeridian.
t = 320
for i in range(1,len(locs)):
if abs(locs[i][1] - locs[i-1][1]) > t:
if locs[i-1][1] < 0:
# i-1 point in Western Hemisphere
locs[i] = (locs[i][0],locs[i][1]-360)
else:
# i-1 point in Eastern Hemisphere
locs[i] = (locs[i][0],locs[i][1]+360)
return