core package

Subpackages


core.auth

core.auth.get_auth(name: str) dict[source]

Returns a dictionary with credentials, using .netrc

name is the identifier (= machine in .netrc). This allows for several accounts on a single machine. The url is returned as account

core.auth.get_auth_dhus(name)[source]

core.cache

core.cache.cache_dataframe(cache_file: Path | str, inputs: Literal['check', 'store', 'ignore'] = 'check')[source]

A decorator that caches the result of a function, which is a pandas DataFrame

inputs:

“check” [default]: store and check the function inputs “store”: store but don’t check the function inputs “ignore”: ignore the function inputs

core.cache.cache_dataset(cache_file: Path | str, attrs=None, **kwargs)[source]

A decorator that caches the dataset returned by a function in a netcdf file

The attribute dictionary attrs is stored in the file, and verified upon reading.

Other kwargs (ex: chunks) are passed to xr.open_dataset

core.cache.cache_json(cache_file: Path | str, inputs: Literal['check', 'store', 'ignore'] = 'check')[source]

A decorator that caches the result of a function to a json file.

inputs:

“check” [default]: store and check the function inputs “store”: store but don’t check the function inputs “ignore”: ignore the function inputs

core.cache.cache_pickle(cache_file: ~pathlib._local.Path | str, inputs: ~typing.Literal['check', 'store', 'ignore'] = 'check', check_out=<function <lambda>>)[source]

A decorator that caches the result of a function to a pickle file.

inputs:

“check” [default]: store and check the function inputs “store”: store but don’t check the function inputs “ignore”: ignore the function inputs

core.cache.cachefunc(cache_file: Path | str, reader: Callable, writer: Callable, check_in: Callable | None = None, check_out: Callable | None = None, fg_kwargs=None)[source]

A decorator that caches the return of a function in a file, with customizable format

reader: a function that reads inputs/output from the cache file

reader(filename) -> {‘output’: …, ‘input’: …}

writer: a function that writes the inputs/output to the cache file

writer(filename, output, input_args, input_kwargs)

check_in: a custom function to test the equality of the inputs

checker(obj1, obj2) -> bool (defaults to None -> no checking)

check_out: a custom function to test the equality of the outputs

checker(obj1, obj2) -> bool (defaults to ==)

fg_kwargs: kwargs passed to filegen (ex: lock_timeout=-1)

core.condor

core.condor.get_htcondor_client(maximum_jobs=100, **kwargs)[source]

Get a client for HTCondor cluster

core.condor.get_htcondor_logdir(logdir: Path | str | None) Path[source]

A default log directory for condor

core.condor.get_htcondor_params(cores=1, memory='1GB', disk='1GB', death_timeout='600', logdir: Path | str | None = None) Dict[source]

Parameters passed to HTCondorCluster

core.condor.get_htcondor_runner(maximum_jobs: int = 100, **kwargs)[source]

Initialize a prefect DaskTaskRunner based on HTCondorCluster

core.config

class core.config.Config(**kwargs)[source]

Bases: object

copy(**kwargs)
get(**kwargs)
get_subsection(**kwargs)
ingest(**kwargs)
new_from_dict(**kwargs)
new_from_toml(**kwargs)

core.conftest

core.dates

core.dates.closest(date, h)[source]

Round a date to the closest hour, by steps of h hours

core.dates.date_range(date_start: date, date_end: date) list[date][source]

Returns a list of days starting from date_start, up to date_end included

core.dates.round_date(date, h)[source]

Round a date to the bracketing hours, by steps of h hours

core.dates.time_range(start: datetime, end: datetime, step: timedelta) list[datetime][source]

Returns a list of datetime starting from start, up to end included

core.deptree

class core.deptree.Task[source]

Bases: object

Base class for tasks, which defines its dependencies and can be generated with the gen function.

Should typically hold an attribute output for storing the task output, if any (optional).

Check the sample function which gives an example of Task definition.

dependencies() list[source]

Returns a list of dependencies, each being a Task.

done() bool[source]

Whether the task is done

run()[source]

Implements the actual execution code

core.deptree.gen(task: Task, mode: Literal['sync', 'executor', 'prefect'] = 'sync', **kwargs)[source]

Run a task and its dependencies

Parameters:

mode (str) –

  • ‘sync’: sequential

  • ’executor’: using executors

  • ’prefect’: using prefect as a worflow manager

core.deptree.gen_executor(task: Task, executors: dict | None = None, default_executor=None, graph_executor=None, verbose: bool = False)[source]

Recursively generate dependencies of task, then call its run method in a custom executor.

Parameters:
  • task – a Task object

  • executors – dictionary of executors

  • default_executor – the executor to use for all tasks not included in executors. Defaults to ThreadPoolExecutor

  • graph_executor – an instance of ThreadPoolExecutor used for traversing the dependency tree with the gen function. It is useful to override the default ThreadPoolExecutor(512) if the tree is too large.

Example

from concurrent.futures import ThreadPoolExecutor from distributed import Client # dask executor

gen(task,

executors={‘Level1’ : ThreadPoolExecutor(2)}, default_executor=Client() # dask executor

# Note: can also use dask_jobqueue to instantiate # an executor backed by a job queuing system like # HTCondor

})

For details on how to implement task, please check function core.deptree.sample.

core.deptree.gen_sync(task: Task, verbose: bool = False)[source]

Run a Task in sequential mode

Parameters:

task (Task) – The task to generate

core.deptree.sample()[source]

Generate a sample Product for demonstration purposes

core.deptree_prefect

core.download

core.download.download_nextcloud(product_name: str, output_dir: Path | str, input_dir: Path | str = '', verbose: bool = True, if_exists: Literal['skip', 'overwrite', 'backup', 'error'] = 'skip')[source]

Function for downloading data from Nextcloud contained in the data/eoread directory

Parameters:
  • product_name (str) – Name of the product with the extension

  • output_dir (Path | str) – Directory where to store downloaded data

  • input_dir (Path | str, optional) – Sub repository in which the product are stored. Defaults to ‘’.

Returns:

Output path of the downloaded data

Return type:

Path

core.download.download_url(url: str, dirname: Path, wget_opts='', check_function=None, verbose=True, if_exists: Literal['skip', 'overwrite', 'backup', 'error'] = 'skip', **kwargs) Path[source]

Download url to dirname with wget

Options wget_opts are added to wget Uses a filegen wrapper Other kwargs are passed to filegen (lock_timeout, tmpdir, if_exists)

Returns the path to the downloaded file

core.env

core.env.getdir(envvar: str, default: Path | None = None, create: bool | None = None) Path[source]

Returns the value of environment variable envvar, assumed to represent a directory path. If this variable is not defined, returns default.

The environment variable can be defined in the users .bashrc, or in a file .env in the current working directory.

Parameters:
  • envvar – the input environment variable

  • default

    the default path, if the environment variable is not defined default values are predefined for the following variables:

    • DIR_DATA : “data” (in current working directory)

    • DIR_STATIC : DIR_DATA/”static”

    • DIR_SAMPLES : DIR_DATA/”sample_products”

    • DIR_ANCILLARY : DIR_DATA/”ancillary”

  • create – whether to silently create the directory if it does not exist. If not provided this parameter defaults to False except for DIR_STATIC, DIR_SAMPLES and DIR_ANCILLARY.

Returns:

the path to the directory.

core.env.getvar(envvar: str, default=None)[source]

Returns the value of environment variable envvar. If this variable is not defined, returns default.

The environment variable can be defined in the users .bashrc, or in a file .env in the current working directory.

Parameters:
  • envvar – the input environment variable

  • default – the default return, if the environment variable is not defined

Returns:

the requested environment variable or the default if the var is not defined and a default has been provided.

core.fileutils

class core.fileutils.PersistentList(filename, timeout=0, concurrent=True)[source]

Bases: list

A list that saves its content in filename on each modification. The extension must be .json.

concurrent: whether to activate concurrent mode. In this mode, the

file is also read before each access.

class core.fileutils.filegen(arg: int | str = 0, tmpdir: Path | None = None, lock_timeout: int = 0, if_exists: Literal['skip', 'overwrite', 'backup', 'error'] = 'error', uncompress: str | None = None, verbose: bool = True)[source]

Bases: object

core.fileutils.get_git_commit()[source]
core.fileutils.mdir(directory: Path | str, mdir_filename: str = 'mdir.json', strict: bool = False, create: bool = True, **kwargs) Path[source]

Create or access a managed directory with path directory Returns the directory path, so that it can be used in directories definition:

dir_data = mdir(‘/path/to/data/’)

tag it with a file mdir.json, containing:
  • The creation date

  • The last access date

  • The python file and module that was run during access

  • The username

  • The current git commit if available

  • Any other kwargs, such as:
    • project

    • version

    • description

    • etc

mdir_filename: default=’mdir.json’

strict: boolean

False: metadata is updated True: metadata is checked or added (default)

(remove file content to override)

create: whether directory is automatically created (default True)

core.fileutils.safe_move(src, dst, makedirs=True)[source]

Move src file to dst

if makedirs: create directory if necessary

core.fileutils.skip(filename: Path, if_exists: str = 'skip')[source]

Utility function to check whether to skip an existing file

if_exists:

‘skip’: skip the existing file ‘error’: raise an error on existing file ‘overwrite’: overwrite existing file ‘backup’: move existing file to a backup ‘.1’, ‘.2’…

core.fileutils.temporary_copy(src: Path, enable: bool = True, **kwargs)[source]

Context manager to copy a file/folder to a temporary directory.

Parameters:
  • src (Path) – Path to the source file/folder to copy.

  • enable (bool) – whether to enable the copy, otherwise returns the input

  • TemporaryDirectory (Other **kwargs are passed to) –

Yields:

Path – Path to the temporary file/folder

core.ftp

core.ftp.ftp_create_dir(ftp: FTP, path_server: Path | str)[source]
core.ftp.ftp_download(ftp: FTP, file_local: Path, dir_server: str | Path, verbose=True)[source]

Downloads file_local on ftp, from server directory dir_server

The file name on the server is determined by file_local.name

Refs:

https://stackoverflow.com/questions/19692739/ https://stackoverflow.com/questions/73534659/

core.ftp.ftp_file_exists(ftp: FTP, path_server: Path | str) bool[source]
core.ftp.ftp_list(ftp: FTP, dir_server: str, pattern: str = '*')[source]

Returns the list of fles matching pattern on dir_server

core.ftp.ftp_upload(ftp: FTP, file_local: Path, dir_server: str, if_exists='skip', blocksize=8192, verbose=True)[source]

FTP upload function

  • Use temporary files

  • Create remote directories

  • if_exists:

    ‘skip’: skip the existing file ‘error’: raise an error on existing file ‘overwrite’: overwrite existing file

core.ftp.get_auth_ftp(name) Dict[source]

get netrc credentials for use with pyfilesystem’s FTPFS or ftplib’s FTP

Ex: FTP(**get_auth_ftp(<name>))

core.ftp.get_url_ftpfs(name)[source]

get netrc credentials for use with pyfilesystem’s fs.open_fs

Ex: fs.open_fs(get_url_ftpfs(<name>))

core.fuzzy

core.fuzzy.search(keywords: list[str] | str, list_of_string, threshold=0.71, nmax=None)[source]

core.interpolate

class core.interpolate.Index(values)[source]

Bases: object

get_indexer(coords)[source]
class core.interpolate.Linear(values: DataArray, bounds: Literal['error', 'nan', 'clip', 'cycle'] = 'error', spacing: Literal['regular', 'irregular', 'auto'] | Callable[[float], float] = 'auto', period: float | None = None)[source]

Bases: object

get_indexer(coords: DataArray)[source]
class core.interpolate.Linear_Indexer(coords: ndarray[tuple[Any, ...], dtype[_ScalarT]], bounds: str, spacing, period=None)[source]

Bases: object

class core.interpolate.Locator(coords: ndarray[tuple[Any, ...], dtype[_ScalarT]], bounds: str)[source]

Bases: object

The purpose of these classes is to locate values in coordinate axes.

handle_oob(values: ndarray)[source]

handle out of bound values

Note: when bounds == “cycle”, does nothing

locate_index()[source]
locate_index_weight(values)[source]

Find indices and dist of values for linear and spline interpolation in self.coords

Returns a list of indices, dist (float 0 to 1) and oob

class core.interpolate.Locator_Regular(coords, bounds: str, inversion_func: Callable | None = None, period=None)[source]

Bases: Locator

locate_index()[source]
locate_index_weight(values)[source]

Find indices and dist of values for linear and spline interpolation in self.coords

Returns a list of indices, dist (float 0 to 1) and oob

class core.interpolate.Nearest(values: DataArray, tolerance: float | None = None, spacing: Literal['auto'] | Callable[[float], float] = 'auto')[source]

Bases: object

get_indexer(coords: DataArray)[source]
class core.interpolate.Nearest_Indexer(coords: ndarray[tuple[Any, ...], dtype[_ScalarT]], tolerance: float | None, spacing: str | Callable = 'auto')[source]

Bases: object

class core.interpolate.Spline(values, tension=0.5, bounds: Literal['error', 'nan', 'clip'] = 'error', spacing: Literal['regular', 'irregular', 'auto'] | Callable[[float], float] = 'auto')[source]

Bases: object

get_indexer(coords: DataArray)[source]
class core.interpolate.Spline_Indexer(coords: ndarray[tuple[Any, ...], dtype[_ScalarT]], bounds: str, spacing, tension: float)[source]

Bases: object

core.interpolate.broadcast_numpy(ds: Dataset) Dict[source]

Returns all data variables in ds as numpy arrays broadcastable against each other (with new single-element dimensions)

This requires the input to be broadcasted to common dimensions.

core.interpolate.broadcast_shapes(ds: Dataset, dims) Dict[source]

For each data variable in ds, returns the shape for broadcasting in the dimensions defined by dims

core.interpolate.create_locator(coords, bounds: str, spacing, period: float | None = None) Locator[source]

Locator factory

The purpose of this method is to instantiate the appropriate “Locator” class.

The args are passed from the indexers.

core.interpolate.determine_output_dimensions(data, ds, dims_sel_interp)[source]

determine output dimensions based on numpy’s advanced indexing rules

core.interpolate.interp(da: DataArray, **kwargs)[source]

Interpolate/select a DataArray onto new coordinates.

This function is similar to xr.interp and xr.sel, but:
  • Supports dask-based coordinates inputs without triggering immediate computation as is done by xr.interp

  • Supports combinations of selection and interpolation. This is faster and more memory efficient than performing independently the selection and interpolation.

  • Supports pointwise indexing/interpolation using dask arrays (see https://docs.xarray.dev/en/latest/user-guide/indexing.html#more-advanced-indexing)

  • Supports per-dimension options (nearest neighbour selection, linear/spline interpolation, out-of-bounds behaviour, cyclic dimensions…)

Parameters:
  • da (xr.DataArray) – The input DataArray

  • **kwargs

    definition of the selection/interpolation coordinates for each dimension, using the following classes:

    • Linear: linear interpolation (like xr.DataArray.interp)

    • Nearest: nearest neighbour selection (like xr.DataArray.sel)

    • Index: integer index selection (like xr.DataArray.isel)

    These classes store the coordinate data in their .values attribute and have a .get_indexer method which returns an indexer for the passed coordinates.

Example

>>> interp(
...     data,  # input DataArray with dimensions (a, b, c)
...     a = Linear(           # perform linear interpolation along dimension `a`
...          a_values,        # `a_values` is a DataArray with dimension (x, y);
...          bounds='clip'),  # clip out-of-bounds values to the axis min/max.
...     b = Nearest(b_values), # perform nearest neighbour selection along
...                            # dimension `b`; `b_values` is a DataArray
...                            # with dimension (x, y)
... ) # returns a DataArray with dimensions (x, y, c)
No interpolation or selection is performed along dimension `c` thus it is
left as-is.
Returns:

DataArray on the new coordinates.

Return type:

xr.DataArray

core.interpolate.interp_block(ds: Dataset, da: DataArray, out_dims, indexers: Dict) DataArray[source]

This function is called by map_blocks in function interp, and performs the indexing and interpolation at the numpy level.

It relies on the indexers to perform index searching and weight calculation, and performs a linear combination of the sub-arrays.

core.interpolate.product_dict(**kwargs) Iterable[Dict][source]

Cartesian product of a dictionary of lists

core.lock

core.lock.LockFile(locked_file: Path, ext='.lock', interval=1, timeout=0, create_dir=True)[source]

Create a blocking context with a lock file

timeout: timeout in seconds, waiting to the lock to be released.

If negative, disable lock files entirely.

interval: interval in seconds

Example

with LockFile(‘/dir/to/file.txt’):

# create a file ‘/dir/to/file.txt.lock’ including a filesystem lock # the context will enter once the lock is released

core.log

Module Shadowed by API function of same name log

usage:

from core import log

core.log.check(condition, *args, e: Exception = <class 'AssertionError'>)[source]

log assertion with level ERROR

class core.log.config[source]

Bases: object

show_color = True
core.log.debug(*args, **kwargs)[source]

log with defaul level DEBUG

core.log.disp(*args, **kwargs)[source]
core.log.error(*args, e: Exception = <class 'RuntimeError'>, **kwargs)[source]

log with defaul level ERROR will raise e if passed

core.log.info(*args, **kwargs)[source]

log with default level INFO

core.log.log(lvl: lvl, *args, **kwargs)[source]

log with specified level

class core.log.lvl(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

DEBUG = 1
ERROR = 4
INFO = 2
PROMPT = 5
WARNING = 3
core.log.pbar(lvl: lvl = lvl.INFO, iterable=None, desc=None, total=None, leave=True, ncols=None, mininterval=0.1, maxinterval=10.0, miniters=None, ascii=None, disable=False, unit='it', unit_scale=False, dynamic_ncols=False, smoothing=0.3, initial=0, file=None, position=None, postfix=None, unit_divisor=1000, write_bytes=False, lock_args=None, nrows=None, colour=None, delay=0, gui=False, **kwargs)[source]

log a progress bar such as tqdm Args: All arguments are those from tqdm (cf. https://tqdm.github.io/docs/tqdm/)

core.log.prompt(*args, **kwargs)[source]

prompt user with log format

class core.log.rgb[source]

Bases: object

blue = <core.log._color object>
bold = <core.log._color object>
cyan = <core.log._color object>
default = <core.log._color object>
gray = <core.log._color object>
green = <core.log._color object>
orange = <core.log._color object>
purple = <core.log._color object>
red = <core.log._color object>
underline = <core.log._color object>
core.log.set_format(fmt: Literal['%level', '%icon', '%time', '%namespace', '%pid'])[source]

valid keys:

core.log.set_lvl(lvl: lvl)[source]
core.log.silence(module, lvl_and_below: lvl = lvl.ERROR)[source]
core.log.stderr(*args)[source]
core.log.warning(*args, w: Warning = None, **kwargs)[source]

log with defaul level WARNING

core.monitor

class core.monitor.Chrono(name='chrono object', unit='m')[source]

Bases: object

  • name: str

  • unit “m” | “s” | “ms” | “us”

display(unit: Literal['m', 's', 'ms', 'us'] = None)[source]
elapsed() timedelta[source]
laps() timedelta[source]
pause()[source]
reset() timedelta[source]
restart()[source]
stop() timedelta[source]
class core.monitor.Monitor(name: str = 'monitor object', time: Chrono = None, ram: RAM = None)[source]

Bases: object

Meta-structure to monitor some variables in a script

display()[source]
elapsed()[source]
laps()[source]
pause()[source]
reset()[source]
restart()[source]
stop()[source]
class core.monitor.RAM(name='ram object')[source]

Bases: object

  • name: str

display()[source]
elapsed()[source]
laps()[source]
pause()[source]
reset()[source]
restart()[source]
stop()[source]

core.naming

core.naming.add_var(ds, var, attrs: _name)[source]

Add a new variable to a xarray.Dataset with it attributes

Parameters:
  • ds (xr.Dataset) – Dataset to complete

  • var (xr.DataArray) – Array to add

  • attrs (_name) – Attributes to join to the new variable

class core.naming.names[source]

Bases: object

F0 = <core.naming._name object>
bands = <core.naming._name object>
bands_ir = <core.naming._name object>
bt = <core.naming._name object>
columns = <core.naming._name object>
crs = <core.naming._name object>
cwav = <core.naming._name object>
datetime = <core.naming._name object>
description = <core.naming._name object>
flags = <core.naming._name object>
input_directory = <core.naming._name object>
lat = <core.naming._name object>
lon = <core.naming._name object>
ltoa = <core.naming._name object>
ltoa_ir = <core.naming._name object>
platform = <core.naming._name object>
product_name = <core.naming._name object>
raa = <core.naming._name object>
resolution = <core.naming._name object>
rho_w = <core.naming._name object>
rows = <core.naming._name object>
rtoa = <core.naming._name object>
saa = <core.naming._name object>
sensor = <core.naming._name object>
shortname = <core.naming._name object>
sza = <core.naming._name object>
vaa = <core.naming._name object>
vza = <core.naming._name object>
wav = <core.naming._name object>
wav_ir = <core.naming._name object>

core.project

class core.project.Config(**kwargs)[source]

Bases: object

get(key: str, section: str = None, *, default=None) Any[source]

Returns the key if present in the config file. If not present and no default provided: raises an error if default is provided (different than None): return default instead

auto cast values to Path objects if its key starts with ‘dir_’ and its type is str

ingest(**kwargs)

core.pseudoinverse

core.pseudoinverse.pseudoinverse(A)[source]

Calculate the pseudoinverse of array A over the last 2 axes (broadcasting the first axes) A* = ((A’.A)^(-1)).A’ where X’ is the transpose of X and X^-1 is the inverse of X

shapes: A: […,i,j]

A*: […,j,i]

core.pseudoinverse.weighted_pseudoinverse(A, W)[source]

Calculate the weighted pseudoinverse of array A over the last 2 axes (broadcasting the first axes) W is the weight matrix (diagonal) A* = ((A’.W.A)^(-1)).A’.W

core.pytest_utils

core.pytest_utils.parametrize_dict(d) dict[source]

Allow passing args and ids to pytest.mark.parametrize with a dict syntax

@pytest.mark.parametrize(‘a’, **parametrize_dict({

‘first case’: 1, ‘second case’: 2,

})) # equivalent to parametrize(‘a’, [1, 2], ids=[‘first_case’, ‘second case’]) def test(a):

pass

core.save

core.save.clean_attributes(obj: Dataset | DataArray)[source]

Remove attributes that can not be written to netcdf

core.save.to_netcdf(ds: Dataset, filename: Path, *, engine: str = 'h5netcdf', zlib: bool = True, complevel: int = 5, verbose: bool = True, tmpdir: Path | None = None, lock_timeout: int = 0, if_exists: Literal['skip', 'overwrite', 'backup', 'error'] = 'error', clean_attrs: bool = True, **kwargs)[source]

Write an xarray Dataset ds using .to_netcdf with several additional features:

  • Use file compression

  • Wrapped by filegen: use temporary files, detect existing output files…

Parameters:
  • ds (xr.Dataset) – Input dataset

  • filename (Path) – Output file path

  • engine (str, optional) – Engine driver to use. Defaults to ‘h5netcdf’.

  • zlib (bool, optional) – activate zlib. Defaults to True.

  • complevel (int, optional) – Compression level. Defaults to 5.

  • verbose (bool, optional) – Verbosity. Defaults to True.

  • tmpdir (Path, optional) – use a given temporary directory. Defaults to None.

  • lock_timeout (int) – timeout in case of existing lock file

  • if_exists (str, optional) – what to do if output file exists. Defaults to ‘error’.

  • clean_attrs – whether to remove attributes in the xarray object, that cannot be written to netcdf.

  • ds.to_netcdf (other kwargs are passed to)

core.table

core.table.read_csv(path: str | Path, **kwargs) DataFrame[source]

Function to read csv file without taking care of tabulation and whitespaces

Parameters:
  • path (str | Path) – Path of csv file

  • kwargs – Keyword arguments of read_csv function from pandas

Returns:

Output table in pandas DataFrame format

Return type:

DataFrame

core.tools

Various utility functions for modifying xarray object

class core.tools.MapBlocksOutput(model: List, new_dims: Dict | None = None)[source]

Bases: object

conform(ds: Dataset, transpose: bool = False) Dataset[source]

Conform dataset ds to this model

transpose: whether to automatically transpose the variables in ds to conform

to the specified dimensions.

subset(ds: Dataset) Dataset[source]
template(ds: Dataset) Dataset[source]

Return an empty template for this model, to be provided to xr.map_blocks

class core.tools.Var(name: str, dtype: str, dims: Tuple)[source]

Bases: object

conform(da: DataArray, transpose: bool = False) DataArray[source]

Conform a DataArray to the variable definition

to_dataarray(ds: Dataset, new_dims: Dict | None = None)[source]

Convert to a DataArray with dims infos provided by ds

Parameters:
  • new_dims – size of each new dimension

  • new_coords – coords of each new dimension

core.tools.chunk(ds: Dataset, **kwargs)[source]

Apply rechunking to a xr.Dataset ds along dimensions provided as kwargs

Works like ds.chunk but works also for Datasets with repeated dimensions.

core.tools.conform(attrname: str, transpose: bool = True)[source]

A method decorator which applies MapBlocksOutput.conform to the method output.

The MapBlocksOutput should be an attribute attrname of the class.

core.tools.contains(ds: Dataset, lat: float, lon: float)[source]
core.tools.convert(A: DataArray, unit_to: str, unit_from: str = None, converter: dict = None)[source]

Unit conversion

Arguments:

A: DataArray to convert

unit_from: str or None

unit to convert from. If not provided, uses da.units

unit_to: str

unit to convert to

converter: a dictionary for unit conversion

example: converter={‘Pa’: 1, ‘hPa’: 1e-2}

core.tools.datetime(ds: Dataset)[source]

Parse datetime (in isoformat) from ds attributes

core.tools.getflag(A: DataArray, name: str)[source]

Return the binary flag with given name as a boolean array

A: DataArray name: str

example: getflag(flags, ‘LAND’)

core.tools.getflags(A=None, meanings=None, masks=None, sep=None)[source]

returns the flags in attributes of A as a dictionary {meaning: value}

Arguments:

provide either:

A: Dataarray

or:

meanings: flag meanings ‘FLAG1 FLAG2’ masks: flag values [1, 2] sep: string separator

core.tools.haversine(lat1: float, lon1: float, lat2: float, lon2: float, radius: float = 6371)[source]

Calculate the great circle distance between two points (specified in decimal degrees) on a sphere of a given radius

Returns the distance in the same unit as radius (defaults to earth radius in km)

core.tools.locate(lat, lon, lat0, lon0, dist_min_km: float = None, verbose: bool = False)[source]

Locate lat0, lon0 within lat, lon

if dist_min_km is specified and if the minimal distance exceeds it, a ValueError is raised

core.tools.merge(ds: ~xarray.core.dataset.Dataset, dim: str = None, varname: str = None, pattern: str = '(.+)_(\\d+)', dtype: type = <class 'int'>)[source]

Merge DataArrays in ds along dimension dim.

ds: xr.Dataset

dim: str or None

name of the new or existing dimension if None, use the attribute split_dimension

varname: str or None

name of the variable to create if None, detect variable name from regular expression

pattern: str

Regular expression for matching variable names and coordinates if varname is None:

First group represents the new variable name. Second group represents the coordinate value Ex: r’(.+)_(d+)’

First group matches all characters. Second group matches digits.

r’(D+)(d+)’

First group matches non-digit. Second group matches digits.

if varname is not None:

Match a single group representing the coordinate value

dtype: data type

data type of the coordinate items

core.tools.only(iterable)[source]

If iterable has only one item, return it. Otherwise raise a ValueError

core.tools.raiseflag(A: DataArray, flag_name: str, flag_value: int, condition=None)[source]

Raise a flag in DataArray A with name flag_name, value flag_value and condition The name and value of the flag is recorded in the attributes of A

Arguments:

A: DataArray of integers

flag_name: str

Name of the flag

flag_value: int

Value of the flag

condition: boolean array-like of same shape as A

Condition to raise flag. If None, the flag values are unchanged ; the flag is simple registered in the attributes.

core.tools.reglob(path: Path | str, regexp: str)[source]
core.tools.split(d: Dataset | DataArray, dim: str, sep: str = '_')[source]

Returns a Dataset where a given dimension is split into as many variables

d: Dataset or DataArray

core.tools.sub(ds: Dataset, cond: DataArray, drop_invalid: bool = True, int_default_value: int = 0)[source]

Creates a Dataset based on the conditions passed in parameters

cond : a DataArray of booleans that defines which pixels are kept

drop_invalid, bool

if True invalid pixels will be replace by nan for floats and int_default_value for other types

int_default_value, int

for DataArrays of type int, this value is assigned on non-valid pixels

core.tools.sub_pt(ds: Dataset, pt_lat, pt_lon, rad, drop_invalid: bool = True, int_default_value: int = 0)[source]

Creates a Dataset based on the circle specified in parameters

pt_lat, pt_lon : Coordonates of the center of the point

rad : radius of the circle in km

drop_invalid, bool

if True invalid pixels will be replace by nan for floats and int_default_value for other types

int_default_value, int

for DataArrays of type int, this value is assigned on non-valid pixels

core.tools.sub_rect(ds: Dataset, lat_min, lon_min, lat_max, lon_max, drop_invalid: bool = True, int_default_value: int = 0)[source]

Returns a Dataset based on the coordinates of the rectangle passed in parameters

lat_min, lat_max, lon_min, lon_max : delimitations of the region of interest

drop_invalid, bool : if True, invalid pixels will be replace by nan for floats and int_default_value for other types

int_default_value, int : for DataArrays of type int, this value is assigned on non-valid pixels

core.tools.trim_dims(A: Dataset)[source]

Trim the dimensions of Dataset A

Rename all possible dimensions to avoid duplicate dimensions with same sizes Avoid any DataArray with duplicate dimensions

core.tools.wrap(ds: Dataset, dim: str, vmin: float, vmax: float)[source]

Wrap and reorder a cyclic dimension between vmin and vmax. The border value is duplicated at the edges. The period is (vmax-vmin)

Example: * Dimension [0, 359] -> [-180, 180] * Dimension [-180, 179] -> [-180, 180] * Dimension [0, 359] -> [0, 360]

Arguments:

ds: xarray.Dataset dim: str

Name of the dimension to wrap

vmin, vmax: float

new values for the edges

core.tools.xr_filter(ds: Dataset, condition: DataArray, stackdim: str | None = None, transparent: bool = False) Dataset[source]

Extracts a subset of the dataset where the condition is True, stacking the condition dimensions. Equivalent to numpy’s boolean indexing, A[condition].

Parameters: ds (xr.Dataset): The input dataset. condition (xr.DataArray): A boolean DataArray indicating where the condition is True. stackdim (str, optional): The name of the new stacked dimension. If None, it will be

determined automatically from the condition dimensions.

transparent (bool, optional): whether to reassign the original dimension names to

the Dataset (expanding with length-one dimensions).

Returns: xr.Dataset: A new dataset with the subset of data where the condition is True.

core.tools.xr_filter_decorator(argpos: int, condition: Callable, fill_value_float: float = nan, fill_value_int: int = 0, transparent: bool = False, stackdim: str | None = None)[source]

A decorator which applies the decorated function only where the condition is True.

Parameters:
  • argpos (int) – Position index of the input dataset in the decorated function call.

  • condition (Callable) – A callable taking the Dataset as input and returning a boolean DataArray.

  • fill_value_float (float, optional) – Fill value for floating point data types. Default is np.nan.

  • fill_value_int (int, optional) – Fill value for integer data types. Default is 0

  • transparent (bool, optional) – Whether to reassign the original dimension names to the Dataset (expanding with length-one dimensions). Default is False.

  • stackdim (str | None, optional) – The name of the new stacked dimension. If None, it will be determined automatically from the condition dimensions. Default is None.

Example

@xr_filter_decorator(0, lambda x: x.flags == 0) def my_func(ds: xr.Dataset) -> xr.Dataset:

# my_func is applied only where ds.flags == 0 …

The decorator works by: 1. Extracting a subset of the dataset where the condition is True using xr_filter. 2. Applying the decorated function to the subset. 3. Reconstructing the original dataset from the subset using xr_unfilter.

NOTE: this decorator does not guarantee that the order of dimensions is maintained. When using this decorator with xr.apply_blocks, you may want to wrap your xr_filter_decorator decorated method with the conform decorator.

core.tools.xr_flat(ds: Dataset) Dataset[source]

A method which flat a xarray.Dataset on a new dimension named ‘index’

Parameters:

ds (xr.Dataset) – Dataset to flat

core.tools.xr_sample(ds: Dataset, nb_sample: int | float, seed: int = None) Dataset[source]

A method to extract a subset of sample from a flat xarray.Dataset

Parameters:
  • ds (xr.Dataset) – Input flat dataset

  • nb_sample (int|float) – Number or percentage of sample to extract

  • seed (int, optional) – Random seed to use. Defaults to None.

core.tools.xr_unfilter(sub: Dataset, condition: DataArray, stackdim: str | None = None, fill_value_float: float = nan, fill_value_int: int = 0, transparent: bool = False) DataArray[source]

Reconstructs the original dataset from a subset dataset where the condition is True, unstacking the condition dimensions.

Parameters: sub (xr.Dataset): The subset dataset where the condition is True. condition (xr.DataArray): A boolean DataArray indicating where the condition is True. stackdim (str, optional): The name of the stacked dimension. If None, it will be

determined automatically from the condition dimensions.

fill_value_float (float, optional): The fill value for floating point data types.

Default is np.nan.

fill_value_int (int, optional): The fill value for integer data types. Default is 0. transparent (bool, optional): whether to revert the transparent compatibility

conversion applied in xrwhere.

Returns: xr.DataArray: The reconstructed dataset with the specified dimensions unstacked.

core.tools.xrcrop(A: Dataset, **kwargs) Dataset[source]
core.tools.xrcrop(A: DataArray, **kwargs) DataArray

Crop a Dataset or DataArray along dimensions based on min/max values.

For each dimension provided as kwarg, the min/max values along that dimension can be provided:

  • As a min/max tuple

  • As a DataArrat, for which the min/max are computed

Ex: crop dimensions latitude and longitude of gsw based on the min/max

of ds.lat and ds.lon gsw = xrcrop(

gsw, latitude=ds.lat, longitude=ds.lon,

)

Note: the purpose of this function is to make it possible to .compute() the result of the cropped data, thus allowing to perform a sel over large arrays (otherwise extremely slow with dask based arrays).

core.uncompress

class core.uncompress.CacheDir(directory=None)[source]

Bases: object

A cache directory for uncompressing files

Example

# by default, CacheDir stores data in /tmp/uncompress_cache_<user> uncompressed = CacheDir().uncompress(compressed_file)

find(file_compressed)[source]

Finds the directory containing file_compressed and returns the related uncompressed file (or None)

read_info(directory)[source]
uncompress(filename, purge_after='1w')[source]
write_info(directory, info)[source]
exception core.uncompress.ErrorUncompressed[source]

Bases: Exception

Raised when input file is already not compressed

core.uncompress.duration(s)[source]

Returns a timedelta from a string s

core.uncompress.now_isofmt()[source]

Returns now in iso format

core.uncompress.uncompress(filename, dirname, on_uncompressed='error', create_out_dir=True, verbose=False) Path[source]

Uncompress filename to dirname

Arguments:

on_uncompressed: str

determines what to do if filename is not compressed - ‘error’: raise an error (default) - ‘copy’: copy uncompressed file - ‘bypass’: returns the input file

create_out_dir: bool

create output directory if it does not exist

Returns the path to the uncompressed file

core.uncompress.uncompress_decorator(filename='.core_uncompress_mapping', verbose=True)[source]

A decorator that uncompresses the result of function f

Signature of f is assumed to be as follows:

f(identifier, dirname, *args, **kwargs)

The file returned by f is uncompressed to dirname

The mapping of “identifier -> uncompressed” is stored in dirname/filename

core.xrtags

core.xrtags.tag_add(da: DataArray, tag: List[str] | str)[source]

Add one or several tags to DataArray A

tag: either a single tag (ex: “level2”)

or multiple tags (ex: [“level2”, “ancillary”])

core.xrtags.tag_filter(ds: Dataset, tag: List[str] | str)[source]

Returns the filtered Dataset, containing variables tagged with any of the tag(s) provided