Source code for sand.copernicus_dataspace

from requests.utils import requote_uri
from urllib.parse import urlencode
from typing import Optional, Literal
from dataclasses import dataclass
from pathlib import Path
import requests

from sand.base import raise_api_error, BaseDownload, RequestsError
from sand.utils import write, check_name_glob
from sand.constraint import Time, Geo, Name
from sand.results import SandQuery, SandProduct

from core import log
from core.files import filegen
from core.network.auth import get_auth
from core.files.uncompress import uncompress
from core.geo.product_name import get_pattern, get_level


[docs] class DownloadCDSE(BaseDownload): """ Python interface to the Copernicus Data Space (https://dataspace.copernicus.eu/) This class implements the BaseDownload interface for accessing and downloading satellite data from Copernicus Data Space. It supports both OData and OpenSearch APIs. """ provider = 'cdse' def __init__(self): self.api = 'OData' def _login(self): """ Login to copernicus dataspace with credentials storted in .netrc """ # Check if session is already set and set it up if not if not hasattr(self, "session"): self._set_session() auth = get_auth("dataspace.copernicus.eu") self._get_tokens(auth) log.debug('Log to API (https://dataspace.copernicus.eu/)') def _get_tokens(self, auth: dict) -> None: """ Get authentication tokens from Copernicus Data Space using provided credentials. """ data = { "client_id": "cdse-public", "username": auth['user'], "password": auth['password'], "grant_type": "password", } try: url = "https://identity.dataspace.copernicus.eu/auth/realms/CDSE/protocol/openid-connect/token" r = requests.post(url, data=data) r.raise_for_status() except Exception: raise Exception( f"Keycloak token creation failed. Reponse from the server was: {r.json()}" ) self.tokens = r.json()["access_token"]
[docs] def change_api(self, api_name: Literal['OData', 'OpenSearch']): """ Change the backend API used for querying Copernicus Data Space. The Copernicus Data Space provides two APIs: - OData: Modern REST API with rich querying capabilities - OpenSearch: Legacy API maintained for compatibility Args: api_name (Literal['OData', 'OpenSearch']): Name of the API to use """ assert api_name in ['OData', 'OpenSearch'] log.debug(f'Move from {self.api} API to {api_name} API') self.api = api_name
def _query( self, collection_sand: str = None, level: Literal[1,2,3] = 1, time: Time = None, geo: Geo = None, name: Name = None, cloudcover_thres: Optional[int] = None, api_collection: list[str] = None, ): """ Product query on the Copernicus Data Space """ self._login() # Retrieve api collections based on SAND collections if api_collection is None: name_constraint = self._load_sand_collection_properties(collection_sand, level) api_collection = self.api_collection[0] # Format input time and geospatial constraints time = self._format_time(collection_sand, time) if geo: geo.set_convention(0) # Define or complement constraint on naming if name: name.add_contains(name_constraint) else: name = Name(contains=name_constraint) # Concatenate every information into a single object log.debug(f'Query {self.api} API') params = _Request_params(api_collection, time, geo, name, cloudcover_thres) # Query the server if self.api == 'OpenSearch': response = _query_opensearch(params) elif self.api == 'OData': response = _query_odata(params) else: log.error(f'Invalid API, got {self.api}', e=ValueError) # Format list of product out = [ SandProduct(index=d['Id'], product_id=d['Name'], date=d['ContentDate']['Start'], metadata=d) for d in response if check_name_glob(d['Name'], name.glob) ] log.info(f'{len(out)} products has been found') return SandQuery(out) def _dl(self, product: dict, dir: Path|str, if_exists: str='skip') -> Path: """ Download a product from copernicus data space """ self._login() target = Path(dir)/product.product_id url = ("https://catalogue.dataspace.copernicus.eu/odata/v1/" f"Products({product.index})/$value") filegen(if_exists=if_exists)(self._download)(target, url, '.zip') log.info(f'Product has been downloaded at : {target}') return target def _download( self, target: Path, url: str, compression_ext: str = None ) -> None: """ Internal method to download a file from Copernicus Data Space. """ # Compression file path dl_target = Path(str(target)+'.zip') if compression_ext else target status = False while not status: try: # Initialize session for download self.session.headers.update({'Authorization': f'Bearer {self.tokens}'}) # Try to request server niter = 0 response = self.session.get(url, allow_redirects=False) log.debug(f'Requesting server for {target.name}') while response.status_code in (301, 302, 303, 307) and niter < 5: log.debug(f'Download content [Try {niter+1}/5]') if 'Location' not in response.headers: raise ValueError(f'status code : [{response.status_code}]') url = response.headers['Location'] response = self.session.get(url, verify=True, allow_redirects=True) niter += 1 response.raise_for_status() status = True except Exception as e: # Refresh session tokens self.session = requests.Session() auth = get_auth("dataspace.copernicus.eu") self._get_tokens(auth) # Download compressed file write(response, dl_target) # Uncompress archive if compression_ext: log.debug('Uncompress archive') assert target == uncompress(dl_target, target.parent) dl_target.unlink() def _dl_file(self, product_id: str, dir: Path | str, api_collection: str = None) -> Path: """ Download a specific product from Copernicus Data Space by its product identifier """ self._login() # Retrieve api collections based on SAND collections if api_collection is None: p = get_pattern(product_id) collection_sand, level = p['Name'], get_level(product_id, p) self._load_sand_collection_properties(collection_sand, level) else: self.api_collection = api_collection self.name_contains = [] name = Name(contains=[product_id]) ls = self.query(collection_sand=collection_sand, level=level, name=name) assert len(ls) == 1, 'Multiple products found' return self.download(ls[0], dir) def _qkl(self, product: dict, dir: Path|str): """ Download a quicklook (preview image) of the product """ self._login() target = Path(dir)/(product.product_id + '.jpeg') if not target.exists(): assets = self.metadata(product)['Assets'] if not assets: raise FileNotFoundError(f'Skipping quicklook {target.name}') url = assets[0]['DownloadLink'] filegen(0, if_exists='skip')(self._download)(target, url) log.info(f'Quicklook has been downloaded at : {target}') return target def _metadata(self, product): """ Extract metadata from a product's metadata field Args: product (dict): Product dictionary containing a 'metadata' field Returns: dict: Dictionary of metadata field names and their values """ self._login() req = ("https://catalogue.dataspace.copernicus.eu/odata/v1/Products?$filter=Id" f" eq '{product.index}'&$expand=Attributes&$expand=Assets") json = requests.get(req).json() assert len(json['value']) == 1 return json['value'][0]
@dataclass class _Request_params: collection: str time: Time geo: Geo name: Name cloudcover_thres: float def _query_odata(params: _Request_params): """Query the EOData Finder API""" query_lines = [ f"""https://catalogue.dataspace.copernicus.eu/odata/v1/Products?$filter=Collection/Name eq '{params.collection}' """ ] if params.time and params.time.start: query_lines.append(f'ContentDate/Start gt {params.time.start.isoformat()}Z') if params.time and params.time.end: query_lines.append(f'ContentDate/Start lt {params.time.end.isoformat()}Z') if params.geo is not None: query_lines.append(f"OData.CSC.Intersects(area=geography'SRID=4326;{params.geo.to_wkt()}')") if params.name.startswith != '': query_lines.append(f"startswith(Name, '{params.name.startswith}')") if len(params.name.contains) != 0: for cont in params.name.contains: query_lines.append(f"contains(Name, '{cont}')") if params.name.endswith != '': query_lines.append(f"endswith(Name, '{params.name.endswith}')") if params.cloudcover_thres: query_lines.append( "Attributes/OData.CSC.DoubleAttribute/any(att:att/Name eq 'cloudCover' " f"and att/OData.CSC.DoubleAttribute/Value le {params.cloudcover_thres})") top = 1000 # maximum value of number of retrieved values req = (' and '.join(query_lines))+f'&$top={top}' response = requests.get(requote_uri(req), verify=True) raise_api_error(response) if len(response.json()['value']) == top: raise NotImplementedError log.check(len(response.json()['value']) < top, "The number of matches has reached the API limit on the maximum " "number of items returned. This may mean that some hits are missing. " "Please refine your query.", e=RequestsError) return response.json()['value'] # SHOULD BE DEPRECATED def _query_opensearch(params: _Request_params): """Query the OpenSearch Finder API""" def _get_next_page(links): for link in links: if link["rel"] == "next": return link["href"] return False query = f"""https://catalogue.dataspace.copernicus.eu/resto/api/collections/{params.collection}/search.json?maxRecords=1000""" query_params = {'status': 'ALL'} if params.time and params.time.start: query_params["startDate"] = params.time.start.isoformat() if params.time and params.time.end: query_params["completionDate"] = params.time.end.isoformat() if params.geo is not None: query_params["geometry"] = params.geo.to_wkt() query += f"&{urlencode(query_params)}" query_response = [] while query: response = requests.get(query, verify=True) response.raise_for_status() data = response.json() for feature in data["features"]: query_response.append(feature) query = _get_next_page(data["properties"]["links"]) return query_response