API Reference for shared

shared

Shared models, utilities, and canonical prompt templates.

Classes

GPUInfo dataclass

GPUInfo(name: str, vram_mb: int)

Information about a single GPU device.

Attributes:

Name Type Description
name str

Human-readable GPU name (e.g. 'NVIDIA RTX 4090').

vram_mb int

Total VRAM in megabytes.

HardwareBudget dataclass

HardwareBudget(
    max_agents: int,
    max_concurrent_loras: int,
    training_slots: int,
    vram_per_gpu_mb: int,
    single_gpu_mode: bool,
)

Concurrency limits derived from detected hardware.

Attributes:

Name Type Description
max_agents int

Maximum concurrent swarm agents.

max_concurrent_loras int

Maximum LoRA adapters loaded at once.

training_slots int

Number of concurrent training jobs (0 if no GPU).

vram_per_gpu_mb int

VRAM available per GPU (0 if no GPU).

single_gpu_mode bool

True when only 0 or 1 GPUs are available.

HardwareProbe dataclass

HardwareProbe(
    cpu_count: int,
    ram_total_mb: int,
    gpus: list[GPUInfo] = list(),
)

Detected hardware resources on the current machine.

Attributes:

Name Type Description
cpu_count int

Number of logical CPU cores.

ram_total_mb int

Total system RAM in megabytes.

gpus list[GPUInfo]

List of detected GPU devices.

Functions
detect classmethod
detect() -> HardwareProbe

Detect hardware resources on the current machine.

Uses psutil for CPU/RAM detection (always available). Attempts to import pynvml for GPU detection; falls back to empty GPU list if pynvml is not installed or NVML initialization fails.

Returns:

Type Description
HardwareProbe

A HardwareProbe with detected resource information.

Source code in libs/shared/src/shared/hardware.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
@classmethod
def detect(cls) -> HardwareProbe:
    """Detect hardware resources on the current machine.

    Uses psutil for CPU/RAM detection (always available). Attempts to
    import pynvml for GPU detection; falls back to empty GPU list if
    pynvml is not installed or NVML initialization fails.

    Returns:
        A HardwareProbe with detected resource information.
    """
    cpu_count = psutil.cpu_count(logical=True) or 1
    ram_total_mb = psutil.virtual_memory().total // (1024 * 1024)

    gpus: list[GPUInfo] = []
    try:
        import pynvml

        pynvml.nvmlInit()
        device_count = pynvml.nvmlDeviceGetCount()
        for i in range(device_count):
            handle = pynvml.nvmlDeviceGetHandleByIndex(i)
            name = pynvml.nvmlDeviceGetName(handle)
            if isinstance(name, bytes):
                name = name.decode("utf-8")
            mem_info = pynvml.nvmlDeviceGetMemoryInfo(handle)
            vram_mb = mem_info.total // (1024 * 1024)
            gpus.append(GPUInfo(name=name, vram_mb=vram_mb))
        pynvml.nvmlShutdown()
    except ImportError:
        pass  # pynvml not installed; CPU-only mode
    except Exception as e:
        logger.debug("GPU detection failed (will run CPU-only): %s", e)

    return cls(cpu_count=cpu_count, ram_total_mb=ram_total_mb, gpus=gpus)
compute_budget
compute_budget(
    base_model_vram_mb: int = 0,
) -> HardwareBudget

Compute concurrency limits based on detected hardware.

Parameters:

Name Type Description Default
base_model_vram_mb int

VRAM consumed by the base inference model. Used to calculate how much VRAM is left for LoRA adapters and training.

0

Returns:

Type Description
HardwareBudget

A HardwareBudget with computed concurrency limits.

Source code in libs/shared/src/shared/hardware.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def compute_budget(self, base_model_vram_mb: int = 0) -> HardwareBudget:
    """Compute concurrency limits based on detected hardware.

    Args:
        base_model_vram_mb: VRAM consumed by the base inference model.
            Used to calculate how much VRAM is left for LoRA adapters
            and training.

    Returns:
        A HardwareBudget with computed concurrency limits.
    """
    num_gpus = len(self.gpus)
    single_gpu_mode = num_gpus <= 1

    if num_gpus == 0:
        return HardwareBudget(
            max_agents=max(1, self.cpu_count // 2),
            max_concurrent_loras=0,
            training_slots=0,
            vram_per_gpu_mb=0,
            single_gpu_mode=True,
        )

    vram_per_gpu = self.gpus[0].vram_mb
    free_vram = max(0, vram_per_gpu - base_model_vram_mb)

    max_loras = max(1, free_vram // VRAM_PER_LORA_MB)
    training_slots = 1 if free_vram >= MIN_TRAINING_VRAM_MB else 0

    # Agents limited by CPU cores and available VRAM
    max_agents = max(1, min(self.cpu_count // 2, num_gpus * 4))

    return HardwareBudget(
        max_agents=max_agents,
        max_concurrent_loras=max_loras,
        training_slots=training_slots,
        vram_per_gpu_mb=vram_per_gpu,
        single_gpu_mode=single_gpu_mode,
    )

AdapterRef

Bases: BaseModel

Reference to a stored LoRA adapter.

Used within CodingSession to track which adapters were loaded during an agent coding session.

Attributes:

Name Type Description
adapter_id str

UUID of the adapter in the registry.

task_type str

Task category this adapter was trained on (e.g. 'bug-fix').

fitness_score Optional[float]

Evolutionary fitness score, if evaluated.

Example

ref = AdapterRef(adapter_id="abc-123", task_type="bug-fix") ref.adapter_id 'abc-123' ref.fitness_score is None True

CodingSession

Bases: BaseModel

A complete agent coding session record.

Tracks the full lifecycle of a Rune agent session including which adapters were used, how many generate-execute-reflect cycles occurred, and the final outcome.

Attributes:

Name Type Description
session_id str

Unique session identifier.

task_description str

Human-readable description of the coding task.

task_type str

Task category (e.g. 'bug-fix', 'feature-impl').

adapter_refs list[AdapterRef]

List of adapters loaded during this session.

attempt_count int

Number of generate-execute-reflect cycles completed.

outcome Optional[str]

Final session result ('success', 'exhausted', or None if in progress).

Example

session = CodingSession( ... session_id="sess-001", ... task_description="Fix import error", ... task_type="bug-fix", ... ) session.adapter_refs [] session.attempt_count 0 session.outcome is None True

EvolMetrics

Bases: BaseModel

Evolution evaluation metrics for an adapter.

Captures the performance and fitness measurements used by the evolution service to decide which adapters to keep, mutate, or retire.

Attributes:

Name Type Description
adapter_id str

UUID of the adapter being evaluated.

pass_rate float

Pass rate on benchmark tasks (0.0 to 1.0).

fitness_score float

Overall evolutionary fitness score.

generalization_delta Optional[float]

Difference between in-distribution and OOD performance.

Example

metrics = EvolMetrics( ... adapter_id="adapter-001", ... pass_rate=0.85, ... fitness_score=0.9, ... ) metrics.generalization_delta is None True

PipelinePhase

Bases: str, Enum

Pipeline phases for the multi-phase swarm pipeline.

Each phase uses a dedicated Jinja2 trajectory template and prompt template.

SwarmCheckpoint

Bases: BaseModel

Status record for a single swarm task execution.

Attributes:

Name Type Description
run_id str

Unique identifier for the swarm run.

task_hash str

Hash of the task being executed.

agent_id str

Identifier of the agent executing the task.

status str

Current status (pending, running, completed, failed).

outcome Optional[str]

Result description when completed.

started_at Optional[str]

ISO 8601 timestamp when execution began.

completed_at Optional[str]

ISO 8601 timestamp when execution finished.

SwarmConfig

Bases: BaseModel

Configuration for a swarm execution run.

Attributes:

Name Type Description
db_url str

SQLite database URL for the adapter registry.

task_source str

Path to task definitions file or inline task list.

population_size int

Number of concurrent swarm agents.

max_generations int

Maximum evolutionary generations.

evolution_interval int

Seconds between evolution sweeps.

sandbox_backend str

Execution backend ('subprocess' or 'nsjail').

base_model_id str

HuggingFace model identifier for inference.

hypernetwork_checkpoint str | None

Path to pretrained hypernetwork checkpoint.

run_id str | None

Unique identifier for this swarm run (for checkpoint isolation). Auto-generated UUID if not provided.

TaskStatus

Bases: str, Enum

Canonical status strings used across services and swarm checkpoints.

Inherits from str so values are JSON-serialisable and can be compared directly to string literals in existing code (e.g. record.status == "running").

Example

TaskStatus.RUNNING TaskStatus.RUNNING == "running" True "running" == TaskStatus.RUNNING True

NsjailBackend

NsjailBackend()

Bases: SandboxBackend

Execute code inside an nsjail sandbox (Linux-only).

Falls back to SubprocessBackend if nsjail is not found on PATH. Applies resource limits via nsjail flags: --time_limit, --rlimit_as, --rlimit_cpu.

Initialize NsjailBackend, checking nsjail availability.

Source code in libs/shared/src/shared/sandbox.py
107
108
109
110
111
112
113
114
def __init__(self) -> None:
    """Initialize NsjailBackend, checking nsjail availability."""
    self._nsjail_path = shutil.which("nsjail")
    self._fallback = SubprocessBackend()
    if self._nsjail_path is None:
        logger.warning(
            "nsjail not found on PATH; NsjailBackend will fall back to subprocess"
        )
Functions
run
run(code: str, timeout: int = 30) -> SandboxResult

Execute Python code inside nsjail, or fall back to subprocess.

Parameters:

Name Type Description Default
code str

Python source code to execute.

required
timeout int

Maximum execution time in seconds.

30

Returns:

Type Description
SandboxResult

A SandboxResult with captured output and exit status.

Source code in libs/shared/src/shared/sandbox.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def run(self, code: str, timeout: int = 30) -> SandboxResult:
    """Execute Python code inside nsjail, or fall back to subprocess.

    Args:
        code: Python source code to execute.
        timeout: Maximum execution time in seconds.

    Returns:
        A SandboxResult with captured output and exit status.
    """
    if self._nsjail_path is None:
        return self._fallback.run(code, timeout)

    with tempfile.TemporaryDirectory() as tmpdir:
        script_path = os.path.join(tmpdir, "solution.py")
        with open(script_path, "w") as f:
            f.write(code)

        cmd = [
            self._nsjail_path,
            "--mode",
            "o",
            "--time_limit",
            str(timeout),
            "--rlimit_as",
            "512",
            "--rlimit_cpu",
            str(timeout),
            "--",
            sys.executable,
            script_path,
        ]

        try:
            proc = subprocess.run(
                cmd,
                capture_output=True,
                text=True,
                timeout=timeout + 5,
                cwd=tmpdir,
            )
            return SandboxResult(
                stdout=proc.stdout,
                stderr=proc.stderr,
                exit_code=proc.returncode,
                is_timed_out=False,
            )
        except subprocess.TimeoutExpired:
            return SandboxResult(
                stdout="",
                stderr=f"Execution timed out after {timeout}s",
                exit_code=1,
                is_timed_out=True,
            )

SandboxBackend

Bases: ABC

Abstract base class for sandbox execution backends.

Functions
run abstractmethod
run(code: str, timeout: int = 30) -> SandboxResult

Execute Python code and return the result.

Parameters:

Name Type Description Default
code str

Python source code to execute.

required
timeout int

Maximum execution time in seconds.

30

Returns:

Type Description
SandboxResult

A SandboxResult with captured output and exit status.

Source code in libs/shared/src/shared/sandbox.py
41
42
43
44
45
46
47
48
49
50
51
@abstractmethod
def run(self, code: str, timeout: int = 30) -> SandboxResult:
    """Execute Python code and return the result.

    Args:
        code: Python source code to execute.
        timeout: Maximum execution time in seconds.

    Returns:
        A SandboxResult with captured output and exit status.
    """

SandboxResult dataclass

SandboxResult(
    stdout: str,
    stderr: str,
    exit_code: int,
    is_timed_out: bool,
)

Result of executing code in a sandbox.

Attributes:

Name Type Description
stdout str

Captured standard output.

stderr str

Captured standard error.

exit_code int

Process exit code (0 = success).

is_timed_out bool

Whether execution was terminated due to timeout.

SubprocessBackend

Bases: SandboxBackend

Execute code via a standard subprocess.

Writes code to a temporary file and runs it with the current Python interpreter. This is the default backend, used on all platforms.

Functions
run
run(code: str, timeout: int = 30) -> SandboxResult

Execute Python code in a subprocess.

Parameters:

Name Type Description Default
code str

Python source code to execute.

required
timeout int

Maximum execution time in seconds.

30

Returns:

Type Description
SandboxResult

A SandboxResult with captured output and exit status.

Source code in libs/shared/src/shared/sandbox.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def run(self, code: str, timeout: int = 30) -> SandboxResult:
    """Execute Python code in a subprocess.

    Args:
        code: Python source code to execute.
        timeout: Maximum execution time in seconds.

    Returns:
        A SandboxResult with captured output and exit status.
    """
    with tempfile.TemporaryDirectory() as tmpdir:
        script_path = os.path.join(tmpdir, "solution.py")
        with open(script_path, "w") as f:
            f.write(code)

        try:
            proc = subprocess.run(
                [sys.executable, script_path],
                capture_output=True,
                text=True,
                timeout=timeout,
                cwd=tmpdir,
            )
            return SandboxResult(
                stdout=proc.stdout,
                stderr=proc.stderr,
                exit_code=proc.returncode,
                is_timed_out=False,
            )
        except subprocess.TimeoutExpired:
            return SandboxResult(
                stdout="",
                stderr=f"Execution timed out after {timeout}s",
                exit_code=1,
                is_timed_out=True,
            )

Functions

get_best_device

get_best_device() -> str

Return the best available compute device: cuda > mps > cpu.

Imports torch inside the function body per INFRA-05 pattern so this module is importable in CPU-only CI without torch installed.

Returns:

Type Description
str

"cuda" if an NVIDIA GPU is available,

str

"mps" if an Apple Silicon GPU is available,

str

"cpu" otherwise.

Example

device = get_best_device() device in ("cuda", "mps", "cpu") True

Source code in libs/shared/src/shared/hardware.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def get_best_device() -> str:
    """Return the best available compute device: ``cuda`` > ``mps`` > ``cpu``.

    Imports torch inside the function body per INFRA-05 pattern so this module
    is importable in CPU-only CI without torch installed.

    Returns:
        ``"cuda"`` if an NVIDIA GPU is available,
        ``"mps"`` if an Apple Silicon GPU is available,
        ``"cpu"`` otherwise.

    Example:
        >>> device = get_best_device()
        >>> device in ("cuda", "mps", "cpu")
        True
    """
    try:
        import torch  # noqa: PLC0415

        if torch.cuda.is_available():
            return "cuda"
        if torch.backends.mps.is_available():
            return "mps"
    except ImportError:
        pass
    return "cpu"

get_sandbox_backend

get_sandbox_backend() -> SandboxBackend

Return the appropriate sandbox backend based on environment.

Reads RUNE_EXEC_BACKEND env var: - "nsjail" → NsjailBackend (falls back to subprocess if nsjail not found) - anything else or unset → SubprocessBackend

Returns:

Type Description
SandboxBackend

A SandboxBackend instance.

Source code in libs/shared/src/shared/sandbox.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
def get_sandbox_backend() -> SandboxBackend:
    """Return the appropriate sandbox backend based on environment.

    Reads RUNE_EXEC_BACKEND env var:
    - "nsjail" → NsjailBackend (falls back to subprocess if nsjail not found)
    - anything else or unset → SubprocessBackend

    Returns:
        A SandboxBackend instance.
    """
    backend = os.environ.get("RUNE_EXEC_BACKEND", "subprocess")
    if backend == "nsjail":
        return NsjailBackend()
    return SubprocessBackend()

create_service_engine

create_service_engine(default_url: str) -> Engine

Create a SQLAlchemy Engine from DATABASE_URL env var or a default URL.

All three Rune services (api-service, training-svc, evolution-svc) use the same pattern: read DATABASE_URL from the environment, falling back to a service-specific SQLite file. This helper centralises that pattern.

Parameters:

Name Type Description Default
default_url str

Fallback DB URL when DATABASE_URL env var is not set. Typically a SQLite path, e.g. "sqlite:///training_svc.db".

required

Returns:

Type Description
Engine

A configured SQLAlchemy Engine.

Example

engine = create_service_engine("sqlite:///training_svc.db")

Source code in libs/shared/src/shared/storage_utils.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def create_service_engine(default_url: str) -> Engine:
    """Create a SQLAlchemy Engine from DATABASE_URL env var or a default URL.

    All three Rune services (api-service, training-svc, evolution-svc) use the
    same pattern: read DATABASE_URL from the environment, falling back to a
    service-specific SQLite file. This helper centralises that pattern.

    Args:
        default_url: Fallback DB URL when DATABASE_URL env var is not set.
            Typically a SQLite path, e.g. ``"sqlite:///training_svc.db"``.

    Returns:
        A configured SQLAlchemy Engine.

    Example:
        >>> engine = create_service_engine("sqlite:///training_svc.db")
    """
    db_url = os.getenv("DATABASE_URL", default_url)
    connect_args = {"check_same_thread": False} if db_url.startswith("sqlite") else {}
    return create_engine(db_url, connect_args=connect_args, echo=False)

set_wal_mode

set_wal_mode(engine: Engine) -> None

Register a SQLAlchemy connect event that enables WAL journal mode.

WAL mode allows concurrent readers while a writer is active, which is important for swarm workloads that read checkpoints while writing. Safe to call multiple times on different engines; each call registers a separate listener on its own engine instance.

Parameters:

Name Type Description Default
engine Engine

SQLAlchemy Engine to configure.

required
Source code in libs/shared/src/shared/storage_utils.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def set_wal_mode(engine: Engine) -> None:
    """Register a SQLAlchemy connect event that enables WAL journal mode.

    WAL mode allows concurrent readers while a writer is active, which is
    important for swarm workloads that read checkpoints while writing.
    Safe to call multiple times on different engines; each call registers
    a separate listener on its own engine instance.

    Args:
        engine: SQLAlchemy Engine to configure.
    """
    if engine.dialect.name != "sqlite":
        return

    @event.listens_for(engine, "connect")
    def _set_wal(dbapi_conn: object, _record: object) -> None:  # type: ignore[type-arg]
        assert hasattr(dbapi_conn, "execute")
        dbapi_conn.execute("PRAGMA journal_mode=WAL")  # type: ignore[union-attr]

render_prompt

render_prompt(phase: str, **kwargs: object) -> str

Render a phase prompt template with task-specific variables.

Source code in libs/shared/src/shared/template_loader.py
20
21
22
23
def render_prompt(phase: str, **kwargs: object) -> str:
    """Render a phase prompt template with task-specific variables."""
    template = _env.get_template(f"prompt_{phase}.j2")
    return template.render(**kwargs)

render_trajectory

render_trajectory(phase: str, **kwargs: object) -> str

Render a phase trajectory template with task-specific variables.

Source code in libs/shared/src/shared/template_loader.py
14
15
16
17
def render_trajectory(phase: str, **kwargs: object) -> str:
    """Render a phase trajectory template with task-specific variables."""
    template = _env.get_template(f"{phase}.j2")
    return template.render(**kwargs)

get_prompts_dir

get_prompts_dir() -> Path

Return the path to the shared prompt templates directory (Jinja2 .j2 files).

Use this as prompts_dir when calling inference.factory helpers so agents use the same templates with different prompt_vars (DRY). No duplicate template files in services.

Returns:

Type Description
Path

Path to libs/shared/src/shared/templates/ (package data).

Source code in libs/shared/src/shared/__init__.py
59
60
61
62
63
64
65
66
67
68
69
def get_prompts_dir() -> Path:
    """Return the path to the shared prompt templates directory (Jinja2 .j2 files).

    Use this as prompts_dir when calling inference.factory helpers so agents
    use the same templates with different prompt_vars (DRY). No duplicate
    template files in services.

    Returns:
        Path to libs/shared/src/shared/templates/ (package data).
    """
    return Path(__file__).resolve().parent / "templates"

Modules

blackboard

Typed blackboard for inter-subtask context passing.

Subtasks publish their code and extracted interface signatures after completion. Dependent subtasks read predecessor interfaces from the blackboard, which then flow through adapter trajectories.

Classes
SubtaskArtifact dataclass
SubtaskArtifact(
    name: str,
    code: str,
    interfaces: str,
    tests_passed: bool,
    dependencies: list[str] = list(),
)

A completed subtask's publishable artifacts.

Blackboard
Blackboard()

Shared artifact store for inter-subtask communication.

Initialize empty blackboard.

Source code in libs/shared/src/shared/blackboard.py
31
32
33
def __init__(self) -> None:
    """Initialize empty blackboard."""
    self._artifacts: dict[str, SubtaskArtifact] = {}
Functions
publish
publish(artifact: SubtaskArtifact) -> None

Publish a subtask's artifacts.

Source code in libs/shared/src/shared/blackboard.py
35
36
37
def publish(self, artifact: SubtaskArtifact) -> None:
    """Publish a subtask's artifacts."""
    self._artifacts[artifact.name] = artifact
get
get(name: str) -> SubtaskArtifact | None

Retrieve a subtask's artifacts by name.

Source code in libs/shared/src/shared/blackboard.py
39
40
41
def get(self, name: str) -> SubtaskArtifact | None:
    """Retrieve a subtask's artifacts by name."""
    return self._artifacts.get(name)
get_dependency_interfaces
get_dependency_interfaces(
    subtask: dict[str, object],
) -> str

Get concatenated interfaces for a subtask's declared dependencies.

Source code in libs/shared/src/shared/blackboard.py
43
44
45
46
47
48
49
50
51
52
53
54
def get_dependency_interfaces(self, subtask: dict[str, object]) -> str:
    """Get concatenated interfaces for a subtask's declared dependencies."""
    raw_deps = subtask.get("depends_on", [])
    deps = raw_deps if isinstance(raw_deps, list) else []
    if not deps:
        return ""
    parts: list[str] = []
    for dep_name in deps:
        artifact = self._artifacts.get(str(dep_name))
        if artifact and artifact.interfaces:
            parts.append(f"--- {artifact.name} ---\n{artifact.interfaces}")
    return "\n".join(parts)
all_interfaces
all_interfaces() -> str

Get all published interfaces.

Source code in libs/shared/src/shared/blackboard.py
56
57
58
59
60
61
62
def all_interfaces(self) -> str:
    """Get all published interfaces."""
    parts: list[str] = []
    for artifact in self._artifacts.values():
        if artifact.interfaces:
            parts.append(f"--- {artifact.name} ---\n{artifact.interfaces}")
    return "\n".join(parts)
Functions
extract_interfaces
extract_interfaces(code: str, max_lines: int = 60) -> str

Extract class/function signatures, imports, and decorators from code.

Returns a compact structural summary suitable for adapter trajectories.

Source code in libs/shared/src/shared/blackboard.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def extract_interfaces(code: str, max_lines: int = 60) -> str:
    """Extract class/function signatures, imports, and decorators from code.

    Returns a compact structural summary suitable for adapter trajectories.
    """
    if not code:
        return ""
    result: list[str] = []
    for line in code.splitlines():
        stripped = line.strip()
        if stripped.startswith(
            (
                "import ",
                "from ",
                "class ",
                "def ",
                "@",
            )
        ):
            result.append(line)
        elif stripped.startswith(('"""', "'''")):
            result.append(line)
    return "\n".join(result[:max_lines])
build_execution_layers
build_execution_layers(
    subtasks: list[dict[str, object]],
) -> list[list[dict[str, object]]]

Group subtasks into dependency layers for ordered execution.

Layer 0 contains subtasks with no dependencies. Layer N contains subtasks whose dependencies are all in layers 0..N-1.

Parameters:

Name Type Description Default
subtasks list[dict[str, object]]

List of subtask dicts with optional depends_on key.

required

Returns:

Type Description
list[list[dict[str, object]]]

List of layers, each a list of subtask dicts.

Source code in libs/shared/src/shared/blackboard.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def build_execution_layers(
    subtasks: list[dict[str, object]],
) -> list[list[dict[str, object]]]:
    """Group subtasks into dependency layers for ordered execution.

    Layer 0 contains subtasks with no dependencies. Layer N contains
    subtasks whose dependencies are all in layers 0..N-1.

    Args:
        subtasks: List of subtask dicts with optional ``depends_on`` key.

    Returns:
        List of layers, each a list of subtask dicts.
    """
    from graphlib import TopologicalSorter

    name_to_subtask = {str(st["name"]): st for st in subtasks}
    known_names = set(name_to_subtask.keys())

    # Build graph: {node: set_of_dependencies}
    graph: dict[str, set[str]] = {}
    for st in subtasks:
        name = str(st["name"])
        raw_deps = st.get("depends_on", [])
        deps = raw_deps if isinstance(raw_deps, list) else []
        # Filter to known subtask names only
        valid_deps = {str(d) for d in deps if str(d) in known_names}
        graph[name] = valid_deps

    sorter = TopologicalSorter(graph)
    sorter.prepare()  # raises CycleError if dependencies form a cycle

    layers: list[list[dict[str, object]]] = []
    while sorter.is_active():
        ready = list(sorter.get_ready())
        layer = [name_to_subtask[n] for n in ready if n in name_to_subtask]
        layers.append(layer)
        for n in ready:
            sorter.done(n)

    return layers
parse_dependencies
parse_dependencies(
    line: str, all_subtask_names: list[str]
) -> list[str]

Extract dependency names from a decompose output line.

Supports formats
  • [depends: none] → empty list
  • [depends: 1, 3] → resolved to subtask names by index
  • [depends: Data Model] → matched by name substring

Parameters:

Name Type Description Default
line str

A single line from decompose output.

required
all_subtask_names list[str]

Ordered list of all subtask names for index resolution.

required

Returns:

Type Description
list[str]

List of dependency subtask names.

Source code in libs/shared/src/shared/blackboard.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def parse_dependencies(
    line: str,
    all_subtask_names: list[str],
) -> list[str]:
    """Extract dependency names from a decompose output line.

    Supports formats:
      - ``[depends: none]`` → empty list
      - ``[depends: 1, 3]`` → resolved to subtask names by index
      - ``[depends: Data Model]`` → matched by name substring

    Args:
        line: A single line from decompose output.
        all_subtask_names: Ordered list of all subtask names for index resolution.

    Returns:
        List of dependency subtask names.
    """
    match = _DEPENDS_RE.search(line)
    if not match:
        return []

    raw = match.group(1).strip()
    if not raw or raw.lower() == "none":
        return []

    deps: list[str] = []
    for part in raw.split(","):
        part = part.strip()
        if not part:
            continue

        # Try numeric index (1-based)
        try:
            idx = int(part) - 1
            if 0 <= idx < len(all_subtask_names):
                deps.append(all_subtask_names[idx])
                continue
        except ValueError:
            pass

        # Try name match
        part_lower = part.lower()
        for name in all_subtask_names:
            if part_lower in name.lower() or name.lower() in part_lower:
                deps.append(name)
                break

    return deps

checkpoint_db

SQLite-backed checkpoint database for swarm task tracking.

Provides SwarmCheckpointDB for recording task execution state across swarm agents, enabling crash recovery and deduplication.

Classes
CheckpointRecord

Bases: SQLModel

Persistent checkpoint record for a swarm task.

Attributes:

Name Type Description
id Optional[int]

Auto-incremented primary key.

run_id str

Swarm run identifier.

task_hash str

Hash of the task being tracked.

agent_id str

Agent executing the task.

status str

Current status (pending, running, completed, failed).

outcome Optional[str]

Result description when completed.

started_at Optional[str]

ISO 8601 timestamp when execution began.

completed_at Optional[str]

ISO 8601 timestamp when execution finished.

SwarmCheckpointDB
SwarmCheckpointDB(engine: object, run_id: str = 'default')

SQLite-backed checkpoint database for swarm task tracking.

Tracks task execution state (pending → running → completed/failed) to enable crash recovery and prevent duplicate work.

Parameters:

Name Type Description Default
engine object

SQLAlchemy Engine connected to the target SQLite database.

required
run_id str

Identifier for this swarm run. Stored on every record created by this instance. Defaults to "default" for backward compatibility, but callers should supply a unique run ID so that multiple swarm runs are distinguishable in the database.

'default'
Example

from sqlmodel import create_engine engine = create_engine("sqlite:///checkpoints.db") db = SwarmCheckpointDB(engine, run_id="run-2026-03-01") db.mark_running("task-hash-1", "agent-01") db.mark_completed("task-hash-1", "success: pass_rate=0.95")

Initialize checkpoint database and create tables.

Parameters:

Name Type Description Default
engine object

SQLAlchemy Engine connected to the target SQLite database.

required
run_id str

Identifier for this swarm run. Defaults to "default".

'default'
Source code in libs/shared/src/shared/checkpoint_db.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def __init__(self, engine: object, run_id: str = "default") -> None:
    """Initialize checkpoint database and create tables.

    Args:
        engine: SQLAlchemy Engine connected to the target SQLite database.
        run_id: Identifier for this swarm run. Defaults to ``"default"``.
    """
    from sqlalchemy.engine import Engine  # noqa: PLC0415

    assert isinstance(engine, Engine)
    self._engine = engine
    self._run_id = run_id

    set_wal_mode(engine)
    SQLModel.metadata.create_all(engine)
Functions
mark_running
mark_running(task_hash: str, agent_id: str) -> None

Record that an agent has started working on a task.

Parameters:

Name Type Description Default
task_hash str

Hash of the task being started.

required
agent_id str

Identifier of the agent executing the task.

required
Source code in libs/shared/src/shared/checkpoint_db.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def mark_running(self, task_hash: str, agent_id: str) -> None:
    """Record that an agent has started working on a task.

    Args:
        task_hash: Hash of the task being started.
        agent_id: Identifier of the agent executing the task.
    """
    now = datetime.now(timezone.utc).isoformat()
    record = CheckpointRecord(
        run_id=self._run_id,
        task_hash=task_hash,
        agent_id=agent_id,
        status="running",
        started_at=now,
    )
    with Session(self._engine, expire_on_commit=False) as session:
        session.add(record)
        session.commit()
mark_completed
mark_completed(task_hash: str, outcome: str) -> None

Record that a task has been completed.

Parameters:

Name Type Description Default
task_hash str

Hash of the completed task.

required
outcome str

Description of the result.

required
Source code in libs/shared/src/shared/checkpoint_db.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def mark_completed(self, task_hash: str, outcome: str) -> None:
    """Record that a task has been completed.

    Args:
        task_hash: Hash of the completed task.
        outcome: Description of the result.
    """
    now = datetime.now(timezone.utc).isoformat()
    with Session(self._engine, expire_on_commit=False) as session:
        stmt = (
            select(CheckpointRecord)
            .where(
                CheckpointRecord.task_hash == task_hash,
                CheckpointRecord.run_id == self._run_id,
                CheckpointRecord.status == "running",
            )
            .limit(1)
        )
        record = session.exec(stmt).first()
        if record is not None:
            record.status = "completed"
            record.outcome = outcome
            record.completed_at = now
            session.add(record)
            session.commit()
        else:
            # No running record found — this means mark_running() was never
            # called for this task. Log a warning so the bug is visible,
            # then create a completed record to keep the DB consistent.
            logger.warning(
                "mark_completed called for task_hash=%r but no running record "
                "was found (run_id=%r). mark_running() may not have been called. "
                "Creating orphaned completed record with agent_id='unknown'.",
                task_hash,
                self._run_id,
            )
            new_record = CheckpointRecord(
                run_id=self._run_id,
                task_hash=task_hash,
                agent_id="unknown",
                status="completed",
                outcome=outcome,
                completed_at=now,
            )
            session.add(new_record)
            session.commit()
mark_failed
mark_failed(task_hash: str, agent_id: str) -> None

Record that a task execution has failed.

Parameters:

Name Type Description Default
task_hash str

Hash of the failed task.

required
agent_id str

Identifier of the agent that failed.

required
Source code in libs/shared/src/shared/checkpoint_db.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def mark_failed(self, task_hash: str, agent_id: str) -> None:
    """Record that a task execution has failed.

    Args:
        task_hash: Hash of the failed task.
        agent_id: Identifier of the agent that failed.
    """
    now = datetime.now(timezone.utc).isoformat()
    with Session(self._engine, expire_on_commit=False) as session:
        stmt = (
            select(CheckpointRecord)
            .where(
                CheckpointRecord.task_hash == task_hash,
                CheckpointRecord.run_id == self._run_id,
                CheckpointRecord.agent_id == agent_id,
                CheckpointRecord.status == "running",
            )
            .limit(1)
        )
        record = session.exec(stmt).first()
        if record is not None:
            record.status = "failed"
            record.completed_at = now
            session.add(record)
            session.commit()
        else:
            # No running record found — log a warning and still create the
            # failed record so the failure is visible in the DB.
            logger.warning(
                "mark_failed called for task_hash=%r agent_id=%r but no running "
                "record was found (run_id=%r). mark_running() may not have been "
                "called. Creating orphaned failed record.",
                task_hash,
                agent_id,
                self._run_id,
            )
            new_record = CheckpointRecord(
                run_id=self._run_id,
                task_hash=task_hash,
                agent_id=agent_id,
                status="failed",
                completed_at=now,
            )
            session.add(new_record)
            session.commit()
is_completed
is_completed(task_hash: str) -> bool

Check if a task has been completed.

Parameters:

Name Type Description Default
task_hash str

Hash of the task to check.

required

Returns:

Type Description
bool

True if the task has a "completed" checkpoint record.

Source code in libs/shared/src/shared/checkpoint_db.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def is_completed(self, task_hash: str) -> bool:
    """Check if a task has been completed.

    Args:
        task_hash: Hash of the task to check.

    Returns:
        True if the task has a "completed" checkpoint record.
    """
    with Session(self._engine, expire_on_commit=False) as session:
        stmt = (
            select(CheckpointRecord)
            .where(
                CheckpointRecord.task_hash == task_hash,
                CheckpointRecord.run_id == self._run_id,
                CheckpointRecord.status == "completed",
            )
            .limit(1)
        )
        return session.exec(stmt).first() is not None
Functions

hardware

Hardware detection and resource budgeting for swarm execution.

Provides HardwareProbe for detecting CPU, RAM, and GPU resources, and HardwareBudget for computing concurrency limits based on available hardware.

Classes
GPUInfo dataclass
GPUInfo(name: str, vram_mb: int)

Information about a single GPU device.

Attributes:

Name Type Description
name str

Human-readable GPU name (e.g. 'NVIDIA RTX 4090').

vram_mb int

Total VRAM in megabytes.

HardwareBudget dataclass
HardwareBudget(
    max_agents: int,
    max_concurrent_loras: int,
    training_slots: int,
    vram_per_gpu_mb: int,
    single_gpu_mode: bool,
)

Concurrency limits derived from detected hardware.

Attributes:

Name Type Description
max_agents int

Maximum concurrent swarm agents.

max_concurrent_loras int

Maximum LoRA adapters loaded at once.

training_slots int

Number of concurrent training jobs (0 if no GPU).

vram_per_gpu_mb int

VRAM available per GPU (0 if no GPU).

single_gpu_mode bool

True when only 0 or 1 GPUs are available.

HardwareProbe dataclass
HardwareProbe(
    cpu_count: int,
    ram_total_mb: int,
    gpus: list[GPUInfo] = list(),
)

Detected hardware resources on the current machine.

Attributes:

Name Type Description
cpu_count int

Number of logical CPU cores.

ram_total_mb int

Total system RAM in megabytes.

gpus list[GPUInfo]

List of detected GPU devices.

Functions
detect classmethod
detect() -> HardwareProbe

Detect hardware resources on the current machine.

Uses psutil for CPU/RAM detection (always available). Attempts to import pynvml for GPU detection; falls back to empty GPU list if pynvml is not installed or NVML initialization fails.

Returns:

Type Description
HardwareProbe

A HardwareProbe with detected resource information.

Source code in libs/shared/src/shared/hardware.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
@classmethod
def detect(cls) -> HardwareProbe:
    """Detect hardware resources on the current machine.

    Uses psutil for CPU/RAM detection (always available). Attempts to
    import pynvml for GPU detection; falls back to empty GPU list if
    pynvml is not installed or NVML initialization fails.

    Returns:
        A HardwareProbe with detected resource information.
    """
    cpu_count = psutil.cpu_count(logical=True) or 1
    ram_total_mb = psutil.virtual_memory().total // (1024 * 1024)

    gpus: list[GPUInfo] = []
    try:
        import pynvml

        pynvml.nvmlInit()
        device_count = pynvml.nvmlDeviceGetCount()
        for i in range(device_count):
            handle = pynvml.nvmlDeviceGetHandleByIndex(i)
            name = pynvml.nvmlDeviceGetName(handle)
            if isinstance(name, bytes):
                name = name.decode("utf-8")
            mem_info = pynvml.nvmlDeviceGetMemoryInfo(handle)
            vram_mb = mem_info.total // (1024 * 1024)
            gpus.append(GPUInfo(name=name, vram_mb=vram_mb))
        pynvml.nvmlShutdown()
    except ImportError:
        pass  # pynvml not installed; CPU-only mode
    except Exception as e:
        logger.debug("GPU detection failed (will run CPU-only): %s", e)

    return cls(cpu_count=cpu_count, ram_total_mb=ram_total_mb, gpus=gpus)
compute_budget
compute_budget(
    base_model_vram_mb: int = 0,
) -> HardwareBudget

Compute concurrency limits based on detected hardware.

Parameters:

Name Type Description Default
base_model_vram_mb int

VRAM consumed by the base inference model. Used to calculate how much VRAM is left for LoRA adapters and training.

0

Returns:

Type Description
HardwareBudget

A HardwareBudget with computed concurrency limits.

Source code in libs/shared/src/shared/hardware.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def compute_budget(self, base_model_vram_mb: int = 0) -> HardwareBudget:
    """Compute concurrency limits based on detected hardware.

    Args:
        base_model_vram_mb: VRAM consumed by the base inference model.
            Used to calculate how much VRAM is left for LoRA adapters
            and training.

    Returns:
        A HardwareBudget with computed concurrency limits.
    """
    num_gpus = len(self.gpus)
    single_gpu_mode = num_gpus <= 1

    if num_gpus == 0:
        return HardwareBudget(
            max_agents=max(1, self.cpu_count // 2),
            max_concurrent_loras=0,
            training_slots=0,
            vram_per_gpu_mb=0,
            single_gpu_mode=True,
        )

    vram_per_gpu = self.gpus[0].vram_mb
    free_vram = max(0, vram_per_gpu - base_model_vram_mb)

    max_loras = max(1, free_vram // VRAM_PER_LORA_MB)
    training_slots = 1 if free_vram >= MIN_TRAINING_VRAM_MB else 0

    # Agents limited by CPU cores and available VRAM
    max_agents = max(1, min(self.cpu_count // 2, num_gpus * 4))

    return HardwareBudget(
        max_agents=max_agents,
        max_concurrent_loras=max_loras,
        training_slots=training_slots,
        vram_per_gpu_mb=vram_per_gpu,
        single_gpu_mode=single_gpu_mode,
    )
Functions
resolve_model_dtype
resolve_model_dtype(
    param_count: int,
    device: str = "cuda",
    available_vram_bytes: int = 0,
    overhead_bytes: int = 0,
    dtype_override: str | None = None,
) -> object

Pick the highest precision dtype that fits in available VRAM.

Resolution order
  1. dtype_override parameter (explicit caller override)
  2. RUNE_DTYPE_OVERRIDE env var (global user override)
  3. Auto-detect: pick fp32 if it fits, otherwise bf16

On CPU, always returns float32 (no VRAM constraint).

Parameters:

Name Type Description Default
param_count int

Number of model parameters.

required
device str

Target device ("cuda", "cpu", "mps").

'cuda'
available_vram_bytes int

Total VRAM available on the device in bytes. If 0 and device is "cuda", queries the GPU automatically.

0
overhead_bytes int

Additional VRAM already in use or reserved by other models (e.g. an inference model already loaded).

0
dtype_override str | None

Manual override. One of "float32", "bfloat16", "float16".

None

Returns:

Type Description
object

A torch.dtype suitable for loading the model.

Source code in libs/shared/src/shared/hardware.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def resolve_model_dtype(
    param_count: int,
    device: str = "cuda",
    available_vram_bytes: int = 0,
    overhead_bytes: int = 0,
    dtype_override: str | None = None,
) -> object:
    """Pick the highest precision dtype that fits in available VRAM.

    Resolution order:
      1. ``dtype_override`` parameter (explicit caller override)
      2. ``RUNE_DTYPE_OVERRIDE`` env var (global user override)
      3. Auto-detect: pick fp32 if it fits, otherwise bf16

    On CPU, always returns float32 (no VRAM constraint).

    Args:
        param_count: Number of model parameters.
        device: Target device (``"cuda"``, ``"cpu"``, ``"mps"``).
        available_vram_bytes: Total VRAM available on the device in bytes.
            If 0 and device is ``"cuda"``, queries the GPU automatically.
        overhead_bytes: Additional VRAM already in use or reserved by other
            models (e.g. an inference model already loaded).
        dtype_override: Manual override. One of ``"float32"``,
            ``"bfloat16"``, ``"float16"``.

    Returns:
        A ``torch.dtype`` suitable for loading the model.
    """
    import os  # noqa: PLC0415

    import torch  # noqa: PLC0415

    dtype_map = {
        "float32": torch.float32,
        "fp32": torch.float32,
        "bfloat16": torch.bfloat16,
        "bf16": torch.bfloat16,
        "float16": torch.float16,
        "fp16": torch.float16,
    }

    # 1. Check explicit override
    override = dtype_override or os.environ.get("RUNE_DTYPE_OVERRIDE")
    if override and override in dtype_map:
        return dtype_map[override]

    # 2. CPU — always fp32, no VRAM constraint
    if device == "cpu":
        return torch.float32

    # 3. Auto-detect VRAM if not provided
    if available_vram_bytes <= 0 and device == "cuda" and torch.cuda.is_available():
        props = torch.cuda.get_device_properties(0)
        allocated = torch.cuda.memory_allocated(0)
        available_vram_bytes = props.total_memory - allocated

    usable_vram = int((available_vram_bytes - overhead_bytes) * _VRAM_MARGIN)
    fp32_bytes = param_count * _BYTES_PER_PARAM["float32"]

    if fp32_bytes <= usable_vram:
        return torch.float32

    return torch.bfloat16
get_best_device
get_best_device() -> str

Return the best available compute device: cuda > mps > cpu.

Imports torch inside the function body per INFRA-05 pattern so this module is importable in CPU-only CI without torch installed.

Returns:

Type Description
str

"cuda" if an NVIDIA GPU is available,

str

"mps" if an Apple Silicon GPU is available,

str

"cpu" otherwise.

Example

device = get_best_device() device in ("cuda", "mps", "cpu") True

Source code in libs/shared/src/shared/hardware.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def get_best_device() -> str:
    """Return the best available compute device: ``cuda`` > ``mps`` > ``cpu``.

    Imports torch inside the function body per INFRA-05 pattern so this module
    is importable in CPU-only CI without torch installed.

    Returns:
        ``"cuda"`` if an NVIDIA GPU is available,
        ``"mps"`` if an Apple Silicon GPU is available,
        ``"cpu"`` otherwise.

    Example:
        >>> device = get_best_device()
        >>> device in ("cuda", "mps", "cpu")
        True
    """
    try:
        import torch  # noqa: PLC0415

        if torch.cuda.is_available():
            return "cuda"
        if torch.backends.mps.is_available():
            return "mps"
    except ImportError:
        pass
    return "cpu"

lazy_cache

Lazy singleton decorator for expensive initializations.

Functions
lazy_singleton
lazy_singleton(func: Callable[[], T]) -> Callable[[], T]

Decorator to lazily evaluate and cache a function result (singleton).

Thread-safe implementation using a lock to prevent race conditions in concurrent environments.

Source code in libs/shared/src/shared/lazy_cache.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def lazy_singleton(func: Callable[[], T]) -> Callable[[], T]:
    """Decorator to lazily evaluate and cache a function result (singleton).

    Thread-safe implementation using a lock to prevent race conditions
    in concurrent environments.
    """
    _instance: Any = None
    _lock = threading.Lock()

    def reset() -> None:
        nonlocal _instance
        with _lock:
            _instance = None

    def wrapper() -> T:
        nonlocal _instance
        if _instance is None:
            with _lock:
                # Double-checked locking pattern
                if _instance is None:
                    _instance = func()
        return _instance

    # Register this singleton's reset function
    _singleton_registry.append(reset)

    return wrapper

models

Shared data models for the application.

Classes
Entity

Bases: SQLModel

Generic entity processed by agents.

Task

Bases: SQLModel

Generic task flowing through the pipeline.

pipeline_config

Pipeline configuration for adapter scaling, generation, and prompt style.

Provides a frozen dataclass config with load/save to JSON, factory defaults, and per-field override from environment variables.

Classes
AdapterConfig dataclass
AdapterConfig(
    scaling: float = 0.075,
    use_bias: bool = True,
    max_length: int = 2048,
)

Adapter weight application settings.

GenerationConfig dataclass
GenerationConfig(
    temperature: float = 0.3,
    max_tokens: int = 1024,
    repetition_penalty: float = 1.1,
    top_p: float = 0.9,
)

LLM generation settings.

PromptConfig dataclass
PromptConfig(style: str = 'must_include')

Prompt template selection.

TrajectoryConfig dataclass
TrajectoryConfig(style: str = 'full_context')

Trajectory template selection.

CalibrationConfig dataclass
CalibrationConfig(
    enabled: bool = True,
    n_trials: int = 5,
    scaling_range: tuple[float, float] = (0.5, 1.5),
)

Per-task calibration settings.

PipelineConfig dataclass
PipelineConfig(
    adapter: AdapterConfig = AdapterConfig(),
    generation: GenerationConfig = GenerationConfig(),
    prompt: PromptConfig = PromptConfig(),
    trajectory: TrajectoryConfig = TrajectoryConfig(),
    calibration: CalibrationConfig = CalibrationConfig(),
)

Top-level pipeline configuration.

Functions
to_dict
to_dict() -> dict[str, Any]

Serialize to a plain dict.

Source code in libs/shared/src/shared/pipeline_config.py
71
72
73
74
75
76
def to_dict(self) -> dict[str, Any]:
    """Serialize to a plain dict."""
    d = asdict(self)
    # Convert tuple back for JSON compatibility
    d["calibration"]["scaling_range"] = list(d["calibration"]["scaling_range"])
    return d
save
save(path: Path | None = None) -> Path

Write config to JSON file.

Source code in libs/shared/src/shared/pipeline_config.py
78
79
80
81
82
83
def save(self, path: Path | None = None) -> Path:
    """Write config to JSON file."""
    path = path or (_DEFAULT_CONFIG_DIR / _CONFIG_FILENAME)
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(json.dumps(self.to_dict(), indent=2))
    return path
override
override(**kwargs: Any) -> PipelineConfig

Return a new config with selected fields replaced.

Accepts dotted keys like adapter.scaling=0.1 or flat section dicts like adapter={"scaling": 0.1}.

Source code in libs/shared/src/shared/pipeline_config.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def override(self, **kwargs: Any) -> PipelineConfig:
    """Return a new config with selected fields replaced.

    Accepts dotted keys like ``adapter.scaling=0.1`` or flat
    section dicts like ``adapter={"scaling": 0.1}``.
    """
    d = self.to_dict()
    for key, value in kwargs.items():
        if "." in key:
            section, field_name = key.split(".", 1)
            d.setdefault(section, {})[field_name] = value
        elif isinstance(value, dict):
            d.setdefault(key, {}).update(value)
        else:
            d[key] = value
    return _from_dict(d)
Functions
load_config
load_config(path: Path | None = None) -> PipelineConfig

Load config from JSON, falling back to defaults.

Also checks RUNE_PIPELINE_CONFIG env var for the path.

Source code in libs/shared/src/shared/pipeline_config.py
117
118
119
120
121
122
123
124
125
126
127
128
129
def load_config(path: Path | None = None) -> PipelineConfig:
    """Load config from JSON, falling back to defaults.

    Also checks ``RUNE_PIPELINE_CONFIG`` env var for the path.
    """
    if path is None:
        env_path = os.environ.get("RUNE_PIPELINE_CONFIG")
        path = Path(env_path) if env_path else _DEFAULT_CONFIG_DIR / _CONFIG_FILENAME

    if path.exists():
        d = json.loads(path.read_text())
        return _from_dict(d)
    return PipelineConfig()
default_config
default_config() -> PipelineConfig

Return the default config without reading any files.

Source code in libs/shared/src/shared/pipeline_config.py
132
133
134
def default_config() -> PipelineConfig:
    """Return the default config without reading any files."""
    return PipelineConfig()

rune_models

Rune-specific Pydantic models for cross-service data contracts.

Defines the shared data shapes for coding sessions, adapter references, and evolutionary fitness metrics used across all Rune services.

Classes
TaskStatus

Bases: str, Enum

Canonical status strings used across services and swarm checkpoints.

Inherits from str so values are JSON-serialisable and can be compared directly to string literals in existing code (e.g. record.status == "running").

Example

TaskStatus.RUNNING TaskStatus.RUNNING == "running" True "running" == TaskStatus.RUNNING True

PipelinePhase

Bases: str, Enum

Pipeline phases for the multi-phase swarm pipeline.

Each phase uses a dedicated Jinja2 trajectory template and prompt template.

AdapterRef

Bases: BaseModel

Reference to a stored LoRA adapter.

Used within CodingSession to track which adapters were loaded during an agent coding session.

Attributes:

Name Type Description
adapter_id str

UUID of the adapter in the registry.

task_type str

Task category this adapter was trained on (e.g. 'bug-fix').

fitness_score Optional[float]

Evolutionary fitness score, if evaluated.

Example

ref = AdapterRef(adapter_id="abc-123", task_type="bug-fix") ref.adapter_id 'abc-123' ref.fitness_score is None True

CodingSession

Bases: BaseModel

A complete agent coding session record.

Tracks the full lifecycle of a Rune agent session including which adapters were used, how many generate-execute-reflect cycles occurred, and the final outcome.

Attributes:

Name Type Description
session_id str

Unique session identifier.

task_description str

Human-readable description of the coding task.

task_type str

Task category (e.g. 'bug-fix', 'feature-impl').

adapter_refs list[AdapterRef]

List of adapters loaded during this session.

attempt_count int

Number of generate-execute-reflect cycles completed.

outcome Optional[str]

Final session result ('success', 'exhausted', or None if in progress).

Example

session = CodingSession( ... session_id="sess-001", ... task_description="Fix import error", ... task_type="bug-fix", ... ) session.adapter_refs [] session.attempt_count 0 session.outcome is None True

EvolMetrics

Bases: BaseModel

Evolution evaluation metrics for an adapter.

Captures the performance and fitness measurements used by the evolution service to decide which adapters to keep, mutate, or retire.

Attributes:

Name Type Description
adapter_id str

UUID of the adapter being evaluated.

pass_rate float

Pass rate on benchmark tasks (0.0 to 1.0).

fitness_score float

Overall evolutionary fitness score.

generalization_delta Optional[float]

Difference between in-distribution and OOD performance.

Example

metrics = EvolMetrics( ... adapter_id="adapter-001", ... pass_rate=0.85, ... fitness_score=0.9, ... ) metrics.generalization_delta is None True

SwarmConfig

Bases: BaseModel

Configuration for a swarm execution run.

Attributes:

Name Type Description
db_url str

SQLite database URL for the adapter registry.

task_source str

Path to task definitions file or inline task list.

population_size int

Number of concurrent swarm agents.

max_generations int

Maximum evolutionary generations.

evolution_interval int

Seconds between evolution sweeps.

sandbox_backend str

Execution backend ('subprocess' or 'nsjail').

base_model_id str

HuggingFace model identifier for inference.

hypernetwork_checkpoint str | None

Path to pretrained hypernetwork checkpoint.

run_id str | None

Unique identifier for this swarm run (for checkpoint isolation). Auto-generated UUID if not provided.

SwarmCheckpoint

Bases: BaseModel

Status record for a single swarm task execution.

Attributes:

Name Type Description
run_id str

Unique identifier for the swarm run.

task_hash str

Hash of the task being executed.

agent_id str

Identifier of the agent executing the task.

status str

Current status (pending, running, completed, failed).

outcome Optional[str]

Result description when completed.

started_at Optional[str]

ISO 8601 timestamp when execution began.

completed_at Optional[str]

ISO 8601 timestamp when execution finished.

sandbox

Sandbox execution backends for running untrusted code.

Provides SubprocessBackend (default) and NsjailBackend (Linux-only) for executing Python code in isolated environments with resource limits.

Classes
SandboxResult dataclass
SandboxResult(
    stdout: str,
    stderr: str,
    exit_code: int,
    is_timed_out: bool,
)

Result of executing code in a sandbox.

Attributes:

Name Type Description
stdout str

Captured standard output.

stderr str

Captured standard error.

exit_code int

Process exit code (0 = success).

is_timed_out bool

Whether execution was terminated due to timeout.

SandboxBackend

Bases: ABC

Abstract base class for sandbox execution backends.

Functions
run abstractmethod
run(code: str, timeout: int = 30) -> SandboxResult

Execute Python code and return the result.

Parameters:

Name Type Description Default
code str

Python source code to execute.

required
timeout int

Maximum execution time in seconds.

30

Returns:

Type Description
SandboxResult

A SandboxResult with captured output and exit status.

Source code in libs/shared/src/shared/sandbox.py
41
42
43
44
45
46
47
48
49
50
51
@abstractmethod
def run(self, code: str, timeout: int = 30) -> SandboxResult:
    """Execute Python code and return the result.

    Args:
        code: Python source code to execute.
        timeout: Maximum execution time in seconds.

    Returns:
        A SandboxResult with captured output and exit status.
    """
SubprocessBackend

Bases: SandboxBackend

Execute code via a standard subprocess.

Writes code to a temporary file and runs it with the current Python interpreter. This is the default backend, used on all platforms.

Functions
run
run(code: str, timeout: int = 30) -> SandboxResult

Execute Python code in a subprocess.

Parameters:

Name Type Description Default
code str

Python source code to execute.

required
timeout int

Maximum execution time in seconds.

30

Returns:

Type Description
SandboxResult

A SandboxResult with captured output and exit status.

Source code in libs/shared/src/shared/sandbox.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def run(self, code: str, timeout: int = 30) -> SandboxResult:
    """Execute Python code in a subprocess.

    Args:
        code: Python source code to execute.
        timeout: Maximum execution time in seconds.

    Returns:
        A SandboxResult with captured output and exit status.
    """
    with tempfile.TemporaryDirectory() as tmpdir:
        script_path = os.path.join(tmpdir, "solution.py")
        with open(script_path, "w") as f:
            f.write(code)

        try:
            proc = subprocess.run(
                [sys.executable, script_path],
                capture_output=True,
                text=True,
                timeout=timeout,
                cwd=tmpdir,
            )
            return SandboxResult(
                stdout=proc.stdout,
                stderr=proc.stderr,
                exit_code=proc.returncode,
                is_timed_out=False,
            )
        except subprocess.TimeoutExpired:
            return SandboxResult(
                stdout="",
                stderr=f"Execution timed out after {timeout}s",
                exit_code=1,
                is_timed_out=True,
            )
NsjailBackend
NsjailBackend()

Bases: SandboxBackend

Execute code inside an nsjail sandbox (Linux-only).

Falls back to SubprocessBackend if nsjail is not found on PATH. Applies resource limits via nsjail flags: --time_limit, --rlimit_as, --rlimit_cpu.

Initialize NsjailBackend, checking nsjail availability.

Source code in libs/shared/src/shared/sandbox.py
107
108
109
110
111
112
113
114
def __init__(self) -> None:
    """Initialize NsjailBackend, checking nsjail availability."""
    self._nsjail_path = shutil.which("nsjail")
    self._fallback = SubprocessBackend()
    if self._nsjail_path is None:
        logger.warning(
            "nsjail not found on PATH; NsjailBackend will fall back to subprocess"
        )
Functions
run
run(code: str, timeout: int = 30) -> SandboxResult

Execute Python code inside nsjail, or fall back to subprocess.

Parameters:

Name Type Description Default
code str

Python source code to execute.

required
timeout int

Maximum execution time in seconds.

30

Returns:

Type Description
SandboxResult

A SandboxResult with captured output and exit status.

Source code in libs/shared/src/shared/sandbox.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def run(self, code: str, timeout: int = 30) -> SandboxResult:
    """Execute Python code inside nsjail, or fall back to subprocess.

    Args:
        code: Python source code to execute.
        timeout: Maximum execution time in seconds.

    Returns:
        A SandboxResult with captured output and exit status.
    """
    if self._nsjail_path is None:
        return self._fallback.run(code, timeout)

    with tempfile.TemporaryDirectory() as tmpdir:
        script_path = os.path.join(tmpdir, "solution.py")
        with open(script_path, "w") as f:
            f.write(code)

        cmd = [
            self._nsjail_path,
            "--mode",
            "o",
            "--time_limit",
            str(timeout),
            "--rlimit_as",
            "512",
            "--rlimit_cpu",
            str(timeout),
            "--",
            sys.executable,
            script_path,
        ]

        try:
            proc = subprocess.run(
                cmd,
                capture_output=True,
                text=True,
                timeout=timeout + 5,
                cwd=tmpdir,
            )
            return SandboxResult(
                stdout=proc.stdout,
                stderr=proc.stderr,
                exit_code=proc.returncode,
                is_timed_out=False,
            )
        except subprocess.TimeoutExpired:
            return SandboxResult(
                stdout="",
                stderr=f"Execution timed out after {timeout}s",
                exit_code=1,
                is_timed_out=True,
            )
Functions
count_test_results
count_test_results(
    stdout: str, stderr: str
) -> tuple[int, int]

Parse unittest output to count (passed, total) tests.

Parameters:

Name Type Description Default
stdout str

Captured standard output from test execution.

required
stderr str

Captured standard error from test execution.

required

Returns:

Type Description
tuple[int, int]

tuple[int, int]: (passed_count, total_count). Returns (0, 0) if no unittest output is detected.

Source code in libs/shared/src/shared/sandbox.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
def count_test_results(stdout: str, stderr: str) -> tuple[int, int]:
    """Parse unittest output to count (passed, total) tests.

    Args:
        stdout (str): Captured standard output from test execution.
        stderr (str): Captured standard error from test execution.

    Returns:
        tuple[int, int]: (passed_count, total_count). Returns (0, 0)
            if no unittest output is detected.
    """
    import re as _re

    total = 0
    failed = 0

    ran_match = _re.search(r"Ran (\d+) test", stderr or "")
    if ran_match:
        total = int(ran_match.group(1))

    fail_match = _re.search(r"failures=(\d+)", stderr or "")
    err_match = _re.search(r"errors=(\d+)", stderr or "")
    if fail_match:
        failed += int(fail_match.group(1))
    if err_match:
        failed += int(err_match.group(1))

    passed = max(0, total - failed)
    return passed, total
extract_failed_tests
extract_failed_tests(stderr: str) -> str

Extract names of failing tests from unittest stderr.

Parameters:

Name Type Description Default
stderr str

Captured standard error from test execution.

required

Returns:

Name Type Description
str str

Semicolon-separated string of up to 5 failed test names. Empty string if no failures found.

Source code in libs/shared/src/shared/sandbox.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
def extract_failed_tests(stderr: str) -> str:
    """Extract names of failing tests from unittest stderr.

    Args:
        stderr (str): Captured standard error from test execution.

    Returns:
        str: Semicolon-separated string of up to 5 failed test names.
            Empty string if no failures found.
    """
    if not stderr:
        return ""
    lines = stderr.strip().splitlines()
    failed = [
        ln.strip()
        for ln in lines
        if ln.strip().startswith("FAIL:") or ln.strip().startswith("ERROR:")
    ]
    return "; ".join(failed[:5])
has_unittest_classes
has_unittest_classes(code: str) -> bool

Check whether code contains unittest.TestCase subclasses.

Parameters:

Name Type Description Default
code str

Python source code string.

required

Returns:

Name Type Description
bool bool

True if the code contains at least one TestCase class definition.

Source code in libs/shared/src/shared/sandbox.py
224
225
226
227
228
229
230
231
232
233
234
235
236
def has_unittest_classes(code: str) -> bool:
    """Check whether code contains unittest.TestCase subclasses.

    Args:
        code (str): Python source code string.

    Returns:
        bool: True if the code contains at least one TestCase class
            definition.
    """
    import re as _re

    return bool(_re.search(r"class\s+\w+\s*\(.*\bTestCase\b.*\)", code))
get_sandbox_backend
get_sandbox_backend() -> SandboxBackend

Return the appropriate sandbox backend based on environment.

Reads RUNE_EXEC_BACKEND env var: - "nsjail" → NsjailBackend (falls back to subprocess if nsjail not found) - anything else or unset → SubprocessBackend

Returns:

Type Description
SandboxBackend

A SandboxBackend instance.

Source code in libs/shared/src/shared/sandbox.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
def get_sandbox_backend() -> SandboxBackend:
    """Return the appropriate sandbox backend based on environment.

    Reads RUNE_EXEC_BACKEND env var:
    - "nsjail" → NsjailBackend (falls back to subprocess if nsjail not found)
    - anything else or unset → SubprocessBackend

    Returns:
        A SandboxBackend instance.
    """
    backend = os.environ.get("RUNE_EXEC_BACKEND", "subprocess")
    if backend == "nsjail":
        return NsjailBackend()
    return SubprocessBackend()

storage_utils

Shared SQLAlchemy/SQLite utilities for all Rune services.

Provides: - set_wal_mode: register WAL-mode PRAGMA on a SQLAlchemy Engine (DRY for all DBs) - create_service_engine: build an Engine from DATABASE_URL env or a default URL

Functions
set_wal_mode
set_wal_mode(engine: Engine) -> None

Register a SQLAlchemy connect event that enables WAL journal mode.

WAL mode allows concurrent readers while a writer is active, which is important for swarm workloads that read checkpoints while writing. Safe to call multiple times on different engines; each call registers a separate listener on its own engine instance.

Parameters:

Name Type Description Default
engine Engine

SQLAlchemy Engine to configure.

required
Source code in libs/shared/src/shared/storage_utils.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def set_wal_mode(engine: Engine) -> None:
    """Register a SQLAlchemy connect event that enables WAL journal mode.

    WAL mode allows concurrent readers while a writer is active, which is
    important for swarm workloads that read checkpoints while writing.
    Safe to call multiple times on different engines; each call registers
    a separate listener on its own engine instance.

    Args:
        engine: SQLAlchemy Engine to configure.
    """
    if engine.dialect.name != "sqlite":
        return

    @event.listens_for(engine, "connect")
    def _set_wal(dbapi_conn: object, _record: object) -> None:  # type: ignore[type-arg]
        assert hasattr(dbapi_conn, "execute")
        dbapi_conn.execute("PRAGMA journal_mode=WAL")  # type: ignore[union-attr]
create_service_engine
create_service_engine(default_url: str) -> Engine

Create a SQLAlchemy Engine from DATABASE_URL env var or a default URL.

All three Rune services (api-service, training-svc, evolution-svc) use the same pattern: read DATABASE_URL from the environment, falling back to a service-specific SQLite file. This helper centralises that pattern.

Parameters:

Name Type Description Default
default_url str

Fallback DB URL when DATABASE_URL env var is not set. Typically a SQLite path, e.g. "sqlite:///training_svc.db".

required

Returns:

Type Description
Engine

A configured SQLAlchemy Engine.

Example

engine = create_service_engine("sqlite:///training_svc.db")

Source code in libs/shared/src/shared/storage_utils.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def create_service_engine(default_url: str) -> Engine:
    """Create a SQLAlchemy Engine from DATABASE_URL env var or a default URL.

    All three Rune services (api-service, training-svc, evolution-svc) use the
    same pattern: read DATABASE_URL from the environment, falling back to a
    service-specific SQLite file. This helper centralises that pattern.

    Args:
        default_url: Fallback DB URL when DATABASE_URL env var is not set.
            Typically a SQLite path, e.g. ``"sqlite:///training_svc.db"``.

    Returns:
        A configured SQLAlchemy Engine.

    Example:
        >>> engine = create_service_engine("sqlite:///training_svc.db")
    """
    db_url = os.getenv("DATABASE_URL", default_url)
    connect_args = {"check_same_thread": False} if db_url.startswith("sqlite") else {}
    return create_engine(db_url, connect_args=connect_args, echo=False)

template_loader

Jinja2 template loader for phase trajectory and prompt rendering.

Functions
render_trajectory
render_trajectory(phase: str, **kwargs: object) -> str

Render a phase trajectory template with task-specific variables.

Source code in libs/shared/src/shared/template_loader.py
14
15
16
17
def render_trajectory(phase: str, **kwargs: object) -> str:
    """Render a phase trajectory template with task-specific variables."""
    template = _env.get_template(f"{phase}.j2")
    return template.render(**kwargs)
render_prompt
render_prompt(phase: str, **kwargs: object) -> str

Render a phase prompt template with task-specific variables.

Source code in libs/shared/src/shared/template_loader.py
20
21
22
23
def render_prompt(phase: str, **kwargs: object) -> str:
    """Render a phase prompt template with task-specific variables."""
    template = _env.get_template(f"prompt_{phase}.j2")
    return template.render(**kwargs)