from pathlib import Path
from typing import Literal
from tempfile import TemporaryDirectory
from urllib.parse import urlencode
from re import match
from core import log
from core.files import filegen
from core.table import read_xml
from core.geo.product_name import get_pattern, get_level
from sand.utils import write, drop_extension
from sand.constraint import Time, Geo, GeoType, Name
from sand.base import BaseDownload, raise_api_error
from sand.results import SandQuery, SandProduct
# BASED ON : https://github.com/yannforget/landsatxplore/tree/master/landsatxplore
[docs]
class DownloadNASA(BaseDownload):
"""
Python interface to the NASA CMR API (https://cmr.earthdata.nasa.gov/)
"""
provider = 'nasa'
def __init__(self):
super().__init__()
def _login(self):
# Check if session is already set and set it up if not
if not hasattr(self, "session"):
self._set_session()
log.debug(f'No login required for NASA API (https://cmr.earthdata.nasa.gov/)')
[docs]
def query(
self,
collection_sand: str,
level: Literal[1,2,3] = 1,
time: Time|None = None,
geo: GeoType|None = None,
name: Name|None = None,
cloudcover_thres: int|None = None,
api_collection: str|None = None
):
self._login()
# Retrieve api collections based on SAND collections
if api_collection is None:
name_constraint = self._load_sand_collection_properties(collection_sand, level)
api_collection = self.api_collection[0]
else:
name_constraint = []
# Format input time and geospatial constraints
time = self._format_time(collection_sand, time)
if isinstance(geo, Geo.Point|Geo.Polygon):
geo.set_convention(0)
# Define or complement constraint on naming
if name:
name.add_contains(name_constraint)
else:
name = Name(contains=name_constraint)
# Initialise data dictionary
data = {}
headers = {'Accept': 'application/json'}
# Configure scene constraints for request
if time and time.start:
date_range = time.start.isoformat()[:19] + 'Z,'
if time.end:
date_range += time.end.isoformat()[:19] + 'Z'
data['temporal'] = date_range
if isinstance(geo, Geo.Point|Geo.Polygon):
data['bounding_box'] = f"{geo.bounds[1]},{geo.bounds[0]},"
data['bounding_box'] += f"{geo.bounds[3]},{geo.bounds[2]}"
# Add constraint for cloud cover
if cloudcover_thres:
data['cloud_cover'] = f",{cloudcover_thres}"
out = []
for collec in self.api_collection:
# Query NASA API
log.debug(f'Query NASA API for collection {collec}')
data['concept_id'] = collec
data['page_size'] = 1000
url = 'https://cmr.earthdata.nasa.gov/search/granules'
url_encode = url + '?' + urlencode(data)
response = self.session.post(url_encode, headers=headers, verify=True)
if len(response.json()['feed']['entry']) == data['page_size']:
log.warning(
"The number of matches has reached the API limit on the maximum "
"number of items returned. This may mean that some hits are missing. "
"Please refine your query."
)
response = response.json()['feed']['entry']
# Filter products
response = [p for p in response if name.apply(p['title'])]
for d in response:
if 'producer_granule_id' in d: prod_id = d['producer_granule_id']
else: prod_id = d['title']
out.append(SandProduct(
product_id=prod_id, index=d['id'],
date=d['time_start'], metadata=d
))
log.info(f'{len(out)} products has been found')
return SandQuery(out)
[docs]
def download_file(
self,
product_id: str,
dir: Path | str,
api_collection: str|None = None
) -> Path:
self._login()
# Retrieve api collections based on SAND collections
if api_collection is None:
p = get_pattern(product_id)
collection_sand, level = p['Name'], get_level(product_id, p)
self._load_sand_collection_properties(collection_sand, level)
else:
self.api_collection = [api_collection]
self.name_contains = []
data = {'page_size': 5}
headers = {'Accept': 'application/json'}
url = 'https://cmr.earthdata.nasa.gov/search/granules'
@filegen(if_exists='skip')
def _dl(target):
for collec in self.api_collection:
data['collection_concept_id'] = collec
data['producer_granule_id'] = drop_extension(product_id)
url_encode = url + '?' + urlencode(data)
response = self.session.post(url_encode, headers=headers, verify=True)
response = response.json()['feed']['entry']
if len(response) == 0: continue
dl_url = response[0]['links'][0]['href']
assert target.name == Path(dl_url).name
self._download(target, dl_url)
return
log.error(f'No file found with name {product_id}')
filename = Path(dir)/product_id
_dl(filename)
log.info(f'Product has been downloaded at : {filename}')
return filename
[docs]
def download(
self,
product: SandProduct,
dir: Path | str,
if_exists: Literal['skip','overwrite','backup','error'] = "skip"
) -> Path:
self._login()
links = product.metadata['links']
url = self._get(links, product.product_id)
target = Path(dir)/Path(url).name
filegen(0, if_exists=if_exists)(self._download)(target, url)
log.info(f'Product has been downloaded at : {target}')
return target
def _download(
self,
target: Path,
url: str,
):
"""
Internal method to handle the actual download of files from NASA servers
"""
# Try to request server
niter = 0
response = self.session.get(url, allow_redirects=False)
log.debug(f'Requesting server for {target.name}')
while response.status_code in (301, 302, 303, 307) and niter < 5:
log.debug(f'Download content [Try {niter+1}/5]')
if 'Location' not in response.headers:
raise ValueError(f'status code : [{response.status_code}]')
url = response.headers['Location']
response = self.session.get(url, verify=True, allow_redirects=True)
niter += 1
raise_api_error(response)
# Download file
write(response, target)
[docs]
def quicklook(
self,
product: SandProduct,
dir: Path|str
) -> Path:
self._login()
links = product.metadata['links']
target = Path(dir)/(product.product_id + '.png')
url = self._get(links, target.name)
if not target.exists():
filegen(0)(self._download)(target, url)
log.info(f'Quicklook has been downloaded at : {target}')
return target
def _get(self, liste, name) -> str:
"""
Internal helper to find a value in a list of dictionaries by matching keys
"""
for col in liste:
if 'title' not in col: continue
if match(f'Download {name}', col['title']):
return col['href']
for col in liste:
if 'href' not in col: continue
if match(name, col['href']):
return col['href']
log.error(f'{name} has not been found', e=KeyError)