Source code for dandi_compute_code.queue._process_queue
import logging
import pathlib
from ._count_running_aind_ephys_pipeline_jobs import _count_running_aind_ephys_pipeline_jobs
from ._submit_next import _submit_next
_log = logging.getLogger(__name__)
[docs]
def process_queue(
*,
queue_directory: pathlib.Path,
processing_directory: pathlib.Path,
test: bool = False,
) -> None:
"""
Submit jobs from ``state.jsonl`` up to two total
running ``AIND-Ephys-Pipeline`` SLURM jobs.
If ``state.jsonl`` is absent, a :class:`FileNotFoundError` is raised.
If ``state.jsonl`` exists but is empty, a warning is emitted and the
invocation returns without submitting jobs. Otherwise ``squeue --me`` is
checked for currently running ``AIND-Ephys-Pipeline`` jobs, and up to the
difference from two jobs are submitted.
:param queue_directory: Path to the queue root directory.
:type queue_directory: pathlib.Path
:param processing_directory: Path to the directory used for temporary working trees
during job submission.
:type processing_directory: pathlib.Path
:param test: If ``True``, preserve temporary processing directories on success.
:type test: bool
:raises FileNotFoundError: If ``state.jsonl`` is not found in *queue_directory*.
"""
state_file = queue_directory / "state.jsonl"
if not state_file.exists():
message = f"State file not found: {state_file}"
raise FileNotFoundError(message)
if not state_file.read_text().strip():
_log.info(f"No entries in {state_file}")
return
running_count = _count_running_aind_ephys_pipeline_jobs()
available_slots = max(0, 2 - running_count)
if available_slots > 0:
_submit_next(
processing_directory=processing_directory,
max_submissions=available_slots,
test=test,
)