Source code for sand.nasa

from pathlib import Path
from typing import Optional, Literal
from tempfile import TemporaryDirectory
from urllib.parse import urlencode

from core import log
from core.files import filegen
from core.table import read_xml
from core.geo.product_name import get_pattern, get_level

from sand.utils import write
from sand.constraint import Time, Geo, Name
from sand.base import BaseDownload, raise_api_error, RequestsError
from sand.results import SandQuery, SandProduct

# BASED ON : https://github.com/yannforget/landsatxplore/tree/master/landsatxplore


[docs] class DownloadNASA(BaseDownload): """ Python interface to the NASA CMR API (https://cmr.earthdata.nasa.gov/) """ provider = 'nasa' def __init__(self): super().__init__() def _log(self): """ Login to NASA with credentials storted in .netrc """ # Check if session is already set and set it up if not if not hasattr(self, "session"): self._set_session() log.debug(f'No login required for NASA API (https://cmr.earthdata.nasa.gov/)') def _query( self, collection_sand: str = None, level: Literal[1,2,3] = 1, time: Time = None, geo: Geo = None, name: Name = None, cloudcover_thres: Optional[int] = None, api_collection: list[str] = None, ): """ Product query on the CMR NASA """ self._login() # Retrieve api collections based on SAND collections if api_collection is None: name_constraint = self._load_sand_collection_properties(collection_sand, level) api_collection = self.api_collection[0] # Format input time and geospatial constraints time = self._format_time(collection_sand, time) if geo: geo.set_convention(0) # Define or complement constraint on naming if name: name.add_contains(name_constraint) else: name = Name(contains=name_constraint) # Initialise data dictionary data = {} headers = {'Accept': 'application/json'} # Configure scene constraints for request date_range = time.start.isoformat() + 'Z,' date_range += time.end.isoformat() + 'Z' data['temporal'] = date_range if isinstance(geo, Geo.Point|Geo.Polygon): data['bounding_box'] = f"{geo.bounds[1]},{geo.bounds[0]}," data['bounding_box'] += f"{geo.bounds[3]},{geo.bounds[2]}" # Add constraint for cloud cover if cloudcover_thres: data['cloud_cover'] = f",{cloudcover_thres}" out = [] for collec in self.api_collection: # Query NASA API log.debug(f'Query NASA API for collection {collec}') data['concept_id'] = collec data['page_size'] = 1000 url = 'https://cmr.earthdata.nasa.gov/search/granules' url_encode = url + '?' + urlencode(data) response = self.session.post(url_encode, headers=headers, verify=True) log.check(len(response.json()['feed']['entry']) < data['page_size'], "The number of matches has reached the API limit on the maximum " "number of items returned. This may mean that some hits are missing. " "Please refine your query.", e=RequestsError) response = response.json()['feed']['entry'] # Filter products response = [p for p in response if name.apply(p['title'])] for d in response: out.append(SandProduct( product_id=d['producer_granule_id'], index=d['id'], date=d['time_start'], metadata=d )) log.info(f'{len(out)} products has been found') return SandQuery(out) def _dl_file(self, product_id, dir, api_collection: list[str] = None): """ Download a specific product from NASA by its product identifier """ self._login() # Retrieve api collections based on SAND collections if api_collection is None: p = get_pattern(product_id) collection_sand, level = p['Name'], get_level(product_id, p) self._load_sand_collection_properties(collection_sand, level) else: self.api_collection = api_collection self.name_contains = [] data = {'page_size': 5} headers = {'Accept': 'application/json'} url = 'https://cmr.earthdata.nasa.gov/search/granules' for collec in self.api_collection: data['collection_concept_id'] = collec data['producer_granule_id'] = product_id url_encode = url + '?' + urlencode(data) response = self.session.post(url_encode, headers=headers, verify=True) response = response.json()['feed']['entry'] if len(response) == 0: continue dl_url = response[0]['links'][0]['href'] target = Path(dir)/Path(dl_url).name filegen(if_exists='skip')(self._download)(target, dl_url) log.info(f'Product has been downloaded at : {target}') return target log.error(f'No file found with name {product_id}') def _dl(self, product: dict, dir: Path|str, if_exists='skip') -> Path: """ Download a product from NASA data space """ self._login() links = product.metadata['links'] title = f"Download {product.product_id}" url = self._get(links, title, 'title', 'href') target = Path(dir)/Path(url).name filegen(0, if_exists=if_exists)(self._download)(target, url) log.info(f'Product has been downloaded at : {target}') return target def _download( self, target: Path, url: str, ): """ Internal method to handle the actual download of files from NASA servers """ # Try to request server niter = 0 response = self.session.get(url, allow_redirects=False) log.debug(f'Requesting server for {target.name}') while response.status_code in (301, 302, 303, 307) and niter < 5: log.debug(f'Download content [Try {niter+1}/5]') if 'Location' not in response.headers: raise ValueError(f'status code : [{response.status_code}]') url = response.headers['Location'] response = self.session.get(url, verify=True, allow_redirects=True) niter += 1 raise_api_error(response) # Download file write(response, target) def _qkl(self, product: dict, dir: Path|str): """ Download a quicklook (preview image) of the product """ self._login() links = product.metadata['links'] target = Path(dir)/(product.product_id + '.jpeg') url = self._get(links, '.png', 'title', 'href') if not target.exists(): filegen(0)(self._download)(target, url) log.info(f'Quicklook has been downloaded at : {target}') return target def _metadata(self, product): """ Extract metadata from a product's metadata field """ self._login() links = product.metadata['links'] req = self._get(links, '.xml', 'title', 'href') meta = self.session.get(req).text assert len(meta) > 0 with TemporaryDirectory() as tmpdir: with open(Path(tmpdir)/'meta.xml', 'w') as f: f.writelines(meta.split('\n')) return read_xml(Path(tmpdir)/'meta.xml') def _get(self, liste, name, in_key, out_key): """ Internal helper to find a value in a list of dictionaries by matching keys """ for col in liste: if in_key not in col: continue if name in col[in_key]: return col[out_key] log.error(f'{name} has not been found', e=KeyError)