Source code for eodal.metadata.stac.client

"""
Querying datasets from a Spatio-Temporal Asset Catalog (STAC).
"""

from __future__ import annotations

import geopandas as gpd
import pandas as pd
import warnings

from datetime import datetime
from pystac_client import Client
from pystac_client.stac_api_io import StacApiIO
from requests.adapters import HTTPAdapter, Retry
from shapely.geometry import box, Polygon
from typing import Any, Dict, List

from eodal.config import get_settings, STAC_Providers
from eodal.mapper.filter import Filter
from eodal.utils.decorators import prepare_bbox
from eodal.utils.geometry import box_from_transform, prepare_gdf
from eodal.utils.reprojection import infer_utm_zone
from eodal.utils.timestamps import datetime_to_date

Settings = get_settings()


[docs] def query_stac( time_start: datetime, time_end: datetime, collection: str, bounding_box: Polygon, ) -> List[Dict[str, Any]]: """ Queries a STAC (Spatio-Temporal Asset Catalog) by bounding box and time period to get items from a user-defined collection. This is a sensor-agnostic function called by sensor-specific ones. :param time_start: start of the time period :param time_end: end of the time period :param collection: name of the collection (e.g., sentinel-2-l2a for Sentinel-2 L2A data) :param bounding_box: bounding box either as extended well-known text in geographic coordinates or as shapely ``Polygon`` in geographic coordinates (WGS84) :returns: list of dictionary items returned from the STAC query """ # open connection to STAC server (specify custom CA_BUNDLE if required) stac_api_io = StacApiIO() # ..versionadd:: 0.2.1 # add retries in case of HTTP 502, 503 and 504 errors # TODO pass `Retry` object to `StacApiIO` with python >= 3.8 # and `pystac-client` >= 0.7.0. retries = Retry( total=Settings.NUMBER_HTTPS_RETRIES, backoff_factor=1, status_forcelist=[502, 503, 504]) stac_api_io.session.mount("http://", HTTPAdapter(max_retries=retries)) stac_api_io.session.mount("https://", HTTPAdapter(max_retries=retries)) # handle certificate bundle stac_api_io.session.verify = Settings.STAC_API_IO_CA_BUNDLE # setup the client cat = Client.from_file(Settings.STAC_BACKEND.URL, stac_io=stac_api_io) # transform dates into the required format (%Y-%m-%d) datestr = f'{time_start.strftime("%Y-%m-%d")}/{time_end.strftime("%Y-%m-%d")}' # search datasets on catalog search = cat.search( collections=collection, intersects=bounding_box, datetime=datestr, max_items=Settings.MAX_ITEMS, limit=Settings.LIMIT_ITEMS, ) # fetch items and convert them to GeoDataFrame, drop all records with # too many clouds items = search.item_collection() item_json = items.to_dict() scenes = item_json["features"] return scenes
def _filter_criteria_fulfilled( metadata_dict: Dict[str, Any], metadata_filters: List[Filter] ) -> bool: """ Check if a scene fulfills the metadata filter criteria :param metadata_dict: scene metadata returned from STAC item :param metadata_filters: scene metadata filters to apply on metadata_dict :returns: `True` if all criteria passed, `False` if a single criterion was not met. """ criteria_fulfilled = [] for _filter in metadata_filters: if _filter.entity in ["processing_level", "product_type"]: continue if _filter.entity not in metadata_dict.keys(): warnings.warn( f"{_filter.entity} could not be retrieved from STAC -> skipping filter" ) # check if the filter condition is met # check if the metadata item to filter is a list or single value if not isinstance(metadata_dict[_filter.entity], list): if isinstance(metadata_dict[_filter.entity], str): eval_str = \ f'metadata_dict["{_filter.entity}"] ' + \ f'{_filter.operator} "{_filter.value}"' else: eval_str = \ f'metadata_dict["{_filter.entity}"] ' + \ f'{_filter.operator} {_filter.value}' condition_met = eval(eval_str) else: # TODO: this is not really elegant and might not deliver always the # results we are looking for ... # convert _filter.value to list if it is not yet value = _filter.value if not isinstance(value, list): value = [value] # redefine the operators to work with sets if _filter.operator == '==': condition_met = set(value).issubset(metadata_dict[_filter.entity]) elif _filter.operator == '!=': condition_met = not set(value).issubset(metadata_dict[_filter.entity]) criteria_fulfilled.append(condition_met) return all(criteria_fulfilled)
[docs] @prepare_bbox def sentinel2(metadata_filters: List[Filter], **kwargs) -> gpd.GeoDataFrame: """ Sentinel-2 specific STAC query allows filtering by scene-wide cloudy pixel percentage. :param metadata_filters: custom filters for filtering scenes in catalog by some metadata attributes :param kwargs: keyword arguments to pass to `query_stac` function :returns: dataframe with references to found Sentinel-2 scenes """ # check for processing level of the data and set the collection accordingly filter_entities = [x.entity for x in metadata_filters] if "processing_level" in filter_entities: processing_level = [ x.value for x in metadata_filters if x.entity == "processing_level" ][0] processing_level = processing_level.replace("-", "_") processing_level_stac = eval(f"Settings.STAC_BACKEND.S2{processing_level}") kwargs.update({"collection": processing_level_stac}) # query STAC catalog stac_kwargs = kwargs.copy() scenes = query_stac(**stac_kwargs) # get STAC provider specific naming conventions s2 = Settings.STAC_BACKEND.Sentinel2 # loop over scenes found and apply the Filters provided metadata_list = [] for scene in scenes: # extract scene metadata required for Sentinel-2 # map the STAC keys to eodal's naming convention props = scene["properties"] # tile-id requires some string handling in case of AWS if isinstance(s2.tile_id, list): tile_id = "".join( [str(props[x]) for x in Settings.STAC_BACKEND.Sentinel2.tile_id] ) else: tile_id = props[Settings.STAC_BACKEND.Sentinel2.tile_id] # the product_uri is also not handled the same way by the different # STAC providers try: product_uri = scene[s2.product_uri] except KeyError: product_uri = props[s2.product_uri] # same for the scene_id try: scene_id = props[s2.scene_id] except KeyError: scene_id = scene[s2.scene_id] # TODO: think about a more generic way to do this. The problem is: # we need to map the different STAC provider settings into EOdals # metadata model to avoid having the user to think about it meta_dict = { "product_uri": product_uri, "scene_id": scene_id, "spacecraft_name": props[s2.platform], "tile_id": tile_id, "sensing_date": datetime_to_date(props[s2.sensing_time]), "cloudy_pixel_percentage": props[s2.cloud_cover], "epsg": props[s2.epsg], "sensing_time": datetime.strptime( props[s2.sensing_time], s2.sensing_time_fmt ), "sun_azimuth_angle": props[s2.sun_azimuth_angle], "sun_zenith_angle": props[s2.sun_zenith_angle], "geom": Polygon(scene["geometry"]["coordinates"][0]), } # get links to actual Sentinel-2 bands meta_dict["assets"] = scene["assets"] # apply filters append_scene = _filter_criteria_fulfilled(meta_dict, metadata_filters) if append_scene: metadata_list.append(meta_dict) # create geppandas GeoDataFrame out of scene metadata records return prepare_gdf(metadata_list)
[docs] @prepare_bbox def sentinel1(metadata_filters: List[Filter], **kwargs) -> gpd.GeoDataFrame: """ Sentinel-1 specific STAC query function to retrieve mapper from MSPC. IMPORTANT: Returns the RTC product by default if not stated otherwise (using `EOdal.mapper.filter` on `product_type` :param metadata_filters: custom filters for filtering scenes in catalog by some metadata attributes :param kwargs: :param kwargs: keyword arguments to pass to `query_stac` function :returns: dataframe with references to found Sentinel-1 scenes """ if Settings.STAC_BACKEND != STAC_Providers.MSPC: raise ValueError("This method currently requires Microsoft Planetary Computer") # construct collection string (defaults to S1RTC if not stated otherwise in the # metadata filters) collection = "S1" if len([x.entity for x in metadata_filters if x.entity == "product_type"]) == 1: collection += [x.value for x in metadata_filters if x.entity == "product_type"][ 0 ] else: collection += "RTC" # query the catalog stac_kwargs = kwargs.copy() stac_kwargs.update({"collection": eval(f"Settings.STAC_BACKEND.{collection}")}) scenes = query_stac(**stac_kwargs) metadata_list = [] for scene in scenes: metadata_dict = scene["properties"] metadata_dict["assets"] = scene["assets"] metadata_dict["sensing_time"] = metadata_dict["datetime"] metadata_dict["sensing_date"] = datetime_to_date(metadata_dict['sensing_time']) del metadata_dict["datetime"] # infer EPSG code of the scene in UTM coordinates from its bounding box bbox = box(*scene["bbox"]) metadata_dict["epsg"] = infer_utm_zone(bbox) metadata_dict["geom"] = bbox # apply filters append_scene = _filter_criteria_fulfilled(metadata_dict, metadata_filters) if append_scene: metadata_list.append(metadata_dict) # create geppandas GeoDataFrame out of scene metadata records return prepare_gdf(metadata_list)
[docs] @prepare_bbox def landsat(metadata_filters: List[Filter], **kwargs) -> gpd.GeoDataFrame: """ Landsat specific STAC query function to retrieve mapper from MSPC. :param metadata_filters: custom filters for filtering scenes in catalog by some metadata attributes :param kwargs: :param kwargs: keyword arguments to pass to `query_stac` function :returns: dataframe with references to found Landsat scenes """ if Settings.STAC_BACKEND != STAC_Providers.MSPC: raise ValueError("This method currently requires Microsoft Planetary Computer") # query the catalog stac_kwargs = kwargs.copy() scenes = query_stac(**stac_kwargs) if len(scenes) == 0: raise ValueError(f'STAC query returned no results! {stac_kwargs}') metadata_list = [] for scene in scenes: metadata_dict = scene['properties'] metadata_dict['assets'] = scene['assets'] metadata_dict["sensing_time"] = metadata_dict["datetime"] metadata_dict["sensing_date"] = datetime_to_date(metadata_dict['sensing_time']) del metadata_dict["datetime"] # apply filters append_scene = _filter_criteria_fulfilled(metadata_dict, metadata_filters) if append_scene: metadata_list.append(metadata_dict) # reconstruct the scene footprint transform = metadata_dict['proj:transform'] shape = metadata_dict['proj:shape'] metadata_dict['geom'] = box_from_transform(transform, shape) metadata_dict['epsg'] = metadata_dict['proj:epsg'] # extract angles metadata_dict['sun_azimuth_angle'] = metadata_dict['view:sun_azimuth'] del metadata_dict['view:sun_azimuth'] metadata_dict['sun_zenith_angle'] = 90. - metadata_dict['view:sun_elevation'] del metadata_dict['view:sun_elevation'] metadata_dict['sensor_zenith_angle'] = metadata_dict['view:off_nadir'] del metadata_dict['view:off_nadir'] if len(metadata_list) == 0: raise ValueError( f'No scenes fulfilling filter criteria: {metadata_filters}') return prepare_gdf(metadata_list)