#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
This module contains functions to extract relevant scene-specific
Sentinel-2 metadata supporting L1C and L2A (sen2core-derived) processing levels
Copyright (C) 2022 Lukas Valentin Graf
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
from __future__ import annotations
import os
import glob
import time
import numpy as np
from datetime import datetime
from xml.dom import minidom
from pyproj import Transformer
from pathlib import Path
import pandas as pd
from typing import Any, Dict, Optional, Tuple
from datetime import date
from eodal.config import get_settings
from eodal.utils.constants.sentinel2 import s2_band_mapping
from eodal.utils.exceptions import UnknownProcessingLevel
from eodal.utils.exceptions import InputError
from eodal.utils.warnings import NothingToDo
logger = get_settings().logger
[docs]
def parse_MTD_DS(in_file: Path) -> Dict[str, Any]:
"""
Parses the MTD_DS.xml located in tghe /DATASTRIP folder
in each .SAFE dataset. The xml contains the noise model parameters
alpha and beta as well as physical gain factors
required to calculate the radiometric uncertainty
of the Level-1C data. The extraction of this data is therefore
optional.
:param in_file:
filepath of the scene metadata xml (MTD_DS.xml)
:return metadata:
dictionary with extracted noise model parameters, alpha
and beta, per spectral band of MSI
"""
# parse the xml file into a minidom object
xmldoc = minidom.parse(str(in_file))
# now, the values of some relevant tags can be extracted:
metadata = dict()
band_names = list(s2_band_mapping.keys())
datatakeIdentifier_xml = xmldoc.getElementsByTagName("Datatake_Info")
element = datatakeIdentifier_xml[0]
datatakeIdentifier = element.getAttribute("datatakeIdentifier")
metadata["datatakeidentifier"] = datatakeIdentifier
# extract noise model parameters alpha and beta for all bands
alpha_values = xmldoc.getElementsByTagName("ALPHA")
beta_values = xmldoc.getElementsByTagName("BETA")
# loop over bands and store values of alpha and beta
for idx, elem in enumerate(zip(alpha_values, beta_values)):
alpha = float(elem[0].firstChild.nodeValue)
beta = float(elem[1].firstChild.nodeValue)
metadata[f"alpha_{band_names[idx]}"] = alpha
metadata[f"beta_{band_names[idx]}"] = beta
# extract physical gans of the single spectral bands
physical_gains = xmldoc.getElementsByTagName("PHYSICAL_GAINS")
for idx, elem in enumerate(physical_gains):
physical_gain = float(elem.firstChild.nodeValue)
metadata[f"physical_gain_{band_names[idx]}"] = physical_gain
return metadata
[docs]
def parse_MTD_TL(in_file: Path) -> Dict[str, Any]:
"""
Parses the MTD_TL.xml metadata file provided by ESA.This metadata
XML is usually placed in the GRANULE subfolder of a ESA-derived
S2 product and named 'MTD_TL.xml'.
The 'MTD_TL.xml' is available for both processing levels (i.e.,
L1C and L2A). The function is able to handle both processing
sources and returns some entries available in L2A processing level,
only, as None type objects.
The function extracts the most important metadata from the XML and
returns a dict with those extracted entries.
:param in_file:
filepath of the scene metadata xml 8MTD_TL.xml)
:return metadata:
dict with extracted metadata entries
"""
# parse the xml file into a minidom object
xmldoc = minidom.parse(in_file)
# now, the values of some relevant tags can be extracted:
metadata = dict()
# get tile ID of L2A product and its corresponding L1C counterpart
tile_id_xml = xmldoc.getElementsByTagName("TILE_ID")
# adaption to older Sen2Cor version
check_l1c = True
if len(tile_id_xml) == 0:
tile_id_xml = xmldoc.getElementsByTagName("TILE_ID_2A")
check_l1c = False
tile_id = tile_id_xml[0].firstChild.nodeValue
scene_id = tile_id.split(".")[0]
metadata["SCENE_ID"] = scene_id
# check if the scene is L1C or L2A
is_l1c = False
if check_l1c:
try:
l1c_tile_id_xml = xmldoc.getElementsByTagName("L1C_TILE_ID")
l1c_tile_id = l1c_tile_id_xml[0].firstChild.nodeValue
l1c_tile_id = l1c_tile_id.split(".")[0]
metadata["L1C_TILE_ID"] = l1c_tile_id
except Exception:
logger.info(f"{scene_id} is L1C processing level")
is_l1c = True
# sensing time (acquisition time)
sensing_time_xml = xmldoc.getElementsByTagName("SENSING_TIME")
sensing_time = sensing_time_xml[0].firstChild.nodeValue
metadata["SENSING_TIME"] = sensing_time
metadata["SENSING_DATE"] = datetime.strptime(
sensing_time.split("T")[0], "%Y-%m-%d"
).date()
# number of rows and columns for each resolution -> 10, 20, 60 meters
nrows_xml = xmldoc.getElementsByTagName("NROWS")
ncols_xml = xmldoc.getElementsByTagName("NCOLS")
resolutions = ["_10m", "_20m", "_60m"]
# order: 10, 20, 60 meters spatial resolution
for ii in range(3):
nrows = nrows_xml[ii].firstChild.nodeValue
ncols = ncols_xml[ii].firstChild.nodeValue
metadata["NROWS" + resolutions[ii]] = int(nrows)
metadata["NCOLS" + resolutions[ii]] = int(ncols)
# EPSG-code
epsg_xml = xmldoc.getElementsByTagName("HORIZONTAL_CS_CODE")
epsg = epsg_xml[0].firstChild.nodeValue
metadata["EPSG"] = int(epsg.split(":")[1])
# Upper Left Corner coordinates -> is the same for all three resolutions
ulx_xml = xmldoc.getElementsByTagName("ULX")
uly_xml = xmldoc.getElementsByTagName("ULY")
ulx = ulx_xml[0].firstChild.nodeValue
uly = uly_xml[0].firstChild.nodeValue
metadata["ULX"] = float(ulx)
metadata["ULY"] = float(uly)
# endfor
# extract the mean zenith and azimuth angles
# the sun angles come first followed by the mean angles per band
zenith_angles = xmldoc.getElementsByTagName("ZENITH_ANGLE")
metadata["SUN_ZENITH_ANGLE"] = float(zenith_angles[0].firstChild.nodeValue)
azimuth_angles = xmldoc.getElementsByTagName("AZIMUTH_ANGLE")
metadata["SUN_AZIMUTH_ANGLE"] = float(azimuth_angles[0].firstChild.nodeValue)
# get the mean zenith and azimuth angle over all bands
sensor_zenith_angles = [float(x.firstChild.nodeValue) for x in zenith_angles[1::]]
metadata["SENSOR_ZENITH_ANGLE"] = np.mean(np.asarray(sensor_zenith_angles))
sensor_azimuth_angles = [float(x.firstChild.nodeValue) for x in azimuth_angles[1::]]
metadata["SENSOR_AZIMUTH_ANGLE"] = np.mean(np.asarray(sensor_azimuth_angles))
# extract scene relevant data about nodata values, cloud coverage, etc.
cloudy_xml = xmldoc.getElementsByTagName("CLOUDY_PIXEL_PERCENTAGE")
cloudy = cloudy_xml[0].firstChild.nodeValue
metadata["CLOUDY_PIXEL_PERCENTAGE"] = float(cloudy)
degraded_xml = xmldoc.getElementsByTagName("DEGRADED_MSI_DATA_PERCENTAGE")
degraded = degraded_xml[0].firstChild.nodeValue
metadata["DEGRADED_MSI_DATA_PERCENTAGE"] = float(degraded)
# the other tags are available in L2A processing level, only
if not is_l1c:
nodata_xml = xmldoc.getElementsByTagName("NODATA_PIXEL_PERCENTAGE")
nodata = nodata_xml[0].firstChild.nodeValue
metadata["NODATA_PIXEL_PERCENTAGE"] = float(nodata)
darkfeatures_xml = xmldoc.getElementsByTagName("DARK_FEATURES_PERCENTAGE")
darkfeatures = darkfeatures_xml[0].firstChild.nodeValue
metadata["DARK_FEATURES_PERCENTAGE"] = float(darkfeatures)
cs_xml = xmldoc.getElementsByTagName("CLOUD_SHADOW_PERCENTAGE")
cs = cs_xml[0].firstChild.nodeValue
metadata["CLOUD_SHADOW_PERCENTAGE"] = float(cs)
veg_xml = xmldoc.getElementsByTagName("VEGETATION_PERCENTAGE")
veg = veg_xml[0].firstChild.nodeValue
metadata["VEGETATION_PERCENTAGE"] = float(veg)
noveg_xml = xmldoc.getElementsByTagName("NOT_VEGETATED_PERCENTAGE")
noveg = noveg_xml[0].firstChild.nodeValue
metadata["NOT_VEGETATED_PERCENTAGE"] = float(noveg)
water_xml = xmldoc.getElementsByTagName("WATER_PERCENTAGE")
water = water_xml[0].firstChild.nodeValue
metadata["WATER_PERCENTAGE"] = float(water)
unclass_xml = xmldoc.getElementsByTagName("UNCLASSIFIED_PERCENTAGE")
unclass = unclass_xml[0].firstChild.nodeValue
metadata["UNCLASSIFIED_PERCENTAGE"] = float(unclass)
cproba_xml = xmldoc.getElementsByTagName("MEDIUM_PROBA_CLOUDS_PERCENTAGE")
cproba = cproba_xml[0].firstChild.nodeValue
metadata["MEDIUM_PROBA_CLOUDS_PERCENTAGE"] = float(cproba)
hcproba_xml = xmldoc.getElementsByTagName("HIGH_PROBA_CLOUDS_PERCENTAGE")
hcproba = hcproba_xml[0].firstChild.nodeValue
metadata["HIGH_PROBA_CLOUDS_PERCENTAGE"] = float(hcproba)
thcirrus_xml = xmldoc.getElementsByTagName("THIN_CIRRUS_PERCENTAGE")
thcirrus = thcirrus_xml[0].firstChild.nodeValue
metadata["THIN_CIRRUS_PERCENTAGE"] = float(thcirrus)
snowice_xml = xmldoc.getElementsByTagName("SNOW_ICE_PERCENTAGE")
snowice = snowice_xml[0].firstChild.nodeValue
metadata["SNOW_ICE_PERCENTAGE"] = float(snowice)
# calculate the scene footprint in geographic coordinates
metadata["geom"] = get_scene_footprint(sensor_data=metadata)
return metadata
[docs]
def parse_MTD_MSI(in_file: str) -> Dict[str, Any]:
"""
parses the MTD_MSIL1C or MTD_MSIL2A metadata file that is delivered with
ESA Sentinel-2 L1C and L2A products, respectively.
The file is usually placed directly in the .SAFE root folder of an
unzipped Sentinel-2 L1C or L2A scene.
The extracted metadata is returned as a dict.
:param in_file:
filepath of the scene metadata xml (MTD_MSI2A.xml or MTD_MSIL1C.xml)
"""
# parse the xml file into a minidom object
xmldoc = minidom.parse(in_file)
# check the version of the xml. Unfortunately, different sen2cor version
# also produced slightly different metadata xmls
if xmldoc.getElementsByTagName("L2A_Product_Info"):
tag_list = ["PRODUCT_URI_2A"]
else:
tag_list = ["PRODUCT_URI"]
# datatake identifier
datatakeIdentifier_xml = xmldoc.getElementsByTagName("Datatake")
element = datatakeIdentifier_xml[0]
datatakeIdentifier = element.getAttribute("datatakeIdentifier")
# define further tags to extract
tag_list.extend(
[
"PROCESSING_LEVEL",
"SENSING_ORBIT_NUMBER",
"SPACECRAFT_NAME",
"SENSING_ORBIT_DIRECTION",
]
)
metadata = dict.fromkeys(tag_list)
for tag in tag_list:
xml_elem = xmldoc.getElementsByTagName(tag)
if tag == "PRODUCT_URI_2A":
metadata["PRODUCT_URI"] = xml_elem[0].firstChild.data
metadata.pop("PRODUCT_URI_2A")
else:
metadata[tag] = xml_elem[0].firstChild.data
metadata["datatakeIdentifier"] = datatakeIdentifier
# extract PDGS baseline
metadata["pdgs_baseline"] = metadata["PRODUCT_URI"].split("_")[3]
# stupid Sen2Cor is not consistent here ...
if metadata["PROCESSING_LEVEL"] == "Level-2Ap":
metadata["PROCESSING_LEVEL"] = "Level-2A"
# reflectance conversion factor (U)
reflectance_conversion_xml = xmldoc.getElementsByTagName("U")
reflectance_conversion = float(reflectance_conversion_xml[0].firstChild.nodeValue)
metadata["reflectance_conversion"] = reflectance_conversion
# extract solar irradiance for the single bands
bands = [
"B01",
"B02",
"B03",
"B04",
"B05",
"B06",
"B07",
"B08",
"B8A",
"B09",
"B10",
"B11",
"B12",
]
sol_irrad_xml = xmldoc.getElementsByTagName("SOLAR_IRRADIANCE")
for idx, band in enumerate(bands):
metadata[f"SOLAR_IRRADIANCE_{band}"] = float(
sol_irrad_xml[idx].firstChild.nodeValue
)
# S2 tile
metadata["TILE_ID"] = metadata["PRODUCT_URI"].split("_")[5]
return metadata
[docs]
def loop_s2_archive(
in_dir: Path,
extract_datastrip: Optional[bool] = False,
get_newest_datasets: Optional[bool] = False,
last_execution_date: Optional[date] = None,
) -> Tuple[pd.DataFrame]:
"""
wrapper function to loop over an entire archive (i.e., collection) of
Sentinel-2 mapper in either L1C or L2A processing level or a mixture
thereof.
The function returns a pandas dataframe for all found entries in the
archive (i.e., directory). Each row in the dataframe denotes one scene.
:param in_dir:
directory containing the Sentinel-2 data (L1C and/or L2A
processing level). Sentinel-2 mapper are assumed to follow ESA's
.SAFE naming convention and structure
:param extract_datastrip:
If True reads also metadata from the datastrip xml file
(MTD_DS.xml)
:param get_newest_datasets:
if set to True only datasets newer than a user-defined time stamp
will be considered for ingestion into the database. This is particularly
useful for updating the database after new mapper have been downloaded
or processed.
:param last_execution_date:
if get_newest_datasets is True this variable needs to be set. All
datasets younger than that date will be considered for ingestion
into the database.
:return:
dataframe with metadata of all mapper handled by the function
call
"""
# check inputs if only latest datasets shall be considered
if get_newest_datasets:
if last_execution_date is None:
raise InputError(
"A timestamp must be provided when the only newest " +
"datasets shall be considered"
)
# search for .SAFE subdirectories identifying the single mapper
# some data providers, however, do not name their products following the
# ESA convention (.SAFE is missing)
s2_scenes = glob.glob(str(in_dir.joinpath("*.SAFE")))
n_scenes = len(s2_scenes)
if n_scenes == 0:
s2_scenes = [f for f in in_dir.iterdir() if f.is_dir()]
n_scenes = len(s2_scenes)
if n_scenes == 0:
raise UnknownProcessingLevel("No Sentinel-2 mapper were found")
# if only mapper after a specific timestamp shall be considered drop
# those from the list which are "too old"
if get_newest_datasets:
filtered_scenes = []
# convert date to Unix timestamp
last_execution = time.mktime(last_execution_date.timetuple())
for s2_scene in s2_scenes:
s2_scene_path = Path(s2_scene)
if s2_scene_path.stat().st_ctime >= last_execution:
filtered_scenes.append(s2_scene)
s2_scenes = filtered_scenes
if len(s2_scenes) == 0:
raise NothingToDo(
'No mapper younger than ' +
f'{datetime.strftime(last_execution_date, "%Y-%m-%d")} found'
)
# loop over the mapper
metadata_scenes = []
ql_ds_scenes = []
error_file = open(in_dir.joinpath("errored_datasets.txt"), "w+")
for idx, s2_scene in enumerate(s2_scenes):
logger.info(
f"Extracting metadata of {os.path.basename(s2_scene)} ({idx+1}/{n_scenes})"
)
try:
mtd_scene, mtd_ds_scene = parse_s2_scene_metadata(
in_dir=Path(s2_scene), extract_datastrip=extract_datastrip
)
except Exception as e:
error_file.write(Path(s2_scene).name)
error_file.flush()
logger.error(f"Extraction of metadata failed {s2_scene}: {e}")
continue
metadata_scenes.append(mtd_scene)
ql_ds_scenes.append(mtd_ds_scene)
# convert to pandas dataframe and return
return (
pd.DataFrame.from_dict(metadata_scenes),
pd.DataFrame.from_dict(ql_ds_scenes),
)