Source code for dandi_compute_code.queue._aggregate_queue_statistics
import collections
import datetime
import json
import pathlib
from ._duration_string_to_seconds import _duration_string_to_seconds
from ._extract_nextflow_timeline_data import _extract_nextflow_timeline_data
from ._resolve_attempt_dir import _resolve_attempt_dir
[docs]
def aggregate_queue_statistics(
*,
queue_directory: pathlib.Path,
dandiset_directory: pathlib.Path,
output_file_name: str = "queue_stats.json",
) -> dict:
"""Write aggregate queue statistics JSON and return the written payload."""
state_file = queue_directory / "state.jsonl"
state_entries = (
[json.loads(line.strip()) for line in state_file.read_text().splitlines() if line.strip()]
if state_file.exists()
else []
)
successful_asset_bytes_total = sum(
entry["asset_size_bytes"]
for entry in state_entries
if entry.get("has_output")
and isinstance(entry.get("asset_size_bytes"), int)
and not isinstance(entry.get("asset_size_bytes"), bool)
)
job_step_wall_time_seconds: collections.defaultdict[str, float] = collections.defaultdict(float)
timeline_files_processed = 0
for entry in state_entries:
attempt_dir = _resolve_attempt_dir(base_dir=dandiset_directory, entry=entry)
timeline_file = attempt_dir / "logs" / "timeline.html"
if not timeline_file.is_file():
continue
timeline_data = _extract_nextflow_timeline_data(timeline_html=timeline_file.read_text())
if timeline_data is None:
continue
processes = timeline_data.get("processes")
if not isinstance(processes, list):
continue
timeline_files_processed += 1
for process in processes:
if not isinstance(process, dict):
continue
process_label = process.get("label")
if not isinstance(process_label, str):
continue
step_name = process_label.split(" (", 1)[0]
times = process.get("times")
if not isinstance(times, list):
continue
for step in times:
if not isinstance(step, dict):
continue
duration_label = step.get("label")
if not isinstance(duration_label, str):
continue
duration_string = duration_label.split("/", 1)[0].strip()
duration_seconds = _duration_string_to_seconds(duration_string)
if duration_seconds > 0:
job_step_wall_time_seconds[step_name] += duration_seconds
statistics = {
"generated_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"state_entry_count": len(state_entries),
"successful_asset_bytes_total": successful_asset_bytes_total,
"timeline_files_processed": timeline_files_processed,
"job_step_wall_time_seconds": {
key: value for key, value in sorted(job_step_wall_time_seconds.items(), key=lambda item: item[0])
},
}
output_file = queue_directory / output_file_name
output_file.write_text(json.dumps(statistics, indent=2, sort_keys=True) + "\n")
return statistics