Source code for core.process.condor

import os
from datetime import datetime
from pathlib import Path
from typing import Dict


[docs] def get_htcondor_logdir(logdir: Path | str | None) -> Path: """ A default log directory for condor """ if logdir is None: username = os.getlogin() return Path(f"/tmp/condor_logs_{username}/{datetime.now().isoformat()}") else: return Path(logdir)
[docs] def get_htcondor_params( cores=1, memory="1GB", disk="1GB", death_timeout="600", logdir: Path | str | None = None, ) -> Dict: """ Parameters passed to HTCondorCluster """ logdir = get_htcondor_logdir(logdir) print("HTCondor log directory is", logdir) return { "cores": cores, "memory": memory, "disk": disk, "death_timeout": death_timeout, "log_directory": logdir, # 'job_extra_directives': { # # "log": "logs/dask_job_output.log", # # "output": "logs/dask_job_output.out", # # "error": "logs/dask_job_output.err", # "sdefsdfshould_transfer_files": "YES", # "initialdir": "$(LOCAL_DIR)", # "environment": "TMP=$(LOCAL_DIR)/tmp", # }, }
[docs] def get_htcondor_runner(maximum_jobs: int = 100, **kwargs): """ Initialize a prefect DaskTaskRunner based on HTCondorCluster """ from dask_jobqueue.htcondor import HTCondorCluster from prefect_dask import DaskTaskRunner task_runner = DaskTaskRunner( cluster_class=HTCondorCluster, cluster_kwargs=get_htcondor_params(**kwargs), adapt_kwargs={"maximum_jobs": maximum_jobs}, ) return task_runner
[docs] def get_htcondor_client(maximum_jobs=100, **kwargs): """ Get a client for HTCondor cluster """ from dask_jobqueue.htcondor import HTCondorCluster from dask.distributed import Client cluster = HTCondorCluster(**get_htcondor_params(**kwargs)) cluster.adapt(maximum_jobs=maximum_jobs) client = Client(cluster) print("Dashboard address is", client.dashboard_link) return client