Source code for dandi_compute_code.queue._clean_unsubmitted_capsules
import os
import pathlib
import shutil
import subprocess
from ._read_state_entries import _read_state_entries
from ._resolve_unsubmitted_attempt_dir import _resolve_unsubmitted_attempt_dir
# TODO: refactor this to not use try/except pattern (shouldn't be necessary)
def _remove_empty_parents(*, start: pathlib.Path, stop: pathlib.Path) -> None:
"""
Remove empty directories from ``start`` up to but not including ``stop``.
If ``stop`` is not an ancestor of ``start``, this function returns without
modifying the filesystem. Removal stops at the first non-empty directory.
"""
if stop not in start.parents:
return
current = start
while current != stop:
if not current.exists() or not current.is_dir():
break
try:
current.rmdir()
except OSError:
break
current = current.parent
# TODO: review if the return value here is needed at all
# TODO: make more efficient in terms of --preserve-tree
[docs]
def clean_unsubmitted_capsules(
*,
dandiset_directory: pathlib.Path,
queue_directory: pathlib.Path,
) -> list[pathlib.Path]:
"""
Remove all queued (unsubmitted) capsule directories from the dandiset tree.
A capsule is considered *queued* (prepared but not yet submitted) when its
attempt directory has a ``code/`` subdirectory but neither a non-empty
``logs/`` subdirectory nor a ``derivatives/`` subdirectory, and the attempt
directory does not contain a submitted-marker file (``code/submitted`` or
``code/submitted_date-*``).
The function reads the queue state, then deletes each
matching attempt directory tree from the DANDI archive (via ``dandi
delete``) and from the local filesystem. This expects the local Dandiset copy to be
up-to-date.
Parameters
----------
dandiset_directory : pathlib.Path
Path to a local clone of the dandiset repository used to resolve and
delete matching attempt directories.
queue_directory : pathlib.Path
Path to the queue root directory.
Returns
-------
list[pathlib.Path]
List of attempt directory paths that were deleted.
Raises
------
NotADirectoryError
If *queue_directory* does not exist or is not a directory.
RuntimeError
If the ``DANDI_API_KEY`` environment variable is not set or is blank.
"""
if not queue_directory.is_dir():
message = f"Queue directory does not exist or is not a directory: {queue_directory}"
raise NotADirectoryError(message)
if not os.environ.get("DANDI_API_KEY", "").strip():
message = "`DANDI_API_KEY` environment variable is not set or is blank."
raise RuntimeError(message)
state_entries = _read_state_entries(queue_directory / "state.jsonl")
cleanable_attempt_dirs = [
attempt_dir
for entry in state_entries
if (attempt_dir := _resolve_unsubmitted_attempt_dir(base_dir=dandiset_directory, entry=entry)) is not None
]
removed: list[pathlib.Path] = []
for attempt_dir in cleanable_attempt_dirs:
if attempt_dir.is_dir():
parent_dir = attempt_dir.parent
subprocess.run(
["dandi", "delete", str(attempt_dir)],
input=b"y\n",
check=True,
)
shutil.rmtree(attempt_dir)
_remove_empty_parents(start=parent_dir, stop=dandiset_directory / "derivatives")
removed.append(attempt_dir)
return removed