from pathlib import Path
from typing import Optional, Literal
from tempfile import TemporaryDirectory
from urllib.parse import urlencode
from core import log
from core.files import filegen
from core.table import read_xml
from core.geo.product_name import get_pattern, get_level
from sand.utils import write
from sand.constraint import Time, Geo, Name
from sand.base import BaseDownload, raise_api_error, RequestsError
from sand.results import SandQuery, SandProduct
# BASED ON : https://github.com/yannforget/landsatxplore/tree/master/landsatxplore
[docs]
class DownloadNASA(BaseDownload):
"""
Python interface to the NASA CMR API (https://cmr.earthdata.nasa.gov/)
"""
provider = 'nasa'
def __init__(self):
super().__init__()
def _log(self):
"""
Login to NASA with credentials storted in .netrc
"""
# Check if session is already set and set it up if not
if not hasattr(self, "session"):
self._set_session()
log.debug(f'No login required for NASA API (https://cmr.earthdata.nasa.gov/)')
def _query(
self,
collection_sand: str = None,
level: Literal[1,2,3] = 1,
time: Time = None,
geo: Geo = None,
name: Name = None,
cloudcover_thres: Optional[int] = None,
api_collection: list[str] = None,
):
"""
Product query on the CMR NASA
"""
self._login()
# Retrieve api collections based on SAND collections
if api_collection is None:
name_constraint = self._load_sand_collection_properties(collection_sand, level)
api_collection = self.api_collection[0]
# Format input time and geospatial constraints
time = self._format_time(collection_sand, time)
if geo: geo.set_convention(0)
# Define or complement constraint on naming
if name:
name.add_contains(name_constraint)
else:
name = Name(contains=name_constraint)
# Initialise data dictionary
data = {}
headers = {'Accept': 'application/json'}
# Configure scene constraints for request
date_range = time.start.isoformat() + 'Z,'
date_range += time.end.isoformat() + 'Z'
data['temporal'] = date_range
if isinstance(geo, Geo.Point|Geo.Polygon):
data['bounding_box'] = f"{geo.bounds[1]},{geo.bounds[0]},"
data['bounding_box'] += f"{geo.bounds[3]},{geo.bounds[2]}"
# Add constraint for cloud cover
if cloudcover_thres:
data['cloud_cover'] = f",{cloudcover_thres}"
out = []
for collec in self.api_collection:
# Query NASA API
log.debug(f'Query NASA API for collection {collec}')
data['concept_id'] = collec
data['page_size'] = 1000
url = 'https://cmr.earthdata.nasa.gov/search/granules'
url_encode = url + '?' + urlencode(data)
response = self.session.post(url_encode, headers=headers, verify=True)
log.check(len(response.json()['feed']['entry']) < data['page_size'],
"The number of matches has reached the API limit on the maximum "
"number of items returned. This may mean that some hits are missing. "
"Please refine your query.", e=RequestsError)
response = response.json()['feed']['entry']
# Filter products
response = [p for p in response if name.apply(p['title'])]
for d in response:
out.append(SandProduct(
product_id=d['producer_granule_id'], index=d['id'],
date=d['time_start'], metadata=d
))
log.info(f'{len(out)} products has been found')
return SandQuery(out)
def _dl_file(self, product_id, dir, api_collection: list[str] = None):
"""
Download a specific product from NASA by its product identifier
"""
self._login()
# Retrieve api collections based on SAND collections
if api_collection is None:
p = get_pattern(product_id)
collection_sand, level = p['Name'], get_level(product_id, p)
self._load_sand_collection_properties(collection_sand, level)
else:
self.api_collection = api_collection
self.name_contains = []
data = {'page_size': 5}
headers = {'Accept': 'application/json'}
url = 'https://cmr.earthdata.nasa.gov/search/granules'
for collec in self.api_collection:
data['collection_concept_id'] = collec
data['producer_granule_id'] = product_id
url_encode = url + '?' + urlencode(data)
response = self.session.post(url_encode, headers=headers, verify=True)
response = response.json()['feed']['entry']
if len(response) == 0: continue
dl_url = response[0]['links'][0]['href']
target = Path(dir)/Path(dl_url).name
filegen(if_exists='skip')(self._download)(target, dl_url)
log.info(f'Product has been downloaded at : {target}')
return target
log.error(f'No file found with name {product_id}')
def _dl(self, product: dict, dir: Path|str, if_exists='skip') -> Path:
"""
Download a product from NASA data space
"""
self._login()
links = product.metadata['links']
title = f"Download {product.product_id}"
url = self._get(links, title, 'title', 'href')
target = Path(dir)/Path(url).name
filegen(0, if_exists=if_exists)(self._download)(target, url)
log.info(f'Product has been downloaded at : {target}')
return target
def _download(
self,
target: Path,
url: str,
):
"""
Internal method to handle the actual download of files from NASA servers
"""
# Try to request server
niter = 0
response = self.session.get(url, allow_redirects=False)
log.debug(f'Requesting server for {target.name}')
while response.status_code in (301, 302, 303, 307) and niter < 5:
log.debug(f'Download content [Try {niter+1}/5]')
if 'Location' not in response.headers:
raise ValueError(f'status code : [{response.status_code}]')
url = response.headers['Location']
response = self.session.get(url, verify=True, allow_redirects=True)
niter += 1
raise_api_error(response)
# Download file
write(response, target)
def _qkl(self, product: dict, dir: Path|str):
"""
Download a quicklook (preview image) of the product
"""
self._login()
links = product.metadata['links']
target = Path(dir)/(product.product_id + '.jpeg')
url = self._get(links, '.png', 'title', 'href')
if not target.exists():
filegen(0)(self._download)(target, url)
log.info(f'Quicklook has been downloaded at : {target}')
return target
def _metadata(self, product):
"""
Extract metadata from a product's metadata field
"""
self._login()
links = product.metadata['links']
req = self._get(links, '.xml', 'title', 'href')
meta = self.session.get(req).text
assert len(meta) > 0
with TemporaryDirectory() as tmpdir:
with open(Path(tmpdir)/'meta.xml', 'w') as f:
f.writelines(meta.split('\n'))
return read_xml(Path(tmpdir)/'meta.xml')
def _get(self, liste, name, in_key, out_key):
"""
Internal helper to find a value in a list of dictionaries by matching keys
"""
for col in liste:
if in_key not in col: continue
if name in col[in_key]:
return col[out_key]
log.error(f'{name} has not been found', e=KeyError)