Source code for sand.geodes

from datetime import datetime, date
from pathlib import Path
from typing import Optional

from sand.base import BaseDownload, raise_api_error
from sand.results import Query
from sand.tinyfunc import *
from core import log
from core.download import get_auth
from core.static import interface
from core.files import filegen
from core.table import select_cell, select


# [SOURCE] https://github.com/olivierhagolle/theia_download/tree/master
# https://geodes.cnes.fr/support/api/
[docs] class DownloadCNES(BaseDownload): name = 'DownloadCNES' def __init__(self, collection: str = None, level: int = 1): """ Python interface to the CNES Geodes Data Center (https://geodes-portal.cnes.fr/) Args: collection (str): collection name ('LANDSAT-5-TM', 'VENUS', etc.) Example: cds = DownloadCNES('VENUS') # retrieve the list of products # using a pickle cache file to avoid reconnection ls = cache_dataframe('query-S2.pickle')(cds.query)( dtstart=datetime(2024, 1, 1), dtend=datetime(2024, 2, 1), geo=Point(119.514442, -8.411750), ) cds.download(ls.iloc[0], <dirname>, uncompress=True) """ self.provider = 'geodes' super().__init__(collection, level) def _login(self): """ Login to copernicus dataspace with credentials storted in .netrc """ auth = get_auth("geodes.cnes.fr") self.tokens = auth['password'] log.info('Log to API (https://geodes-portal.cnes.fr/)') @interface def query( self, dtstart: Optional[date|datetime]=None, dtend: Optional[date|datetime]=None, geo=None, cloudcover_thres: Optional[int]=None, name_contains: Optional[list] = None, name_startswith: Optional[str] = None, name_endswith: Optional[str] = None, name_glob: Optional[str] = None, tile_number: str = None, venus_site: str = None, other_attrs: Optional[list] = None, **kwargs ): """ Product query on the Geodes Data Center Args: dtstart and dtend (datetime): start and stop datetimes geo: shapely geometry. Examples: Point(lon, lat) Polygon(...) cloudcover_thres: Optional[int]=None, name_contains (list): list of substrings name_startswith (str): search for name starting with this str name_endswith (str): search for name ending with this str name_glob (str): match name with this string use_most_recent (bool): keep only the most recent processing baseline version tile_number (str): Tile number (ex: T31TCJ), Sentinel2 only venus_site (str): Venµs Site name, Venµs only other_attrs (list): list of other attributes to include in the output (ex: ['ContentDate', 'Footprint']) Note: This method can be decorated by cache_dataframe for storing the outputs. Example: cache_dataframe('cache_result.pickle')(cds.query)(...) """ dtstart, dtend, geo = self._format_input_query(dtstart, dtend, geo) # Add provider constraint name_contains = self._complete_name_contains(name_contains) # Define check functions checker = [] if name_contains: checker.append((check_name_contains, name_contains)) if name_startswith: checker.append((check_name_startswith, name_startswith)) if name_endswith: checker.append((check_name_endswith, name_endswith)) if name_glob: checker.append((check_name_glob, name_glob)) server_url = f'https://geodes-portal.cnes.fr/api/stac/collections/{self.api_collection}/items' data = {'page':1, 'limit':500} query = {} if dtstart: query['datetime'] = {'lte':dtstart.isoformat()} if dtend: query['datetime']['gte'] = dtend.isoformat() assert sum(v is not None for v in [geo, tile_number, venus_site]) != 0, \ "Please fill in at least geo or tile number or venus site" assert sum(v is not None for v in [geo, tile_number, venus_site]) == 1 if geo: data['bbox'] = geo # if tile_number: # query_lines.append(f"location={tile_number}") # if venus_site: # query_lines.append(f"location={venus_site}") if cloudcover_thres: query['eo:cloud_cover'] = {"lte":cloudcover_thres} data['query'] = query self.session.headers.update({"X-API-Key": self.tokens}) self.session.headers.update({"Content-type": "application/json"}) response = self.session.get(server_url, data=data, verify=False) raise_api_error(response) # Filter products r = response.json()['features'] response = [p for p in r if self.check_name(p["properties"]['identifier'], checker)] # test if maximum number of returns is reached if len(response) >= 500: log.error('The request led to the maximum number of results ' f'({len(response)})', e=ValueError) else: log.info(f'{len(response)} products has been found') out = [{"id": d["id"], "name": d["properties"]["identifier"], 'links': d['assets'], 'time': d['properties']['datetime']} for d in response] log.info(f'{len(out)} products has been found') return Query(out) @interface def download(self, product: dict, dir: Path|str, if_exists='skip', uncompress: bool=True) -> Path: """ Download a product from Geodes Datahub Args: product (dict): product definition with keys 'id' and 'name' dir (Path | str): Directory where to store downloaded file. uncompress (bool, optional): If True, uncompress file if needed. Defaults to True. """ search = [l for l in product['links'] if product['name']+'.' in l] assert len(search) == 1 target = Path(dir)/search[0] dl_data = product['links'][search[0]] filegen(0, if_exists=if_exists)(self._download)(target, dl_data) log.info(f'Product has been downloaded at : {target}') return target def _download( self, target: Path, url: str, ): """ Wrapped by filegen """ desc = url['description'].split() filesize = int(desc[desc.index('bytes')-1]) self.session.headers.update({"X-API-Key": self.tokens}) response = self.session.get(url['href'], verify=False) pbar = log.pbar(log.lvl.INFO, total=filesize, unit_scale=True, unit="B", desc='writing', unit_divisor=1024, leave=False) with open(target, 'wb') as f: for chunk in response.iter_content(chunk_size=1024): if chunk: f.write(chunk) pbar.update(1024) @interface def download_file(self, product_id: str, dir, if_exists='skip'): # Query and check if product exists server_url = f'https://geodes-portal.cnes.fr/api/stac/search' data = {'page':1, 'limit':1} data['query'] = {'identifier': {'contains':[product_id]}} self.session.headers.update({"X-API-Key": self.tokens}) self.session.headers.update({"Content-type": "application/json"}) response = self.session.get(server_url, data=data, verify=False) raise_api_error(response) r = response.json()['features'] log.check(len(r) > 0, f'No product named {product_id}') # Download the product out = [{"id": d["id"], "name": d["properties"]["identifier"], 'links': d['assets'], 'time': d['properties']['datetime']} for d in r] return self.download(Query(out).iloc[0], dir, if_exists) @interface def quicklook(self, product: dict, dir: Path|str): """ Download a quicklook to `dir` """ search = [l for l in product['links'] if 'quicklook' in l] assert len(search) == 1 target = Path(dir)/search[0].split('/')[-1] url = product['links'][search[0]] if not target.exists(): filegen(0)(self._download)(target, url) log.info(f'Quicklook has been downloaded at : {target}') return target @interface def metadata(self, product: dict): """ Returns the product metadata including attributes and assets """ raise NotImplementedError def _retrieve_collec_name(self, collection): collecs = select(self.provider_prop,('SAND_name','=',collection),['level','collec']) try: collecs = select_cell(collecs,('level','=',self.level),'collec') except AssertionError: log.error( f'Level{self.level} products are not available for {self.collection}', e=KeyError) return collecs.split(' ')[0] def _get(self, liste, name, in_key, out_key): for col in liste: if in_key not in col: continue if name in col[in_key]: return col[out_key] log.error(f'{name} has not been found', e=KeyError)