Source code for sand.copernicus_dataspace

from datetime import datetime, date, time
from requests.utils import requote_uri
from urllib.parse import urlencode
from pathlib import Path
from typing import Optional
from typing import Literal

import re
import fnmatch
import requests

from sand.tinyfunc import _parse_geometry, change_lon_convention
from sand.base import raise_api_error, BaseDownload
from sand.results import Query

from core import log
from core.files import filegen
from core.static import interface
from core.download import get_auth
from core.table import select_cell, select
from core.geo.product_name import get_pattern, get_level


[docs] class DownloadCDSE(BaseDownload): name = 'DownloadCDSE' def __init__(self, collection: str = None, level: int = 1): """ Python interface to the Copernicus Data Space (https://dataspace.copernicus.eu/) Args: collection (str): collection name ('SENTINEL-2-MSI', 'SENTINEL-3-OLCI', etc.) Example: cds = DownloadCDS('SENTINEL-2-MSI') # retrieve the list of products # using a pickle cache file to avoid reconnection ls = cache_dataframe('query-S2.pickle')(cds.query)( dtstart=datetime(2024, 1, 1), dtend=datetime(2024, 2, 1), geo=Point(119.514442, -8.411750), name_contains=['_MSIL1C_'], ) cds.download(ls.iloc[0], <dirname>, uncompress=True) """ self.api = 'OData' self.provider = 'cdse' super().__init__(collection, level) def _login(self): """ Login to copernicus dataspace with credentials storted in .netrc """ auth = get_auth("dataspace.copernicus.eu") data = { "client_id": "cdse-public", "username": auth['user'], "password": auth['password'], "grant_type": "password", } try: url = "https://identity.dataspace.copernicus.eu/auth/realms/CDSE/protocol/openid-connect/token" r = requests.post(url, data=data) r.raise_for_status() except Exception: raise Exception( f"Keycloak token creation failed. Reponse from the server was: {r.json()}" ) self.tokens = r.json()["access_token"] log.info('Log to API (https://dataspace.copernicus.eu/)') @interface def change_api(self, api_name: Literal['OData', 'OpenSearch']): """ To change backend Copernicus API """ assert api_name in ['OData', 'OpenSearch'] log.debug(f'Move from {self.api} API to {api_name} API') self.api = api_name @interface def query( self, dtstart: Optional[date|datetime] = None, dtend: Optional[date|datetime] = None, geo = None, cloudcover_thres: Optional[int] = None, name_contains: Optional[list] = [], name_startswith: Optional[str] = None, name_endswith: Optional[str] = None, name_glob: Optional[str] = None, use_most_recent: bool = True, other_attrs: Optional[list] = None, **kwargs ): """ Product query on the Copernicus Data Space Args: dtstart and dtend (datetime): start and stop datetimes geo: shapely geometry. Examples: Point(lon, lat) Polygon(...) cloudcover_thres: Optional[int]=None, name_contains (list): list of substrings name_startswith (str): search for name starting with this str name_endswith (str): search for name ending with this str name_glob (str): match name with this string use_most_recent (bool): keep only the most recent processing baseline version other_attrs (list): list of other attributes to include in the output (ex: ['ContentDate', 'Footprint']) Note: This method can be decorated by cache_dataframe for storing the outputs. Example: cache_dataframe('cache_result.pickle')(cds.query)(...) """ # https://documentation.dataspace.copernicus.eu/APIs/OData.html#query-by-name dtstart, dtend, geo = self._format_input_query(dtstart, dtend, geo) if geo: geo = change_lon_convention(geo) # Add provider constraint name_contains = self._complete_name_contains(name_contains) log.debug(f'Query {self.api} API') params = (dtstart, dtend, geo, name_glob, name_contains, name_startswith, name_endswith, cloudcover_thres) if self.api == 'OpenSearch': response = self._query_opensearch(*params) elif self.api == 'OData': response = self._query_odata(*params) else: log.error(f'Invalid API, got {self.api}', e=ValueError) # test if maximum number of returns is reached if len(response) >= 1000: log.error('The request led to the maximum number of results ' f'({len(response)})', e=ValueError) else: log.info(f'{len(response)} products has been found') if use_most_recent and (self.api_collection == 'SENTINEL-2'): # remove duplicate products, take only the most recent one mp = {} # maps a single_id to a list of lines for args in response: # build a common identifier for multiple versions s = args['Name'].split('_') ident = '_'.join([s[i] for i in [0, 1, 2, 4, 5]]) if ident in mp: mp[ident].append(args) else: mp[ident] = [args] # for each identifier, sort the corresponding lines by "Name" # and select the last one json_value = [sorted(lines, key=lambda line: line['Name'])[-1] for lines in mp.values()] else: json_value = response out = [{"id": d["Id"], "name": d["Name"], **{k: d[k] for k in (other_attrs or [])}} for d in json_value if ((not name_glob) or fnmatch.fnmatch(d["Name"], name_glob)) ] log.info(f'{len(out)} products has been found') return Query(out) @interface def download(self, product: dict, dir: Path|str, if_exists: str='skip', uncompress: bool=True) -> Path: """ Download a product from copernicus data space Args: product (dict): product definition with keys 'id' and 'name' dir (Path | str): Directory where to store downloaded file. uncompress (bool, optional): If True, uncompress file if needed. Defaults to True. """ target = Path(dir)/(product['name']) url = ("https://catalogue.dataspace.copernicus.eu/odata/v1/" f"Products({product['id']})/$value") filegen(0, if_exists=if_exists, uncompress='.zip')(self._download)(target, url) log.info(f'Product has been downloaded at : {target}') return target def _download( self, target: Path, url: str, ): """ Wrapped by filegen """ status = False exp_timeout_cnt = 1 while not status: if exp_timeout_cnt >= 16: log.error("The server error bypass method failed", e=RuntimeError) try: # Initialize session for download self.session.headers.update({'Authorization': f'Bearer {self.tokens}'}) # Try to request server niter = 0 response = self.session.get(url, allow_redirects=False) log.debug(f'Requesting server for {target.name}') while response.status_code in (301, 302, 303, 307) and niter < 5: log.debug(f'Download content [Try {niter+1}/5]') if 'Location' not in response.headers: raise ValueError(f'status code : [{response.status_code}]') url = response.headers['Location'] # response = self.session.get(url, allow_redirects=False) response = self.session.get(url, verify=True, allow_redirects=True) niter += 1 raise_api_error(response) status = True except Exception as e: # exponential wait when ecountering errors log.warning(log.rgb.red, str(e)) exp_timeout_cnt *= 2 self.session.close() log.warning(log.rgb.red, f"Waiting {exp_timeout_cnt}min ...") time.sleep(60 * exp_timeout_cnt) self.session = requests.Session() # end while # Download file log.debug('Start writing on device') filesize = int(response.headers["Content-Length"]) pbar = log.pbar(log.lvl.INFO, total=filesize, unit_scale=True, unit="B", desc='writing', unit_divisor=1024, leave=False) with open(target, 'wb') as f: for chunk in response.iter_content(chunk_size=1024): if chunk: f.write(chunk) pbar.update(1024)
[docs] def download_file(self, product_id: str, dir: Path | str) -> Path: """ Download product knowing is product id (ex: S2A_MSIL1C_20190305T050701_N0207_R019_T44QLH_20190305T103028) """ p = get_pattern(product_id) self.__init__(p['Name'], get_level(product_id, p)) ls = self.query(name_contains=[product_id]) assert len(ls) == 1, 'Multiple products found' return self.download(ls.iloc[0], dir)
@interface def quicklook(self, product: dict, dir: Path|str): """ Download a quicklook to `dir` """ target = Path(dir)/(product['name'] + '.jpeg') if not target.exists(): assets = self.metadata(product)['Assets'] if not assets: raise FileNotFoundError(f'Skipping quicklook {target.name}') url = assets[0]['DownloadLink'] filegen(0, if_exists='skip')(self._download)(target, url) log.info(f'Quicklook has been downloaded at : {target}') return target @interface def metadata(self, product): """ Returns the product metadata including attributes and assets """ req = ("https://catalogue.dataspace.copernicus.eu/odata/v1/Products?$filter=Id" f" eq '{product['id']}'&$expand=Attributes&$expand=Assets") json = requests.get(req).json() assert len(json['value']) == 1 return json['value'][0] def _retrieve_collec_name(self, collection): collecs = select(self.provider_prop,('SAND_name','=',collection),['level','collec']) try: return select_cell(collecs,('level','=',self.level),'collec') except AssertionError: log.error( f'Level{self.level} products are not available for {self.collection}', e=KeyError) def _query_odata(self, dtstart, dtend, geo, name_glob, name_contains, name_startswith, name_endswith, cloudcover_thres): """Query the EOData Finder API""" query_lines = [ f"""https://catalogue.dataspace.copernicus.eu/odata/v1/Products?$filter=Collection/Name eq '{self.api_collection}' """ ] if dtstart: query_lines.append(f'ContentDate/Start gt {dtstart.isoformat()}Z') if dtend: query_lines.append(f'ContentDate/Start lt {dtend.isoformat()}Z') if geo: query_lines.append(f"OData.CSC.Intersects(area=geography'SRID=4326;{geo}')") if name_glob: assert name_startswith is None assert name_endswith is None assert name_contains is None substrings = re.split(r'\*|\?', name_glob) if substrings[0]: name_startswith = substrings[0] if substrings[-1] and (len(substrings) > 1): name_endswith = substrings[-1] if (len(substrings) > 2): name_contains = [x for x in substrings[1:-1] if x] if name_startswith: query_lines.append(f"startswith(Name, '{name_startswith}')") if name_contains: assert isinstance(name_contains, list) for cont in name_contains: query_lines.append(f"contains(Name, '{cont}')") if name_endswith: query_lines.append(f"endswith(Name, '{name_endswith}')") if cloudcover_thres: query_lines.append( "Attributes/OData.CSC.DoubleAttribute/any(att:att/Name eq 'cloudCover' " f"and att/OData.CSC.DoubleAttribute/Value le {cloudcover_thres})") top = 1000 # maximum value of number of retrieved values req = (' and '.join(query_lines))+f'&$top={top}' # urllib_req = Request(requote_uri(req)) # urllib_response = urlopen(urllib_req, timeout=5, context=self.ssl_ctx) # response = json.load(urllib_response) response = requests.get(requote_uri(req), verify=True) # assert urllib_response.status == 200 return response.json()['value'] def _query_opensearch(self, dtstart, dtend, geo, name_glob, name_contains, name_startswith, name_endswith, cloudcover_thres): """Query the OpenSearch Finder API""" def _get_next_page(links): for link in links: if link["rel"] == "next": return link["href"] return False query = f"""https://catalogue.dataspace.copernicus.eu/resto/api/collections/{self.api_collection}/search.json?maxRecords=1000""" query_params = {'status': 'ALL'} if dtstart is not None: query_params["startDate"] = dtstart.isoformat() if dtend is not None: query_params["completionDate"] = dtend.isoformat() if geo is not None: query_params["geometry"] = _parse_geometry(geo) query += f"&{urlencode(query_params)}" query_response = [] while query: response = requests.get(query, verify=True) response.raise_for_status() data = response.json() for feature in data["features"]: query_response.append(feature) query = _get_next_page(data["properties"]["links"]) return query_response