Source code for dandi_compute_code.queue._dump_issues
import datetime
import json
import pathlib
from ._extract_error_lines import _extract_error_lines
from ._list_capsule_log_directories import _list_capsule_log_directories
[docs]
def dump_issues(
*,
dandiset_directory: pathlib.Path,
queue_directory: pathlib.Path,
output_file_name: str = "issues_dump.json",
) -> list[dict]:
"""Scan nextflow/slurm logs and write per-capsule error lines under *queue_directory*."""
records: list[dict] = []
for logs_dir in _list_capsule_log_directories(dandiset_directory=dandiset_directory):
nextflow_log = logs_dir / "nextflow.log"
slurm_logs = sorted(path for path in logs_dir.glob("*slurm.log") if path.is_file())
nextflow_errors = _extract_error_lines(log_file=nextflow_log)
slurm_errors = {log_file.name: _extract_error_lines(log_file=log_file) for log_file in slurm_logs}
slurm_errors = {key: value for key, value in slurm_errors.items() if value}
if not nextflow_errors and not slurm_errors:
continue
records.append(
{
"capsule_path": logs_dir.parent.relative_to(dandiset_directory).as_posix(),
"nextflow_log": (
nextflow_log.relative_to(dandiset_directory).as_posix() if nextflow_log.is_file() else None
),
"nextflow_errors": nextflow_errors,
"slurm_errors": {
log_name: errors for log_name, errors in sorted(slurm_errors.items(), key=lambda item: item[0])
},
}
)
payload = {
"generated_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"capsule_count": len(records),
"records": records,
}
(queue_directory / output_file_name).write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n")
return records