import json
import logging
import pathlib
import re
import urllib.error
import urllib.request
from dataclasses import dataclass
from ._load_queue_config import _load_queue_config
from ..dandiset._load_assets_jsonld_metadata import (
AssetMetadata,
AssetsJsonldMetadata,
_build_asset_metadata,
load_assets_jsonld_metadata,
)
_FLAT_ATTEMPT_RE = re.compile(
r"^version-(?P<version>.+?)"
r"_codebase-(?P<codebase>[^_]+)"
r"_params-(?P<params>[^_]+)"
r"_config-(?P<config>[^_]+)"
r"_attempt-(?P<attempt>\d+)$"
)
_NESTED_ATTEMPT_RE = re.compile(r"^params-(?P<params>[^_]+)_config-(?P<config>[^_]+)_attempt-(?P<attempt>\d+)$")
_UPSTREAM_JSONLD_URL_TEMPLATE = "https://dandiarchive.s3.amazonaws.com/dandisets/{dandiset_id}/draft/assets.jsonld"
_log = logging.getLogger(__name__)
[docs]
@dataclass(frozen=True)
class JobInfo:
dandiset_id: str
dandi_path: str
pipeline: str
version: str
params: str
config: str
attempt: int
codebase: str
# --- path parsing -----------------------------------------------------------
def _find_segment_index(parts: tuple[str, ...], prefix: str, start: int = 0) -> int | None:
"""Return the index of the first part starting with ``prefix`` at or after ``start``."""
for index in range(start, len(parts)):
if parts[index].startswith(prefix):
return index
return None
def _parse_flat_attempt(parts: tuple[str, ...], index: int) -> tuple[dict[str, str], int] | None:
"""Parse a single-segment ``version-..._params-..._config-..._attempt-N`` directory."""
match = _FLAT_ATTEMPT_RE.fullmatch(parts[index])
if match is None:
return None
return match.groupdict(), index
def _parse_nested_attempt(parts: tuple[str, ...], index: int) -> tuple[dict[str, str], int] | None:
"""Parse a ``version-X / params-..._config-..._attempt-N`` directory pair."""
if not parts[index].startswith("version-") or index + 1 >= len(parts):
return None
match = _NESTED_ATTEMPT_RE.fullmatch(parts[index + 1])
if match is None:
return None
fields = match.groupdict()
fields["version"] = parts[index][len("version-") :]
return fields, index + 1
def _parse_attempt_identity(asset_path: str) -> tuple[JobInfo, str] | None:
"""
Parse an asset path of the form
``derivatives/dandiset-XXX/.../pipeline-NAME/<attempt-dir>/<subpath>``
into a :class:`JobInfo` and the subpath beneath the attempt directory.
Returns ``None`` if the path does not match either supported layout.
"""
parts = pathlib.PurePosixPath(asset_path).parts
dandiset_index = _find_segment_index(parts, "dandiset-")
if dandiset_index is None:
return None
pipeline_index = _find_segment_index(parts, "pipeline-", start=dandiset_index + 1)
if pipeline_index is None or pipeline_index <= dandiset_index + 1:
return None
pipeline = parts[pipeline_index][len("pipeline-") :]
if not pipeline:
return None
attempt_dir_index = pipeline_index + 1
if attempt_dir_index >= len(parts):
return None
parsed = _parse_flat_attempt(parts, attempt_dir_index) or _parse_nested_attempt(parts, attempt_dir_index)
if parsed is None:
return None
fields, attempt_directory_index = parsed
dandi_path = "/".join(parts[dandiset_index + 1 : pipeline_index]) + ".nwb"
subpath = "/".join(parts[attempt_directory_index + 1 :])
job_info = JobInfo(
dandiset_id=parts[dandiset_index][len("dandiset-") :],
dandi_path=dandi_path,
pipeline=pipeline,
version=fields["version"],
params=fields["params"],
config=fields["config"],
attempt=int(fields["attempt"]),
codebase=fields["codebase"],
)
return job_info, subpath
def _subpath_is_under(subpath: str, directory: str) -> bool:
"""True if ``subpath`` equals ``directory`` or lives inside it."""
return subpath == directory or subpath.startswith(f"{directory}/")
# --- upstream metadata lookup ----------------------------------------------
def _load_upstream_assets_jsonld_metadata(dandiset_id: str) -> AssetsJsonldMetadata:
"""
Fetch and index ``assets.jsonld`` for another dandiset by id.
Mirrors :func:`load_assets_jsonld_metadata` but parameterized by URL so
derivatives in a meta-analysis dandiset can resolve their source assets.
"""
url = _UPSTREAM_JSONLD_URL_TEMPLATE.format(dandiset_id=dandiset_id)
content_id_to_asset: dict[str, dict[str, object]] = {}
path_to_asset_metadata: dict[str, AssetMetadata] = {}
try:
with urllib.request.urlopen(url, timeout=30) as response:
assets = json.load(response)
except (urllib.error.URLError, TimeoutError, json.JSONDecodeError) as exception:
_log.warning("Unable to load upstream metadata from %s: %s", url, exception)
return AssetsJsonldMetadata(
content_id_to_asset=content_id_to_asset,
path_to_asset_metadata=path_to_asset_metadata,
)
if not isinstance(assets, list):
_log.warning("Expected a JSON array from %s, got %s", url, type(assets).__name__)
return AssetsJsonldMetadata(
content_id_to_asset=content_id_to_asset,
path_to_asset_metadata=path_to_asset_metadata,
)
for asset in assets:
if not isinstance(asset, dict):
continue
try:
content_id, metadata = _build_asset_metadata(asset)
except ValueError as exception:
_log.debug("Skipping malformed upstream asset in %s: %s", url, exception)
continue
content_id_to_asset[content_id] = asset
path_to_asset_metadata[metadata.path] = metadata
return AssetsJsonldMetadata(
content_id_to_asset=content_id_to_asset,
path_to_asset_metadata=path_to_asset_metadata,
)
class _UpstreamMetadataCache:
"""Per-call cache of upstream ``assets.jsonld`` lookups, keyed by dandiset id."""
def __init__(self) -> None:
self._cache: dict[str, AssetsJsonldMetadata] = {}
def get(self, dandiset_id: str) -> AssetsJsonldMetadata:
if dandiset_id not in self._cache:
self._cache[dandiset_id] = _load_upstream_assets_jsonld_metadata(dandiset_id)
return self._cache[dandiset_id]
# --- record construction ---------------------------------------------------
def _new_attempt_record(job: JobInfo) -> dict[str, object]:
return {
"dandiset_id": job.dandiset_id,
"dandi_path": job.dandi_path,
"pipeline": job.pipeline,
"version": job.version,
"params": job.params,
"config": job.config,
"attempt": job.attempt,
"has_code": False,
"has_been_submitted": False,
"has_output": False,
"has_logs": False,
"dataset_description_path": None,
"output_paths": {},
"log_paths": {},
"codebase": job.codebase,
}
# --- attempt collection ----------------------------------------------------
@dataclass
class _AttemptCollection:
"""Bookkeeping accumulated while walking ``assets.jsonld`` once."""
records_by_attempt: dict[JobInfo, dict[str, object]]
log_timestamps_by_attempt: dict[JobInfo, list[str]]
submit_sh_timestamps_by_attempt: dict[JobInfo, str]
def _collect_attempts(local_metadata: AssetsJsonldMetadata) -> _AttemptCollection:
"""
Walk every asset path, group by attempt identity, record presence flags,
and capture the ``code/submit.sh`` timestamp per attempt (used for
``created_at``).
"""
records_by_attempt: dict[JobInfo, dict[str, object]] = {}
log_timestamps_by_attempt: dict[JobInfo, list[str]] = {}
submit_sh_timestamps_by_attempt: dict[JobInfo, str] = {}
for asset_path, asset_metadata in local_metadata.path_to_asset_metadata.items():
parsed = _parse_attempt_identity(asset_path)
if parsed is None:
continue
job_info, subpath = parsed
record = records_by_attempt.setdefault(job_info, _new_attempt_record(job_info))
if _subpath_is_under(subpath, "code"):
record["has_code"] = True
if subpath.startswith("code/submitted"):
record["has_been_submitted"] = True
if subpath == "dataset_description.json":
record["dataset_description_path"] = asset_path
elif _subpath_is_under(subpath, "derivatives"):
record["has_output"] = True
record["output_paths"][asset_path] = asset_metadata.content_id
elif subpath.startswith("logs/"):
log_relative_path = subpath.removeprefix("logs/")
if log_relative_path and log_relative_path != "dataset_description.json":
record["has_logs"] = True
record["log_paths"][asset_path] = asset_metadata.content_id
log_timestamps_by_attempt.setdefault(job_info, []).append(asset_metadata.date_modified)
if subpath == "code/submit.sh":
submit_sh_timestamps_by_attempt[job_info] = asset_metadata.date_modified
return _AttemptCollection(
records_by_attempt=records_by_attempt,
log_timestamps_by_attempt=log_timestamps_by_attempt,
submit_sh_timestamps_by_attempt=submit_sh_timestamps_by_attempt,
)
def _finalize_attempt_records(
*,
collection: _AttemptCollection,
upstream_cache: _UpstreamMetadataCache,
) -> list[dict[str, object]]:
"""
Attach source-asset fields (``content_id``, ``asset_size_bytes``) from the
upstream dandiset's ``assets.jsonld``, and ``created_at`` /
``job_completion_time`` from local timestamps.
Records are emitted even if the upstream lookup fails; the source fields
become ``None`` and a warning is logged.
"""
finalized: list[dict[str, object]] = []
for job_info, record in collection.records_by_attempt.items():
upstream_metadata = upstream_cache.get(job_info.dandiset_id)
source_metadata = upstream_metadata.path_to_asset_metadata.get(job_info.dandi_path)
if source_metadata is None:
_log.warning(
"Source asset not found in upstream dandiset %s for dandi_path=%s; "
"emitting record with null content_id/asset_size_bytes",
job_info.dandiset_id,
job_info.dandi_path,
)
content_id: str | None = None
asset_size_bytes: int | None = None
else:
content_id = source_metadata.content_id
asset_size_bytes = source_metadata.content_size
completion_times = collection.log_timestamps_by_attempt.get(job_info, [])
record.update(
{
"content_id": content_id,
"asset_size_bytes": asset_size_bytes,
"created_at": collection.submit_sh_timestamps_by_attempt.get(job_info),
"job_completion_time": max(completion_times) if completion_times else None,
}
)
finalized.append(record)
return finalized
def _sort_key(record: dict[str, object]) -> tuple[str, str, str]:
# content_id may be None for attempts whose upstream source wasn't resolvable;
# coerce to "" so sorting stays total.
return (
str(record["dandiset_id"]),
str(record["dandi_path"]),
str(record["content_id"]) if record["content_id"] is not None else "",
)
[docs]
def write_queue_state(*, queue_directory: pathlib.Path) -> None:
"""
Write ``state.jsonl`` from DANDI ``assets.jsonld`` metadata.
Each state entry represents one attempt capsule inferred from the
``derivatives/dandiset-*/.../pipeline-*/..._attempt-*`` path structure in
``assets.jsonld``. ``dandi_path`` is derived from the path segment between
``dandiset-*`` and ``pipeline-*`` with ``.nwb`` appended.
``has_code``/``has_been_submitted``/``has_output``/``has_logs`` are
inferred from the assets present under each attempt directory.
``dataset_description_path`` records the root-level ``dataset_description.json``
asset path when present.
``output_paths`` maps each output asset path to its blob ID when
``has_output`` is ``True``; otherwise it is an empty dict.
``log_paths`` maps each log asset path to its blob ID when ``has_logs`` is
``True``; otherwise it is an empty dict.
For meta-analysis dandisets, the ``content_id`` and ``asset_size_bytes``
of each attempt's source NWB are resolved by fetching the upstream
dandiset's ``assets.jsonld``. ``created_at`` comes from the local
``code/submit.sh`` modification time; ``job_completion_time`` is the
latest modification time among log files.
:param queue_directory: Path to the queue root directory.
:type queue_directory: pathlib.Path
"""
_load_queue_config(queue_directory=queue_directory)
local_metadata = load_assets_jsonld_metadata()
collection = _collect_attempts(local_metadata)
upstream_cache = _UpstreamMetadataCache()
records = _finalize_attempt_records(
collection=collection,
upstream_cache=upstream_cache,
)
records.sort(key=_sort_key)
state_file = queue_directory / "state.jsonl"
with state_file.open(mode="w") as file_stream:
for record in records:
file_stream.write(json.dumps(record) + "\n")