Source code for dandi_compute_code.queue._prepare_queue
import collections
import gzip
import json
import logging
import pathlib
import urllib.request
from ._load_queue_config import _load_queue_config
from ._order_content_ids_for_uniform_dandiset_sampling import _order_content_ids_for_uniform_dandiset_sampling
from ..aind_ephys_pipeline import prepare_aind_ephys_job
_log = logging.getLogger(__name__)
# TODO: rethink the passing of content_ids here (namely for testing)
# in favor of direct pipeline job capsule creation
[docs]
def prepare_queue(
*,
queue_directory: pathlib.Path,
pipeline_directory: pathlib.Path | None = None,
config_key: str = "default",
content_ids: list[str] | None = None,
limit: int | None = None,
) -> None:
"""
En-masse preparation of qualifying assets based on the current queue config.
For every pipeline/version/params combination declared in ``queue_config.json``
this function determines which content IDs to prepare and calls
:func:`~dandi_compute_code.aind_ephys_pipeline.prepare_aind_ephys_job` for each
asset — generating the ``code/`` directory and its parent directories without
submitting a job.
The per-pipeline failure cap (``max_fail_per_dandiset`` in ``queue_config.json``)
is enforced by reading the existing ``state.jsonl`` file inside
*queue_directory*. Entries with ``has_code=True``, ``has_logs=True``, and
``has_output=False`` are counted as failures for the relevant pipeline,
version, and source Dandiset. Run :func:`write_queue_state` beforehand to
ensure ``state.jsonl`` is up to date.
:param queue_directory: Path to the queue root directory.
:type queue_directory: pathlib.Path
:param pipeline_directory: Local path to the AIND pipeline repository. Passed directly to
:func:`~dandi_compute_code.aind_ephys_pipeline.prepare_aind_ephys_job`.
:type pipeline_directory: pathlib.Path, optional
:param config_key: Key for a registered job configuration. Passed directly to
:func:`~dandi_compute_code.aind_ephys_pipeline.prepare_aind_ephys_job`.
:type config_key: str
:param content_ids: Explicit list of content IDs to prepare. When provided, the qualifying
content IDs list is not fetched from the network and these IDs are used
directly instead. Useful for targeted runs such as testing with one or
more known content IDs.
:type content_ids: list of str, optional
:param limit: If provided, stop after preparing *limit* assets in total (across all
pipeline/version/params combinations). When qualifying IDs are fetched
automatically, they are randomized in round-robin order across source
Dandisets before this limit is applied. Useful for testing.
:type limit: int, optional
"""
queue_config = _load_queue_config(queue_directory=queue_directory)
if content_ids is None:
qualifying_aind_content_ids_url = (
"https://raw.githubusercontent.com/dandi-cache/qualifying-aind-content-ids/refs/heads/min/"
"derivatives/qualifying_aind_content_ids.min.json.gz"
)
with urllib.request.urlopen(url=qualifying_aind_content_ids_url) as response:
fetched_content_ids = json.loads(gzip.decompress(response.read()))
content_ids = _order_content_ids_for_uniform_dandiset_sampling(content_ids=fetched_content_ids)
state_file = queue_directory / "state.jsonl"
state_entries = (
[json.loads(line.strip()) for line in state_file.read_text().splitlines() if line.strip()]
if state_file.exists()
else []
)
# Build from all known state entries. A content ID is expected to map to a
# stable source dandiset in normal operation; ambiguous mappings are handled
# conservatively below by skipping max-failure enforcement for that asset.
content_id_to_dandiset_ids: dict[str, set[str]] = {}
for entry in state_entries:
content_id = entry.get("content_id")
dandiset_id = entry.get("dandiset_id")
if content_id and dandiset_id:
content_id_to_dandiset_ids.setdefault(content_id, set()).add(dandiset_id)
failure_entries = [
entry
for entry in state_entries
if entry.get("has_code") and entry.get("has_logs") and not entry.get("has_output")
]
prepared_count = 0
for pipeline_name, pipeline_data in queue_config.get("pipelines", {}).items():
if limit is not None and prepared_count >= limit:
break
for version in pipeline_data.get("version_priority", []):
if limit is not None and prepared_count >= limit:
break
for params in pipeline_data.get("params_priority", []):
if limit is not None and prepared_count >= limit:
break
pipeline_cfg = queue_config["pipelines"][pipeline_name]
max_fail = pipeline_cfg.get("max_fail_per_dandiset")
failure_count_by_dandiset: collections.defaultdict[str, int] = collections.defaultdict(int)
if max_fail is not None:
for entry in failure_entries:
if entry.get("pipeline") != pipeline_name or entry.get("version", "") != version:
continue
dandiset_id = entry.get("dandiset_id")
if not dandiset_id:
continue
failure_count_by_dandiset[dandiset_id] += 1
for content_id in content_ids:
if limit is not None and prepared_count >= limit:
break
if max_fail is not None:
dandiset_ids = content_id_to_dandiset_ids.get(content_id, set())
if len(dandiset_ids) == 1:
dandiset_id = next(iter(dandiset_ids))
failure_count = failure_count_by_dandiset.get(dandiset_id, 0)
if failure_count >= max_fail:
_log.info(
f"Skipping preparation for {pipeline_name}/{version}/{params}/{content_id}: "
f"failure count ({failure_count}) for dandiset-{dandiset_id} has reached "
f"max_fail_per_dandiset ({max_fail})."
)
continue
else:
mapped_dandisets = ", ".join(sorted(dandiset_ids)) if dandiset_ids else "<none>"
_log.info(
f"Preparing {content_id} without max_fail_per_dandiset enforcement for "
f"{pipeline_name}/{version}/{params}: expected exactly 1 mapped dandiset but "
f"found {len(dandiset_ids)} ({mapped_dandisets})."
)
_log.info(f"Preparing content ID: {content_id}")
prepare_aind_ephys_job(
content_id=content_id,
parameters_key=params,
pipeline_version=version,
pipeline_directory=pipeline_directory,
config_key=config_key,
silent=True,
)
prepared_count += 1