Source code for sand.usgs

import requests
import json

from pathlib import Path
from typing import Optional
from shapely import Point, Polygon
from datetime import datetime, date

from core import log
from core.files import filegen
from core.static import interface
from core.download import get_auth
from core.table import select, select_cell
from core.geo.product_name import get_pattern, get_level

from sand.base import raise_api_error, BaseDownload
from sand.results import Query
from sand.tinyfunc import *

# BASED ON : https://github.com/yannforget/landsatxplore/tree/master/landsatxplore



[docs] class DownloadUSGS(BaseDownload): name = 'DownloadUSGS' _DATA_PRODUCTS = { # Level 1 datasets "landsat_tm_c2_l1": ["5e81f14f92acf9ef", "5e83d0a0f94d7d8d", "63231219fdd8c4e5"], "landsat_etm_c2_l1":[ "5e83d0d0d2aaa488", "5e83d0d08fec8a66"], "landsat_ot_c2_l1": ["632211e26883b1f7", "5e81f14ff4f9941c", "5e81f14f92acf9ef"], # Level 2 datasets "landsat_tm_c2_l2": ["5e83d11933473426", "5e83d11933473426", "632312ba6c0988ef"], "landsat_etm_c2_l2": ["5e83d12aada2e3c5", "5e83d12aed0efa58", "632311068b0935a8"], "landsat_ot_c2_l2": ["5e83d14f30ea90a9", "5e83d14fec7cae84", "632210d4770592cf"] } def __init__(self, collection: str = None, level: int = 1): """ Python interface to the USGS API (https://data.usgs.gov/) Args: collection (str): collection name ('LANDSAT-5-TM', 'LANDSAT-7-ET', etc.) Example: usgs = DownloadUSGS('LANDSAT-5-TM') # retrieve the list of products # using a pickle cache file to avoid reconnection ls = cache_dataframe('query-S2.pickle')(cds.query)( dtstart=datetime(2024, 1, 1), dtend=datetime(2024, 2, 1), geo=Point(119.514442, -8.411750), ) cds.download(ls.iloc[0], <dirname>, uncompress=True) """ self.provider = 'usgs' super().__init__(collection, level) def _login(self): """ Login to USGS with credentials storted in .netrc """ auth = get_auth("usgs.gov") data = { "username": auth['user'], # "password": auth['password'], "token": auth['password'], } try: url = "https://m2m.cr.usgs.gov/api/api/json/stable/login-token" r = self.session.post(url, json.dumps(data)) r.raise_for_status() assert r.json()['errorCode'] == None self.API_key = {'X-Auth-Token': r.json()['data']} except Exception: raise Exception( f"Keycloak token creation failed. Reponse from the server was: {r.json()}" ) log.info(f'Log to API (https://m2m.cr.usgs.gov/)') @interface def query( self, dtstart: Optional[date|datetime] = None, dtend: Optional[date|datetime] = None, geo = None, cloudcover_thres: Optional[int] = None, name_contains: Optional[list] = [], name_startswith: Optional[str] = None, name_endswith: Optional[str] = None, name_glob: Optional[str] = None, other_attrs: Optional[list] = None, **kwargs ): """ Product query on the USGS Args: dtstart and dtend (datetime): start and stop datetimes geo: shapely geometry. Examples: Point(lon, lat) Polygon(...) cloudcover_thres: Optional[int]=None, name_contains (list): list of substrings name_startswith (str): search for name starting with this str name_endswith (str): search for name ending with this str name_glob (str): match name with this string use_most_recent (bool): keep only the most recent processing baseline version other_attrs (list): list of other attributes to include in the output (ex: ['ContentDate', 'Footprint']) Note: This method can be decorated by cache_dataframe for storing the outputs. Example: cache_dataframe('cache_result.pickle')(cds.query)(...) """ dtstart, dtend, geo = self._format_input_query(dtstart, dtend, geo) # Add provider constraint name_contains = self._complete_name_contains(name_contains) # Define check functions checker = [] if name_contains: checker.append((check_name_contains, name_contains)) if name_startswith: checker.append((check_name_startswith, name_startswith)) if name_endswith: checker.append((check_name_endswith, name_endswith)) if name_glob: checker.append((check_name_glob, name_glob)) # Configure scene constraints for request spatial_filter = {} spatial_filter["filterType"] = "mbr" if isinstance(geo, Point): spatial_filter["lowerLeft"] = {"latitude":geo.y, "longitude":geo.x} spatial_filter["upperRight"] = spatial_filter["lowerLeft"] elif isinstance(geo, Polygon): bounds = geo.bounds spatial_filter["lowerLeft"] = {"latitude":bounds[1], "longitude":bounds[0]} spatial_filter["upperRight"] = {"latitude":bounds[3], "longitude":bounds[2]} acquisition_filter = {"start": dtstart.isoformat(), "end" : dtend.isoformat()} cloud_cover_filter = {"min" : cloudcover_thres, "max" : 100, "includeUnknown" : False} scene_filter = {"acquisitionFilter": acquisition_filter, "spatialFilter" : spatial_filter, "cloudCoverFilter" : cloud_cover_filter, "metadataFilter" : None, "seasonalFilter" : None} params = { "datasetName": self.api_collection[0], "sceneFilter": scene_filter, "maxResults": 1000, "metadataType": "full", } # Request API for each dataset url = "https://m2m.cr.usgs.gov/api/api/json/stable/scene-search" response = self.session.get(url, data=json.dumps(params), headers=self.API_key) raise_api_error(response) r = response.json() if r['data'] is None: log.error(r['errorMessage'], e=Exception) r = r['data']['results'] # Filter products response = [p for p in r if self.check_name(p['displayId'], checker)] # test if maximum number of returns is reached if len(response) >= 1000: log.error('The request led to the maximum number of results ' f'({len(response)})', e=ValueError) else: log.info(f'{len(response)} products has been found') out = [{"id": d["entityId"], "name": d["displayId"], **{k: d[k] for k in (other_attrs or ['metadata','publishDate','browse'])}} for d in response] log.info(f'{len(out)} products has been found') return Query(out)
[docs] def download_file(self, product_id, dir): p = get_pattern(product_id) self.__init__(p['Name'], get_level(product_id, p)) scene_filter = { "MetadataValue": { "filterType": 'value', "filterId": 'displayId', "value": product_id, "operand": "=", } } params = { "datasetName": self.api_collection[0], # "sceneFilter": scene_filter, "MetadataFilter": scene_filter, "maxResults": 10, "metadataType": "full", } # Request API for each dataset url = "https://m2m.cr.usgs.gov/api/api/json/stable/scene-search" response = self.session.get(url, data=json.dumps(params), headers=self.API_key) raise_api_error(response) r = response.json() target = Path(dir)/prod._id self._download(target, prod.url) return target
@interface def download(self, product: dict, dir: Path|str, if_exists='skip', uncompress: bool=True) -> Path: """ Download a product from USGS Args: product (dict): product definition with keys 'id' and 'name' dir (Path | str): Directory where to store downloaded file. uncompress (bool, optional): If True, uncompress file if needed. Defaults to True. """ target = Path(dir)/(product['name']) # Find product in dataset url = "https://m2m.cr.usgs.gov/api/api/json/stable/download-options" params = {'entityIds': product['id'], "datasetName": self.api_collection[0]} dl_opt = self.session.get(url, data=json.dumps(params), headers=self.API_key) dl_opt = dl_opt.json()['data'] # Find available acquisitions for product in dl_opt: if not product['available']: continue # Find one available product url = "https://m2m.cr.usgs.gov/api/api/json/stable/download-request" label = datetime.now().strftime("%Y%m%d_%H%M%S") # Customized label using date time downloads = [{'entityId':product['entityId'], 'productId':product['id']}] params = {'label': label, 'downloads' : downloads} dl = self.session.get(url, data=json.dumps(params), headers=self.API_key) dl = dl.json()['data'] # Collect url for download if dl['numInvalidScenes'] != 0: continue url = dl['availableDownloads'][0]['url'] filegen(0, if_exists=if_exists)(self._download)(target, url) log.info(f'Product has been downloaded at : {target}') return target log.error('No product immediately available') def _download( self, target: Path, url: str, ): """ Wrapped by filegen """ # Initialize session for download self.session.headers.update(self.API_key) # Try to request server niter = 0 response = self.session.get(url, allow_redirects=False) log.debug(f'Requesting server for {target.name}') while response.status_code in (301, 302, 303, 307) and niter < 5: if 'Location' not in response.headers: raise ValueError(f'status code : [{response.status_code}]') url = response.headers['Location'] # response = self.session.get(url, allow_redirects=False) response = self.session.get(url, verify=True, allow_redirects=True) niter += 1 # Download file log.debug('Start writing on device') filesize = int(response.headers["Content-Length"]) pbar = log.pbar(log.lvl.INFO, total=filesize, unit_scale=True, unit="B", desc='writing', unit_divisor=1024, leave=False) with open(target, 'wb') as f: for chunk in response.iter_content(chunk_size=1024): if chunk: f.write(chunk) pbar.update(1024) @interface def quicklook(self, product: dict, dir: Path|str): """ Download a quicklook to `dir` """ target = Path(dir)/(product['name'] + '.png') if not target.exists(): assets = self.metadata(product)['Landsat Product Identifier L1'] log.check(assets, f'Skipping quicklook {target.name}', e=FileNotFoundError) for b in product['browse']: url = b['browsePath'] if 'type=refl' in url: break filegen(0)(self._download)(target, url) log.info(f'Quicklook has been downloaded at : {target}') return target @interface def metadata(self, product): """ Returns the product metadata including attributes and assets """ meta = {} for m in product['metadata']: meta[m['fieldName']] = m['value'] return meta def _retrieve_collec_name(self, collection): collecs = select(self.provider_prop,('SAND_name','=',collection),['level','collec']) try: collecs = select_cell(collecs,('level','=',self.level),'collec') except AssertionError: log.error( f'Level{self.level} products are not available for {self.collection}', e=KeyError) return collecs.split(' ')