import eumdac
import requests
from pathlib import Path
from typing import Literal
from tempfile import TemporaryDirectory
from datetime import datetime
from sand.constraint import Time, Geo, Name
from sand.base import raise_api_error, RequestsError, BaseDownload
from sand.results import SandQuery, SandProduct
from sand.utils import write
from core import log
from core.table import read_xml
from core.network.auth import get_auth
from core.geo.product_name import get_pattern, get_level
from core.files import filegen, uncompress
[docs]
class DownloadEumDAC(BaseDownload):
"""
Python interface to the EuMetSat Data Access API Client (https://data.eumetsat.int/)
"""
provider = 'eumdac'
def __init__(self):
super().__init__()
def _log(self):
"""
Login to EUMETSAT Data Access API with credentials from .netrc.
"""
# Check if session is already set and set it up if not
if not hasattr(self, "session"):
self._set_session()
auth = get_auth('data.eumetsat.int')
credentials = (auth['user'], auth['password'])
self.tokens = eumdac.AccessToken(credentials)
try:
if self.tokens.expiration < datetime.now():
msg = "Tokens has expired. Please refresh on https://api.eumetsat.int/api-key/#"
log.error(msg, e=RequestsError)
except requests.exceptions.HTTPError:
log.error("Invalid Credentials", e=RequestsError)
self.datastore = eumdac.DataStore(self.tokens)
log.debug(f'Log to API (https://data.eumetsat.int/)')
def _query(
self,
collection_sand: str = None,
level: Literal[1,2,3] = 1,
time: Time = None,
geo: Geo = None,
name: Name = None,
cloudcover_thres: int = None,
api_collection: list[str] = None
):
"""
Product query on Eumetsat datahub
"""
self._login()
# Retrieve api collections based on SAND collections
if api_collection is None:
name_constraint = self._load_sand_collection_properties(collection_sand, level)
api_collection = self.api_collection[0]
time = self._format_time(collection_sand, time)
# Define or complement constraint on naming
if name:
name.add_contains(name_constraint)
else:
name = Name(contains=name_constraint)
product = []
for collec in self.api_collection:
# Query EumDAC API
log.debug(f'Query EumDAC API for collection {collec}')
self.selected_collection = self.datastore.get_collection(collec)
try:
prod = list(self.selected_collection.search(
geo = geo.to_wkt() if geo else None,
dtstart = time.start,
dtend = time.end
))
except eumdac.collection.CollectionError:
continue
# Filter products
product += [p for p in prod if name.apply(str(p))]
if cloudcover_thres:
log.warning("'cloudcover_thres' is not used with eumdac")
out = [
SandProduct(product_id=str(d), date=d.sensing_start.isoformat(), metadata=d)
for d in product
]
log.info(f'{len(out)} products has been found')
return SandQuery(out)
def _dl(self, product: str, dir: Path, if_exists='skip') -> Path:
"""
Download a product from EUMETSAT Data Store.
"""
self._login()
data = self.datastore.get_product(
product_id=product.product_id,
collection_id=product.metadata.collection
)
target = Path(dir)/product.product_id
filegen(if_exists=if_exists)(self._download)(target, data, '.zip')
log.info(f'Product has been downloaded at : {target}')
return target
def _dl_file(self, product_id: str, dir: Path | str, api_collection: str = None) -> Path:
"""
Download a specific product from EumDAC by its product identifier
"""
self._login()
# Retrieve api collections based on SAND collections
if api_collection is None:
p = get_pattern(product_id)
collection_sand, level = p['Name'], get_level(product_id, p)
self._load_sand_collection_properties(collection_sand, level)
else:
self.api_collection = api_collection
self.name_contains = []
for c in self.api_collection:
collec = self.datastore.get_collection(c)
prod = self.datastore.get_product(collec, product_id)
target = Path(dir)/prod._id
filegen(if_exists='skip')(self._download)(target, prod, '.zip')
log.info(f'Product has been downloaded at : {target}')
return target
def _download(
self,
target: Path,
data,
compression_ext: str = None
) -> None:
"""
Internal method to download a file from EUMETSAT Data Store.
This method is wrapped by filegen decorator for file management.
Downloads are done in chunks to handle large files efficiently.
Args:
target (Path): Path where the file should be saved
data: EUMETSAT data object containing the file to download
compression_ext (str, optional): Compression format of the file to download
(e.g. '.zip'). If not None, file will be uncompress after downloading
Raises:
OSError: If file writing fails
eumdac.collection.CollectionError: If data access fails
"""
# Compression file path
dl_target = Path(str(target)+'.zip') if compression_ext else target
log.debug(f"Downloading {data._id} ...")
with data.open() as fsrc, open(dl_target, mode='wb') as fdst:
while True:
chunk = fsrc.read(1024)
if not chunk: break
fdst.write(chunk)
# Uncompress archive
if compression_ext:
log.debug('Uncompress archive')
assert target == uncompress(dl_target, target.parent)
dl_target.unlink()
def _qkl(self, product: dict, dir: Path|str) -> Path:
"""
Download a quicklook preview image for a product.
Args:
product (dict): Product metadata from query results, must contain:
- name: Product name
- quicklook_url: List of preview image URLs
dir (Path|str): Directory where to save the quicklook image
Returns:
Path: Path to the downloaded quicklook image
Raises:
RequestsError: If quicklook download fails
ValueError: If quicklook is not available
OSError: If file operations fail
"""
self._login()
quicklook_url = product.metadata.metadata['properties']['links'].get('previews')
if quicklook_url is None:
log.error('No download link for quicklook')
url = quicklook_url[0]['href']
target = Path(dir)/(url.split('/')[-2].split('.')[0] + '.jpeg')
def _download_qkl(target, url):# Initialize session for download
self.session.headers.update({'Authorization': f'Bearer {self.tokens}'})
# Try to request server
niter = 0
response = self.session.get(url, allow_redirects=False)
log.debug(f'Requesting server for {target.name}')
while response.status_code in (301, 302, 303, 307) and niter < 5:
log.debug(f'Download content [Try {niter+1}/5]')
if 'Location' not in response.headers:
raise ValueError(f'status code : [{response.status_code}]')
url = response.headers['Location']
response = self.session.get(url, verify=True, allow_redirects=True)
niter += 1
raise_api_error(response)
# Download file
write(response, target)
if not target.exists():
filegen(0)(_download_qkl)(target, url)
log.info(f'Quicklook has been downloaded at : {target}')
return target
def _metadata(self, product: dict) -> dict:
"""
Retrieve detailed metadata for a product from EUMETSAT.
Args:
product (dict): Product metadata from query results, must contain:
- meta_url: List containing metadata URL in XML format
Returns:
dict: Parsed XML metadata containing detailed product information
"""
self._login()
meta_url = product.metadata.metadata['properties']['links']['alternates']
req = (meta_url[0]['href'])
meta = requests.get(req).text
assert len(meta) > 0
with TemporaryDirectory() as tmpdir:
with open(Path(tmpdir)/'meta.xml', 'w') as f:
f.writelines(meta.split('\n'))
return read_xml(Path(tmpdir)/'meta.xml')