Source code for dandi_compute_code.queue._queue_state

"""
QueueState — typed container for ``state.jsonl``.

``state.jsonl`` is a newline-delimited JSON file where each line is one attempt
capsule.  Previously every consumer read it into a ``list[dict]`` and accessed
fields by string key.  This module adds a thin typed layer on top:

- :class:`JobEntry` wraps an existing :class:`JobInfo` with the status fields
  (``has_code``, ``has_output``, ``has_logs``, ``content_id``, ...).
- :class:`QueueState` is the container — a list of ``JobEntry`` objects with
  convenience helpers for filtering and round-trip I/O.
"""

from __future__ import annotations

import json
import pathlib
from collections.abc import Iterator
from dataclasses import dataclass, field

import yaml

from ._write_queue_state import (
    JobInfo,
    _collect_attempts,
    _finalize_attempt_records,
    _sort_key,
    _UpstreamMetadataCache,
)
from ..dandiset._load_assets_jsonld_metadata import (
    AssetMetadata,
    AssetsJsonldMetadata,
    _build_asset_metadata,
    load_assets_jsonld_metadata,
)


[docs] @dataclass class JobEntry: """ A :class:`JobInfo` (identity) plus the status fields written by ``write_queue_state`` and consumed across the queue module. """ job: JobInfo content_id: str | None asset_size_bytes: int | None has_code: bool = False has_been_submitted: bool = False has_output: bool = False has_logs: bool = False created_at: str | None = None job_completion_time: str | None = None dataset_description_path: str | None = None output_paths: dict = field(default_factory=dict) log_paths: dict = field(default_factory=dict) @property def is_pending(self) -> bool: """Code prepared but never submitted (no logs, no output yet).""" return self.has_code and not self.has_logs and not self.has_output @property def is_running(self) -> bool: """Logs present but no output yet — likely still executing.""" return self.has_logs and not self.has_output @property def is_successful(self) -> bool: """Output directory present — job completed successfully.""" return self.has_output @property def is_failed(self) -> bool: """Has code and logs but no output — the job ran but did not succeed.""" return self.has_code and self.has_logs and not self.has_output
[docs] @classmethod def from_dict(cls, data: dict, /) -> JobEntry: """Construct from a raw ``state.jsonl`` entry dict.""" job = JobInfo( dandiset_id=data["dandiset_id"], dandi_path=data["dandi_path"], pipeline=data["pipeline"], version=data["version"], params=data["params"], config=data["config"], attempt=int(data["attempt"]), codebase=data.get("codebase", ""), ) return cls( job=job, content_id=data.get("content_id"), asset_size_bytes=data.get("asset_size_bytes"), has_code=bool(data.get("has_code", False)), has_been_submitted=bool(data.get("has_been_submitted", False)), has_output=bool(data.get("has_output", False)), has_logs=bool(data.get("has_logs", False)), created_at=data.get("created_at"), job_completion_time=data.get("job_completion_time"), dataset_description_path=data.get("dataset_description_path"), output_paths=dict(data.get("output_paths") or {}), log_paths=dict(data.get("log_paths") or {}), )
[docs] def to_dict(self) -> dict: """Serialise back to the flat dict format written to ``state.jsonl``.""" return { "dandiset_id": self.job.dandiset_id, "dandi_path": self.job.dandi_path, "pipeline": self.job.pipeline, "version": self.job.version, "params": self.job.params, "config": self.job.config, "attempt": self.job.attempt, "codebase": self.job.codebase, "content_id": self.content_id, "asset_size_bytes": self.asset_size_bytes, "has_code": self.has_code, "has_been_submitted": self.has_been_submitted, "has_output": self.has_output, "has_logs": self.has_logs, "dataset_description_path": self.dataset_description_path, "output_paths": self.output_paths, "log_paths": self.log_paths, "created_at": self.created_at, "job_completion_time": self.job_completion_time, }
[docs] @dataclass class QueueState: """ Container for all entries in ``state.jsonl``. Replaces the scattered ``list[dict]`` reads in ``_prepare_queue.py``, ``_aggregate_queue_statistics.py``, ``_process_queue.py``, and ``_clean_unsubmitted_capsules.py``. """ entries: list[JobEntry] def __iter__(self) -> Iterator[JobEntry]: return iter(self.entries) def __len__(self) -> int: return len(self.entries) @property def pending(self) -> list[JobEntry]: """Entries with code prepared but not yet submitted.""" return [e for e in self.entries if e.is_pending] @property def running(self) -> list[JobEntry]: """Entries with logs present but no output — likely still executing.""" return [e for e in self.entries if e.is_running] @property def successful(self) -> list[JobEntry]: """Entries whose output directory is present.""" return [e for e in self.entries if e.is_successful] @property def failed(self) -> list[JobEntry]: """Entries with code and logs but no output.""" return [e for e in self.entries if e.is_failed]
[docs] @classmethod def from_jsonl(cls, file_path: pathlib.Path, /) -> QueueState: """ Load from an existing ``state.jsonl`` file. :param file_path: Path to the ``state.jsonl`` file to read. :type file_path: pathlib.Path :raises FileNotFoundError: If *file_path* does not exist. """ if not file_path.exists(): message = f"State file not found: {file_path}" raise FileNotFoundError(message) stripped_lines = [line.strip() for line in file_path.read_text().splitlines()] entries = [JobEntry.from_dict(json.loads(line)) for line in stripped_lines if line] return cls(entries=entries)
[docs] @classmethod def from_assets_yaml(cls, file_path: pathlib.Path | None = None, /) -> QueueState: """ Build from DANDI assets metadata. When *file_path* is ``None`` the metadata is fetched from the DANDI S3 bucket over the network. When a path is provided the file is read locally; the file should be a YAML file whose content is a list of asset dicts with ``path``, ``contentSize``, ``dateModified``, and ``contentUrl`` fields (matching the ``assets.yaml`` layout from DANDI). :param file_path: Optional path to a local assets YAML file. Pass ``None`` to fetch from the network. :type file_path: pathlib.Path, optional """ if file_path is None: local_metadata = load_assets_jsonld_metadata() else: raw = yaml.safe_load(file_path.read_text()) if not isinstance(raw, list): raise ValueError(f"Expected a YAML list in {file_path}, got {type(raw).__name__}") content_id_to_asset: dict[str, dict] = {} path_to_asset_metadata: dict[str, AssetMetadata] = {} for asset in raw: if not isinstance(asset, dict): continue try: content_id, metadata = _build_asset_metadata(asset) except ValueError: continue content_id_to_asset[content_id] = asset path_to_asset_metadata[metadata.path] = metadata local_metadata = AssetsJsonldMetadata( content_id_to_asset=content_id_to_asset, path_to_asset_metadata=path_to_asset_metadata, ) collection = _collect_attempts(local_metadata) upstream_cache = _UpstreamMetadataCache() records = _finalize_attempt_records(collection=collection, upstream_cache=upstream_cache) records.sort(key=_sort_key) entries = [JobEntry.from_dict(record) for record in records] return cls(entries=entries)
[docs] def to_file(self, file_path: pathlib.Path, /) -> None: """ Write all entries to *file_path* as newline-delimited JSON. :param file_path: Destination path; the file is overwritten if it already exists. :type file_path: pathlib.Path """ with file_path.open(mode="w") as file_stream: for entry in self.entries: file_stream.write(json.dumps(entry.to_dict()) + "\n")