API Reference for adapter-registry

adapter_registry

LoRA adapter metadata registry backed by SQLite via SQLModel.

Classes

AdapterAlreadyExistsError

Bases: AdapterRegistryError

Raised when attempting to store an adapter with a duplicate ID.

AdapterNotFoundError

Bases: AdapterRegistryError

Raised when an adapter matching the query criteria does not exist.

AdapterRecord

Bases: SQLModel

A stored LoRA adapter metadata record.

Tracks metadata for a single LoRA adapter including its lineage, storage location, and evaluation metrics. Backed by SQLite via SQLModel.

Attributes:

Name Type Description
id str

Unique adapter identifier (UUID string).

version int

Adapter version number for lineage tracking.

task_type str

Task category this adapter was trained on (e.g. 'bug-fix').

base_model_id str

Identifier of the base model this adapter was trained from.

rank int

LoRA rank used during training.

created_at str

ISO 8601 timestamp of adapter creation.

file_path str

Filesystem path to the adapter weights file.

file_hash str

SHA-256 hash of the adapter weights file for integrity checks.

file_size_bytes int

Size of the adapter weights file in bytes.

pass_rate float | None

Pass rate on benchmark tasks (0.0 to 1.0), if evaluated.

fitness_score float | None

Overall evolutionary fitness score, if evaluated.

source str

How the adapter was created ('distillation', 'evolution', 'manual').

session_id str

ID of the coding session that produced this adapter.

is_archived bool

Whether this adapter has been archived (soft delete).

parent_ids str | None

JSON-encoded list of parent adapter IDs for lineage tracking.

generation int

Evolutionary generation number (0 for initial adapters).

training_task_hash str | None

Deduplication key for the training task.

agent_id str | None

Identifier of the swarm agent that produced this adapter.

Example

record = AdapterRecord( ... id="adapter-001", version=1, task_type="bug-fix", ... base_model_id="Qwen/Qwen2.5-Coder-7B", rank=16, ... created_at="2026-01-01T00:00:00Z", ... file_path="/adapters/adapter-001.safetensors", ... file_hash="abc123", file_size_bytes=1024, ... source="distillation", session_id="sess-001", ... ) record.task_type 'bug-fix' record.is_archived False

AdapterRegistry

AdapterRegistry(engine: Engine)

Registry for storing and querying LoRA adapter metadata.

Provides CRUD operations backed by SQLite via SQLModel. The registry is initialized with a SQLAlchemy Engine and creates tables idempotently on construction. WAL mode is activated automatically on every new SQLite connection via an event hook registered before table creation.

Raises:

Type Description
AdapterAlreadyExistsError

When storing a duplicate adapter ID.

AdapterNotFoundError

When querying for a non-existent adapter.

Example

from sqlalchemy import create_engine engine = create_engine("sqlite:///adapters.db") registry = AdapterRegistry(engine=engine) registry.store(record)

Initialize the registry with a SQLAlchemy Engine.

Registers a WAL-mode hook on the engine before creating tables to ensure every connection (including those opened by create_all) uses Write-Ahead Logging. Table creation is idempotent — safe to call against an existing database.

Parameters:

Name Type Description Default
engine Engine

A SQLAlchemy Engine connected to the target SQLite database.

required
Source code in libs/adapter-registry/src/adapter_registry/registry.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def __init__(self, engine: Engine) -> None:
    """Initialize the registry with a SQLAlchemy Engine.

    Registers a WAL-mode hook on the engine before creating tables to
    ensure every connection (including those opened by create_all) uses
    Write-Ahead Logging. Table creation is idempotent — safe to call
    against an existing database.

    Args:
        engine: A SQLAlchemy Engine connected to the target SQLite database.
    """
    self._engine = engine
    set_wal_mode(engine)
    SQLModel.metadata.create_all(engine)
Functions
store
store(record: AdapterRecord) -> None

Store a new adapter record in the registry.

Parameters:

Name Type Description Default
record AdapterRecord

The AdapterRecord to persist.

required

Raises:

Type Description
AdapterAlreadyExistsError

If an adapter with the same ID exists.

Example

engine = create_engine("sqlite:///adapters.db") registry = AdapterRegistry(engine=engine) record = AdapterRecord(id="abc-1", version=1, task_type="bug-fix", ... base_model_id="qwen2.5-coder-7b", rank=16, ... created_at="2026-01-01T00:00:00Z", ... file_path="/adapters/abc-1.safetensors", ... file_hash="sha256hash", file_size_bytes=4096, ... source="distillation", session_id="sess-001") registry.store(record)

Source code in libs/adapter-registry/src/adapter_registry/registry.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def store(self, record: AdapterRecord) -> None:
    """Store a new adapter record in the registry.

    Args:
        record: The AdapterRecord to persist.

    Raises:
        AdapterAlreadyExistsError: If an adapter with the same ID exists.

    Example:
        >>> engine = create_engine("sqlite:///adapters.db")
        >>> registry = AdapterRegistry(engine=engine)
        >>> record = AdapterRecord(id="abc-1", version=1, task_type="bug-fix",
        ...     base_model_id="qwen2.5-coder-7b", rank=16,
        ...     created_at="2026-01-01T00:00:00Z",
        ...     file_path="/adapters/abc-1.safetensors",
        ...     file_hash="sha256hash", file_size_bytes=4096,
        ...     source="distillation", session_id="sess-001")
        >>> registry.store(record)
    """
    with Session(self._engine, expire_on_commit=False) as session:
        if session.get(AdapterRecord, record.id) is not None:
            raise AdapterAlreadyExistsError(
                f"Adapter '{record.id}' already exists."
            )
        session.add(record)
        session.commit()
        session.expunge(record)
archive
archive(adapter_id: str) -> None

Archive an adapter by setting is_archived=True.

Parameters:

Name Type Description Default
adapter_id str

ID of the adapter to archive.

required

Raises:

Type Description
AdapterNotFoundError

If no adapter with the given ID exists.

Source code in libs/adapter-registry/src/adapter_registry/registry.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def archive(self, adapter_id: str) -> None:
    """Archive an adapter by setting is_archived=True.

    Args:
        adapter_id: ID of the adapter to archive.

    Raises:
        AdapterNotFoundError: If no adapter with the given ID exists.
    """
    with Session(self._engine, expire_on_commit=False) as session:
        record = session.get(AdapterRecord, adapter_id)
        if record is None:
            raise AdapterNotFoundError(f"No adapter with id '{adapter_id}'.")
        record.is_archived = True
        session.add(record)
        session.commit()
update_fitness
update_fitness(
    adapter_id: str, pass_rate: float, fitness_score: float
) -> None

Update evaluation metrics for an adapter.

Parameters:

Name Type Description Default
adapter_id str

ID of the adapter to update.

required
pass_rate float

New pass rate value.

required
fitness_score float

New fitness score value.

required

Raises:

Type Description
AdapterNotFoundError

If no adapter with the given ID exists.

Source code in libs/adapter-registry/src/adapter_registry/registry.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def update_fitness(
    self,
    adapter_id: str,
    pass_rate: float,
    fitness_score: float,
) -> None:
    """Update evaluation metrics for an adapter.

    Args:
        adapter_id: ID of the adapter to update.
        pass_rate: New pass rate value.
        fitness_score: New fitness score value.

    Raises:
        AdapterNotFoundError: If no adapter with the given ID exists.
    """
    with Session(self._engine, expire_on_commit=False) as session:
        record = session.get(AdapterRecord, adapter_id)
        if record is None:
            raise AdapterNotFoundError(f"No adapter with id '{adapter_id}'.")
        record.pass_rate = pass_rate
        record.fitness_score = fitness_score
        session.add(record)
        session.commit()
retrieve_by_id
retrieve_by_id(adapter_id: str) -> AdapterRecord

Retrieve a single adapter record by its unique ID.

Parameters:

Name Type Description Default
adapter_id str

The unique identifier of the adapter to retrieve.

required

Returns:

Type Description
AdapterRecord

The matching AdapterRecord.

Raises:

Type Description
AdapterNotFoundError

If no adapter with the given ID exists.

Example

engine = create_engine("sqlite:///adapters.db") registry = AdapterRegistry(engine=engine) record = registry.retrieve_by_id("abc-1")

Source code in libs/adapter-registry/src/adapter_registry/registry.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def retrieve_by_id(self, adapter_id: str) -> AdapterRecord:
    """Retrieve a single adapter record by its unique ID.

    Args:
        adapter_id: The unique identifier of the adapter to retrieve.

    Returns:
        The matching AdapterRecord.

    Raises:
        AdapterNotFoundError: If no adapter with the given ID exists.

    Example:
        >>> engine = create_engine("sqlite:///adapters.db")
        >>> registry = AdapterRegistry(engine=engine)
        >>> record = registry.retrieve_by_id("abc-1")
    """
    with Session(self._engine, expire_on_commit=False) as session:
        record = session.get(AdapterRecord, adapter_id)
        if record is None:
            raise AdapterNotFoundError(f"No adapter with id '{adapter_id}'.")
        session.expunge(record)
        return record
query_by_task_type
query_by_task_type(task_type: str) -> list[AdapterRecord]

Query all adapters matching a given task type.

Parameters:

Name Type Description Default
task_type str

The task category to filter by (e.g. 'bug-fix').

required

Returns:

Type Description
list[AdapterRecord]

List of matching AdapterRecord instances. Empty list if none match.

Example

engine = create_engine("sqlite:///adapters.db") registry = AdapterRegistry(engine=engine) results = registry.query_by_task_type("bug-fix")

Source code in libs/adapter-registry/src/adapter_registry/registry.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def query_by_task_type(self, task_type: str) -> list[AdapterRecord]:
    """Query all adapters matching a given task type.

    Args:
        task_type: The task category to filter by (e.g. 'bug-fix').

    Returns:
        List of matching AdapterRecord instances. Empty list if none match.

    Example:
        >>> engine = create_engine("sqlite:///adapters.db")
        >>> registry = AdapterRegistry(engine=engine)
        >>> results = registry.query_by_task_type("bug-fix")
    """
    stmt = select(AdapterRecord).where(AdapterRecord.task_type == task_type)
    return self._execute_query(stmt)
list_all
list_all() -> list[AdapterRecord]

List all non-archived adapter records.

Returns:

Type Description
list[AdapterRecord]

List of all AdapterRecord instances where is_archived=False.

Example

engine = create_engine("sqlite:///adapters.db") registry = AdapterRegistry(engine=engine) all_adapters = registry.list_all()

Source code in libs/adapter-registry/src/adapter_registry/registry.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def list_all(self) -> list[AdapterRecord]:
    """List all non-archived adapter records.

    Returns:
        List of all AdapterRecord instances where is_archived=False.

    Example:
        >>> engine = create_engine("sqlite:///adapters.db")
        >>> registry = AdapterRegistry(engine=engine)
        >>> all_adapters = registry.list_all()
    """
    stmt = select(AdapterRecord).where(
        AdapterRecord.is_archived == False  # noqa: E712
    )
    return self._execute_query(stmt)
query_best_for_task
query_best_for_task(
    task_type: str, top_k: int = 3
) -> list[AdapterRecord]

Return the top-k highest-fitness adapters for a task type.

Parameters:

Name Type Description Default
task_type str

Task category to filter by.

required
top_k int

Maximum number of results to return.

3

Returns:

Type Description
list[AdapterRecord]

List of AdapterRecord instances ordered by fitness_score DESC.

Source code in libs/adapter-registry/src/adapter_registry/registry.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
def query_best_for_task(
    self, task_type: str, top_k: int = 3
) -> list[AdapterRecord]:
    """Return the top-k highest-fitness adapters for a task type.

    Args:
        task_type: Task category to filter by.
        top_k: Maximum number of results to return.

    Returns:
        List of AdapterRecord instances ordered by fitness_score DESC.
    """
    stmt = (
        select(AdapterRecord)
        .where(
            AdapterRecord.task_type == task_type,
            AdapterRecord.is_archived == False,  # noqa: E712
            col(AdapterRecord.fitness_score).isnot(None),
        )
        .order_by(col(AdapterRecord.fitness_score).desc())
        .limit(top_k)
    )
    return self._execute_query(stmt)
is_task_solved
is_task_solved(
    task_hash: str, threshold: float = 0.95
) -> bool

Check if any adapter for the given task hash meets the threshold.

Parameters:

Name Type Description Default
task_hash str

Training task hash to check.

required
threshold float

Minimum pass_rate to consider "solved".

0.95

Returns:

Type Description
bool

True if a qualifying adapter exists.

Source code in libs/adapter-registry/src/adapter_registry/registry.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
def is_task_solved(self, task_hash: str, threshold: float = 0.95) -> bool:
    """Check if any adapter for the given task hash meets the threshold.

    Args:
        task_hash: Training task hash to check.
        threshold: Minimum pass_rate to consider "solved".

    Returns:
        True if a qualifying adapter exists.
    """
    stmt = (
        select(AdapterRecord)
        .where(
            AdapterRecord.training_task_hash == task_hash,
            AdapterRecord.is_archived == False,  # noqa: E712
            col(AdapterRecord.pass_rate) >= threshold,
        )
        .limit(1)
    )
    return bool(self._execute_query(stmt))
get_lineage
get_lineage(adapter_id: str) -> list[str]

Walk the parent_ids chain for an adapter.

Parameters:

Name Type Description Default
adapter_id str

Starting adapter ID.

required

Returns:

Type Description
list[str]

List of adapter IDs from the given adapter back to the root.

Source code in libs/adapter-registry/src/adapter_registry/registry.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
def get_lineage(self, adapter_id: str) -> list[str]:
    """Walk the parent_ids chain for an adapter.

    Args:
        adapter_id: Starting adapter ID.

    Returns:
        List of adapter IDs from the given adapter back to the root.
    """
    chain: list[str] = [adapter_id]
    current_id = adapter_id
    visited: set[str] = {adapter_id}

    while True:
        with Session(self._engine, expire_on_commit=False) as session:
            record = session.get(AdapterRecord, current_id)
            if record is None or record.parent_ids is None:
                break
            parent_list: list[str] = json.loads(record.parent_ids)
            if not parent_list:
                break
            next_id = parent_list[0]
            if next_id in visited:
                break
            visited.add(next_id)
            chain.append(next_id)
            current_id = next_id

    return chain
get_lineage_dag
get_lineage_dag(adapter_id: str) -> dict[str, list[str]]

Return the full multi-parent lineage DAG as an adjacency dict.

Unlike get_lineage which follows only the first parent, this method does BFS over all parents to capture the complete merge tree.

Parameters:

Name Type Description Default
adapter_id str

Starting adapter ID.

required

Returns:

Type Description
dict[str, list[str]]

Dict mapping each ancestor adapter ID to its list of parent IDs.

dict[str, list[str]]

Unknown IDs encountered during traversal are recorded with an

dict[str, list[str]]

empty parent list (no exception raised).

Source code in libs/adapter-registry/src/adapter_registry/registry.py
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
def get_lineage_dag(self, adapter_id: str) -> dict[str, list[str]]:
    """Return the full multi-parent lineage DAG as an adjacency dict.

    Unlike ``get_lineage`` which follows only the first parent, this
    method does BFS over all parents to capture the complete merge tree.

    Args:
        adapter_id: Starting adapter ID.

    Returns:
        Dict mapping each ancestor adapter ID to its list of parent IDs.
        Unknown IDs encountered during traversal are recorded with an
        empty parent list (no exception raised).

    Raises:
        None. Missing adapter IDs are silently treated as leaf nodes.
    """
    from collections import deque

    adjacency: dict[str, list[str]] = {}
    queue: deque[str] = deque([adapter_id])
    visited: set[str] = {adapter_id}

    while queue:
        current_id = queue.popleft()
        with Session(self._engine, expire_on_commit=False) as session:
            record = session.get(AdapterRecord, current_id)
            if record is None or record.parent_ids is None:
                adjacency[current_id] = []
                continue
            parent_list: list[str] = json.loads(record.parent_ids)
            adjacency[current_id] = parent_list
            for pid in parent_list:
                if pid not in visited:
                    visited.add(pid)
                    queue.append(pid)

    return adjacency
query_unevaluated
query_unevaluated(
    task_type: str | None = None,
) -> list[AdapterRecord]

Return adapters that have not been evaluated (pass_rate IS NULL).

Parameters:

Name Type Description Default
task_type str | None

Optional task type filter.

None

Returns:

Type Description
list[AdapterRecord]

List of unevaluated AdapterRecord instances.

Source code in libs/adapter-registry/src/adapter_registry/registry.py
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
def query_unevaluated(self, task_type: str | None = None) -> list[AdapterRecord]:
    """Return adapters that have not been evaluated (pass_rate IS NULL).

    Args:
        task_type: Optional task type filter.

    Returns:
        List of unevaluated AdapterRecord instances.
    """
    stmt = select(AdapterRecord).where(
        AdapterRecord.is_archived == False,  # noqa: E712
        col(AdapterRecord.pass_rate).is_(None),  # type: ignore[arg-type]
    )
    if task_type is not None:
        stmt = stmt.where(AdapterRecord.task_type == task_type)
    return self._execute_query(stmt)
get_task_types
get_task_types() -> list[str]

Return all distinct task types in the registry.

Returns:

Type Description
list[str]

Sorted list of unique task_type values.

Source code in libs/adapter-registry/src/adapter_registry/registry.py
337
338
339
340
341
342
343
344
345
346
def get_task_types(self) -> list[str]:
    """Return all distinct task types in the registry.

    Returns:
        Sorted list of unique task_type values.
    """
    with Session(self._engine, expire_on_commit=False) as session:
        stmt = select(AdapterRecord.task_type).distinct()
        results = list(session.exec(stmt).all())
        return sorted(results)

Modules

exceptions

Custom exceptions for adapter registry operations.

Classes
AdapterRegistryError

Bases: Exception

Base exception for adapter registry operations.

AdapterAlreadyExistsError

Bases: AdapterRegistryError

Raised when attempting to store an adapter with a duplicate ID.

AdapterNotFoundError

Bases: AdapterRegistryError

Raised when an adapter matching the query criteria does not exist.

models

SQLModel table model for LoRA adapter metadata records.

Classes
AdapterRecord

Bases: SQLModel

A stored LoRA adapter metadata record.

Tracks metadata for a single LoRA adapter including its lineage, storage location, and evaluation metrics. Backed by SQLite via SQLModel.

Attributes:

Name Type Description
id str

Unique adapter identifier (UUID string).

version int

Adapter version number for lineage tracking.

task_type str

Task category this adapter was trained on (e.g. 'bug-fix').

base_model_id str

Identifier of the base model this adapter was trained from.

rank int

LoRA rank used during training.

created_at str

ISO 8601 timestamp of adapter creation.

file_path str

Filesystem path to the adapter weights file.

file_hash str

SHA-256 hash of the adapter weights file for integrity checks.

file_size_bytes int

Size of the adapter weights file in bytes.

pass_rate float | None

Pass rate on benchmark tasks (0.0 to 1.0), if evaluated.

fitness_score float | None

Overall evolutionary fitness score, if evaluated.

source str

How the adapter was created ('distillation', 'evolution', 'manual').

session_id str

ID of the coding session that produced this adapter.

is_archived bool

Whether this adapter has been archived (soft delete).

parent_ids str | None

JSON-encoded list of parent adapter IDs for lineage tracking.

generation int

Evolutionary generation number (0 for initial adapters).

training_task_hash str | None

Deduplication key for the training task.

agent_id str | None

Identifier of the swarm agent that produced this adapter.

Example

record = AdapterRecord( ... id="adapter-001", version=1, task_type="bug-fix", ... base_model_id="Qwen/Qwen2.5-Coder-7B", rank=16, ... created_at="2026-01-01T00:00:00Z", ... file_path="/adapters/adapter-001.safetensors", ... file_hash="abc123", file_size_bytes=1024, ... source="distillation", session_id="sess-001", ... ) record.task_type 'bug-fix' record.is_archived False

registry

AdapterRegistry class providing CRUD operations for adapter metadata.

Classes
AdapterRegistry
AdapterRegistry(engine: Engine)

Registry for storing and querying LoRA adapter metadata.

Provides CRUD operations backed by SQLite via SQLModel. The registry is initialized with a SQLAlchemy Engine and creates tables idempotently on construction. WAL mode is activated automatically on every new SQLite connection via an event hook registered before table creation.

Raises:

Type Description
AdapterAlreadyExistsError

When storing a duplicate adapter ID.

AdapterNotFoundError

When querying for a non-existent adapter.

Example

from sqlalchemy import create_engine engine = create_engine("sqlite:///adapters.db") registry = AdapterRegistry(engine=engine) registry.store(record)

Initialize the registry with a SQLAlchemy Engine.

Registers a WAL-mode hook on the engine before creating tables to ensure every connection (including those opened by create_all) uses Write-Ahead Logging. Table creation is idempotent — safe to call against an existing database.

Parameters:

Name Type Description Default
engine Engine

A SQLAlchemy Engine connected to the target SQLite database.

required
Source code in libs/adapter-registry/src/adapter_registry/registry.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def __init__(self, engine: Engine) -> None:
    """Initialize the registry with a SQLAlchemy Engine.

    Registers a WAL-mode hook on the engine before creating tables to
    ensure every connection (including those opened by create_all) uses
    Write-Ahead Logging. Table creation is idempotent — safe to call
    against an existing database.

    Args:
        engine: A SQLAlchemy Engine connected to the target SQLite database.
    """
    self._engine = engine
    set_wal_mode(engine)
    SQLModel.metadata.create_all(engine)
Functions
store
store(record: AdapterRecord) -> None

Store a new adapter record in the registry.

Parameters:

Name Type Description Default
record AdapterRecord

The AdapterRecord to persist.

required

Raises:

Type Description
AdapterAlreadyExistsError

If an adapter with the same ID exists.

Example

engine = create_engine("sqlite:///adapters.db") registry = AdapterRegistry(engine=engine) record = AdapterRecord(id="abc-1", version=1, task_type="bug-fix", ... base_model_id="qwen2.5-coder-7b", rank=16, ... created_at="2026-01-01T00:00:00Z", ... file_path="/adapters/abc-1.safetensors", ... file_hash="sha256hash", file_size_bytes=4096, ... source="distillation", session_id="sess-001") registry.store(record)

Source code in libs/adapter-registry/src/adapter_registry/registry.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def store(self, record: AdapterRecord) -> None:
    """Store a new adapter record in the registry.

    Args:
        record: The AdapterRecord to persist.

    Raises:
        AdapterAlreadyExistsError: If an adapter with the same ID exists.

    Example:
        >>> engine = create_engine("sqlite:///adapters.db")
        >>> registry = AdapterRegistry(engine=engine)
        >>> record = AdapterRecord(id="abc-1", version=1, task_type="bug-fix",
        ...     base_model_id="qwen2.5-coder-7b", rank=16,
        ...     created_at="2026-01-01T00:00:00Z",
        ...     file_path="/adapters/abc-1.safetensors",
        ...     file_hash="sha256hash", file_size_bytes=4096,
        ...     source="distillation", session_id="sess-001")
        >>> registry.store(record)
    """
    with Session(self._engine, expire_on_commit=False) as session:
        if session.get(AdapterRecord, record.id) is not None:
            raise AdapterAlreadyExistsError(
                f"Adapter '{record.id}' already exists."
            )
        session.add(record)
        session.commit()
        session.expunge(record)
archive
archive(adapter_id: str) -> None

Archive an adapter by setting is_archived=True.

Parameters:

Name Type Description Default
adapter_id str

ID of the adapter to archive.

required

Raises:

Type Description
AdapterNotFoundError

If no adapter with the given ID exists.

Source code in libs/adapter-registry/src/adapter_registry/registry.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def archive(self, adapter_id: str) -> None:
    """Archive an adapter by setting is_archived=True.

    Args:
        adapter_id: ID of the adapter to archive.

    Raises:
        AdapterNotFoundError: If no adapter with the given ID exists.
    """
    with Session(self._engine, expire_on_commit=False) as session:
        record = session.get(AdapterRecord, adapter_id)
        if record is None:
            raise AdapterNotFoundError(f"No adapter with id '{adapter_id}'.")
        record.is_archived = True
        session.add(record)
        session.commit()
update_fitness
update_fitness(
    adapter_id: str, pass_rate: float, fitness_score: float
) -> None

Update evaluation metrics for an adapter.

Parameters:

Name Type Description Default
adapter_id str

ID of the adapter to update.

required
pass_rate float

New pass rate value.

required
fitness_score float

New fitness score value.

required

Raises:

Type Description
AdapterNotFoundError

If no adapter with the given ID exists.

Source code in libs/adapter-registry/src/adapter_registry/registry.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def update_fitness(
    self,
    adapter_id: str,
    pass_rate: float,
    fitness_score: float,
) -> None:
    """Update evaluation metrics for an adapter.

    Args:
        adapter_id: ID of the adapter to update.
        pass_rate: New pass rate value.
        fitness_score: New fitness score value.

    Raises:
        AdapterNotFoundError: If no adapter with the given ID exists.
    """
    with Session(self._engine, expire_on_commit=False) as session:
        record = session.get(AdapterRecord, adapter_id)
        if record is None:
            raise AdapterNotFoundError(f"No adapter with id '{adapter_id}'.")
        record.pass_rate = pass_rate
        record.fitness_score = fitness_score
        session.add(record)
        session.commit()
retrieve_by_id
retrieve_by_id(adapter_id: str) -> AdapterRecord

Retrieve a single adapter record by its unique ID.

Parameters:

Name Type Description Default
adapter_id str

The unique identifier of the adapter to retrieve.

required

Returns:

Type Description
AdapterRecord

The matching AdapterRecord.

Raises:

Type Description
AdapterNotFoundError

If no adapter with the given ID exists.

Example

engine = create_engine("sqlite:///adapters.db") registry = AdapterRegistry(engine=engine) record = registry.retrieve_by_id("abc-1")

Source code in libs/adapter-registry/src/adapter_registry/registry.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def retrieve_by_id(self, adapter_id: str) -> AdapterRecord:
    """Retrieve a single adapter record by its unique ID.

    Args:
        adapter_id: The unique identifier of the adapter to retrieve.

    Returns:
        The matching AdapterRecord.

    Raises:
        AdapterNotFoundError: If no adapter with the given ID exists.

    Example:
        >>> engine = create_engine("sqlite:///adapters.db")
        >>> registry = AdapterRegistry(engine=engine)
        >>> record = registry.retrieve_by_id("abc-1")
    """
    with Session(self._engine, expire_on_commit=False) as session:
        record = session.get(AdapterRecord, adapter_id)
        if record is None:
            raise AdapterNotFoundError(f"No adapter with id '{adapter_id}'.")
        session.expunge(record)
        return record
query_by_task_type
query_by_task_type(task_type: str) -> list[AdapterRecord]

Query all adapters matching a given task type.

Parameters:

Name Type Description Default
task_type str

The task category to filter by (e.g. 'bug-fix').

required

Returns:

Type Description
list[AdapterRecord]

List of matching AdapterRecord instances. Empty list if none match.

Example

engine = create_engine("sqlite:///adapters.db") registry = AdapterRegistry(engine=engine) results = registry.query_by_task_type("bug-fix")

Source code in libs/adapter-registry/src/adapter_registry/registry.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def query_by_task_type(self, task_type: str) -> list[AdapterRecord]:
    """Query all adapters matching a given task type.

    Args:
        task_type: The task category to filter by (e.g. 'bug-fix').

    Returns:
        List of matching AdapterRecord instances. Empty list if none match.

    Example:
        >>> engine = create_engine("sqlite:///adapters.db")
        >>> registry = AdapterRegistry(engine=engine)
        >>> results = registry.query_by_task_type("bug-fix")
    """
    stmt = select(AdapterRecord).where(AdapterRecord.task_type == task_type)
    return self._execute_query(stmt)
list_all
list_all() -> list[AdapterRecord]

List all non-archived adapter records.

Returns:

Type Description
list[AdapterRecord]

List of all AdapterRecord instances where is_archived=False.

Example

engine = create_engine("sqlite:///adapters.db") registry = AdapterRegistry(engine=engine) all_adapters = registry.list_all()

Source code in libs/adapter-registry/src/adapter_registry/registry.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def list_all(self) -> list[AdapterRecord]:
    """List all non-archived adapter records.

    Returns:
        List of all AdapterRecord instances where is_archived=False.

    Example:
        >>> engine = create_engine("sqlite:///adapters.db")
        >>> registry = AdapterRegistry(engine=engine)
        >>> all_adapters = registry.list_all()
    """
    stmt = select(AdapterRecord).where(
        AdapterRecord.is_archived == False  # noqa: E712
    )
    return self._execute_query(stmt)
query_best_for_task
query_best_for_task(
    task_type: str, top_k: int = 3
) -> list[AdapterRecord]

Return the top-k highest-fitness adapters for a task type.

Parameters:

Name Type Description Default
task_type str

Task category to filter by.

required
top_k int

Maximum number of results to return.

3

Returns:

Type Description
list[AdapterRecord]

List of AdapterRecord instances ordered by fitness_score DESC.

Source code in libs/adapter-registry/src/adapter_registry/registry.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
def query_best_for_task(
    self, task_type: str, top_k: int = 3
) -> list[AdapterRecord]:
    """Return the top-k highest-fitness adapters for a task type.

    Args:
        task_type: Task category to filter by.
        top_k: Maximum number of results to return.

    Returns:
        List of AdapterRecord instances ordered by fitness_score DESC.
    """
    stmt = (
        select(AdapterRecord)
        .where(
            AdapterRecord.task_type == task_type,
            AdapterRecord.is_archived == False,  # noqa: E712
            col(AdapterRecord.fitness_score).isnot(None),
        )
        .order_by(col(AdapterRecord.fitness_score).desc())
        .limit(top_k)
    )
    return self._execute_query(stmt)
is_task_solved
is_task_solved(
    task_hash: str, threshold: float = 0.95
) -> bool

Check if any adapter for the given task hash meets the threshold.

Parameters:

Name Type Description Default
task_hash str

Training task hash to check.

required
threshold float

Minimum pass_rate to consider "solved".

0.95

Returns:

Type Description
bool

True if a qualifying adapter exists.

Source code in libs/adapter-registry/src/adapter_registry/registry.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
def is_task_solved(self, task_hash: str, threshold: float = 0.95) -> bool:
    """Check if any adapter for the given task hash meets the threshold.

    Args:
        task_hash: Training task hash to check.
        threshold: Minimum pass_rate to consider "solved".

    Returns:
        True if a qualifying adapter exists.
    """
    stmt = (
        select(AdapterRecord)
        .where(
            AdapterRecord.training_task_hash == task_hash,
            AdapterRecord.is_archived == False,  # noqa: E712
            col(AdapterRecord.pass_rate) >= threshold,
        )
        .limit(1)
    )
    return bool(self._execute_query(stmt))
get_lineage
get_lineage(adapter_id: str) -> list[str]

Walk the parent_ids chain for an adapter.

Parameters:

Name Type Description Default
adapter_id str

Starting adapter ID.

required

Returns:

Type Description
list[str]

List of adapter IDs from the given adapter back to the root.

Source code in libs/adapter-registry/src/adapter_registry/registry.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
def get_lineage(self, adapter_id: str) -> list[str]:
    """Walk the parent_ids chain for an adapter.

    Args:
        adapter_id: Starting adapter ID.

    Returns:
        List of adapter IDs from the given adapter back to the root.
    """
    chain: list[str] = [adapter_id]
    current_id = adapter_id
    visited: set[str] = {adapter_id}

    while True:
        with Session(self._engine, expire_on_commit=False) as session:
            record = session.get(AdapterRecord, current_id)
            if record is None or record.parent_ids is None:
                break
            parent_list: list[str] = json.loads(record.parent_ids)
            if not parent_list:
                break
            next_id = parent_list[0]
            if next_id in visited:
                break
            visited.add(next_id)
            chain.append(next_id)
            current_id = next_id

    return chain
get_lineage_dag
get_lineage_dag(adapter_id: str) -> dict[str, list[str]]

Return the full multi-parent lineage DAG as an adjacency dict.

Unlike get_lineage which follows only the first parent, this method does BFS over all parents to capture the complete merge tree.

Parameters:

Name Type Description Default
adapter_id str

Starting adapter ID.

required

Returns:

Type Description
dict[str, list[str]]

Dict mapping each ancestor adapter ID to its list of parent IDs.

dict[str, list[str]]

Unknown IDs encountered during traversal are recorded with an

dict[str, list[str]]

empty parent list (no exception raised).

Source code in libs/adapter-registry/src/adapter_registry/registry.py
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
def get_lineage_dag(self, adapter_id: str) -> dict[str, list[str]]:
    """Return the full multi-parent lineage DAG as an adjacency dict.

    Unlike ``get_lineage`` which follows only the first parent, this
    method does BFS over all parents to capture the complete merge tree.

    Args:
        adapter_id: Starting adapter ID.

    Returns:
        Dict mapping each ancestor adapter ID to its list of parent IDs.
        Unknown IDs encountered during traversal are recorded with an
        empty parent list (no exception raised).

    Raises:
        None. Missing adapter IDs are silently treated as leaf nodes.
    """
    from collections import deque

    adjacency: dict[str, list[str]] = {}
    queue: deque[str] = deque([adapter_id])
    visited: set[str] = {adapter_id}

    while queue:
        current_id = queue.popleft()
        with Session(self._engine, expire_on_commit=False) as session:
            record = session.get(AdapterRecord, current_id)
            if record is None or record.parent_ids is None:
                adjacency[current_id] = []
                continue
            parent_list: list[str] = json.loads(record.parent_ids)
            adjacency[current_id] = parent_list
            for pid in parent_list:
                if pid not in visited:
                    visited.add(pid)
                    queue.append(pid)

    return adjacency
query_unevaluated
query_unevaluated(
    task_type: str | None = None,
) -> list[AdapterRecord]

Return adapters that have not been evaluated (pass_rate IS NULL).

Parameters:

Name Type Description Default
task_type str | None

Optional task type filter.

None

Returns:

Type Description
list[AdapterRecord]

List of unevaluated AdapterRecord instances.

Source code in libs/adapter-registry/src/adapter_registry/registry.py
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
def query_unevaluated(self, task_type: str | None = None) -> list[AdapterRecord]:
    """Return adapters that have not been evaluated (pass_rate IS NULL).

    Args:
        task_type: Optional task type filter.

    Returns:
        List of unevaluated AdapterRecord instances.
    """
    stmt = select(AdapterRecord).where(
        AdapterRecord.is_archived == False,  # noqa: E712
        col(AdapterRecord.pass_rate).is_(None),  # type: ignore[arg-type]
    )
    if task_type is not None:
        stmt = stmt.where(AdapterRecord.task_type == task_type)
    return self._execute_query(stmt)
get_task_types
get_task_types() -> list[str]

Return all distinct task types in the registry.

Returns:

Type Description
list[str]

Sorted list of unique task_type values.

Source code in libs/adapter-registry/src/adapter_registry/registry.py
337
338
339
340
341
342
343
344
345
346
def get_task_types(self) -> list[str]:
    """Return all distinct task types in the registry.

    Returns:
        Sorted list of unique task_type values.
    """
    with Session(self._engine, expire_on_commit=False) as session:
        stmt = select(AdapterRecord.task_type).distinct()
        results = list(session.exec(stmt).all())
        return sorted(results)