Source code for eodal.utils.geometry
"""
Vector geometry operations.
Copyright (C) 2022/2023 Lukas Valentin Graf
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
from __future__ import annotations
import geopandas as gpd
import json
import pandas as pd
import rasterio as rio
from copy import deepcopy
from pathlib import Path
from rasterio.mask import raster_geometry_mask
from shapely.geometry import box, Point, Polygon
from eodal.core.utils.geometry import convert_3D_2D
[docs]
def box_to_geojson(gdf: gpd.GeoDataFrame | Polygon) -> str:
"""
Casts the bounding box of a `GeoDataFrame` to GeoJson
:param gdf:
Non-empty `GeoDataFrame` or shapely rectangular Polygon
in geographic coordinates
:returns:
`total_bounds` of gpd in GeoJson notation
"""
# GeoJSON should be in geographic coordinates
if isinstance(gdf, gpd.GeoDataFrame):
_gdf = deepcopy(gdf)
gdf_wgs84 = _gdf.to_crs(epsg=4326)
bbox = gdf_wgs84.total_bounds
bbox_poly = box(*bbox)
elif isinstance(gdf, Polygon) or isinstance(gdf, Point):
bbox_poly = deepcopy(gdf)
bbox_json = gpd.GeoSeries([bbox_poly]).to_json()
return json.loads(bbox_json)["features"][0]["geometry"]
[docs]
def prepare_gdf(
metadata_list: list[dict]
) -> gpd.GeoDataFrame:
"""
Convert list of metadata dictionaries to a GeoDataFrame.
..versionadd:: 0.3.0
:param metadata_list:
list of metadata entries as dictionaries.
:returns:
resulting GeoDataFrame sorted by sensing time.
"""
# return an empty GeoDataFrame if there are no entries
if len(metadata_list) == 0:
return gpd.GeoDataFrame()
df = pd.DataFrame(metadata_list)
df.sort_values(by="sensing_time", inplace=True)
return gpd.GeoDataFrame(df, geometry="geom", crs=df.epsg.unique()[0])
[docs]
def adopt_vector_features_to_mask(
band_df: pd.DataFrame,
vector_features: gpd.GeoDataFrame | gpd.GeoSeries | Path
) -> tuple[gpd.GeoDataFrame, tuple[int, int], int | float]:
"""
Adopt the vector features used for clipping and/or masking data
to the spatial resolution of the band with the coarsest spatial
resolution. This is necessary to avoid artifacts (namely different
spatial extents) in the data caused by the spatial subsetting with
different pixel sizes.
:param band_df:
DataFrame containing the band metadata.
:param vector_features:
vector features to be used for masking.
:returns:
Updated vector features, shape of the resulting mask and
spatial resolution of the band with the coarsest spatial
resolution.
"""
# get lowest spatial resolution
lowest_resolution = band_df["band_resolution"].max()
# get band with lowest spatial resolution
low_res_band = band_df[band_df["band_resolution"] == lowest_resolution].iloc[0]
# get vector feature(s) for spatial subsetting
if isinstance(vector_features, Path):
vector_features_df = gpd.read_file(vector_features)
elif isinstance(vector_features, gpd.GeoDataFrame):
vector_features_df = vector_features.copy()
elif isinstance(vector_features, gpd.GeoSeries):
vector_features_df = gpd.GeoDataFrame(geometry=vector_features.copy())
else:
raise TypeError(
"Geometry must be vector file, GeoSeries or GeoDataFrame"
)
# drop Nones in geometry column
none_idx = vector_features_df[
vector_features_df.geometry == None].index # noqa: E711
vector_features_df.drop(index=none_idx, inplace=True)
with rio.open(low_res_band.band_path, "r") as src:
# convert to raster CRS
raster_crs = src.crs
vector_features_df.to_crs(crs=raster_crs, inplace=True)
# check if the geometry contains the z (3rd) dimension. If yes
# convert it to 2d to avoid an error poping up from rasterio
vector_features_geom = convert_3D_2D(vector_features_df.geometry)
shape_mask, transform, window = raster_geometry_mask(
dataset=src,
shapes=vector_features_geom,
all_touched=True,
crop=True
)
# get upper left coordinates rasterio takes for the band
# with the coarsest spatial resolution
ulx_low_res, uly_low_res = transform.c, transform.f
# reconstruct the lower right corner
llx_low_res = ulx_low_res + window.width * transform.a
lly_low_res = uly_low_res + window.height * transform.e
# overwrite original vector features' bounds in the S2 scene
# geometry of the lowest spatial resolution
low_res_feature_bounds_s2_grid = box(
minx=ulx_low_res,
miny=lly_low_res,
maxx=llx_low_res,
maxy=uly_low_res,
)
# update bounds and pass them on to the kwargs
bounds_df = gpd.GeoDataFrame(
geometry=[low_res_feature_bounds_s2_grid],
)
bounds_df.set_crs(crs=raster_crs, inplace=True)
return bounds_df, shape_mask, lowest_resolution