Source code for eodal.metadata.sentinel1.parsing
"""
This module contains functions to extract relevant scene-specific
Sentinel-1 metadata
Copyright (C) 2022 Gregor Perich
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
from __future__ import annotations
import glob
import numpy as np
import os
import pandas as pd
import time
from datetime import date, datetime
from pathlib import Path
from shapely.geometry import Polygon
from shapely.geometry.polygon import LinearRing
from typing import Dict, Optional
from xml.dom import minidom
from eodal.config import get_settings
from eodal.utils.exceptions import DataNotFoundError, InputError
Settings = get_settings()
logger = Settings.logger
[docs]
def parse_s1_metadata(in_dir: Path) -> Dict:
"""
Parses the metadata found in the "manifest.safe" document of S1_IW_GRDH Level-1
products and writes them into a Python dictionary
:param in_dir:
file-path of the file you want to extract the metadata from
:returns:
Dictionary containing the metadata of the passed S1 scene ready for DB ingestion
"""
in_file = in_dir.joinpath("manifest.safe").as_posix()
# parse Document Object Model (DOM) file from xml
domfile = minidom.parse(in_file)
metadata = dict()
# extract uid from SAFE filename
safe_file = in_dir.name
uid_str = safe_file.split(".")[0]
metadata["scene_id"] = uid_str
metadata["product_uri"] = safe_file
# =============== variables to fill from the xml =======================
# spacecraft_name
for elem in domfile.getElementsByTagName("safe:platform"):
s1_name = elem.getElementsByTagName("safe:familyName")[0].firstChild.nodeValue
s1_num = elem.getElementsByTagName("safe:number")[0].firstChild.nodeValue
metadata["spacecraft_name"] = s1_name + s1_num
# sensing_orbit_number
for elem in domfile.getElementsByTagName("safe:orbitNumber"):
if elem.getAttributeNode("type").nodeValue == "start":
start_orbit = elem.firstChild.nodeValue
if elem.getAttributeNode("type").nodeValue == "stop":
stop_orbit = elem.firstChild.nodeValue
metadata["sensing_orbit_start"] = int(start_orbit)
metadata["sensing_orbit_stop"] = int(stop_orbit)
# relative_orbit_number
for elem in domfile.getElementsByTagName("safe:relativeOrbitNumber"):
if elem.getAttributeNode("type").nodeValue == "start":
start_orbit = elem.firstChild.nodeValue
if elem.getAttributeNode("type").nodeValue == "stop":
stop_orbit = elem.firstChild.nodeValue
metadata["relative_orbit_start"] = int(start_orbit)
metadata["relative_orbit_stop"] = int(stop_orbit)
# sensing_orbit_direction
for elem in domfile.getElementsByTagName("s1:pass"):
direction = elem.firstChild.nodeValue
metadata["sensing_orbit_direction"] = str(direction)
# sensing_time & sensing_date
for elem in domfile.getElementsByTagName("safe:startTime"):
start_time = elem.firstChild.nodeValue
metadata["sensing_time"] = datetime.strptime(start_time, "%Y-%m-%dT%H:%M:%S.%f")
metadata["sensing_date"] = datetime.strptime(
start_time.split("T")[0], "%Y-%m-%d"
).date()
# instrument_mode
for elem in domfile.getElementsByTagName("s1sarl1:mode"):
instrument_mode = elem.firstChild.nodeValue
metadata["instrument_mode"] = instrument_mode
# product_type
for elem in domfile.getElementsByTagName("s1sarl1:productType"):
product_type = elem.firstChild.nodeValue
metadata["product_type"] = product_type
# product_class
for elem in domfile.getElementsByTagName("s1sarl1:productClass"):
product_class = elem.firstChild.nodeValue
metadata["product_class"] = product_class
# processing_software_name & version
for elem in domfile.getElementsByTagName("safe:software"):
processing_software_name = elem.getAttributeNode("name").nodeValue
processing_software_version = elem.getAttributeNode("version").nodeValue
metadata["processing_software_name"] = processing_software_name
metadata["processing_software_version"] = processing_software_version
# mission_data_take_id
for elem in domfile.getElementsByTagName("s1sarl1:missionDataTakeID"):
mission_data_take_id = elem.firstChild.nodeValue
metadata["mission_data_take_id"] = int(mission_data_take_id)
# scene footprint
metadata["geom"] = extract_s1_footprint(in_dir=in_dir)
# add storage information
metadata["storage_device_ip"] = ""
metadata["storage_device_ip_alias"] = ""
metadata["storage_share"] = ""
return metadata
[docs]
def extract_s1_footprint(in_dir: Path, use_gml: Optional[bool] = True) -> str:
"""
Extract the Footprint of the S1 scene from the metadata.safe document
:param in_dir:
Filepath to the S1 raw data .SAFE folder
:param use_gml:
Should the GML coordinates (from the manifest.safe) be used, or the KML
coordinates (from the ./preview/map-overlay.kml)
:returns:
Well-known-text (WKT) of the S1 mapper' footprint in geographic coordinates
(WGS84, EPSG:4326).
"""
in_file = in_dir.joinpath("manifest.safe").as_posix()
# parse Document Object Model (DOM) file from xml
domfile = minidom.parse(in_file)
if use_gml:
# get gml coordinates from manifest.safe file
for elem in domfile.getElementsByTagName("gml:coordinates"):
gml_coords = elem.firstChild.nodeValue
# descrption of gml coords from the S1 product specification document states:
# "lon,lat of near and far range at start and stop time of the image"
gml_list = gml_coords.split(" ")
coord_tuples = [tuple(x.split(",") for x in gml_list)]
coord_tuples = np.float32(coord_tuples[0])
# These coords are NOT in lon/lat, but rather in lat/long -> invert coordinates
invert_coords = []
for x in coord_tuples:
invert_coords.append(tuple([x[1], x[0]]))
invert_coords = np.float32(invert_coords)
invert_poly = Polygon(LinearRing(invert_coords))
wkt = invert_poly.wkt
else:
# read from KML file
kml_file = in_dir.joinpath("preview").joinpath("map-overlay.kml").as_posix()
dom_kml = minidom.parse(kml_file)
for elem in dom_kml.getElementsByTagName("coordinates"):
kml_coords = elem.firstChild.nodeValue
kml_coords = kml_coords.split(" ")
kml_coords = [tuple(x.split(",") for x in kml_coords)]
kml_coords = np.float32(kml_coords[0])
kml_poly = Polygon(LinearRing(kml_coords))
wkt = kml_poly.wkt
out_wkt = "SRID=4326;"
out_wkt += wkt
return out_wkt
[docs]
def loop_s1_archive(
in_dir: Path,
get_newest_datasets: Optional[bool] = False,
last_execution_date: Optional[date] = None,
) -> pd.DataFrame:
"""
wrapper function to loop over an entire archive (i.e., collection) of
Sentinel-2 mapper in either L1C or L2A processing level or a mixture
thereof.
The function returns a pandas dataframe for all found entries in the
archive (i.e., directory). Each row in the dataframe denotes one scene.
:param in_dir:
directory containing the Sentinel-2 data (L1C and/or L2A
processing level). Sentinel-2 mapper are assumed to follow ESA's
.SAFE naming convention and structure
:param extract_datastrip:
If True reads also metadata from the datastrip xml file
(MTD_DS.xml)
:param get_newest_datasets:
if set to True only datasets newer than a user-defined time stamp
will be considered for ingestion into the database. This is particularly
useful for updating the database after new mapper have been downloaded
or processed.
:param last_execution_date:
if get_newest_datasets is True this variable needs to be set. All
datasets younger than that date will be considered for ingestion
into the database.
:return:
dataframe with metadata of all mapper handled by the function
call
"""
# check inputs if only latest datasets shall be considered
if get_newest_datasets:
if last_execution_date is None:
raise InputError(
"A timestamp must be provided when the only newest datasets "
"shall be considered"
)
# search for .SAFE subdirectories identifying the single mapper
# some data providers, however, do not name their products following the
# ESA convention (.SAFE is missing)
s1_scenes = glob.glob(str(in_dir.joinpath("*.SAFE")))
n_scenes = len(s1_scenes)
if n_scenes == 0:
raise DataNotFoundError(f"No .SAFE mapper found in {in_dir}")
# if only mapper after a specific timestamp shall be considered drop
# those from the list which are "too old"
if get_newest_datasets:
filtered_scenes = []
# convert date to Unix timestamp
last_execution = time.mktime(last_execution_date.timetuple())
for s1_scene in s1_scenes:
s1_scene_path = Path(s1_scene)
if s1_scene_path.stat().st_ctime >= last_execution:
filtered_scenes.append(s1_scene)
s1_scenes = filtered_scenes
if len(s1_scenes) == 0:
raise DataNotFoundError(
'No mapper younger than ' +
f'{datetime.strftime(last_execution_date, "%Y-%m-%d")} found'
)
# loop over the mapper
metadata_scenes = []
error_file = open(in_dir.joinpath("errored_datasets.txt"), "w+")
for idx, s1_scene in enumerate(s1_scenes):
logger.info(
f"Extracting metadata of {os.path.basename(s1_scene)} ({idx+1}/{n_scenes})"
)
try:
mtd_scene = parse_s1_metadata(in_dir=Path(s1_scene))
except Exception as e:
error_file.write(Path(s1_scene).name)
error_file.flush()
logger.error(f"Extraction of metadata failed {s1_scene}: {e}")
continue
metadata_scenes.append(mtd_scene)
# convert to pandas dataframe and return
return pd.DataFrame(metadata_scenes)