dandi_compute_code.queue#
- dandi_compute_code.queue.aggregate_queue_statistics(
- *,
- queue_directory,
- dandiset_directory,
- output_file_name='queue_stats.json',
Write aggregate queue statistics JSON and return the written payload.
- dandi_compute_code.queue.clean_unsubmitted_capsules(*, dandiset_directory, queue_directory)[source]#
Remove all queued (unsubmitted) capsule directories from the dandiset tree.
A capsule is considered queued (prepared but not yet submitted) when its attempt directory has a
code/subdirectory but neither a non-emptylogs/subdirectory nor aderivatives/subdirectory, and the attempt directory does not contain a submitted-marker file (code/submittedorcode/submitted_date-*).The function reads the queue state, then deletes each matching attempt directory tree from the DANDI archive (via
dandi delete) and from the local filesystem. This expects the local Dandiset copy to be up-to-date.- Parameters:
- Returns:
List of attempt directory paths that were deleted.
- Return type:
- Raises:
NotADirectoryError – If queue_directory does not exist or is not a directory.
RuntimeError – If the
DANDI_API_KEYenvironment variable is not set or is blank.
- dandi_compute_code.queue.dump_issues(*, dandiset_directory, queue_directory, output_file_name='issues_dump.json')[source]#
Scan nextflow/slurm logs and write per-capsule error lines under queue_directory.
- class dandi_compute_code.queue.JobEntry(
- job,
- content_id,
- asset_size_bytes,
- has_code=False,
- has_been_submitted=False,
- has_output=False,
- has_logs=False,
- created_at=None,
- job_completion_time=None,
- dataset_description_path=None,
- output_paths=<factory>,
- log_paths=<factory>,
Bases:
objectA
JobInfo(identity) plus the status fields written bywrite_queue_stateand consumed across the queue module.- Parameters:
- class dandi_compute_code.queue.JobInfo(dandiset_id, dandi_path, pipeline, version, params, config, attempt, codebase)[source]#
Bases:
object- Parameters:
- dandi_compute_code.queue.prepare_queue(
- *,
- queue_directory,
- pipeline_directory=None,
- config_key='default',
- content_ids=None,
- limit=None,
En-masse preparation of qualifying assets based on the current queue config.
For every pipeline/version/params combination declared in
queue_config.jsonthis function determines which content IDs to prepare and callsprepare_aind_ephys_job()for each asset — generating thecode/directory and its parent directories without submitting a job.The per-pipeline failure cap (
max_fail_per_dandisetinqueue_config.json) is enforced by reading the existingstate.jsonlfile inside queue_directory. Entries withhas_code=True,has_logs=True, andhas_output=Falseare counted as failures for the relevant pipeline, version, and source Dandiset. Runwrite_queue_state()beforehand to ensurestate.jsonlis up to date.- Parameters:
queue_directory (
Path) – Path to the queue root directory.pipeline_directory (
Path|None) – Local path to the AIND pipeline repository. Passed directly toprepare_aind_ephys_job().config_key (
str) – Key for a registered job configuration. Passed directly toprepare_aind_ephys_job().content_ids (
list[str] |None) – Explicit list of content IDs to prepare. When provided, the qualifying content IDs list is not fetched from the network and these IDs are used directly instead. Useful for targeted runs such as testing with one or more known content IDs.limit (
int|None) – If provided, stop after preparing limit assets in total (across all pipeline/version/params combinations). When qualifying IDs are fetched automatically, they are randomized in round-robin order across source Dandisets before this limit is applied. Useful for testing.
- Return type:
- dandi_compute_code.queue.process_queue(*, queue_directory, processing_directory, test=False)[source]#
Submit jobs from
state.jsonlup to two total runningAIND-Ephys-PipelineSLURM jobs.If
state.jsonlis absent, aFileNotFoundErroris raised. Ifstate.jsonlexists but is empty, a warning is emitted and the invocation returns without submitting jobs. Otherwisesqueue --meis checked for currently runningAIND-Ephys-Pipelinejobs, and up to the difference from two jobs are submitted.- Parameters:
- Raises:
FileNotFoundError – If
state.jsonlis not found in queue_directory.- Return type:
- class dandi_compute_code.queue.QueueState(entries)[source]#
Bases:
objectContainer for all entries in
state.jsonl.Replaces the scattered
list[dict]reads in_prepare_queue.py,_aggregate_queue_statistics.py,_process_queue.py, and_clean_unsubmitted_capsules.py.- classmethod from_jsonl(file_path, /)[source]#
Load from an existing
state.jsonlfile.- Parameters:
file_path (
Path) – Path to thestate.jsonlfile to read.- Raises:
FileNotFoundError – If file_path does not exist.
- Return type:
- classmethod from_assets_yaml(file_path=None, /)[source]#
Build from DANDI assets metadata.
When file_path is
Nonethe metadata is fetched from the DANDI S3 bucket over the network. When a path is provided the file is read locally; the file should be a YAML file whose content is a list of asset dicts withpath,contentSize,dateModified, andcontentUrlfields (matching theassets.yamllayout from DANDI).
- dandi_compute_code.queue.summarize_issues(
- *,
- dandiset_directory,
- queue_directory,
- dump_output_file_name='issues_dump.json',
- output_file_name='issues_summary.json',
Write descending error-frequency summary where keys are counts and values are error strings.
- dandi_compute_code.queue.write_queue_state(*, queue_directory)[source]#
Write
state.jsonlfrom DANDIassets.jsonldmetadata.Each state entry represents one attempt capsule inferred from the
derivatives/dandiset-*/.../pipeline-*/..._attempt-*path structure inassets.jsonld.dandi_pathis derived from the path segment betweendandiset-*andpipeline-*with.nwbappended.has_code/has_been_submitted/has_output/has_logsare inferred from the assets present under each attempt directory.dataset_description_pathrecords the root-leveldataset_description.jsonasset path when present.output_pathsmaps each output asset path to its blob ID whenhas_outputisTrue; otherwise it is an empty dict.log_pathsmaps each log asset path to its blob ID whenhas_logsisTrue; otherwise it is an empty dict.For meta-analysis dandisets, the
content_idandasset_size_bytesof each attempt’s source NWB are resolved by fetching the upstream dandiset’sassets.jsonld.created_atcomes from the localcode/submit.shmodification time;job_completion_timeis the latest modification time among log files.