core.process package
core.process.condor
- core.process.condor.get_htcondor_client(maximum_jobs=100, **kwargs)[source]
Get a client for HTCondor cluster
- core.process.condor.get_htcondor_logdir(logdir: Path | str | None) Path [source]
A default log directory for condor
core.process.deptree
- class core.process.deptree.Task[source]
Bases:
object
Base class for tasks implementing a dependency tree structure for execution workflow.
This class provides the foundation for creating tasks that can be organized in a dependency graph and executed in the correct order. Tasks can be generated and run using the gen function with different execution modes (sync, executor, or prefect).
- output
Optional attribute for storing the task’s output path or result.
- deps
Optional list of Task instances that this task depends on.
- status
Task execution status
- dependencies()[source]
Returns the list of dependent tasks. The default implementation returns the deps attribute, if it exists.
- done()[source]
Checks if the task is completed. The default implementation checks if the output Path exists, if defined.
- Usage:
Subclass Task and implement at minimum: - The run() method with your task’s execution logic (if any) - Optionally override dependencies() or self.deps if your task has dependencies - Optionally override cleanup() if your task needs cleanup after execution - Set output attribute if your task produces output files
Example
See the sample function for a complete example of Task definition and usage.
- core.process.deptree.gen(task: Task, mode: Literal['sync', 'executor', 'prefect'] = 'sync', **kwargs)[source]
Run a task and its dependencies
- Parameters:
mode (str) –
‘sync’: sequential
’executor’: using executors
’prefect’: using prefect as a worflow manager
- core.process.deptree.gen_executor(task: Task, executors: dict | None = None, default_executor=None, graph_executor=None, verbose: bool = False, _tasks: list | None = None, log_file: Path | None = None)[source]
Recursively generate dependencies of task, then call its run method in a custom executor.
- Parameters:
task – a Task object
executors – dictionary of executors
default_executor – the executor to use for all tasks not included in executors. Defaults to ThreadPoolExecutor
graph_executor – an instance of ThreadPoolExecutor used for traversing the dependency tree with the gen function. It is useful to override the default ThreadPoolExecutor(512) if the tree is too large.
_tasks – internal parameter to track tasks status and provide an execution summary. Also used to avoid duplicate task submissions.
Example
from concurrent.futures import ThreadPoolExecutor from distributed import Client # dask executor
- gen(task,
executors={‘Level1’ : ThreadPoolExecutor(2)}, default_executor=Client() # dask executor
# Note: can also use dask_jobqueue to instantiate # an executor backed by a job queuing system like # HTCondor
})
For details on how to implement task, please check function core.deptree.sample.
- core.process.deptree.gen_sync(task: Task, verbose: bool = False)[source]
Run a Task in sequential mode