#
# Copyright (c) 2014-2021 National Technology and Engineering
# Solutions of Sandia, LLC. Under the terms of Contract DE-NA0003525
# with National Technology and Engineering Solutions of Sandia, LLC,
# the U.S. Government retains certain rights in this software.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
tracktable.applications.trajectory_splitter - Split trajectories that are idle for a
given amount of time in a given location radius. This is typically to prevent
analyzing trajectories of boats that are docked for long periods of time.
split_when_idle() is the main driver function for splitting trajectories.
"""
import logging
from math import floor
from tracktable.core.geomath import distance
logger = logging.getLogger(__name__)
[docs]def elapsed_seconds(point1, point2):
"""
Calculate the time (in seconds) between two trajectory points. Assumes
that point1 is the earlier point.
Arguments:
point1 (Tracktable point): The eariler occurring trajectory point.
point2 (Tracktable point): The later occurring trajectory point.
Returns:
The time that has elapsed from the first point to the second point,
in seconds.
"""
return (point2.timestamp - point1.timestamp).total_seconds()
# TODO: Does split_when_idle need to be domain-specific?
# Could it be easily generalized to handle trajectories from different domains?
[docs]def split_when_idle(trajectory,
idle_time_threshold=3600,
collocation_radius_threshold=0.2525,
min_points=10):
"""
If over any subset of points the trajectory stays within an area of
.. math::
PI * collocation_radius_threshold^2,
for at least idle_time_threshold seconds, call this an idle area. Attempt
to create new trajectories before and after the idle area, effectively
deleting the idle area.
Arguments:
trajectory (Tracktable trajectory): The trajectory to split.
Keyword Arguments:
idle_time_threshold (int): Consider a trajectory area idle if it spends at least
idle_time_threshold seconds in the same place. (Default: 3600 (one hour))
collocation_radius_threshold (float)
Consider a trajectory area idle if it stays within an area of
PI * collocation_radius_threshold^2 in terms of km.
(Default: 0.2525 (creating an area threshold of approx. 0.2 km^2))
min_points (int):
The minimum number of points for a non-idle subinterval of the
trajectory to become a new trajectory. (Default: 10)
Returns:
A list of trajectories that are each subintervals of the input
trajectory, and do not contain any idle regions (as defined by the
input thresholds).
"""
def _create_interval_list():
"""
Divide the trajectory into adjacent intervals, necessarily overlapping
only at the interval endpoints. We will efficiently try to make each
interval as large as possible, subject to the constraint that the
duration of each interval must be <= half of the idle time threshold.
There is one exception: The shortest interval in terms of points is
exactly two points, meaning we might have some two point intervals
that exceed half of the idle time threshold, but this does not affect
the success of our algorithm later on.
Each interval will be labeled as idle or not, using only the spacial
threshold for now. An interval will be considered idle if all points
are within the collocation_radius_threshold of the first point in the
interval.
Ex: A trajectory with 100 points with idle_time_threshold = 3600
seconds (1 hour) might be subdivided as follows:
[0, 11] 20 minutes
[11, 30] 11 minutes
[30, 31] 45 minutes
[31, 80] 29 minutes
[80, 98] 2 minutes
[98, 100] 3 minutes
Note that all intervals containing more than two points are <=
half of the time interval threshold = 3600/2 seconds = 30 minutes.
The interval [30, 31] contains only two points, so is allowed to
exceed 30 minutes.
Inherited parameters:
split_when_idle
- trajectory
- idle_time_threshold
- collocation_radius_threshold
- trajectory_duration
Returns:
No return value
"""
def _create_interval():
"""
Creates and returns a dictionary representing a subinterval of the
trajectory that has duration <= half of the idle time threshold.
Inherited parameters:
split_when_idle
- trajectory
- collocation_radius_threshold
_create_interval_list
- interval_start
- interval_end
Returns:
A dictionary representing a subinterval of the trajectory that
has duration <= half of the idle time threshold. It has the
following key/value pairs:
'start': The trajectory point index of the start of the
interval.
'end': The trajectory point index of the end of the
interval.
'idle': boolean indicating whether the trajectories points
are all within collocation_radius_threshold of the
center point.
'center': Trajectory point index of the "center" point for
the interval, which is always initialized to the
same as the 'start' point index.
"""
interval = {}
interval['start'] = interval_start
interval['end'] = interval_end
# Determine if the interval is idle in space (ignore time
# threshold for now).
for index in range(interval_start+1, interval_end+1):
if (distance(trajectory[interval_start], trajectory[index])
>= collocation_radius_threshold):
# We found one point that is not close enough to the
# center of the interval, so we can label the entire
# interval as non-idle.
interval['idle'] = False
return interval
# All points were close enough in space to the center of the
# interval, so let's call this interval idle.
interval['center'] = interval_start
interval['idle'] = True
return interval
# We do not want our intervals to exceed this value in time.
interval_duration_max = idle_time_threshold / 2
# Calculate the proportion of the total duration that we want one
# interval to occupy.
interval_fraction_of_duration = (interval_duration_max
/ trajectory_duration)
# This is an estimate of how far to jump to get from the start of an
# interval to the end (one less than the interval width). Our guess
# would be exactly correct if all points are sampled at exactly the
# same difference apart in time. In essence, we want to move
# interval_fraction_of_duration along the trajectory in time, but
# we're going to settle for moving that fraction in terms of total
# points, cause that's all we can do without another loop.
estimated_interval_jump = max(floor((len(trajectory) - 1)
* interval_fraction_of_duration),
1)
interval_list = []
# Make a guess at what the first acceptable interval might be.
interval_start = 0
interval_end = estimated_interval_jump
while interval_start < len(trajectory) - 1:
logger.debug(f'Trying interval from {interval_start} to {interval_end}.')
interval_duration = elapsed_seconds(trajectory[interval_start],
trajectory[interval_end])
if interval_duration <= interval_duration_max:
logger.debug('\tGood interval (shorter than max duration).')
# We have a good interval, label it as idle or not.
interval_list.append(_create_interval())
# Move to the next possible interval.
interval_start = interval_end
interval_end = min(interval_start + estimated_interval_jump,
len(trajectory) - 1)
else:
if interval_end == interval_start + 1:
logger.debug('\tGood interval (just two points).')
# If we have a two point interval, it's okay if it's over
# the maximum interval duration.
interval_list.append(_create_interval())
# Move to the next possible interval.
interval_start = interval_end
interval_end = min(interval_start + estimated_interval_jump,
len(trajectory) - 1)
else:
# Our interval was not short enough in time, so let's make
# another educated guess for how much smaller to make the
# end of the interval.
interval_end = interval_start + max(floor((interval_end - interval_start)
* interval_duration_max
/ interval_duration),
1)
return interval_list
def _add_interval_following_mobile_region():
"""
The last interval was a mobile interval, meaning there is no currently
defined idle region.
If the current interval is mobile:
Because the intervals are HALF of the idle time threshold, we know
there cannot be an idle subinterval long enough in time to matter
among the two mobile intervals, so we can just add mobile
intervals onto the mobile region.
NOTE: In the special case where the interval is exactly two points
whose duration is more than half of the idle time threshold, we
know the two points are farther than collocation_radius_threshold
apart, so it is still impossible for an idle subinterval to form.
If the current interval is idle:
We shift points from the end of our last mobile interval to this
new idle region if that's where they actually belong.
Inherited parameters:
split_when_idle
- idle_region
- mobile_region
- interval
Returns:
No return value.
"""
if interval['idle']:
# The current interval is idle, and there is no current idle
# region, so make this interval the idle region
idle_region['start'] = interval['start']
idle_region['end'] = interval['end']
idle_region['center'] = interval['center']
# Go back into the mobile region and shift points from the end of
# the mobile region into the start of the idle region as
# appropriate.
for index in range(mobile_region['end']-1,
mobile_region['start'], -1):
if not _update_idle_region_with_point(index, 'start'):
mobile_region['end'] = index + 1
break
else:
# The current interval is mobile, so append it to the mobile
# region.
mobile_region['end'] = interval['end']
def _add_interval_following_idle_region():
"""
The last interval was an idle interval, so there is a currently
defined idle region and possibly a mobile region.
If the current interval is idle:
Check to see if it fits entirely within the idle region. This is
a quick check that checks a sufficient but not necessary
condition.
If the check fails, try to add points one at a time from the
current idle interval to the idle region. When we find a point
that is too far from the idle region, check the duration of the
idle region and either delete it or move it to the mobile region.
If we deleted the idle region, make the mobile region into a
new trajectory and delete the mobile region. Make the remaining
points of the current idle interval the new idle region.
If the current interval is mobile:
Add points from the start of the current mobile interval to the
idle region if they belong there. Then check the duration of the
idle region and either delete it or move it to the mobile region.
If we deleted the idle region, make the old mobile region into a
new trajectory. Make the remaining points of the current mobile
interval into the new mobile region.
Inherited parameters:
split_when_idle
- idle_region
- mobile_region
- interval
Returns:
No return value.
"""
# Either the interval is mobile, or it is idle but failed to be
# quickly added to the idle region.
# Add points from the start of the current interval to the end of
# the idle region until they get too far from the center of the
# idle region.
for index in range(interval['start']+1, interval['end']+1):
if not _update_idle_region_with_point(index, 'end'):
break
# Check if any part of the current interval wasn't added to the
# idle region.
if idle_region['end'] != interval['end']:
# Check if the duration of the idle region is over the idle
# time threshold and either delete the idle region or add it
# to the mobile region.
_check_idle_duration()
if interval['idle']:
# What is left of the current idle interval becomes the
# new idle region.
idle_region['start'] = index - 1
idle_region['end'] = interval['end']
idle_region['center'] = index - 1
else:
# What is left of the current mobile interval gets added
# to the mobile region.
if len(mobile_region) == 0:
mobile_region['start'] = index - 1
mobile_region['end'] = interval['end']
else:
mobile_region['end'] = interval['end']
def _update_idle_region_with_point(index, location):
"""
Add a point to the idle region, but only if it is close enough to the
center of the idle region.
Inherited parameters:
split_when_idle
- trajectory
- collocation_radius_threshold
- idle_region
Arguments:
index : int
The trajectory point index that we would like to attempt to add to
the idle region.
location : string
Either 'start' or 'end' to indicate if we are adding this point to
the start or end of the idle region, respectively.
Returns:
True - point was successfully added to the idle region
False - point was not close enough to the center of the idle
region, and therefore was not added
"""
# Check to see if the point is close enough to the center of the idle
# region.
if (distance(trajectory[idle_region['center']], trajectory[index])
>= collocation_radius_threshold):
return False
# Update the idle region at either the beginning or end with the new
# point.
idle_region[location] = index
return True
def _cut_mobile_region():
"""
Check to see if there is a mobile region and if it has enough points.
If so, create a new trajectory from the mobile region and append it to
our list of new trajectories. Then clear the mobile region.
Inherited parameters:
split_when_idle
- trajectory
- min_points
- mobile_region
- new_trajectories
Returns:
No return value.
"""
# Check that the mobile region exists and has enough points to be
# considered its own trajectory.
if (len(mobile_region) != 0 and
mobile_region['end'] - mobile_region['start'] + 1 >= min_points):
logger.debug(f"\tCreating new trajectory for [{mobile_region['start']}, {mobile_region['end']}]")
# Create a trajectory from the mobile region.
new_trajectories.append(trajectory[mobile_region['start']:
mobile_region['end']+1])
# Reset the mobile region.
mobile_region.clear()
def _check_idle_duration():
"""
If the idle region duration is at or above the idle time threshold,
convert the mobile region (if it exists) to a trajectory and clear the
idle region.
If the idle region duration is below the idle time threshold, move all
points from the idle region to the mobile region and clear the idle
region.
Inherited parameters:
split_when_idle
- trajectory
- idle_time_threshold
- idle_region
- mobile_region
Returns:
No return value.
"""
if elapsed_seconds(trajectory[idle_region['start']],
trajectory[idle_region['end']]) >= idle_time_threshold:
# The idle region is too long (in time). Let all the mobile
# region points from before the idle region form a new trajectory.
_cut_mobile_region()
else:
# The idle region is not long enough (in time) to be considered
# truly idle. Move those points to the mobile region.
if len(mobile_region) == 0:
mobile_region['start'] = idle_region['start']
mobile_region['end'] = idle_region['end']
# Reset the idle region.
idle_region.clear()
logger.debug('Commencing trajectory splitting...')
trajectory_duration = elapsed_seconds(trajectory[0], trajectory[-1])
# If the entire trajectory is below the idle time threshold, there can be
# no part of it that is idle for the duration of the idle time threshold,
# so skip straight to keeping or deleting it depending on if it has
# enough data points.
if trajectory_duration < idle_time_threshold:
if len(trajectory) >= min_points:
return [trajectory]
else:
return []
logger.debug('Creating Interval Lists...')
# Create a list that will contain the trajectory broken up into intervals.
# Each interval will be represented as a dictionary containing the
# following keys: 'start' (trajectory point index of the start of the
# interval), 'end' (trajectory point index of the end of the interval),
# 'idle' (boolean flag), 'center' (this key only exists if 'idle' is true,
# and contains the trajectory point index of the "center" of the interval,
# where all idle radius are calculated from).
interval_list = _create_interval_list()
# We will store all non-idle trajectories that we create here (if any).
new_trajectories = []
# The idle region will be a dictionary similar to the interval
# dictionaries, containing the 'start', 'end' and 'center' trajectory
# point indices for the idle region. The mobile region is similar but
# need only contain 'start' and 'end' keys.
idle_region = {}
mobile_region = {}
logger.debug('Initializing mobile/idle regions using first interval...')
# Intialize mobile/idle regions using the first interval.
if interval_list[0]['idle']:
idle_region = interval_list[0].copy()
else:
mobile_region = interval_list[0].copy()
for i, interval in enumerate(interval_list[1:]):
logger.debug(f"\tmobile region: {mobile_region}")
logger.debug(f"\tidle region: {idle_region}")
logger.debug(f"\nInterval: [{interval['start']}, {interval['end']}]\t Idle: {interval['idle']}")
if len(idle_region) == 0:
_add_interval_following_mobile_region()
else:
_add_interval_following_idle_region()
logger.debug(f"\tmobile region: {mobile_region}")
logger.debug(f"\tidle region: {idle_region}")
logger.debug(f"\nNo intervals left. Checking idle region then cutting mobile region...")
if len(idle_region) != 0:
# There may be some idle points that should be moved to the mobile
# region.
_check_idle_duration()
# We're done adding points, so any points left in the mobile region
# should be checked to see if they qualify to be a trajectory.
_cut_mobile_region()
logger.debug(f"\tmobile region: {mobile_region}")
logger.debug(f"\tidle region: {idle_region}")
return new_trajectories