core.process package


core.process.condor

core.process.condor.get_htcondor_client(maximum_jobs=100, **kwargs)[source]

Get a client for HTCondor cluster

core.process.condor.get_htcondor_logdir(logdir: Path | str | None) Path[source]

A default log directory for condor

core.process.condor.get_htcondor_params(cores=1, memory='1GB', disk='1GB', death_timeout='600', logdir: Path | str | None = None) Dict[source]

Parameters passed to HTCondorCluster

core.process.condor.get_htcondor_runner(maximum_jobs: int = 100, **kwargs)[source]

Initialize a prefect DaskTaskRunner based on HTCondorCluster

core.process.deptree

class core.process.deptree.Task[source]

Bases: object

Base class for tasks implementing a dependency tree structure for execution workflow.

This class provides the foundation for creating tasks that can be organized in a dependency graph and executed in the correct order. Tasks can be generated and run using the gen function with different execution modes (sync, executor, or prefect).

output

Optional attribute for storing the task’s output path or result.

deps

Optional list of Task instances that this task depends on.

status

Task execution status

dependencies()[source]

Returns the list of dependent tasks. The default implementation returns the deps attribute, if it exists.

run()[source]

Implements the actual execution code

done()[source]

Checks if the task is completed. The default implementation checks if the output Path exists, if defined.

cleanup()[source]

Optional cleanup method executed when all parent tasks have finished

Usage:

Subclass Task and implement at minimum: - The run() method with your task’s execution logic (if any) - Optionally override dependencies() or self.deps if your task has dependencies - Optionally override cleanup() if your task needs cleanup after execution - Set output attribute if your task produces output files

Example

See the sample function for a complete example of Task definition and usage.

cleanup()[source]

Optional cleanup method executed after the task finishes running

dependencies() list[source]

Returns a list of dependencies, each being a Task.

done() bool[source]

Whether the task is done

run()[source]

Implements the actual execution code

class core.process.deptree.TaskList(deps: list)[source]

Bases: Task

dependencies()[source]

Returns a list of dependencies, each being a Task.

class core.process.deptree.TaskProduct(cls, others={}, **kwargs)[source]

Bases: Task

dependencies()[source]

Returns a list of dependencies, each being a Task.

core.process.deptree.format_time(t)[source]
core.process.deptree.gen(task: Task, mode: Literal['sync', 'executor', 'prefect'] = 'sync', **kwargs)[source]

Run a task and its dependencies

Parameters:

mode (str) –

  • ‘sync’: sequential

  • ’executor’: using executors

  • ’prefect’: using prefect as a worflow manager

core.process.deptree.gen_executor(task: Task, executors: dict | None = None, default_executor=None, graph_executor=None, verbose: bool = False, _tasks: list | None = None, log_file: Path | None = None)[source]

Recursively generate dependencies of task, then call its run method in a custom executor.

Parameters:
  • task – a Task object

  • executors – dictionary of executors

  • default_executor – the executor to use for all tasks not included in executors. Defaults to ThreadPoolExecutor

  • graph_executor – an instance of ThreadPoolExecutor used for traversing the dependency tree with the gen function. It is useful to override the default ThreadPoolExecutor(512) if the tree is too large.

  • _tasks – internal parameter to track tasks status and provide an execution summary. Also used to avoid duplicate task submissions.

Example

from concurrent.futures import ThreadPoolExecutor from distributed import Client # dask executor

gen(task,

executors={‘Level1’ : ThreadPoolExecutor(2)}, default_executor=Client() # dask executor

# Note: can also use dask_jobqueue to instantiate # an executor backed by a job queuing system like # HTCondor

})

For details on how to implement task, please check function core.deptree.sample.

core.process.deptree.gen_sync(task: Task, verbose: bool = False)[source]

Run a Task in sequential mode

Parameters:
  • task (Task) – The task to generate

  • verbose (bool) – Whether to print verbose output

core.process.deptree.print_execution_summary(tasks: list, start_time: float, log_file: Path | None = None, finished: bool = False)[source]

Print a summary table showing count of tasks by status and class name.

Parameters:

tasks – List of Task instances to summarize

core.process.deptree.sample()[source]

Generate a sample Product for demonstration purposes

core.process.deptree_prefect