core package
Subpackages
core.auth
core.cache
- core.cache.cache_dataframe(cache_file: Path | str, inputs: Literal['check', 'store', 'ignore'] = 'check')[source]
A decorator that caches the result of a function, which is a pandas DataFrame
- inputs:
“check” [default]: store and check the function inputs “store”: store but don’t check the function inputs “ignore”: ignore the function inputs
- core.cache.cache_dataset(cache_file: Path | str, attrs=None, **kwargs)[source]
A decorator that caches the dataset returned by a function in a netcdf file
The attribute dictionary attrs is stored in the file, and verified upon reading.
Other kwargs (ex: chunks) are passed to xr.open_dataset
- core.cache.cache_json(cache_file: Path | str, inputs: Literal['check', 'store', 'ignore'] = 'check')[source]
A decorator that caches the result of a function to a json file.
- inputs:
“check” [default]: store and check the function inputs “store”: store but don’t check the function inputs “ignore”: ignore the function inputs
- core.cache.cache_pickle(cache_file: ~pathlib._local.Path | str, inputs: ~typing.Literal['check', 'store', 'ignore'] = 'check', check_out=<function <lambda>>)[source]
A decorator that caches the result of a function to a pickle file.
- inputs:
“check” [default]: store and check the function inputs “store”: store but don’t check the function inputs “ignore”: ignore the function inputs
- core.cache.cachefunc(cache_file: Path | str, reader: Callable, writer: Callable, check_in: Callable | None = None, check_out: Callable | None = None, fg_kwargs=None)[source]
A decorator that caches the return of a function in a file, with customizable format
- reader: a function that reads inputs/output from the cache file
reader(filename) -> {‘output’: …, ‘input’: …}
- writer: a function that writes the inputs/output to the cache file
writer(filename, output, input_args, input_kwargs)
- check_in: a custom function to test the equality of the inputs
checker(obj1, obj2) -> bool (defaults to None -> no checking)
- check_out: a custom function to test the equality of the outputs
checker(obj1, obj2) -> bool (defaults to ==)
fg_kwargs: kwargs passed to filegen (ex: lock_timeout=-1)
core.condor
- core.condor.get_htcondor_client(maximum_jobs=100, **kwargs)[source]
Get a client for HTCondor cluster
- core.condor.get_htcondor_logdir(logdir: Path | str | None) Path [source]
A default log directory for condor
core.config
core.conftest
core.dates
core.deptree
- class core.deptree.Task[source]
Bases:
object
Base class for tasks, which defines its dependencies and can be generated with the gen function.
Should typically hold an attribute output for storing the task output, if any (optional).
Check the sample function which gives an example of Task definition.
- core.deptree.gen(task: Task, mode: Literal['sync', 'executor', 'prefect'] = 'sync', **kwargs)[source]
Run a task and its dependencies
- Parameters:
mode (str) –
‘sync’: sequential
’executor’: using executors
’prefect’: using prefect as a worflow manager
- core.deptree.gen_executor(task: Task, executors: dict | None = None, default_executor=None, graph_executor=None, verbose: bool = False)[source]
Recursively generate dependencies of task, then call its run method in a custom executor.
- Parameters:
task – a Task object
executors – dictionary of executors
default_executor – the executor to use for all tasks not included in executors. Defaults to ThreadPoolExecutor
graph_executor – an instance of ThreadPoolExecutor used for traversing the dependency tree with the gen function. It is useful to override the default ThreadPoolExecutor(512) if the tree is too large.
Example
from concurrent.futures import ThreadPoolExecutor from distributed import Client # dask executor
- gen(task,
executors={‘Level1’ : ThreadPoolExecutor(2)}, default_executor=Client() # dask executor
# Note: can also use dask_jobqueue to instantiate # an executor backed by a job queuing system like # HTCondor
})
For details on how to implement task, please check function core.deptree.sample.
core.deptree_prefect
core.download
- core.download.download_nextcloud(product_name: str, output_dir: Path | str, input_dir: Path | str = '', verbose: bool = True, if_exists: Literal['skip', 'overwrite', 'backup', 'error'] = 'skip')[source]
Function for downloading data from Nextcloud contained in the data/eoread directory
- Parameters:
product_name (str) – Name of the product with the extension
output_dir (Path | str) – Directory where to store downloaded data
input_dir (Path | str, optional) – Sub repository in which the product are stored. Defaults to ‘’.
- Returns:
Output path of the downloaded data
- Return type:
Path
- core.download.download_url(url: str, dirname: Path, wget_opts='', check_function=None, verbose=True, if_exists: Literal['skip', 'overwrite', 'backup', 'error'] = 'skip', **kwargs) Path [source]
Download url to dirname with wget
Options wget_opts are added to wget Uses a filegen wrapper Other kwargs are passed to filegen (lock_timeout, tmpdir, if_exists)
Returns the path to the downloaded file
core.env
- core.env.getdir(envvar: str, default: Path | None = None, create: bool | None = None) Path [source]
Returns the value of environment variable envvar, assumed to represent a directory path. If this variable is not defined, returns default.
The environment variable can be defined in the users .bashrc, or in a file .env in the current working directory.
- Parameters:
envvar – the input environment variable
default –
the default path, if the environment variable is not defined default values are predefined for the following variables:
DIR_DATA : “data” (in current working directory)
DIR_STATIC : DIR_DATA/”static”
DIR_SAMPLES : DIR_DATA/”sample_products”
DIR_ANCILLARY : DIR_DATA/”ancillary”
create – whether to silently create the directory if it does not exist. If not provided this parameter defaults to False except for DIR_STATIC, DIR_SAMPLES and DIR_ANCILLARY.
- Returns:
the path to the directory.
- core.env.getvar(envvar: str, default=None)[source]
Returns the value of environment variable envvar. If this variable is not defined, returns default.
The environment variable can be defined in the users .bashrc, or in a file .env in the current working directory.
- Parameters:
envvar – the input environment variable
default – the default return, if the environment variable is not defined
- Returns:
the requested environment variable or the default if the var is not defined and a default has been provided.
core.fileutils
- class core.fileutils.PersistentList(filename, timeout=0, concurrent=True)[source]
Bases:
list
A list that saves its content in filename on each modification. The extension must be .json.
- concurrent: whether to activate concurrent mode. In this mode, the
file is also read before each access.
- class core.fileutils.filegen(arg: int | str = 0, tmpdir: Path | None = None, lock_timeout: int = 0, if_exists: Literal['skip', 'overwrite', 'backup', 'error'] = 'error', uncompress: str | None = None, verbose: bool = True)[source]
Bases:
object
- core.fileutils.mdir(directory: Path | str, mdir_filename: str = 'mdir.json', strict: bool = False, create: bool = True, **kwargs) Path [source]
Create or access a managed directory with path directory Returns the directory path, so that it can be used in directories definition:
dir_data = mdir(‘/path/to/data/’)
- tag it with a file mdir.json, containing:
The creation date
The last access date
The python file and module that was run during access
The username
The current git commit if available
- Any other kwargs, such as:
project
version
description
etc
mdir_filename: default=’mdir.json’
- strict: boolean
False: metadata is updated True: metadata is checked or added (default)
(remove file content to override)
create: whether directory is automatically created (default True)
- core.fileutils.safe_move(src, dst, makedirs=True)[source]
Move src file to dst
if makedirs: create directory if necessary
- core.fileutils.skip(filename: Path, if_exists: str = 'skip')[source]
Utility function to check whether to skip an existing file
- if_exists:
‘skip’: skip the existing file ‘error’: raise an error on existing file ‘overwrite’: overwrite existing file ‘backup’: move existing file to a backup ‘.1’, ‘.2’…
- core.fileutils.temporary_copy(src: Path, enable: bool = True, **kwargs)[source]
Context manager to copy a file/folder to a temporary directory.
- Parameters:
src (Path) – Path to the source file/folder to copy.
enable (bool) – whether to enable the copy, otherwise returns the input
TemporaryDirectory (Other **kwargs are passed to) –
- Yields:
Path – Path to the temporary file/folder
core.ftp
- core.ftp.ftp_download(ftp: FTP, file_local: Path, dir_server: str | Path, verbose=True)[source]
Downloads file_local on ftp, from server directory dir_server
The file name on the server is determined by file_local.name
- core.ftp.ftp_list(ftp: FTP, dir_server: str, pattern: str = '*')[source]
Returns the list of fles matching pattern on dir_server
- core.ftp.ftp_upload(ftp: FTP, file_local: Path, dir_server: str, if_exists='skip', blocksize=8192, verbose=True)[source]
FTP upload function
Use temporary files
Create remote directories
- if_exists:
‘skip’: skip the existing file ‘error’: raise an error on existing file ‘overwrite’: overwrite existing file
core.fuzzy
core.interpolate
- class core.interpolate.Linear(values: DataArray, bounds: Literal['error', 'nan', 'clip', 'cycle'] = 'error', spacing: Literal['regular', 'irregular', 'auto'] | Callable[[float], float] = 'auto', period: float | None = None)[source]
Bases:
object
- class core.interpolate.Linear_Indexer(coords: ndarray[tuple[Any, ...], dtype[_ScalarT]], bounds: str, spacing, period=None)[source]
Bases:
object
- class core.interpolate.Locator(coords: ndarray[tuple[Any, ...], dtype[_ScalarT]], bounds: str)[source]
Bases:
object
The purpose of these classes is to locate values in coordinate axes.
- class core.interpolate.Locator_Regular(coords, bounds: str, inversion_func: Callable | None = None, period=None)[source]
Bases:
Locator
- class core.interpolate.Nearest(values: DataArray, tolerance: float | None = None, spacing: Literal['auto'] | Callable[[float], float] = 'auto')[source]
Bases:
object
- class core.interpolate.Nearest_Indexer(coords: ndarray[tuple[Any, ...], dtype[_ScalarT]], tolerance: float | None, spacing: str | Callable = 'auto')[source]
Bases:
object
- class core.interpolate.Spline(values, tension=0.5, bounds: Literal['error', 'nan', 'clip'] = 'error', spacing: Literal['regular', 'irregular', 'auto'] | Callable[[float], float] = 'auto')[source]
Bases:
object
- class core.interpolate.Spline_Indexer(coords: ndarray[tuple[Any, ...], dtype[_ScalarT]], bounds: str, spacing, tension: float)[source]
Bases:
object
- core.interpolate.broadcast_numpy(ds: Dataset) Dict [source]
Returns all data variables in ds as numpy arrays broadcastable against each other (with new single-element dimensions)
This requires the input to be broadcasted to common dimensions.
- core.interpolate.broadcast_shapes(ds: Dataset, dims) Dict [source]
For each data variable in ds, returns the shape for broadcasting in the dimensions defined by dims
- core.interpolate.create_locator(coords, bounds: str, spacing, period: float | None = None) Locator [source]
Locator factory
The purpose of this method is to instantiate the appropriate “Locator” class.
The args are passed from the indexers.
- core.interpolate.determine_output_dimensions(data, ds, dims_sel_interp)[source]
determine output dimensions based on numpy’s advanced indexing rules
- core.interpolate.interp(da: DataArray, **kwargs)[source]
Interpolate/select a DataArray onto new coordinates.
- This function is similar to xr.interp and xr.sel, but:
Supports dask-based coordinates inputs without triggering immediate computation as is done by xr.interp
Supports combinations of selection and interpolation. This is faster and more memory efficient than performing independently the selection and interpolation.
Supports pointwise indexing/interpolation using dask arrays (see https://docs.xarray.dev/en/latest/user-guide/indexing.html#more-advanced-indexing)
Supports per-dimension options (nearest neighbour selection, linear/spline interpolation, out-of-bounds behaviour, cyclic dimensions…)
- Parameters:
da (xr.DataArray) – The input DataArray
**kwargs –
definition of the selection/interpolation coordinates for each dimension, using the following classes:
Linear: linear interpolation (like xr.DataArray.interp)
Nearest: nearest neighbour selection (like xr.DataArray.sel)
Index: integer index selection (like xr.DataArray.isel)
These classes store the coordinate data in their .values attribute and have a .get_indexer method which returns an indexer for the passed coordinates.
Example
>>> interp( ... data, # input DataArray with dimensions (a, b, c) ... a = Linear( # perform linear interpolation along dimension `a` ... a_values, # `a_values` is a DataArray with dimension (x, y); ... bounds='clip'), # clip out-of-bounds values to the axis min/max. ... b = Nearest(b_values), # perform nearest neighbour selection along ... # dimension `b`; `b_values` is a DataArray ... # with dimension (x, y) ... ) # returns a DataArray with dimensions (x, y, c) No interpolation or selection is performed along dimension `c` thus it is left as-is.
- Returns:
DataArray on the new coordinates.
- Return type:
xr.DataArray
- core.interpolate.interp_block(ds: Dataset, da: DataArray, out_dims, indexers: Dict) DataArray [source]
This function is called by map_blocks in function interp, and performs the indexing and interpolation at the numpy level.
It relies on the indexers to perform index searching and weight calculation, and performs a linear combination of the sub-arrays.
core.lock
- core.lock.LockFile(locked_file: Path, ext='.lock', interval=1, timeout=0, create_dir=True)[source]
Create a blocking context with a lock file
- timeout: timeout in seconds, waiting to the lock to be released.
If negative, disable lock files entirely.
interval: interval in seconds
Example
- with LockFile(‘/dir/to/file.txt’):
# create a file ‘/dir/to/file.txt.lock’ including a filesystem lock # the context will enter once the lock is released
core.log
Module Shadowed by API function of same name log
usage:
from core import log
- core.log.check(condition, *args, e: Exception = <class 'AssertionError'>)[source]
log assertion with level ERROR
- core.log.error(*args, e: Exception = <class 'RuntimeError'>, **kwargs)[source]
log with defaul level ERROR will raise e if passed
- class core.log.lvl(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]
Bases:
Enum
- DEBUG = 1
- ERROR = 4
- INFO = 2
- PROMPT = 5
- WARNING = 3
- core.log.pbar(lvl: lvl = lvl.INFO, iterable=None, desc=None, total=None, leave=True, ncols=None, mininterval=0.1, maxinterval=10.0, miniters=None, ascii=None, disable=False, unit='it', unit_scale=False, dynamic_ncols=False, smoothing=0.3, initial=0, file=None, position=None, postfix=None, unit_divisor=1000, write_bytes=False, lock_args=None, nrows=None, colour=None, delay=0, gui=False, **kwargs)[source]
log a progress bar such as tqdm Args: All arguments are those from tqdm (cf. https://tqdm.github.io/docs/tqdm/)
- class core.log.rgb[source]
Bases:
object
- blue = <core.log._color object>
- bold = <core.log._color object>
- cyan = <core.log._color object>
- default = <core.log._color object>
- gray = <core.log._color object>
- green = <core.log._color object>
- orange = <core.log._color object>
- purple = <core.log._color object>
- red = <core.log._color object>
- underline = <core.log._color object>
core.monitor
- class core.monitor.Chrono(name='chrono object', unit='m')[source]
Bases:
object
name: str
unit “m” | “s” | “ms” | “us”
core.naming
- core.naming.add_var(ds, var, attrs: _name)[source]
Add a new variable to a xarray.Dataset with it attributes
- Parameters:
ds (xr.Dataset) – Dataset to complete
var (xr.DataArray) – Array to add
attrs (_name) – Attributes to join to the new variable
- class core.naming.names[source]
Bases:
object
- F0 = <core.naming._name object>
- bands = <core.naming._name object>
- bands_ir = <core.naming._name object>
- bt = <core.naming._name object>
- columns = <core.naming._name object>
- crs = <core.naming._name object>
- cwav = <core.naming._name object>
- datetime = <core.naming._name object>
- description = <core.naming._name object>
- flags = <core.naming._name object>
- input_directory = <core.naming._name object>
- lat = <core.naming._name object>
- lon = <core.naming._name object>
- ltoa = <core.naming._name object>
- ltoa_ir = <core.naming._name object>
- platform = <core.naming._name object>
- product_name = <core.naming._name object>
- raa = <core.naming._name object>
- resolution = <core.naming._name object>
- rho_w = <core.naming._name object>
- rows = <core.naming._name object>
- rtoa = <core.naming._name object>
- saa = <core.naming._name object>
- sensor = <core.naming._name object>
- shortname = <core.naming._name object>
- sza = <core.naming._name object>
- vaa = <core.naming._name object>
- vza = <core.naming._name object>
- wav = <core.naming._name object>
- wav_ir = <core.naming._name object>
core.project
- class core.project.Config(**kwargs)[source]
Bases:
object
- get(key: str, section: str = None, *, default=None) Any [source]
Returns the key if present in the config file. If not present and no default provided: raises an error if default is provided (different than None): return default instead
auto cast values to Path objects if its key starts with ‘dir_’ and its type is str
- ingest(**kwargs)
core.pseudoinverse
core.pytest_utils
- core.pytest_utils.parametrize_dict(d) dict [source]
Allow passing args and ids to pytest.mark.parametrize with a dict syntax
- @pytest.mark.parametrize(‘a’, **parametrize_dict({
‘first case’: 1, ‘second case’: 2,
})) # equivalent to parametrize(‘a’, [1, 2], ids=[‘first_case’, ‘second case’]) def test(a):
pass
core.save
- core.save.clean_attributes(obj: Dataset | DataArray)[source]
Remove attributes that can not be written to netcdf
- core.save.to_netcdf(ds: Dataset, filename: Path, *, engine: str = 'h5netcdf', zlib: bool = True, complevel: int = 5, verbose: bool = True, tmpdir: Path | None = None, lock_timeout: int = 0, if_exists: Literal['skip', 'overwrite', 'backup', 'error'] = 'error', clean_attrs: bool = True, **kwargs)[source]
Write an xarray Dataset ds using .to_netcdf with several additional features:
Use file compression
Wrapped by filegen: use temporary files, detect existing output files…
- Parameters:
ds (xr.Dataset) – Input dataset
filename (Path) – Output file path
engine (str, optional) – Engine driver to use. Defaults to ‘h5netcdf’.
zlib (bool, optional) – activate zlib. Defaults to True.
complevel (int, optional) – Compression level. Defaults to 5.
verbose (bool, optional) – Verbosity. Defaults to True.
tmpdir (Path, optional) – use a given temporary directory. Defaults to None.
lock_timeout (int) – timeout in case of existing lock file
if_exists (str, optional) – what to do if output file exists. Defaults to ‘error’.
clean_attrs – whether to remove attributes in the xarray object, that cannot be written to netcdf.
ds.to_netcdf (other kwargs are passed to)
core.table
- core.table.read_csv(path: str | Path, **kwargs) DataFrame [source]
Function to read csv file without taking care of tabulation and whitespaces
- Parameters:
path (str | Path) – Path of csv file
kwargs – Keyword arguments of read_csv function from pandas
- Returns:
Output table in pandas DataFrame format
- Return type:
DataFrame
core.tools
Various utility functions for modifying xarray object
- class core.tools.MapBlocksOutput(model: List, new_dims: Dict | None = None)[source]
Bases:
object
- class core.tools.Var(name: str, dtype: str, dims: Tuple)[source]
Bases:
object
- core.tools.chunk(ds: Dataset, **kwargs)[source]
Apply rechunking to a xr.Dataset ds along dimensions provided as kwargs
Works like ds.chunk but works also for Datasets with repeated dimensions.
- core.tools.conform(attrname: str, transpose: bool = True)[source]
A method decorator which applies MapBlocksOutput.conform to the method output.
The MapBlocksOutput should be an attribute attrname of the class.
- core.tools.convert(A: DataArray, unit_to: str, unit_from: str = None, converter: dict = None)[source]
Unit conversion
Arguments:
A: DataArray to convert
- unit_from: str or None
unit to convert from. If not provided, uses da.units
- unit_to: str
unit to convert to
- converter: a dictionary for unit conversion
example: converter={‘Pa’: 1, ‘hPa’: 1e-2}
- core.tools.getflag(A: DataArray, name: str)[source]
Return the binary flag with given name as a boolean array
A: DataArray name: str
example: getflag(flags, ‘LAND’)
- core.tools.getflags(A=None, meanings=None, masks=None, sep=None)[source]
returns the flags in attributes of A as a dictionary {meaning: value}
Arguments:
- provide either:
A: Dataarray
- or:
meanings: flag meanings ‘FLAG1 FLAG2’ masks: flag values [1, 2] sep: string separator
- core.tools.haversine(lat1: float, lon1: float, lat2: float, lon2: float, radius: float = 6371)[source]
Calculate the great circle distance between two points (specified in decimal degrees) on a sphere of a given radius
Returns the distance in the same unit as radius (defaults to earth radius in km)
- core.tools.locate(lat, lon, lat0, lon0, dist_min_km: float = None, verbose: bool = False)[source]
Locate lat0, lon0 within lat, lon
if dist_min_km is specified and if the minimal distance exceeds it, a ValueError is raised
- core.tools.merge(ds: ~xarray.core.dataset.Dataset, dim: str = None, varname: str = None, pattern: str = '(.+)_(\\d+)', dtype: type = <class 'int'>)[source]
Merge DataArrays in ds along dimension dim.
ds: xr.Dataset
- dim: str or None
name of the new or existing dimension if None, use the attribute split_dimension
- varname: str or None
name of the variable to create if None, detect variable name from regular expression
- pattern: str
Regular expression for matching variable names and coordinates if varname is None:
First group represents the new variable name. Second group represents the coordinate value Ex: r’(.+)_(d+)’
First group matches all characters. Second group matches digits.
- r’(D+)(d+)’
First group matches non-digit. Second group matches digits.
- if varname is not None:
Match a single group representing the coordinate value
- dtype: data type
data type of the coordinate items
- core.tools.only(iterable)[source]
If iterable has only one item, return it. Otherwise raise a ValueError
- core.tools.raiseflag(A: DataArray, flag_name: str, flag_value: int, condition=None)[source]
Raise a flag in DataArray A with name flag_name, value flag_value and condition The name and value of the flag is recorded in the attributes of A
Arguments:
A: DataArray of integers
- flag_name: str
Name of the flag
- flag_value: int
Value of the flag
- condition: boolean array-like of same shape as A
Condition to raise flag. If None, the flag values are unchanged ; the flag is simple registered in the attributes.
- core.tools.split(d: Dataset | DataArray, dim: str, sep: str = '_')[source]
Returns a Dataset where a given dimension is split into as many variables
d: Dataset or DataArray
- core.tools.sub(ds: Dataset, cond: DataArray, drop_invalid: bool = True, int_default_value: int = 0)[source]
Creates a Dataset based on the conditions passed in parameters
cond : a DataArray of booleans that defines which pixels are kept
- drop_invalid, bool
if True invalid pixels will be replace by nan for floats and int_default_value for other types
- int_default_value, int
for DataArrays of type int, this value is assigned on non-valid pixels
- core.tools.sub_pt(ds: Dataset, pt_lat, pt_lon, rad, drop_invalid: bool = True, int_default_value: int = 0)[source]
Creates a Dataset based on the circle specified in parameters
pt_lat, pt_lon : Coordonates of the center of the point
rad : radius of the circle in km
- drop_invalid, bool
if True invalid pixels will be replace by nan for floats and int_default_value for other types
- int_default_value, int
for DataArrays of type int, this value is assigned on non-valid pixels
- core.tools.sub_rect(ds: Dataset, lat_min, lon_min, lat_max, lon_max, drop_invalid: bool = True, int_default_value: int = 0)[source]
Returns a Dataset based on the coordinates of the rectangle passed in parameters
lat_min, lat_max, lon_min, lon_max : delimitations of the region of interest
drop_invalid, bool : if True, invalid pixels will be replace by nan for floats and int_default_value for other types
int_default_value, int : for DataArrays of type int, this value is assigned on non-valid pixels
- core.tools.trim_dims(A: Dataset)[source]
Trim the dimensions of Dataset A
Rename all possible dimensions to avoid duplicate dimensions with same sizes Avoid any DataArray with duplicate dimensions
- core.tools.wrap(ds: Dataset, dim: str, vmin: float, vmax: float)[source]
Wrap and reorder a cyclic dimension between vmin and vmax. The border value is duplicated at the edges. The period is (vmax-vmin)
Example: * Dimension [0, 359] -> [-180, 180] * Dimension [-180, 179] -> [-180, 180] * Dimension [0, 359] -> [0, 360]
Arguments:
ds: xarray.Dataset dim: str
Name of the dimension to wrap
- vmin, vmax: float
new values for the edges
- core.tools.xr_filter(ds: Dataset, condition: DataArray, stackdim: str | None = None, transparent: bool = False) Dataset [source]
Extracts a subset of the dataset where the condition is True, stacking the condition dimensions. Equivalent to numpy’s boolean indexing, A[condition].
Parameters: ds (xr.Dataset): The input dataset. condition (xr.DataArray): A boolean DataArray indicating where the condition is True. stackdim (str, optional): The name of the new stacked dimension. If None, it will be
determined automatically from the condition dimensions.
- transparent (bool, optional): whether to reassign the original dimension names to
the Dataset (expanding with length-one dimensions).
Returns: xr.Dataset: A new dataset with the subset of data where the condition is True.
- core.tools.xr_filter_decorator(argpos: int, condition: Callable, fill_value_float: float = nan, fill_value_int: int = 0, transparent: bool = False, stackdim: str | None = None)[source]
A decorator which applies the decorated function only where the condition is True.
- Parameters:
argpos (int) – Position index of the input dataset in the decorated function call.
condition (Callable) – A callable taking the Dataset as input and returning a boolean DataArray.
fill_value_float (float, optional) – Fill value for floating point data types. Default is np.nan.
fill_value_int (int, optional) – Fill value for integer data types. Default is 0
transparent (bool, optional) – Whether to reassign the original dimension names to the Dataset (expanding with length-one dimensions). Default is False.
stackdim (str | None, optional) – The name of the new stacked dimension. If None, it will be determined automatically from the condition dimensions. Default is None.
Example
@xr_filter_decorator(0, lambda x: x.flags == 0) def my_func(ds: xr.Dataset) -> xr.Dataset:
# my_func is applied only where ds.flags == 0 …
The decorator works by: 1. Extracting a subset of the dataset where the condition is True using xr_filter. 2. Applying the decorated function to the subset. 3. Reconstructing the original dataset from the subset using xr_unfilter.
NOTE: this decorator does not guarantee that the order of dimensions is maintained. When using this decorator with xr.apply_blocks, you may want to wrap your xr_filter_decorator decorated method with the conform decorator.
- core.tools.xr_flat(ds: Dataset) Dataset [source]
A method which flat a xarray.Dataset on a new dimension named ‘index’
- Parameters:
ds (xr.Dataset) – Dataset to flat
- core.tools.xr_sample(ds: Dataset, nb_sample: int | float, seed: int = None) Dataset [source]
A method to extract a subset of sample from a flat xarray.Dataset
- core.tools.xr_unfilter(sub: Dataset, condition: DataArray, stackdim: str | None = None, fill_value_float: float = nan, fill_value_int: int = 0, transparent: bool = False) DataArray [source]
Reconstructs the original dataset from a subset dataset where the condition is True, unstacking the condition dimensions.
Parameters: sub (xr.Dataset): The subset dataset where the condition is True. condition (xr.DataArray): A boolean DataArray indicating where the condition is True. stackdim (str, optional): The name of the stacked dimension. If None, it will be
determined automatically from the condition dimensions.
- fill_value_float (float, optional): The fill value for floating point data types.
Default is np.nan.
fill_value_int (int, optional): The fill value for integer data types. Default is 0. transparent (bool, optional): whether to revert the transparent compatibility
conversion applied in xrwhere.
Returns: xr.DataArray: The reconstructed dataset with the specified dimensions unstacked.
- core.tools.xrcrop(A: Dataset, **kwargs) Dataset [source]
- core.tools.xrcrop(A: DataArray, **kwargs) DataArray
Crop a Dataset or DataArray along dimensions based on min/max values.
For each dimension provided as kwarg, the min/max values along that dimension can be provided:
As a min/max tuple
As a DataArrat, for which the min/max are computed
- Ex: crop dimensions latitude and longitude of gsw based on the min/max
of ds.lat and ds.lon gsw = xrcrop(
gsw, latitude=ds.lat, longitude=ds.lon,
)
Note: the purpose of this function is to make it possible to .compute() the result of the cropped data, thus allowing to perform a sel over large arrays (otherwise extremely slow with dask based arrays).
core.uncompress
- class core.uncompress.CacheDir(directory=None)[source]
Bases:
object
A cache directory for uncompressing files
Example
# by default, CacheDir stores data in /tmp/uncompress_cache_<user> uncompressed = CacheDir().uncompress(compressed_file)
- exception core.uncompress.ErrorUncompressed[source]
Bases:
Exception
Raised when input file is already not compressed
- core.uncompress.uncompress(filename, dirname, on_uncompressed='error', create_out_dir=True, verbose=False) Path [source]
Uncompress filename to dirname
Arguments:
- on_uncompressed: str
determines what to do if filename is not compressed - ‘error’: raise an error (default) - ‘copy’: copy uncompressed file - ‘bypass’: returns the input file
- create_out_dir: bool
create output directory if it does not exist
Returns the path to the uncompressed file