dandi_compute_code.queue#

dandi_compute_code.queue.aggregate_queue_statistics(
*,
queue_directory,
dandiset_directory,
output_file_name='queue_stats.json',
)[source]#

Write aggregate queue statistics JSON and return the written payload.

Return type:

dict

Parameters:
  • queue_directory (Path)

  • dandiset_directory (Path)

  • output_file_name (str)

dandi_compute_code.queue.clean_unsubmitted_capsules(*, dandiset_directory, queue_directory)[source]#

Remove all queued (unsubmitted) capsule directories from the dandiset tree.

A capsule is considered queued (prepared but not yet submitted) when its attempt directory has a code/ subdirectory but neither a non-empty logs/ subdirectory nor a derivatives/ subdirectory, and the attempt directory does not contain a submitted-marker file (code/submitted or code/submitted_date-*).

The function reads the queue state, then deletes each matching attempt directory tree from the DANDI archive (via dandi delete) and from the local filesystem. This expects the local Dandiset copy to be up-to-date.

Parameters:
  • dandiset_directory (Path) – Path to a local clone of the dandiset repository used to resolve and delete matching attempt directories.

  • queue_directory (Path) – Path to the queue root directory.

Returns:

List of attempt directory paths that were deleted.

Return type:

list[Path]

Raises:
  • NotADirectoryError – If queue_directory does not exist or is not a directory.

  • RuntimeError – If the DANDI_API_KEY environment variable is not set or is blank.

dandi_compute_code.queue.dump_issues(*, dandiset_directory, queue_directory, output_file_name='issues_dump.json')[source]#

Scan nextflow/slurm logs and write per-capsule error lines under queue_directory.

Return type:

list[dict]

Parameters:
  • dandiset_directory (Path)

  • queue_directory (Path)

  • output_file_name (str)

class dandi_compute_code.queue.JobEntry(
job,
content_id,
asset_size_bytes,
has_code=False,
has_been_submitted=False,
has_output=False,
has_logs=False,
created_at=None,
job_completion_time=None,
dataset_description_path=None,
output_paths=<factory>,
log_paths=<factory>,
)[source]#

Bases: object

A JobInfo (identity) plus the status fields written by write_queue_state and consumed across the queue module.

Parameters:
  • job (JobInfo)

  • content_id (str | None)

  • asset_size_bytes (int | None)

  • has_code (bool)

  • has_been_submitted (bool)

  • has_output (bool)

  • has_logs (bool)

  • created_at (str | None)

  • job_completion_time (str | None)

  • dataset_description_path (str | None)

  • output_paths (dict)

  • log_paths (dict)

job: JobInfo#
content_id: str | None#
asset_size_bytes: int | None#
has_code: bool = False#
has_been_submitted: bool = False#
has_output: bool = False#
has_logs: bool = False#
created_at: str | None = None#
job_completion_time: str | None = None#
dataset_description_path: str | None = None#
output_paths: dict#
log_paths: dict#
property is_pending: bool#

Code prepared but never submitted (no logs, no output yet).

property is_running: bool#

Logs present but no output yet — likely still executing.

property is_successful: bool#

Output directory present — job completed successfully.

property is_failed: bool#

Has code and logs but no output — the job ran but did not succeed.

classmethod from_dict(data, /)[source]#

Construct from a raw state.jsonl entry dict.

Return type:

JobEntry

Parameters:

data (dict)

to_dict()[source]#

Serialise back to the flat dict format written to state.jsonl.

Return type:

dict

class dandi_compute_code.queue.JobInfo(dandiset_id, dandi_path, pipeline, version, params, config, attempt, codebase)[source]#

Bases: object

Parameters:
  • dandiset_id (str)

  • dandi_path (str)

  • pipeline (str)

  • version (str)

  • params (str)

  • config (str)

  • attempt (int)

  • codebase (str)

dandiset_id: str#
dandi_path: str#
pipeline: str#
version: str#
params: str#
config: str#
attempt: int#
codebase: str#
dandi_compute_code.queue.prepare_queue(
*,
queue_directory,
pipeline_directory=None,
config_key='default',
content_ids=None,
limit=None,
)[source]#

En-masse preparation of qualifying assets based on the current queue config.

For every pipeline/version/params combination declared in queue_config.json this function determines which content IDs to prepare and calls prepare_aind_ephys_job() for each asset — generating the code/ directory and its parent directories without submitting a job.

The per-pipeline failure cap (max_fail_per_dandiset in queue_config.json) is enforced by reading the existing state.jsonl file inside queue_directory. Entries with has_code=True, has_logs=True, and has_output=False are counted as failures for the relevant pipeline, version, and source Dandiset. Run write_queue_state() beforehand to ensure state.jsonl is up to date.

Parameters:
  • queue_directory (Path) – Path to the queue root directory.

  • pipeline_directory (Path | None) – Local path to the AIND pipeline repository. Passed directly to prepare_aind_ephys_job().

  • config_key (str) – Key for a registered job configuration. Passed directly to prepare_aind_ephys_job().

  • content_ids (list[str] | None) – Explicit list of content IDs to prepare. When provided, the qualifying content IDs list is not fetched from the network and these IDs are used directly instead. Useful for targeted runs such as testing with one or more known content IDs.

  • limit (int | None) – If provided, stop after preparing limit assets in total (across all pipeline/version/params combinations). When qualifying IDs are fetched automatically, they are randomized in round-robin order across source Dandisets before this limit is applied. Useful for testing.

Return type:

None

dandi_compute_code.queue.process_queue(*, queue_directory, processing_directory, test=False)[source]#

Submit jobs from state.jsonl up to two total running AIND-Ephys-Pipeline SLURM jobs.

If state.jsonl is absent, a FileNotFoundError is raised. If state.jsonl exists but is empty, a warning is emitted and the invocation returns without submitting jobs. Otherwise squeue --me is checked for currently running AIND-Ephys-Pipeline jobs, and up to the difference from two jobs are submitted.

Parameters:
  • queue_directory (Path) – Path to the queue root directory.

  • processing_directory (Path) – Path to the directory used for temporary working trees during job submission.

  • test (bool) – If True, preserve temporary processing directories on success.

Raises:

FileNotFoundError – If state.jsonl is not found in queue_directory.

Return type:

None

class dandi_compute_code.queue.QueueState(entries)[source]#

Bases: object

Container for all entries in state.jsonl.

Replaces the scattered list[dict] reads in _prepare_queue.py, _aggregate_queue_statistics.py, _process_queue.py, and _clean_unsubmitted_capsules.py.

Parameters:

entries (list[JobEntry])

entries: list[JobEntry]#
property pending: list[JobEntry]#

Entries with code prepared but not yet submitted.

property running: list[JobEntry]#

Entries with logs present but no output — likely still executing.

property successful: list[JobEntry]#

Entries whose output directory is present.

property failed: list[JobEntry]#

Entries with code and logs but no output.

classmethod from_jsonl(file_path, /)[source]#

Load from an existing state.jsonl file.

Parameters:

file_path (Path) – Path to the state.jsonl file to read.

Raises:

FileNotFoundError – If file_path does not exist.

Return type:

QueueState

classmethod from_assets_yaml(file_path=None, /)[source]#

Build from DANDI assets metadata.

When file_path is None the metadata is fetched from the DANDI S3 bucket over the network. When a path is provided the file is read locally; the file should be a YAML file whose content is a list of asset dicts with path, contentSize, dateModified, and contentUrl fields (matching the assets.yaml layout from DANDI).

Parameters:

file_path (Path | None) – Optional path to a local assets YAML file. Pass None to fetch from the network.

Return type:

QueueState

to_file(file_path, /)[source]#

Write all entries to file_path as newline-delimited JSON.

Parameters:

file_path (Path) – Destination path; the file is overwritten if it already exists.

Return type:

None

dandi_compute_code.queue.summarize_issues(
*,
dandiset_directory,
queue_directory,
dump_output_file_name='issues_dump.json',
output_file_name='issues_summary.json',
)[source]#

Write descending error-frequency summary where keys are counts and values are error strings.

Return type:

dict[str, list[str]]

Parameters:
  • dandiset_directory (Path)

  • queue_directory (Path)

  • dump_output_file_name (str)

  • output_file_name (str)

dandi_compute_code.queue.write_queue_state(*, queue_directory)[source]#

Write state.jsonl from DANDI assets.jsonld metadata.

Each state entry represents one attempt capsule inferred from the derivatives/dandiset-*/.../pipeline-*/..._attempt-* path structure in assets.jsonld. dandi_path is derived from the path segment between dandiset-* and pipeline-* with .nwb appended. has_code/has_been_submitted/has_output/has_logs are inferred from the assets present under each attempt directory. dataset_description_path records the root-level dataset_description.json asset path when present. output_paths maps each output asset path to its blob ID when has_output is True; otherwise it is an empty dict. log_paths maps each log asset path to its blob ID when has_logs is True; otherwise it is an empty dict.

For meta-analysis dandisets, the content_id and asset_size_bytes of each attempt’s source NWB are resolved by fetching the upstream dandiset’s assets.jsonld. created_at comes from the local code/submit.sh modification time; job_completion_time is the latest modification time among log files.

Parameters:

queue_directory (Path) – Path to the queue root directory.

Return type:

None