API Reference for training-svc

training_svc

Training service for LoRA and hypernetwork training job orchestration.

Modules

dependencies

Dependency injection for FastAPI endpoints.

Functions
get_db
get_db() -> Generator[Session, None, None]

Provide a database session for dependency injection.

Yields:

Type Description
Session

Database session that automatically commits or rolls back.

Source code in services/training-svc/src/training_svc/dependencies.py
10
11
12
13
14
15
16
17
def get_db() -> Generator[Session, None, None]:
    """Provide a database session for dependency injection.

    Yields:
        Database session that automatically commits or rolls back.
    """
    with Session(engine) as session:
        yield session

jobs

In-memory training job status tracking.

Module-level JOB_STORE dict shared across all request handlers. State is lost on service restart — acceptable for single-user local MVP.

All mutations to JOB_STORE must be made while holding _JOB_STORE_LOCK to prevent race conditions when background threads update job status while FastAPI request handlers are reading it.

Classes
JobStatus dataclass
JobStatus(
    job_id: str,
    status: str,
    adapter_id: Optional[str] = None,
    error: Optional[str] = None,
)

Training job status tracker.

main

FastAPI application for the training service.

Functions
lifespan async
lifespan(app: FastAPI)

Manage the lifecycle of the FastAPI application.

Source code in services/training-svc/src/training_svc/main.py
16
17
18
19
20
21
22
23
@asynccontextmanager
async def lifespan(app: FastAPI):
    """Manage the lifecycle of the FastAPI application."""
    logger.info("Starting up training service...")
    create_db_and_tables()
    logger.info("Database initialized successfully")
    yield
    logger.info("Shutdown complete")
health_check async
health_check()

Liveness probe - is the service running?

Source code in services/training-svc/src/training_svc/main.py
30
31
32
33
@app.get("/health")
async def health_check():
    """Liveness probe - is the service running?"""
    return {"status": "healthy", "service": "training-svc"}

models

SQLModel tables for training job tracking.

Classes
TrainingJob

Bases: SQLModel

Persistent record of a training job.

routers

Router modules for training service endpoints.

Modules
training

Training router — POST /train/lora, POST /train/hypernetwork, GET /jobs/{id}.

Classes Functions
train_lora async
train_lora(
    request: LoraTrainingRequest,
    background_tasks: BackgroundTasks,
) -> JSONResponse

Dispatch a QLoRA training job as a background task.

Parameters:

Name Type Description Default
request LoraTrainingRequest

LoRA training parameters — session_id is required.

required
background_tasks BackgroundTasks

FastAPI background task runner.

required

Returns:

Type Description
JSONResponse

JSONResponse with job_id and status="queued".

Example

body = {"session_id": "s-1", "task_type": "code-gen", "epochs": 3} response = client.post("/train/lora", json=body) response.status_code 200 response.json()["status"] 'queued'

Source code in services/training-svc/src/training_svc/routers/training.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
@router.post("/train/lora")
async def train_lora(
    request: LoraTrainingRequest,
    background_tasks: BackgroundTasks,
) -> JSONResponse:
    """Dispatch a QLoRA training job as a background task.

    Args:
        request: LoRA training parameters — session_id is required.
        background_tasks: FastAPI background task runner.

    Returns:
        JSONResponse with job_id and status="queued".

    Example:
        >>> body = {"session_id": "s-1", "task_type": "code-gen", "epochs": 3}
        >>> response = client.post("/train/lora", json=body)
        >>> response.status_code
        200
        >>> response.json()["status"]
        'queued'
    """
    job_id = str(uuid.uuid4())
    adapter_id = request.adapter_id or str(uuid.uuid4())
    _validate_adapter_id(adapter_id)

    with _JOB_STORE_LOCK:
        JOB_STORE[job_id] = JobStatus(
            job_id=job_id,
            status="queued",
            adapter_id=adapter_id,
        )

    background_tasks.add_task(
        _run_training_job,
        job_id,
        request.session_id,
        adapter_id,
        request.task_type,
        request.rank,
        request.epochs,
        request.learning_rate,
    )

    return JSONResponse(content={"job_id": job_id, "status": "queued"}, status_code=200)
train_hypernetwork async
train_hypernetwork(
    request: HypernetworkTrainingRequest,
    background_tasks: BackgroundTasks,
) -> JSONResponse

Dispatch a hypernetwork adapter generation job as a background task.

Accepts a trajectory, runs it through the pre-trained hypernetwork in a single forward pass, saves the adapter in PEFT format, and returns a job_id for status polling via GET /jobs/{job_id}.

Parameters:

Name Type Description Default
request HypernetworkTrainingRequest

Hypernetwork training parameters including task_type and trajectory_ids (uses first trajectory_id).

required
background_tasks BackgroundTasks

FastAPI background task runner.

required

Returns:

Type Description
JSONResponse

JSONResponse with job_id and status="queued".

Raises:

Type Description
HTTPException

422 if trajectory_ids is empty.

Example

body = {"task_type": "gen", "trajectory_ids": ["t-1"]} response = client.post("/train/hypernetwork", json=body) response.status_code 200 response.json()["status"] 'queued'

Source code in services/training-svc/src/training_svc/routers/training.py
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
@router.post("/train/hypernetwork")
async def train_hypernetwork(
    request: HypernetworkTrainingRequest,
    background_tasks: BackgroundTasks,
) -> JSONResponse:
    """Dispatch a hypernetwork adapter generation job as a background task.

    Accepts a trajectory, runs it through the pre-trained hypernetwork in a
    single forward pass, saves the adapter in PEFT format, and returns a
    job_id for status polling via GET /jobs/{job_id}.

    Args:
        request: Hypernetwork training parameters including task_type and
            trajectory_ids (uses first trajectory_id).
        background_tasks: FastAPI background task runner.

    Returns:
        JSONResponse with job_id and status="queued".

    Raises:
        HTTPException: 422 if trajectory_ids is empty.

    Example:
        >>> body = {"task_type": "gen", "trajectory_ids": ["t-1"]}
        >>> response = client.post("/train/hypernetwork", json=body)
        >>> response.status_code
        200
        >>> response.json()["status"]
        'queued'
    """
    if not request.trajectory_ids:
        raise HTTPException(
            status_code=422,
            detail="trajectory_ids must not be empty.",
        )

    job_id = str(uuid.uuid4())
    # Assign adapter_id at creation time so GET /jobs/{id} returns it immediately
    adapter_id = str(uuid.uuid4())

    with _JOB_STORE_LOCK:
        JOB_STORE[job_id] = JobStatus(
            job_id=job_id,
            status="queued",
            adapter_id=adapter_id,
        )

    background_tasks.add_task(
        _run_hypernetwork_job,
        job_id,
        adapter_id,
        request.trajectory_ids[0],
        request.task_type,
    )

    return JSONResponse(content={"job_id": job_id, "status": "queued"}, status_code=200)
get_job_status async
get_job_status(job_id: str) -> JSONResponse

Get training job status.

Parameters:

Name Type Description Default
job_id str

Unique identifier for the training job.

required

Returns:

Type Description
JSONResponse

JSONResponse with job_id, status, adapter_id, and optional error.

Raises:

Type Description
HTTPException

404 if job_id not found.

Example

response = client.get("/jobs/job-123") response.status_code 404

Source code in services/training-svc/src/training_svc/routers/training.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
@router.get("/jobs/{job_id}")
async def get_job_status(job_id: str) -> JSONResponse:
    """Get training job status.

    Args:
        job_id: Unique identifier for the training job.

    Returns:
        JSONResponse with job_id, status, adapter_id, and optional error.

    Raises:
        HTTPException: 404 if job_id not found.

    Example:
        >>> response = client.get("/jobs/job-123")
        >>> response.status_code
        404
    """
    with _JOB_STORE_LOCK:
        job = JOB_STORE.get(job_id)
    if job is None:
        raise HTTPException(status_code=404, detail=f"Job {job_id} not found")

    return JSONResponse(
        content={
            "job_id": job.job_id,
            "status": job.status,
            "adapter_id": job.adapter_id,
            "error": job.error,
        },
        status_code=200,
    )

schemas

Pydantic request/response schemas for training service.

Classes
LoraTrainingRequest

Bases: BaseModel

Request to train a LoRA adapter.

HypernetworkTrainingRequest

Bases: BaseModel

Request to train via hypernetwork forward pass.

JobStatusResponse

Bases: BaseModel

Response for training job status.

storage

Database storage configuration for training service.

Functions
create_db_and_tables
create_db_and_tables() -> None

Create all database tables.

Source code in services/training-svc/src/training_svc/storage.py
 9
10
11
def create_db_and_tables() -> None:
    """Create all database tables."""
    SQLModel.metadata.create_all(engine)