Source code for eodal.downloader.utils.creodias

"""
Utility functions for downloading data from CREODIAS

Copyright (C) 2022 Lukas Valentin Graf

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program.  If not, see <http://www.gnu.org/licenses/>.
"""
from __future__ import annotations

import os
import pandas as pd
import pyotp
import requests
import shutil
import time

from pathlib import Path
from typing import Optional, Union
from eodal.config import get_settings

Settings = get_settings()
logger = Settings.logger

# fixed chunk size for downloading data
CHUNK_SIZE = 2096


[docs] def get_totp() -> str: """ Get one-time access code using the TOTP algorithm to login into CREODIAS. The TOTP secret must have been set in the EOdal configurations using the `CREODIAS_TOTP_SECRET` variable. :return: one-time access key valid for a maximum of 30 seconds. """ return pyotp.TOTP(Settings.CREODIAS_TOTP_SECRET).now()
def _get_keycloak(username: str, password: str) -> str: """ Gets the keycloak token required by CREODIAS to sign requests. It uses the CREODIAS username, password and one-time access code (2FA) to authenticate at the Cloudferro Openid connector. :param username: CREODIAS username :param password: CREODIAS password :return: keycloak access token """ # get one-time access code totp = get_totp() # prepare data for HTTP POST request data = { "client_id": "CLOUDFERRO_PUBLIC", "username": username, "password": password, "totp": int(totp), "grant_type": "password", } # request the token r = requests.post( "https://identity.cloudferro.com/auth/realms/Creodias-new/protocol/openid-connect/token", data=data, ) r.raise_for_status() return r.json()["access_token"]
[docs] def get_keycloak() -> str: """ Gets the keycloak token required by CREODIAS to sign requests. It uses the CREODIAS username, password and one-time access code (2FA) to authenticate at the Cloudferro Openid connector. NOTE: A one-time access code is valid for 30 seconds. It can be only used once. This means, if one makes multiple requests for a keycloak token within <= 30 seconds, there will be a server error as the one-time access code has been consumed already. To make this behavior more stable, the application waits 31 seconds on error to request a new one-time access code (recursive call). :return: keycloak access token """ # set user name and password from EOdal configuration username = Settings.CREODIAS_USER password = Settings.CREODIAS_PASSWORD try: return _get_keycloak(username, password) except Exception: # on error, sleep 31 seconds and then try again time.sleep(31) return get_keycloak()
[docs] def download_datasets( datasets: pd.DataFrame, download_dir: Union[Path, str], overwrite_existing_zips: Optional[bool] = False, ) -> None: """ Function for actual dataset download from CREODIAS. Requires valid CREODIAS username and password (to be specified in the BaseSettings) :param datasets: dataframe with results of CREODIAS Finder API request made by `query_creodias` function :param download_dir: directory where to store the downloaded files :param overwrite_existing_zips: if set to False (default), existing zip files in the ``download_dir`` are not overwritten. This feature can be useful to restart the downloader after a network connection timeout or similar. NOTE: Thhe function does not check if the existing zips are complete! """ # change into download directory os.chdir(str(download_dir)) # loop over datasets to download them sequentially scene_counter = 1 for _, dataset in datasets.iterrows(): try: # get API token from CREODIAS (only valid for a limited time) keycloak_token = get_keycloak() dataset_temp_url = dataset.properties["services"]["download"]["url"].split('/')[-1] dataset_url = f'{Settings.CREODIAS_ZIPPER_URL}/{dataset_temp_url}' response = requests.get( dataset_url, headers={"Authorization": f"Bearer {keycloak_token}"}, stream=True, ) response.raise_for_status() except Exception as e: logger.error(f"Could not download {dataset_url}: {e}") continue # download the data using the iter_content method (writes chunks to disk) # check if the dataset exists already and overwrite it only if # defined by the user fname = dataset.dataset_name.replace("SAFE", "zip") if not fname.endswith("zip"): fname = fname + ".zip" if Path(fname).exists(): if not overwrite_existing_zips: logger.info( f"{dataset.dataset_name} already downloaded - " + "continue with next dataset" ) scene_counter += 1 continue else: logger.warning(f"Overwriting {dataset.dataset_name}") logger.info( f"Starting downloading {fname} ({scene_counter}/{datasets.shape[0]})" ) try: with open(fname, "wb") as fd: for chunk in response.iter_content(chunk_size=CHUNK_SIZE): fd.write(chunk) logger.info( f"Finished downloading {fname} ({scene_counter}/{datasets.shape[0]})" ) except Exception as e: # on error (e.g., broken network connection) delete the incomplete # zip archive and continue. shutil.rmtree(fname) logger.error( f'Downloading {fname} ({scene_counter}/{datasets.shape[0]}) ' + f'was interrupted.\n{e}\nRemoved broken zip.\n' + 'Consider re-running the download.') scene_counter += 1
# # unit test (requires credentials for CREODIAS) # if __name__ == '__main__': # # from datetime import date # from eodal.downloader.sentinel2.creodias import query_creodias # from eodal.utils.constants.sentinel2 import ProcessingLevels # from shapely.geometry import box # # bbox = box(*[8, 49, 9, 50]) # start_date = date(2017, 10, 5) # end_date = date(2017, 10, 31) # max_records = 100 # processing_level = ProcessingLevels.L2A # datasets = query_creodias( # start_date=start_date, # end_date=end_date, # max_records=max_records, # processing_level=processing_level, # bounding_box=bbox # ) # # download_dir = Path('/mnt/ides/Lukas/03_Debug/SAT/') # download_datasets(datasets=datasets, download_dir=download_dir, overwrite_existing_zips=False) # # from eodal.downloader.utils.unzip_datasets import unzip_datasets # # unzip_datasets(download_dir=download_dir, platform='S2', remove_zips=True)