#
# Copyright (c) 2014-2023 National Technology and Engineering
# Solutions of Sandia, LLC. Under the terms of Contract DE-NA0003525
# with National Technology and Engineering Solutions of Sandia, LLC,
# the U.S. Government retains certain rights in this software.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""tracktable.rw.load: Load in trajectories or trajectory points from a file
This file will intelligently load in trajectory or trajectory point files for
supported file types (.csv, .tsv, .traj). This includes automatic trajectory assembly if
the file being loaded is a .csv or .tsv file as well as automatically utilizing the correct
domain readers so points or trajectories are correctly associated with the
Terrestrial or Cartesian domains.
"""
import logging
from datetime import timedelta
import os
from tracktable.applications.assemble_trajectories import \
AssembleTrajectoryFromPoints
from tracktable.domain import domain_module_from_name
try:
from tqdm import tqdm
tqdm_installed = True
except ImportError as e:
tqdm_installed = False
logger = logging.getLogger(__name__)
[docs]
def load_trajectories(infile,
comment_character="#",
domain='terrestrial',
field_delimiter=',',
object_id_column=0,
timestamp_column=1,
longitude_column=2,
latitude_column=3,
x_column=2,
y_column=3,
z_column=4,
real_fields = dict(), # {'altitude': 4}
string_fields = dict(), # {'vessel-name': 7}
time_fields = dict(), # {'eta': 17}
separation_distance = None, # km
separation_time = 30, # minutes
minimum_length=2, # points
return_trajectory_points = False,
return_list=True
):
"""Load trajectories or trajectory points from .csv, .tsv and .traj files
Arguments:
infile (file-like object): File path for trajectory data
Keyword arguments:
comment_character (character): Any row where this is the first
non-whitespace character will be ignored. (default: '#')
delimiter (character): What character separates columns?
(default: ',')
domain (str): Which point domain will the points belong to?
(default: 'terrestrial')
object_id_column (int): Which column contains the object ID
(default: 0)
timestamp_column (int): Which column contains the timestamp
(default: 1)
longitude_column (int): Which column contains the longitude for
each point? (default: 2, assumes terrestrial domain)
latitude_column (int): Which column contains the latitude for
each point? (default: 3, assumes terrestrial domain)
x_column (int): Which column contains the X coordinate for
each point? (default: 2, assumes cartesian2d or cartesian3d)
y_column (int): Which column contains the Y coordinate for
each point? (default: 3, assumes cartesian2d or cartesian3d)
z_column (int): Which column contains the X coordinate for
each point? (default: 4, assumes cartesian3d)
string_fields (dict, int -> string): Columns in the input that
should be attached to each point as a string. The keys
in this dictionary are column numbers. Their corresponding
values are the name of the field that will be added to
the point's properties. (default: empty)
real_fields (dict, int -> string): Columns in the input that
should be attached to each point as a real number. The keys
in this dictionary are column numbers. Their corresponding
values are the name of the field that will be added to
the point's properties. (default: empty)
time_fields (dict, int -> string): Columns in the input that
should be attached to each point as a timestamp. The keys
in this dictionary are column numbers. Their corresponding
values are the name of the field that will be added to
the point's properties. The timestamps must be in the same
format as for the point as a whole, namely `YYYY-mm-dd HH:MM:SS`.
(default: empty)
separation_distance (int): Distance in KM between points signifying the need
to generate a new trajectory. (default: 10)
separation_time (int): Time in minutes between seperated points signifying the need
to generate a new trajectory. (20)
return_trajectory_points (boolean): When loading a .csv or .tsv file return the points in the
file and don't generate trajectories. When loading a .traj return a list of lists of the points
that make up the given trajectory for all trajectories in the file. (default: False)
return_list (boolean): When returning the reader or assembler object have the loader
automatically pull all of the yielded trajectories into a list for further processing. (default: False)
Returns:
List of trajectory points or trajectories depending on input file and params.
Raises:
IOError: Unsupported filetype
ValueError: Unsupported domain or comment character
"""
if len(comment_character) != 1:
raise ValueError('Unsupported comment character `{}`, comment character must be 1 character long.'.format(comment_character))
domain_module = domain_module_from_name(domain)
if infile.endswith('.traj'):
# Read in the trajectories from the traj file
reader = domain_module.TrajectoryReader()
reader.input = open(infile, 'r')
if return_list:
if tqdm_installed:
trajectories = list(tqdm(reader, desc="Loading Trajectories", unit=" trajectory"))
else:
trajectories = list(reader)
else:
trajectories = reader
if return_trajectory_points:
trajectory_points = []
if tqdm_installed:
for trajectory in tqdm(trajectories, desc="Decomposing Trajectories Into Trajectory Points", unit=" trajectory"):
point_list = []
for point in trajectory:
point_list.append(point)
trajectory_points.append(point_list)
else:
for trajectory in trajectories:
point_list = []
for point in trajectory:
point_list.append(point)
trajectory_points.append(point_list)
return trajectory_points
else:
return trajectories
elif infile.endswith('.csv') or infile.endswith('.tsv'):
# Read in the points from the CSV file
reader = domain_module.TrajectoryPointReader()
reader.input = open(infile, 'r')
reader.comment_character = comment_character
if infile.endswith('.tsv') and field_delimiter != '\t':
field_delimiter = '\t'
reader.field_delimiter = field_delimiter
reader.object_id_column = object_id_column
reader.timestamp_column = timestamp_column
if domain == 'terrestrial':
reader.coordinates[0] = longitude_column
reader.coordinates[1] = latitude_column
elif domain in ['cartesian2d', 'cartesian3d']:
reader.coordinates[0] = x_column
reader.coordinates[1] = y_column
if domain == 'cartesian3d':
reader.coordinates[2] = z_column
else:
raise ValueError('Unsupported domain: `{}`, supported domains are terrestrial, cartesian2d and cartesian3d'.format(domain))
for name, column_num in real_fields.items():
reader.set_real_field_column(name, column_num)
for name, column_num in string_fields.items():
reader.set_string_field_column(name, column_num)
for name, column_num in time_fields.items():
reader.set_time_field_column(name, column_num)
if return_trajectory_points:
if return_list:
if tqdm_installed:
trajectory_points = list(tqdm(reader, desc="Loading Trajectory Points", unit=" point"))
else:
trajectory_points = list(reader)
else:
trajectory_points = reader
return trajectory_points
else:
# Assemble the points into trajectories
assembler = AssembleTrajectoryFromPoints()
assembler.input = reader
assembler.separation_distance = separation_distance
assembler.separation_time = timedelta(minutes=separation_time)
assembler.minimum_length = minimum_length
if return_list:
if tqdm_installed:
trajectories = list(tqdm(assembler.trajectories(), desc="Loading Trajectory Points And Assembling Points Into Trajectories", unit=" trajectory"))
else:
trajectories = list(assembler.trajectories())
else:
trajectories = assembler
return trajectories
else:
filename, file_extension = os.path.splitext(infile)
logger.error("Unsupported file type: `{}`, supported file types are .csv, .tsv and .traj.".format(file_extension))
raise IOError