"""
A SceneCollection is a collection of scenes. A Scene is a RasterCollections with an
acquisition date, an unique identifier and a (remote sensing) platform that acquired
the raster data.
Copyright (C) 2022 Lukas Valentin Graf
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
from __future__ import annotations
import datetime
import dateutil.parser
import geopandas as gpd
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pickle
import xarray as xr
from collections.abc import MutableMapping
from copy import deepcopy
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple
from eodal.core.raster import RasterCollection
from eodal.utils.exceptions import SceneNotFoundError
[docs]
class SceneCollection(MutableMapping):
"""
Collection of 0:N scenes where each scene is a RasterCollection with
**non-empty** `SceneProperties` as each scene is indexed by its
acquistion time.
"""
[docs]
def __init__(
self,
scene_constructor: Optional[Callable[..., RasterCollection]] = None,
indexed_by_timestamps: Optional[bool] = True,
*args,
**kwargs,
):
"""
Initializes a SceneCollection object with 0 to N scenes.
:param scene_constructor:
optional callable returning an `~eodal.core.raster.RasterCollection`
instance.
:param indexed_by_timestamps:
if True, all scene indices are interpreted as timestamps
(`datetime.datetime`).
Set to False if scene indices should be treated as different data types
:param args:
arguments to pass to `scene_constructor` or one of RasterCollection's
class methods (e.g., `RasterCollection.from_multi_band_raster`)
:param kwargs:
key-word arguments to pass to `scene_constructor` or one of
RasterCollection's class methods (e.g.,
`RasterCollection.from_multi_band_raster`)
"""
# mapper are stored in a dictionary like collection
self._frozen = False
self.collection = dict()
self._frozen = True
self._is_sorted = True
self._indexed_by_timestamps = indexed_by_timestamps
self._identifiers = []
if scene_constructor is not None:
scene = scene_constructor.__call__(*args, **kwargs)
self.__setitem__(scene)
def __getitem__(self, key: str | slice) -> RasterCollection:
def _get_scene_from_key(key: str | Any) -> RasterCollection:
if self.indexed_by_timestamps:
if str(key) in self.timestamps:
# most likely time stamps are passed as strings
# ^^ not true for scene collection loaded from pickle file
# we infer the format using dateutil
if isinstance(key, str):
key = dateutil.parser.parse(key)
return self.collection[key]
else:
if key in self.timestamps:
return self.collection[key]
if key in self.identifiers:
scene_idx = self.identifiers.index(key)
return self.__getitem__(self.timestamps[scene_idx])
# has a single key or slice been passed?
if not isinstance(key, slice):
try:
return _get_scene_from_key(key=key)
except IndexError:
raise SceneNotFoundError(
f"Could not find a scene for key {key} in collection"
)
else:
if not self.is_sorted:
raise ValueError(
"Slices are not permitted on unsorted SceneCollections"
)
# find the index of the start and the end of the slice
slice_start = key.start
slice_end = key.stop
# return an empty SceneCollection if start and stop is the same
# (numpy array behavior)
if slice_start is None and slice_end is None:
return SceneCollection()
# if start is None use the first scene
if slice_start is None:
if isinstance(slice_end, datetime.date):
if not self.indexed_by_timestamps:
raise ValueError(
"Cannot slice on timestamps when "
"`indexed_by_timestamps` is False"
)
slice_start = list(self.collection.keys())[0].date()
else:
if slice_end in self.identifiers:
slice_start = self.identifiers[0]
else:
slice_start = self.timestamps[0]
# if end is None use the last scene
end_increment = 0
if slice_end is None:
if isinstance(slice_start, datetime.date):
if not self.indexed_by_timestamps:
raise ValueError(
"Cannot slice on timestamps when "
"`indexed_by_timestamps` is False"
)
slice_end = list(self.collection.keys())[-1].date()
else:
if slice_start in self.identifiers:
slice_end = self.identifiers[-1]
else:
slice_end = self.timestamps[-1]
# to ensure that the :: operator works, we need to make
# sure the last band is also included in the slice
end_increment = 1
if set([slice_start, slice_end]).issubset(set(self.timestamps)):
idx_start = self.timestamps.index(slice_start)
idx_end = self.timestamps.index(slice_end) + end_increment
scenes = self.timestamps
elif set([slice_start, slice_end]).issubset(set(self.identifiers)):
idx_start = self.identifiers.index(slice_start)
idx_end = self.identifiers.index(slice_end) + end_increment
scenes = self.identifiers
# allow selection by date range
elif isinstance(slice_start, datetime.date) and isinstance(
slice_end, datetime.date
):
if not self.indexed_by_timestamps:
raise ValueError(
"Cannot slice on timestamps when "
"`indexed_by_timestamps` is False"
)
out_scoll = SceneCollection()
for timestamp, scene in self:
if end_increment == 0:
if slice_start <= timestamp.date() < slice_end:
out_scoll.add_scene(scene.copy())
else:
if slice_start <= timestamp.date() <= slice_end:
out_scoll.add_scene(scene.copy())
return out_scoll
else:
raise SceneNotFoundError(f"Could not find scenes in {key}")
slice_step = key.step
if slice_step is None:
slice_step = 1
# get an empty SceneCollection for returning the slide
out_scoll = SceneCollection()
for idx in range(idx_start, idx_end, slice_step):
out_scoll.add_scene(_get_scene_from_key(key=scenes[idx]))
return out_scoll
def __getstate__(self):
return self.__dict__.copy()
def __setitem__(self, item: RasterCollection):
if not isinstance(item, RasterCollection):
raise TypeError("Only RasterCollection objects can be passed")
if not item.is_scene:
raise ValueError(
"Only RasterCollection with timestamps in "
"their scene_properties can be passed"
)
# scenes are index by their acquisition time
key = item.scene_properties.acquisition_time
if key in self.collection.keys():
raise KeyError("Duplicate scene names are not permitted")
if key is None:
raise ValueError(
"RasterCollection passed must have an acquisition time stamp"
)
# it's important to make a copy of the scene before adding it
# to the collection
value = deepcopy(item)
self.collection[key] = value
# last, use the scene uri as an alias if available
if hasattr(item.scene_properties, "product_uri"):
self._identifiers.append(item.scene_properties.product_uri)
def __setstate__(self, d):
self.collection = d
def __delitem__(self, key: str | datetime.datetime):
# get index of the scene to be deleted to also delete its identifier
idx = self.timestamps.index(str(key))
# casts strings back to datetime objects
if isinstance(key, str):
key = dateutil.parser.parse(key)
del self.collection[key]
_ = self.identifiers.pop(idx)
def __iter__(self):
for k, v in self.collection.items():
yield k, v
def __len__(self) -> int:
return len(self.collection)
def __repr__(self) -> str:
if self.empty:
return "Empty EOdal SceneCollection"
else:
if self.indexed_by_timestamps:
timestamps = ", ".join(self.timestamps)
else:
timestamps = ", ".join([str(x) for x in self.timestamps])
return (
"EOdal SceneCollection\n----------------------\n"
+ f"# Scenes: {len(self)}\nTimestamps: {timestamps}\n"
+ f'Scene Identifiers: {", ".join(self.identifiers)}'
)
@property
def indexed_by_timestamps(self) -> bool:
return self._indexed_by_timestamps
@staticmethod
def _sort_keys(
sort_direction: str,
raster_collections: List[RasterCollection] | Tuple[RasterCollection],
) -> np.ndarray:
"""
Returns sorted indices from a list/ tuple of RasterCollections.
"""
# check sort_direction passed
if sort_direction not in ["asc", "desc"]:
raise ValueError("Sort direction must be one of: `asc`, `desc`")
# get timestamps of the scenes and use np.argsort to bring them
# into the desired order
timestamps = [x.scene_properties.acquisition_time for x in raster_collections]
if sort_direction == "asc":
sort_idx = np.argsort(timestamps)
elif sort_direction == "desc":
sort_idx = np.argsort(timestamps)[::-1]
return sort_idx
@property
def empty(self) -> bool:
"""Scene Collection is empty"""
return len(self) == 0
@property
def timestamps(self) -> List[str | Any]:
"""acquisition timestamps of scenes in collection"""
if self.indexed_by_timestamps:
return [str(x) for x in list(self.collection.keys())]
else:
return list(self.collection.keys())
@property
def identifiers(self) -> List[str]:
"""list of scene identifiers"""
return self._identifiers
@property
def is_sorted(self) -> bool:
"""are the scenes sorted by their timstamps?"""
return self._is_sorted
@is_sorted.setter
def is_sorted(self, value: bool) -> None:
"""are the scenes sorted by their timestamps?"""
if not type(value) == bool:
raise TypeError("Only boolean types are accepted")
self._is_sorted = value
[docs]
@classmethod
def from_pickle(cls, stream: bytes | Path):
"""
Load SceneCollection from pickled binary stream.
:param stream:
pickled binary stream to load into a SceneCollection or
file-path to pickled binary on disk.
:returns:
`SceneCollection` instance.
"""
if isinstance(stream, Path):
with open(stream, "rb") as f:
reloaded = pickle.load(f)
elif isinstance(stream, bytes):
reloaded = pickle.loads(stream)
else:
raise TypeError(f"{type(stream)} is not a supported data type")
# open empty scene collection and add scenes one by one
scoll_out = cls()
for _, scene in reloaded["collection"].items():
scoll_out.add_scene(scene)
return scoll_out
[docs]
@classmethod
def from_raster_collections(
cls,
raster_collections: List[RasterCollection] | Tuple[RasterCollection],
sort_scenes: Optional[bool] = True,
sort_direction: Optional[str] = "asc",
**kwargs,
):
"""
Create a SceneCollection from a list/tuple of N RasterCollection objects.
:param raster_collections:
list or tuple of RasterCollections from which to create a new scene
collection.
:param sort_scenes:
if True (default) scenes are order in chronological order by their
acquisition time.
:param sort_direction:
direction of sorting. Must be either 'asc' (ascending) or 'desc'
(descending). Ignored if `sort_scenes` is False.
:param kwargs:
key word arguments to pass to `SceneCollection` constructor call.
:returns:
SceneCollection instance
"""
# check inputs
if not isinstance(raster_collections, list) and not isinstance(
raster_collections, tuple
):
raise TypeError("Can only handle lists or tuples of RasterCollections")
if not np.array(
[isinstance(x, RasterCollection) for x in raster_collections]
).all():
raise TypeError("All items passed must be RasterCollection instances")
if not np.array([x.is_scene for x in raster_collections]).all():
raise TypeError("All items passed must have an acquisition timestamp")
# check if scenes shall be sorted
if sort_scenes:
sort_idx = cls._sort_keys(sort_direction, raster_collections)
is_sorted = True
else:
sort_idx = np.array([x for x in range(len(raster_collections))])
is_sorted = False
# open a SceneCollection instance and add the scenes
scoll = cls(**kwargs)
scoll.is_sorted = is_sorted
for idx in sort_idx:
scoll.add_scene(scene_constructor=raster_collections[idx].copy())
return scoll
[docs]
def add_scene(
self,
scene_constructor: Callable[..., RasterCollection] | RasterCollection,
*args,
**kwargs,
) -> None:
"""
Adds a Scene to the collection of scenes.
Raises an error if a scene with the same timestamp already exists (unique
timestamp constraint)
:param scene_constructor:
callable returning a `~eodal.core.raster.RasterCollection` instance or
existing `RasterCollection` instance
:param args:
positional arguments to pass to `scene_constructor`
:param kwargs:
keyword arguments to pass to `scene_constructor`
"""
# if a RasterCollection is passed no constructor call is required
try:
if isinstance(scene_constructor, RasterCollection):
scene = scene_constructor
else:
scene = scene_constructor.__call__(*args, **kwargs)
except Exception as e:
raise ValueError(f"Cannot initialize new Scene instance: {e}")
# try to add the scene to the SceneCollection
try:
self.__setitem__(scene)
except Exception as e:
raise KeyError(f"Cannot add scene: {e}")
[docs]
def apply(self, func: Callable, *args, **kwargs) -> Any:
"""
Apply a custom function to a SceneCollection.
:param func:
custom callable taking the ``SceneCollection`` as first
argument
:param args:
optional arguments to pass to `func`
:param kwargs:
optional keyword arguments to pass to `func`
:returns:
results of `func`
"""
try:
return func.__call__(self, *args, **kwargs)
except Exception as e:
raise ValueError from e
[docs]
def clip_scenes(self, inplace: Optional[bool] = False, **kwargs):
"""
Clip scenes in a SceneCollection to a user-defined spatial bounds.
:param inplace:
if False (default) returns a copy of the ``SceneCollection`` instance
with the changes applied. If True overwrites the values
in the current instance.
:param **kwargs:
key-word arguments to pass to `RasterCollection.clip_bands` method.
"""
# loop over Scenes and try to subset them spatially
# initialize a new SceneCollection if inplace is False
scoll_new = None
if inplace:
kwargs.update({"inplace": True})
if not inplace:
scoll_new = SceneCollection(
indexed_by_timestamps=self.indexed_by_timestamps
)
# loop over band reproject the selected ones
for timestamp, scene in self:
if inplace:
self[timestamp].clip_bands(**kwargs)
else:
scene = self[timestamp].copy()
scoll_new.add_scene(scene_constructor=scene.clip_bands, **kwargs)
if not inplace:
return scoll_new
[docs]
def copy(self):
"""returns a true copy of the SceneCollection"""
return deepcopy(self)
[docs]
def get_feature_timeseries(
self,
vector_features: Path | gpd.GeoDataFrame | str,
reindex_dataframe: Optional[bool] = False,
**kwargs,
) -> gpd.GeoDataFrame:
"""
Get a time series for 1:N vector features from SceneCollection.
:param vector_features:
vector features for which to extract time series data. If `Point` geometries
are provided calls `~RasterCollection.get_pixels()` on all scenes in the
collection. If `Polygon` (or `MultiPolygons`) are provided, calls
`~RasterCollection.band_summaries()` on all scenes in the collection.
:param reindex_dataframe:
boolean flag whether to reindex the resulting GeoDataFrame after extracting
data from all scenes. Set to `True` to ensure that the returned GeoDataFrame
has a unique index. `False` by default.
:param kwargs:
key word arguments to pass to `~RasterCollection.get_pixels()` or
`~RasterCollection.band_summaries()` depending on the type of the
input geometries.
:returns:
``GeoDataFrame`` with extracted raster values per feature and time stamp
"""
# check spatial datatypes
if not isinstance(vector_features, str):
if isinstance(vector_features, Path):
gdf = gpd.read_file(vector_features)
elif isinstance(vector_features, gpd.GeoDataFrame):
gdf = vector_features.copy()
else:
raise ValueError(
'Can only handle pathlibo objects, GeoDataFrames or "self"'
)
if set(gdf.geometry.geom_type.unique()).issubset(
set(["Point", "MulitPoint"])
):
pixels = True
elif set(gdf.geometry.geom_type.unique()).issubset(
set(["Polygon", "MultiPolygon"])
):
pixels = False
else:
raise ValueError(
"Can only handle (Multi)Point or (Multi)Polygon geometries"
)
else:
if vector_features == "self":
gdf = "self"
pixels = False
else:
raise ValueError('When passing a string only "self" is permitted')
# loop over scenes in collection and get the feature values
gdf_list = []
for timestamp, scene in self:
if pixels:
_gdf = scene.get_pixels(vector_features=gdf, **kwargs)
else:
_gdf = scene.band_summaries(by=gdf, **kwargs)
_gdf["acquisition_time"] = timestamp
gdf_list.append(_gdf)
# reindex the resulting GeoDataFrame if required
if reindex_dataframe:
gdf = pd.concat(gdf_list)
# reindexing is done by counting the features starting from zero
gdf.index = [x for x in range(gdf.shape[0])]
return gdf
else:
return pd.concat(gdf_list)
[docs]
def plot(
self,
band_selection: str | List[str],
max_scenes_in_row: Optional[int] = 6,
eodal_plot_kwargs: Optional[Dict] = {},
**kwargs,
) -> plt.Figure:
"""
Plots scenes in a `SceneCollection`
:param band_selection:
selection of band(s) to use for plotting. Must be either a single
band or a set of three bands
:param max_scenes_in_row:
number of scenes in a row. Set to 6 by default.
:param eodal_plot_kwargs:
optional keyword arguments to pass on to `eodal.core.band.Band.plot()`
:param kwargs:
optional keyword arguments to pass to `matplotlib.subplots()`.
:returns:
`Figure` object
"""
# check number of passed bands
if isinstance(band_selection, str):
band_selection = [band_selection]
if not len(band_selection) == 1 and not len(band_selection) == 3:
raise ValueError("You must pass a single band name or three band names")
plot_multiple_bands = True
if len(band_selection) == 1:
plot_multiple_bands = False
# check number of mapper in feature_scenes and determine figure size
n_scenes = len(self)
nrows = 1
ncols = 1
if self.empty:
raise ValueError("No scenes available for plotting")
elif n_scenes == 1:
f, ax = plt.subplots(**kwargs)
# cast to array to allow indexing
ax = np.array([ax]).reshape(1, 1)
else:
if n_scenes <= max_scenes_in_row:
ncols = n_scenes
f, ax = plt.subplots(ncols=ncols, nrows=nrows, **kwargs)
# reshape to match the shape of ax array with >1 rows
ax = ax.reshape(1, ax.size)
else:
nrows = int(np.ceil(n_scenes / max_scenes_in_row))
ncols = max_scenes_in_row
f, ax = plt.subplots(ncols=ncols, nrows=nrows, **kwargs)
# get scene labels
scene_labels = list(self.collection.keys())
row_idx, col_idx = 0, 0
idx = 0
for idx, _scene in enumerate(self):
scene = _scene[1]
if plot_multiple_bands:
scene.plot_multiple_bands(
band_selection=band_selection,
ax=ax[row_idx, col_idx],
**eodal_plot_kwargs,
)
else:
scene[band_selection[0]].plot(
ax=ax[row_idx, col_idx], **eodal_plot_kwargs
)
ax[row_idx, col_idx].set_title(scene_labels[idx])
idx += 1
# switch off axes labels if sharex == True and sharey=True
if (
kwargs.get("sharex", False)
and kwargs.get("sharey", False)
and n_scenes > 1
):
if nrows > 1:
if row_idx < (nrows - 1):
ax[row_idx, col_idx].set_xlabel("")
if nrows > 1:
if col_idx > 0:
ax[row_idx, col_idx].set_ylabel("")
# increase column (and row) counter accordingly
col_idx += 1
# begin a new row when all columns are filled
if col_idx == max_scenes_in_row:
col_idx = 0
row_idx += 1
# make sure sub-plot labels do not overlap
f.tight_layout()
return f
[docs]
def sort(self, sort_direction: Optional[str] = "asc"):
"""
Returns a sorted copy of the SceneCollection.
:param sort_direction:
direction of sorting. Must be either 'asc' (ascending) or 'desc'
(descending). Ignored if `sort_scenes` is False.
:returns:
sorted SceneCollection.
"""
# empty SceneCollections cannot be sorted
if self.empty:
return self.copy()
# get a list of all scenes in the collection and sort them
scenes = [v for _, v in self]
sort_idx = self._sort_keys(sort_direction, raster_collections=scenes)
scoll = SceneCollection()
for idx in sort_idx:
scoll.add_scene(scenes[idx].copy())
return scoll
[docs]
def to_pickle(self) -> bytes:
"""
Dumps a scene collection as pickled object
:returns:
pickled binary object
"""
return pickle.dumps(self.__dict__.copy())
[docs]
def to_xarray(self, **kwargs) -> xr.DataArray:
"""
Converts all scenes in a SceneCollection to a single `xarray.DataArray`.
:param kwargs:
key word arguments to pass to `~RasterCollection.to_xarray`
:returns:
SceneCollection as `xarray.DataArray`
"""
# loop over scenes in Collection and convert them to xarray.DataArray
xarray_list = []
for timestamp, scene in self:
_xr = scene.to_xarray(**kwargs)
# _xr = _xr.to_dataset()
_xr = _xr.expand_dims(time=[timestamp])
xarray_list.append(_xr)
# concatenate into a single xarray along the time dimension
return xr.concat(xarray_list, dim="time")