Source code for sand.nasa

import requests
import json

from pathlib import Path
from typing import Optional
from shapely import Point, Polygon
from requests.utils import requote_uri
from tempfile import TemporaryDirectory
from urllib.request import urlopen, Request
from urllib.parse import urlencode
from datetime import datetime, date

from core import log
from core.table import *
from core.files import filegen
from core.static import interface
from core.geo.product_name import get_pattern, get_level

from sand.base import BaseDownload, raise_api_error
from sand.results import Query
from sand.tinyfunc import *

# BASED ON : https://github.com/yannforget/landsatxplore/tree/master/landsatxplore



[docs] class DownloadNASA(BaseDownload): name = 'DownloadNASA' def __init__(self, collection: str = None, level: int = 1): """ Python interface to the NASA CMR API (https://cmr.earthdata.nasa.gov/) Args: collection (str): collection name ('ECOSTRESS', 'VIIRS', etc.) Example: usgs = DownloadNASA('ECOSTRESS') # retrieve the list of products # using a pickle cache file to avoid reconnection ls = cache_dataframe('query-S2.pickle')(cds.query)( dtstart=datetime(2024, 1, 1), dtend=datetime(2024, 2, 1), geo=Point(119.514442, -8.411750) ) cds.download(ls.iloc[0], <dirname>, uncompress=True) """ self.provider = 'nasa' super().__init__(collection, level) def _login(self): """ Login to NASA with credentials storted in .netrc """ log.info(f'No log required for NASA API (https://cmr.earthdata.nasa.gov/)') @interface def query( self, dtstart: Optional[date|datetime]=None, dtend: Optional[date|datetime]=None, geo=None, cloudcover_thres: Optional[int]=None, name_contains: Optional[list] = [], name_startswith: Optional[str] = None, name_endswith: Optional[str] = None, name_glob: Optional[str] = None, other_attrs: Optional[list] = None, **kwargs ): """ Product query on the CMR NASA Args: dtstart and dtend (datetime): start and stop datetimes geo: shapely geometry. Examples: Point(lon, lat) Polygon(...) cloudcover_thres: Optional[int]=None, name_contains (list): list of substrings name_startswith (str): search for name starting with this str name_endswith (str): search for name ending with this str name_glob (str): match name with this string use_most_recent (bool): keep only the most recent processing baseline version other_attrs (list): list of other attributes to include in the output (ex: ['ContentDate', 'Footprint']) Note: This method can be decorated by cache_dataframe for storing the outputs. Example: cache_dataframe('cache_result.pickle')(cds.query)(...) """ dtstart, dtend, geo = self._format_input_query(dtstart, dtend, geo) if geo: geo = flip_coords(change_lon_convention(geo)) # Add provider constraint name_contains = self._complete_name_contains(name_contains) data = {} headers = {'Accept': 'application/json'} # Configure scene constraints for request date_range = dtstart.isoformat() + 'Z,' date_range += dtend.isoformat() + 'Z' data['temporal'] = date_range if isinstance(geo, Point): bbox = f"{geo.x},{geo.y},{geo.x},{geo.y}" elif isinstance(geo, Polygon): bounds = geo.bounds bbox = f"{bounds[1]},{bounds[0]},{bounds[3]},{bounds[2]}" if geo: data['bounding_box'] = bbox # Define check functions checker = [] if name_contains: checker.append((check_name_contains, name_contains)) if name_startswith: checker.append((check_name_startswith, name_startswith)) if name_endswith: checker.append((check_name_endswith, name_endswith)) if name_glob: checker.append((check_name_glob, name_glob)) out = [] for collec in self.api_collection: # Query NASA API log.debug(f'Query NASA API for collection {collec}') data['concept_id'] = collec data['page_size'] = 1000 url = 'https://cmr.earthdata.nasa.gov/search/granules' url_encode = url + '?' + urlencode(data) urllib_req = Request(requote_uri(url_encode), headers=headers) urllib_response = urlopen(urllib_req, timeout=5, context=self.ssl_ctx) response = json.load(urllib_response)['feed']['entry'] # Filter products response = [p for p in response if self.check_name(p['title'], checker)] # test if maximum number of returns is reached top = 1000 if len(response) + len(out) >= top: log.error('The request led to the maximum number of results ' f'({len(response) + len(out)})', e=ValueError) for d in response: out.append({"id": d["id"], "name": d["producer_granule_id"], **{k: d[k] for k in ['links','collection_concept_id']}}) log.info(f'{len(out)} products has been found') return Query(out)
[docs] def download_file(self, product_id, dir): p = get_pattern(product_id) self.__init__(p['Name'], get_level(product_id, p)) data = {'page_size': 10} headers = {'Accept': 'application/json'} url = 'https://cmr.earthdata.nasa.gov/search/granules' for collec in self.api_collection: data['concept_id'] = collec data['granule_ur'] = product_id url_encode = url + '?' + urlencode(data) urllib_req = Request(requote_uri(url_encode), headers=headers) urllib_response = urlopen(urllib_req, timeout=5, context=self.ssl_ctx) response = json.load(urllib_response)['feed']['entry'] if len(response) == 0: continue dl_url = self._get(response[0]['links'], '.h5', 'title', 'href') target = Path(dir)/Path(dl_url).name try: filegen(0, if_exists='skip')(self._download)(target, dl_url) except: continue return target
@interface def download(self, product: dict, dir: Path|str, if_exists='skip', uncompress: bool=False) -> Path: """ Download a product from NASA data space Args: product (dict): product definition with keys 'id' and 'name' dir (Path | str): Directory where to store downloaded file. uncompress (bool, optional): If True, uncompress file if needed. Defaults to True. """ url = self._get(product['links'], '.h5', 'title', 'href') target = Path(dir)/Path(url).name filegen(0, if_exists=if_exists)(self._download)(target, url) log.info(f'Product has been downloaded at : {target}') return target def _download( self, target: Path, url: str, ): """ Wrapped by filegen """ # Try to request server niter = 0 response = self.session.get(url, allow_redirects=False) log.debug(f'Requesting server for {target.name}') while response.status_code in (301, 302, 303, 307) and niter < 5: log.debug(f'Download content [Try {niter+1}/5]') if 'Location' not in response.headers: raise ValueError(f'status code : [{response.status_code}]') url = response.headers['Location'] # response = self.session.get(url, allow_redirects=False) response = self.session.get(url, verify=True, allow_redirects=True) niter += 1 raise_api_error(response) # Download file log.debug('Start writing on device') filesize = int(response.headers["Content-Length"]) pbar = log.pbar(log.lvl.INFO, total=filesize, unit_scale=True, unit="B", desc='writing', unit_divisor=1024, leave=False) with open(target, 'wb') as f: for chunk in response.iter_content(chunk_size=1024): if chunk: f.write(chunk) pbar.update(1024) @interface def quicklook(self, product: dict, dir: Path|str): """ Download a quicklook to `dir` """ target = Path(dir)/(product['name'] + '.jpeg') url = self._get(product['links'], '.png', 'title', 'href') if not target.exists(): filegen(0)(self._download)(target, url) log.info(f'Quicklook has been downloaded at : {target}') return target @interface def metadata(self, product): """ Returns the product metadata including attributes and assets """ req = self._get(product['links'], '.xml', 'title', 'href') meta = requests.get(req).text assert len(meta) > 0 with TemporaryDirectory() as tmpdir: with open(Path(tmpdir)/'meta.xml', 'w') as f: f.writelines(meta.split('\n')) return read_xml(Path(tmpdir)/'meta.xml') def _retrieve_collec_name(self, collection): collecs = select(self.provider_prop,('SAND_name','=',collection),['level','collec']) try: collecs = select_cell(collecs,('level','=',self.level),'collec') except AssertionError: log.error( f'Level{self.level} products are not available for {self.collection}', e=KeyError) return collecs.split(' ') def _get(self, liste, name, in_key, out_key): for col in liste: if in_key not in col: continue if name in col[in_key]: return col[out_key] log.error(f'{name} has not been found', e=KeyError)