Source code for eodal.mapper.mapper

"""
The EOdal `Mapper` class allows to extract and handle EO data in space and time
and bring the data into Analysis-Ready-Format (ARD).

.. versionadded:: 0.2.0

Copyright (C) 2022 Lukas Valentin Graf

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program.  If not, see <http://www.gnu.org/licenses/>.
"""

from __future__ import annotations

import cv2
import eodal
import geopandas as gpd
import getpass
import numpy as np
import pandas as pd
import warnings
import uuid
import yaml

from copy import deepcopy
from datetime import datetime
from pathlib import Path
from sqlalchemy.exc import DatabaseError
from rasterio import Affine
from shapely.geometry import box
from typing import Any, Callable, Dict, List, Optional

from eodal.config import get_settings
from eodal.core.algorithms import merge_datasets
from eodal.core.raster import RasterCollection
from eodal.core.scene import SceneCollection
from eodal.mapper.feature import Feature
from eodal.mapper.filter import Filter
from eodal.metadata.database.querying import find_raw_data_by_bbox
from eodal.metadata.utils import reconstruct_path
from eodal.utils.exceptions import STACError

settings = get_settings()
logger = settings.logger


[docs] class MapperConfigs: """ global configurations of the Mapper class defining metadata search criteria (space and time), data collections to search and behavior for bringing data into analysis-ready-format (ARD) :attrib collection: name of the collection (<platform>-<sensor>-<processing level>) to use. E.g., "Sentinel2-MSI-L2A :attrib feature: geographic feature(s) for which to extract data from collection :atrrib time_start: time stamp from which onwards to extract data from collection :atrrib time_end: time stamp till which to extract data from collection :attrib metadata_filters: list of custom metadata filters to shrink collection to. Examples include cloud cover filters in the case of optical data or filter by incidence angle in the case of SAR observations. :attrib data_source: denotes from which resource (local archive, STAC archive) the data was read from including the STAC API end point URL. """
[docs] def __init__( self, collection: str, feature: Feature, time_start: datetime, time_end: datetime, metadata_filters: Optional[List[Filter]] = None, ): """ default class constructor :param collection: name of the collection (<platform>-<sensor>) to use. E.g., "sentinel2-msi". <sensor> is optional and can be omitted if a platform does not carry more than a single sensor. I.e., one could also pass "sentinel2" instead. :param feature: geographic feature(s) for which to extract data from collection :param time_start: time stamp from which onwards to extract data from collection :param time_end: time stamp till which to extract data from collection :param metadata_filters: .. versionadd:: 0.2.1 list of custom metadata filters to shrink collection to. Examples include cloud cover filters in the case of optical data or filter by incidence angle in the case of SAR observations. """ # check inputs if not isinstance(collection, str): raise TypeError("Collection must be a string") if len(collection) < 3: raise ValueError("Collections must have at least 3 characters") if collection.count("-") > 2: raise ValueError( "Collections must obey the format <platform>-<sensor> where " + "<sensor> is optional" ) if not isinstance(feature, Feature): raise TypeError("Expected a Feature object") if not isinstance(time_start, datetime) and not isinstance(time_end, datetime): raise TypeError("Expected datetime objects") if metadata_filters is not None: if not np.array([isinstance(x, Filter) for x in metadata_filters]).all(): raise TypeError("All filters must be instances of the Filter class") self._collection = collection self._feature = feature self._time_start = time_start self._time_end = time_end self._metadata_filters = metadata_filters # set data source (..versionadd::0.2.1) self._data_source = f'STAC ({settings.STAC_BACKEND.URL})' if \ settings.USE_STAC else 'local'
def __repr__(self) -> str: return ( f"EOdal MapperConfig\n------------------\nCollection: {self.collection}" f"\nTime Range: {self.time_start} - {self.time_end}\nFeature:\n" f"{self.feature.__repr__()}\nMetadata Filters: " f"{str(self.metadata_filters)}\n" f"Data Source: {self.data_source}" ) @property def collection(self) -> str: return self._collection @property def platform(self) -> str: return self.collection.split("-")[0] @property def sensor(self) -> str: try: return self._collection.split("-")[1] except IndexError: return "" @property def feature(self) -> Feature: return self._feature @property def time_start(self) -> datetime: return self._time_start @property def time_end(self) -> datetime: return self._time_end @property def metadata_filters(self) -> List[Filter] | None: return self._metadata_filters @property def data_source(self) -> str: return self._data_source @data_source.setter def data_source(self, value: str): if not isinstance(value, str): raise TypeError('Expected a string') self._data_source = value
[docs] @classmethod def from_yaml(cls, fpath: str | Path): """ Load mapping configurations from YAML file :param fpath: file-path to yaml with Mapper configurations :returns: new `MapperConfigs` instance """ with open(fpath, "r") as f: try: yaml_content = yaml.safe_load(f) except yaml.YAMLError as exc: print(exc) # reconstruct the Featue object first if "feature" not in yaml_content.keys(): raise ValueError('"feature" attribute is required"') feature_yaml = yaml_content["feature"] # reconstruct the filter objects filter_list = [] if "metadata_filters" in yaml_content.keys(): filters_yaml = yaml_content["metadata_filters"] for filter_yaml in filters_yaml: filter_list.append(Filter(*filter_yaml.split())) try: feature = Feature.from_dict(feature_yaml) configs = cls( collection=yaml_content["collection"], feature=feature, time_start=yaml_content["time_start"], time_end=yaml_content["time_end"], metadata_filters=filter_list, ) # overwrite data_source attribute with content from # file in case it is available configs.data_source = yaml_content.get('data_source', 'UNKNOWN') return configs except KeyError as e: raise ValueError(f"Missing keys in yaml file: {e}")
[docs] def to_yaml(self, fpath: str | Path) -> None: """ save MapperConfig and some meta-information to YAML file (*.yml) :param fpath: file-path where saving the Feature instance to """ mapper_configs_dict = {} mapper_configs_dict["collection"] = self.collection mapper_configs_dict["feature"] = self.feature.to_dict() mapper_configs_dict["time_start"] = self.time_start mapper_configs_dict["time_end"] = self.time_end mapper_configs_dict["data_source"] = self.data_source mapper_configs_dict["metadata_filters"] = [ x.__repr__() for x in self.metadata_filters ] # add some meta-information about creation time and current user mapper_configs_dict["created_at"] = datetime.now() mapper_configs_dict["created_by"] = getpass.getuser() mapper_configs_dict["eodal_version"] = eodal.__version__ with open(fpath, "w+") as f: yaml.dump(mapper_configs_dict, f, allow_unicode=True)
[docs] class Mapper: """ Generic class for mapping Earth Observation Data across space and time and bring them into Analysis-Readay-Format (ARD). The mapper class takes over searching for EO scenes, merging them and filling eventually occurring black-fill (no-data regions). :attrib mapper_configs: `MapperConfig` instance defining search criteria :attrib data: loaded scene data as EOdal `SceneCollection` :attrib metadata: corresponding scene metadata as `GeoDataFrame`. """
[docs] def __init__( self, mapper_configs: MapperConfigs, time_column: Optional[str] = "sensing_time" ): """ Class constructor :param mapper_configs: `MapperConfig` instance defining search criteria :param time_column: name of the metadata column denoting the time stamps of the scenes. `sensing_time` by default. """ if not isinstance(mapper_configs, MapperConfigs): raise TypeError("Expected a MapperConfigs instance") self._mapper_configs = mapper_configs self._time_column = time_column self._metadata = None self._sensor = self.mapper_configs.collection.split("-")[0] self._data = None self._geoms_are_points = False
def __repr__(self) -> str: return f"EOdal Mapper\n============\n{self.mapper_configs.__repr__()}" @property def data(self) -> None | SceneCollection | gpd.GeoDataFrame: """ SceneCollection with scenes found or GeoDataFrame in case single pixel values are extracted """ return self._data @data.setter def data(self, scoll: Optional[SceneCollection] = None): if scoll is not None: if not isinstance(scoll, SceneCollection) and not isinstance( scoll, gpd.GeoDataFrame ): raise TypeError("Expected a EOdal SceneCollection or GeoDataFrame") self._data = scoll @property def mapper_configs(self) -> MapperConfigs: """mapper configurations""" return self._mapper_configs @property def metadata(self) -> None | gpd.GeoDataFrame: """scene metadata found""" return self._metadata @metadata.setter def metadata(self, values: Optional[gpd.GeoDataFrame] = None): """set scene metadata""" if values is not None: if not isinstance(values, gpd.GeoDataFrame): raise TypeError("Expected a GeoDataFrame") self._metadata = deepcopy(values) @property def sensor(self) -> str | None: return self._sensor @property def time_column(self) -> str: return self._time_column @time_column.setter def time_column(self, value: str): """..versionadd:: 0.2.2""" if not isinstance(value, str): raise TypeError('time_column must be a string') if len(value) <= 0: raise ValueError('String must not be empty') self._time_column = value
[docs] def query_scenes(self) -> None: """ Query available scenes for the current `MapperConfigs` and loads into the `observations` attribute. Depending on the settings of EOdal, this method either makes a query to a STAC resource or to a PostgreSQL/PostGIS database. NOTE: This method only queries a metadata catalog without reading data. """ # check for point geometries self._geoms_are_points = self.mapper_configs.feature.geometry.geom_type in [ "Point", "MultiPoint", ] # determine bounding box of the feature using # its representation in geographic coordinates (WGS84, EPSG: 4326) feature_wgs84 = self.mapper_configs.feature.to_epsg(4326) bbox = box(*feature_wgs84.geometry.bounds) # determine platform and collection platform = self.mapper_configs.platform collection = self.mapper_configs.collection # put kwargs together kwargs = { "collection": collection, "time_start": self.mapper_configs.time_start, "time_end": self.mapper_configs.time_end, "bounding_box": bbox, "metadata_filters": self.mapper_configs.metadata_filters, } # query the metadata catalog (STAC or database depending on settings) if settings.USE_STAC: try: exec(f"from eodal.metadata.stac import {platform}") scenes_df = eval(f"{platform}(**kwargs)") except Exception as e: raise STACError(f"Querying STAC catalog failed: {e}") else: try: scenes_df = find_raw_data_by_bbox(**kwargs) except Exception as e: raise DatabaseError(f"Querying metadata DB failed: {e}") # make sure the time column is handled as pandas datetime # objects if not scenes_df.empty: scenes_df[self.time_column] = pd.to_datetime( scenes_df[self.time_column], utc=True) # populate the metadata attribute self.metadata = scenes_df
def _process_scene( self, item: pd.Series, scene_constructor: Callable[..., RasterCollection], scene_constructor_kwargs: Dict[str, Any], scene_modifier: Callable[..., RasterCollection], scene_modifier_kwargs: Dict[str, Any], reprojection_method: int, ) -> RasterCollection: """ Pre-process a scene so it can be added to a SceneCollection. This includes applying user-defined reading and modification functions. In addition, the `Scene` is projected into the target spatial reference system by using the reference system the majority of the scenes in a collection has in common. IMPORTANT: The reprojection step into the target spatial reference system (if scene is not projected in it already) is **always** done! :param item: metadata item of the scene including its file-path or URL :param scene_constructor: Callable used to read the scenes found into `RasterCollection` fulfilling the `is_scene` criterion (i.e., a time stamp is available). :param scene_constructor_kwargs: keyword-arguments to pass to `scene_constructor`. `fpath_raster` and `vector_features` are filled in by the `Mapper` instance automatically, :param scene_modifier: optional Callable modifying a `RasterCollection` or returning a new `RasterCollection`. :param scene_modifier_kwargs: keyword arguments for `scene_modifier` :returns: `Scene` with all pre-processing steps applied. """ scene_constructor_kwargs.update( {"vector_features": self.mapper_configs.feature.to_geoseries()} ) try: # call scene constructor. The file-path (or URL) goes first scene = scene_constructor.__call__( item.real_path, **scene_constructor_kwargs ) scene.scene_properties.sensing_time = item[self.time_column] except Exception as e: raise ValueError(f"Could not load scene: {e}") # apply scene modifier callable if available if scene_modifier is not None: scene = scene_modifier.__call__(scene, **scene_modifier_kwargs) # reproject scene if necessary # ..versionadd 0.2.3 check for the EPSG to make sure no unnecessary # operations are undertaken to save runtime and avoid floating # point inaccuracies epsg_scene = [b.geo_info.epsg for _, b in scene] intersection = set(epsg_scene).intersection(set([item.target_epsg])) # we need to reproject only if the intersection returns an empty set. if len(intersection) == 0: scene.reproject( target_crs=item.target_epsg, interpolation_method=reprojection_method, inplace=True ) return scene def _load_scenes_collection( self, reprojection_method: Optional[int] = cv2.INTER_NEAREST_EXACT, scene_constructor: Optional[ Callable[..., RasterCollection] ] = RasterCollection.from_multi_band_raster, scene_constructor_kwargs: Optional[Dict[str, Any]] = {}, scene_modifier: Optional[Callable[..., RasterCollection]] = None, scene_modifier_kwargs: Optional[Dict[str, Any]] = {}, round_time_stamps_to_freq: Optional[str] = None ) -> None: """ Auxiliary method to handle EOdal scenes and store them into a SceneCollection. This method is called when the geometries used for calling the `Mapper` instance are of type `Polygon` or `MultiPolygon`. Mosaicing operations are handled on the fly so calling programs will always receive analysis-ready data. IMPORTANT: When mosaicing is necessary, it **must** be ensured that all bands have the spatial resolution. Thus, either read only bands with the same spatial resolution or use the `scene_modifier` function to resample the bands into a common spatial resolution. If this requirement is not fulfilled the mosaicing will fail! ..versionadd:: 0.2.1 To make sure all scenes in the collection share the same extent and grid, a post-processing step is carried out re-projecting all scenes into a common reference grid using nearest neighbor interpolation. This ensures not only all scenes having the same spatial extent which is convenient for stacking scenes in time, but also makes sure there are no pixel shifts due to re-projections from different spatial reference systems such as different UTM zones. ..versionadd:: 0.2.2 When mosaicing (i.e., a scene is split into multiple tiles) we not only have to look for spatial and temporal overlap (old behavior) but we must also allow a certain tolerance in the timestamps in the range of typically some seconds to minutes to merge imagery that was acquired slightly one-after-another. To do so, a new keyword argument `round_time_stamps_to_freq` was added that defaults to None (old behavior) and can be set to any string value accepted by `~pandas.Timestamp.round`. :param scene_constructor: Callable used to read the scenes found into `RasterCollection` fulfilling the `is_scene` criterion (i.e., a time stamp is available). The callable is applied to all scenes found in the metadata query call. By default the standard class-method call `~RasterCollection.from_multi_band_raster` is used. It can be replaced, however, with a custom-written callable that can be of any design except that it **MUST** accept a keyword argument `fpath_raster` used for reading the Scene data and `vector_features` for cropping the data to the spatial extent of the Mapper instance. :param scene_constructor_kwargs: optional keyword-arguments to pass to `scene_constructor`. `fpath_raster` and `vector_features` are filled in by the `Mapper` instance automatically, i.e., any custom values passed will be overwritten. :param scene_modifier: optional Callable modifying a `RasterCollection` or returning a new `RasterCollection`. The Callable is applied to all scenes in the `SceneCollection` when loaded by the `Mapper`. Can be used, e.g., to calculate spectral indices on the fly or for applying masks. :param scene_modifier_kwargs: optional keyword arguments for `scene_modifier` (if any). :param round_time_stamps_to_freq: ..versionadd:: 0.2.2 optionally round scene time stamps to a custom temporal frequency accepted by `~pandas.Timestamp.round` to allow moscaicing of scenes with slightly different timestamps as it might be necessary for some EO platforms. """ # open a SceneCollection for storing the data scoll = SceneCollection() logger.info(f"Starting extraction of {self.sensor} scenes") # filter out datasets for which mosaicing is necessary (time stamp is the same) # ..versionadd:: 0.2.2 # Allow a user-defined temporal tolerance (background: some # platforms such as Landsat provide the "scene-center" time, i.e., two # scenes that were acquired one-after-another differ only be a few minutes) if round_time_stamps_to_freq is not None: # new optional behavior eodal >= 0.2.2 rounded_time_column = \ f'{self.time_column}_rounded_{round_time_stamps_to_freq}' self.metadata[rounded_time_column] = \ self.metadata[self.time_column].dt.round( freq=round_time_stamps_to_freq) # set time column to rounded time stamps but keep # the name of the "old", i.e., original time column self.time_column = rounded_time_column self.metadata["_duplicated"] = self.metadata[self.time_column].duplicated( keep=False ) # datasets where the 'duplicated' entry is False are truely unqiue _metadata_unique = self.metadata[~self.metadata._duplicated].copy() _metadata_nonunique = self.metadata[self.metadata._duplicated].copy() # mosaic the non-unique datasets first if not _metadata_nonunique.empty: _metadata_nonunique.sort_values(by=self.time_column, inplace=True) unique_time_stamps = _metadata_nonunique[self.time_column].unique() # loop over unique time stamps. In the end there should be a single # scene per time stamp update_scene_properties_list = [] for unique_time_stamp in unique_time_stamps: scenes = _metadata_nonunique[ _metadata_nonunique[self.time_column].dt.strftime("%Y-%m-%d %H:%M") == pd.to_datetime(unique_time_stamp).strftime("%Y-%m-%d %H:%M")[ 0:16 ] ] # read the datasets one by one, save them into a temporary directory # and merge them using rasterio dataset_list = [] scene_properties_list = [] for _, item in scenes.iterrows(): _scene = self._process_scene( item=item, scene_constructor=scene_constructor, scene_constructor_kwargs=scene_constructor_kwargs, reprojection_method=reprojection_method, scene_modifier=scene_modifier, scene_modifier_kwargs=scene_modifier_kwargs, ) fname_scene = settings.TEMP_WORKING_DIR.joinpath( f"{uuid.uuid4()}.tif" ) _scene.to_rasterio(fname_scene) dataset_list.append(fname_scene) scene_properties_list.append(_scene.scene_properties) # merge datasets using rasterio and read results back into a scene band_options = { "band_names": _scene.band_names, "band_aliases": _scene.band_aliases } scene = merge_datasets( datasets=dataset_list, target_crs=self.metadata.target_epsg.unique()[0], vector_features=self.mapper_configs.feature.to_geoseries(), sensor=self.sensor, band_options=band_options ) # handle scene properties. They need to be merged as well merged_scene_properties = scene_properties_list[0] for other_scene_properties in scene_properties_list[1::]: scene_props_keys = list(merged_scene_properties.__dict__.keys()) for scene_prop in scene_props_keys: first_val = eval(f"merged_scene_properties.{scene_prop}") this_val = eval(f"other_scene_properties.{scene_prop}") if first_val != this_val: # only string values can be merged (connected by '&&') if isinstance(first_val, str): new_val = first_val + "&&" + this_val # noqa: F841 exec(f"merged_scene_properties.{scene_prop} = new_val") scene.scene_properties = merged_scene_properties # because ESA has some mess with the naming of their file names and # metadata, there might be duplicated seems not detected by the mapper # since the time stamps slightly differ (few seconds in some cases) but # their IDs are the same try: scoll.add_scene(scene) except KeyError: logger.warn( f"Scene with ID {merged_scene_properties.product_uri} " "already added to SceneCollection - continue" ) continue update_scene_properties_list.append(merged_scene_properties) # update the metadata entries to avoid mis-matches in the number of # scenes and their URIs self.metadata.drop_duplicates( subset=[self.time_column], keep="first", inplace=True ) for updated_scene_properties in update_scene_properties_list: # use the time stamp for finding the correct metadata # records. There might be some disagreement in the milliseconds # because of different precision levels therefore, an offset # of less than 1 second is tolerated idx = self.metadata[ abs( self.metadata[self.time_column] - pd.to_datetime( updated_scene_properties.acquisition_time, utc=True) ) < pd.Timedelta(60, unit="minutes") ].index for scene_property in updated_scene_properties.__dict__: self.metadata.loc[idx, scene_property] = eval( f"updated_scene_properties.{scene_property}" ) # then add those scenes that are unique, i.e., no mosaicing is required if not _metadata_unique.empty: for _, item in _metadata_unique.iterrows(): scene = self._process_scene( item=item, scene_constructor=scene_constructor, scene_constructor_kwargs=scene_constructor_kwargs, scene_modifier=scene_modifier, scene_modifier_kwargs=scene_modifier_kwargs, reprojection_method=reprojection_method, ) # because ESA has some mess with the naming of their file names and # metadata, there might be duplicated seems not detected by the mapper # since the time stamps slightly differ (few seconds in some cases) but # their IDs are the same try: scoll.add_scene(scene) except KeyError: logger.warn( f"Scene with ID {scene.scene_properties.product_uri} " "already added to SceneCollection - continue" ) continue # sort scenes by their timestamps and save as data attribute # to mapper instance scoll = scoll.sort() # ..versionadd:: 0.2.1 # we have to make sure that the scenes in self.data all share the same # extent and are aligned onto the same grid. # We use the boundary of all scenes to update the upper left corner bounds_list = [] ncols = [] nrows = [] for _, scene in scoll: bounds = scene[scene.band_names[0]].bounds ncols.append(scene[scene.band_names[0]].ncols) nrows.append(scene[scene.band_names[0]].nrows) crs = scene[scene.band_names[0]].crs bounds_list.append(gpd.GeoDataFrame( geometry=[bounds], crs=crs )) bounds_gdf = pd.concat(bounds_list) total_bounds = bounds_gdf.total_bounds # ..versionadd 0.2.3 (https://github.com/EOA-team/eodal/issues/90) # we need to ensure that all scenes have not only the same extent but # also the same pixel size and, hence, number of rows and columns # The heuristic is: # 1) is there a since that was not reprojected (_duplicated is False) # 2) If no: use the first scene as reference scene # 3) If yes: use the first scene that was not reprojected # is there at a least a single scene that was not reprojected if not self.metadata._duplicated.all(): reference_time = self.metadata[ ~self.metadata._duplicated][self.time_column].values[0] timestamps = scoll.timestamps reference_timestamp_idx = np.argmin( [pd.to_datetime(x) - reference_time for x in timestamps]) reference_scene = scoll[ timestamps[reference_timestamp_idx]] else: reference_scene = scoll[scoll.timestamps[0]] # loop over scenes and project them onto the total bounds for _, scene in scoll: # loop over the bands and reproject them # onto the target grid for band_name, band in scene: # get the reference band reference_band = reference_scene[band_name] geo_info = reference_band.geo_info # get the transform of the destination extent minx, maxy = total_bounds[0], total_bounds[-1] dst_transform = Affine( a=geo_info.pixres_x, b=0, c=minx, d=0, e=geo_info.pixres_y, f=maxy) dst_shape = (max(nrows), max(ncols)) destination = np.zeros( dst_shape, dtype=band.values.dtype ) # determine nodata if not np.isnan(band.nodata): dst_nodata = band.nodata else: # if band nodata is NaN we set # no-data to None (rasterio default) dst_nodata = None band.reproject( inplace=True, target_crs=reference_band.crs, dst_transform=dst_transform, destination=destination, dst_nodata=dst_nodata) self.data = scoll logger.info(f"Finished extraction of {self.sensor} scenes") def _load_pixels( self, pixel_reader: Optional[ Callable[..., gpd.GeoDataFrame] ] = RasterCollection.read_pixels, pixel_reader_kwargs: Optional[Dict[str, Any]] = {}, ) -> None: """ Load pixel values from 0:N scenes into a `GeoDataFrame` :param pixel_reader: Callable to read the pixels from scenes. `RasterCollection.read_pixels` by default. Any other callable can be provided as long as it returns a GeoDataFrame and accepts `vector_features` and `fpath_raster` as input keyword arguments. :param pixel_reader_kwargs: optional keyword arguments to pass on to `pixel_reader`. """ # loop over scenes and read the pixel values. Carry out reprojection where # necessary pixel_scene_list = [] for _, item in self.metadata.iterrows(): pixel_reader_kwargs.update( { "vector_features": self.mapper_configs.feature.to_geoseries(), } ) try: pixels = pixel_reader.__call__(item.real_path, **pixel_reader_kwargs) pixels[self.time_column] = item[self.time_column] except Exception as e: raise ValueError(f"Could not read pixel data: {e}") # reproject pixels if necessary pixels.to_crs(epsg=item.target_epsg, inplace=True) pixel_scene_list.append(pixels) self.data = pd.concat(pixel_scene_list)
[docs] def load_scenes( self, scene_kwargs: Optional[Dict[str, Any]] = None, pixel_kwargs: Optional[Dict[str, Any]] = None, round_time_stamps_to_freq: Optional[str] = None ) -> None: """ Load scenes from `~Mapper.query_scenes` result into a `SceneCollection` (Polygon and MultiPolygon geometries) or into a `GeoDataFrame` (Point and MultiPoint geometries). Mosaicing operations are handled on the fly so calling programs will always receive analysis-ready data (e.g., when working across image tiles). :param scene_kwargs: key-word arguments to pass to `~Mapper._load_scenes_collection` for handling EOdal scenes. These arguments *MUST* be provided when using `Polygon` or `MulitPolygon` geometries in the `Mapper` call. :param pixel_kwargs: key-word arguments to pass to `~Mapper._load_pixels` for handling single pixel values. These arguments *MUST* be provided when using `Point` or `MulitPoint` geometries in the `Mapper` call. :param round_time_stamps_to_freq: ..versionadd:: 0.2.2 optionally round scene time stamps to a custom temporal frequency accepted by `~pandas.Timestamp.round` to allow moscaicing of scenes with slightly different timestamps as it might be necessary for some EO platforms. """ # check if the correct keyword arguments have been passed if not self._geoms_are_points: if scene_kwargs is None: raise ValueError( "Since Polygon/MultiPolygon geometries are provided " + "`pixel_kwargs` must not be empty" ) # check if scenes have been queried and found if self.metadata is None: warnings.warn( "No scenes are available - have you already executed " "Mapper.query_scenes()?" ) return if self.metadata.empty: warnings.warn( "No scenes were found - consider modifying your search criteria" ) return # check the spatial reference system of the scenes found. The mapper class # will ensure that all of them will be available in the same CRS. The CRS # is selected which most of the scenes already have to keep the reprojection # costs as small as possible (and also to avoid resampling induced errors) try: self.metadata["target_epsg"] = self.metadata.epsg.mode().values[0] except KeyError as e: raise ValueError(f"Could not determine CRS of scenes: {e}") # provide paths to raster data. Depending on th settings, this is a path on the # file system or a URL self.metadata["real_path"] = "" if settings.USE_STAC: self.metadata["real_path"] = self.metadata["assets"] else: self.metadata["real_path"] = self.metadata.apply( lambda x: reconstruct_path(record=x), axis=1 ) # load the data depending on the geometry type of the feature(s) if self._geoms_are_points: self._load_pixels(**pixel_kwargs) else: self._load_scenes_collection( round_time_stamps_to_freq=round_time_stamps_to_freq, **scene_kwargs )