from datetime import datetime, date, time
from functools import reduce
from typing import Literal
from pathlib import Path
from sand.constraint import Time, Geo, Name
from sand.results import Collection, SandQuery
from sand.utils import end_of_day
from core.table import read_csv
from core import log
import requests
import ssl
[docs]
class BaseDownload:
"""
Base class for satellite data providers API access and download functionality.
This class provides a common interface for interacting with different satellite data
providers. It handles authentication, querying products, downloading data and metadata
retrieval. Child classes must implement the abstract methods for specific provider APIs.
Attributes:
session (requests.Session): HTTP session for making API requests
ssl_ctx (ssl.SSLContext): SSL context for secure connections
available_collection (list): List of available collections from the provider
api_collection (str): Name of the collection in provider's API format
name_contains (list): List of naming constraints for products
"""
# Main functions to implement for each provider
def _login(self):
"""
Login to API server with credentials stored in .netrc file.
"""
self._log()
[docs]
def query(
self,
collection_sand: str = None,
level: Literal[1,2,3] = 1,
time: Time = None,
geo: Geo = None,
name: Name = None,
cloudcover_thres: int = None,
api_collection: list[str] = None
) -> SandQuery:
"""
Query products from the API server based on temporal and spatial constraints.
Args:
collection_sand (str): SAND collection name ('SENTINEL-2-MSI', 'SENTINEL-3-OLCI', etc.)
level (int): Processing level (1, 2, or 3)
time (Time, optional): Time constraint.
geo (Geo, optional): Spatial constraint.
name (Name, optional): Constraints over product name.
cloudcover_thres (int): Upper bound for cloud cover in percentage,
api_collection (list[str]): Name of deserved collection in API standard
Returns:
SandQuery: Query results containing matching products
"""
return self._query(
collection_sand=collection_sand,
level=level,
time=time,
geo=geo,
name=name,
cloudcover_thres=cloudcover_thres,
api_collection=api_collection
)
[docs]
def download(self, product: dict, dir: Path|str, if_exists: str='skip') -> Path:
"""
Download a product from the API server.
Args:
product (dict): Product metadata obtained from query results
dir (Path|str): Directory where to save the downloaded product
Returns:
Path: Path to the downloaded product file
"""
return self._dl(product=product, dir=dir, if_exists=if_exists)
[docs]
def quicklook(self, product: dict, dir: Path|str) -> Path:
"""
Download a quicklook preview image for a product.
Args:
product (dict): Product metadata obtained from query results
dir (Path|str): Directory where to save the quicklook image
Returns:
Path: Path to the downloaded quicklook image
"""
return self._qkl(product=product, dir=dir)
[docs]
def download_file(self, product_id: str, dir: Path | str, api_collection: str = None) -> Path:
"""
Download a specific product from API server by its product identifier
Args:
product_id (str): The identifier of the product to download
(ex: S2A_MSIL1C_20190305T050701_N0207_R019_T44QLH_20190305T103028)
dir (Path | str): Directory where to store the downloaded file
api_collection (str, optional): Name of the API collection to query.
If None, will determine from product_id pattern.
Returns:
Path: Path to the downloaded file
"""
return self._dl_file(product_id=product_id, dir=dir, api_collection=api_collection)
# Visible functions already implemented
[docs]
def download_all(self, products, dir: Path|str, if_exists: str='skip',
parallelized: bool = False) -> list[Path]:
"""
Download all products from API server resulting from a query.
Args:
products (list[dict]): List of product metadata from query results
dir (Path|str): Directory where to save downloaded products
if_exists (str, optional): Action to take if product exists:
- 'skip': Skip download if file exists (default)
- 'overwrite': Replace existing file
- 'raise': Raise an error if file exists
parallelized (bool, optional): If True, downloads products in parallel
using multiple threads. Default is False.
Returns:
list[Path]: List of paths to downloaded product files
"""
if parallelized:
from multiprocessing import Pool
from functools import partial
workers = min(1, len(products))
process = partial(self.download, dir=dir, if_exists=if_exists)
with Pool(workers) as pool:
tmp = pool.map(process, [p[1] for p in products.iterrows()])
# tmp = pool.map(process, products)
return tmp
out = []
for i in range(len(products)):
out.append(self.download(products.iloc[i], dir, if_exists))
return out
[docs]
def get_available_collection(self) -> dict:
"""
Return every downloadable collections for selected provider
"""
# Get list of available collections if not already done
if not hasattr(self, 'available_collection'):
self._load_provider_properties()
# Join with global information contained
current_dir = Path(__file__).parent
sensor = read_csv(current_dir/'sensors.csv')
return Collection(self.available_collection , sensor)
# Private functions
def _load_provider_properties(self):
"""
Load properties of the provider (collections, levels, etc)
"""
provider_file = Path(__file__).parent/'collections'/f'{self.provider}.csv'
log.check(provider_file.exists(), 'Provider properties file is missing')
provider_prop = read_csv(provider_file)
self.available_collection = list(provider_prop['SAND_name'])
return provider_prop
def _load_sand_collection_properties(self, collection: str, level: int):
"""
Retrieve properties for a specific SAND collection
"""
props = self._load_provider_properties()
self._get_collec_properties(collection, level, props)
self.api_collection = self._retrieve_api_collec()
return self._set_name_constraint()
def _set_session(self):
self.session = requests.Session()
self.ssl_ctx = get_ssl_context()
def _get_collec_properties(self, collection, level, properties):
"""
Returns SAND collection properties
"""
# Find SAND collection name
log.check(collection in self.available_collection,
f"Collection '{collection}' does not exist for this downloader,"
" please use get_available_collection methods", e=ValueError)
collecs = properties[properties['SAND_name']==collection]
# Try to find specific level
try:
self.sand_props = collecs[collecs['level']==level]
except AssertionError:
log.error(f'Level{level} products are not available for {collection}',
e=KeyError)
log.check(len(self.sand_props)>0, 'It is not possible to download '
f'level-{level} product for {collection}', e=ReferenceError)
def _retrieve_api_collec(self):
"""
Returns collection names used by API
"""
return self.sand_props['collec'].values[0].split(' ')
def _set_name_constraint(self):
"""
Function to add name constraint to list of user constraint
"""
to_add = self.sand_props['contains'].values[0]
return [] if str(to_add) == 'nan' else to_add.split(' ')
def _check_name(self, name, check_funcs) -> bool:
return all(c[0](name, c[1]) for c in check_funcs)
def _format_time(self, collection: str, t: Time) -> Time:
"""
Function to check and format main arguments of query method
Args:
t (Time, optional): Time constraint.
"""
# Check if void
if t is None:
return t
# Open reference file
ref_file = Path(__file__).parent/'sensors.csv'
ref = read_csv(ref_file)
ref = ref[ref['Name'] == collection]
# Check format
if t.start is None:
t.start = datetime.fromisoformat(ref['launch_date'].values[0])
if isinstance(t.start, date):
t.start = datetime.combine(t.start, time(0))
if t.end is None:
t.end = datetime.now()
elif isinstance(t.end, date):
t.end = end_of_day(datetime.combine(t.end, time(0)))
assert isinstance(t.start, datetime) and isinstance(t.end, datetime)
# Check time
launch, end = ref['launch_date'].values[0], ref['end_date'].values[0]
assert t.start.date() >= date.fromisoformat(launch)
if end != 'x':
assert t.end.date() < date.fromisoformat(end)
return t
def __del__(self):
if hasattr(self, 'session'):
self.session.close()
[docs]
def raise_api_error(response: dict) -> int:
"""
Check HTTP response status code and raise appropriate error if needed.
Args:
response (dict): HTTP response object with status_code attribute
Returns:
int: Status code if response is successful (status < 300)
"""
log.check(hasattr(response,'status_code'), 'No status code in response', e=Exception)
ref = read_csv(Path(__file__).parent/'html_status_code.csv')
msg = '[{}] {}'
status = response.status_code
line = ref[ref['value']==status]
if status > 300:
log.error(msg.format(line['tag'].values[0], line['explain'].values[0]),
e=RequestsError)
return status
[docs]
def check_too_many_matches(response: dict,
returned_tag: str|list[str],
hit_tag: str|list[str]) -> None:
"""
Check if an API query returned more matches than it can return in one response.
Args:
response (dict): API response containing result counts
returned_tag (str|list[str]): Path to the number of returned results in response
hit_tag (str|list[str]): Path to the total number of matches in response
"""
returned = reduce(lambda x,k: x[k], returned_tag, response)
matches = reduce(lambda x,k: x[k], hit_tag, response)
log.check(returned == matches,
f"The query returned too many matches ({matches}) "
f"and exceeded the limit ({returned}) "
"set by the provider.", e=RequestsError)
[docs]
def get_ssl_context() -> ssl.SSLContext:
"""
Returns an SSL context based on ``ssl_verify`` argument.
:param ssl_verify: :attr:`~eodag.config.PluginConfig.ssl_verify` parameter
:returns: An SSL context object.
"""
ctx = ssl.create_default_context()
ctx.check_hostname = True
ctx.verify_mode = ssl.CERT_REQUIRED
return ctx
[docs]
class RequestsError(Exception): pass