Worker Management and Resource Allocation in Agent-Lightning: A Complete Technical Guide

Agent-Lightning coordinates distributed workers through a store-driven API that handles heartbeat-driven lifecycle management and versioned resource allocation, enabling scalable agent execution across process boundaries.

Agent-Lightning is a Microsoft open-source framework designed for orchestrating agent rollouts across distributed compute environments. Understanding worker management and resource allocation in agent-lightning is essential for scaling agent workloads, as the framework abstracts workers as lightweight data models and provides atomic store operations for lifecycle tracking and resource versioning.

Worker Data Model and Lifecycle

The Worker model defined in agentlightning/types/core.py (lines 36-56) serves as the central abstraction for tracking execution units. Workers report heartbeats, maintain status timestamps, and consume versioned resources throughout their lifecycle.

Worker Creation and Initial Heartbeat

When an AgentRunner initializes, it generates a worker_id (UUID or user-provided string). The first heartbeat triggers store.update_worker(worker_id, snapshot), which creates a Worker entry with last_heartbeat_time set to the current epoch. By default, newly created workers appear with status="unknown" until assigned to a rollout, as verified in tests/store/test_threading.py.

Background Heartbeat Loop Implementation

The heartbeat mechanism resides in agentlightning/runner/agent.py (lines 418-470). The AgentRunner starts a background loop via _start_heartbeat_loop that operates in two modes:

  • Asyncio mode: A single coroutine captures and emits snapshots
  • Threaded mode: A producer thread captures snapshots while a consumer thread pushes them via a temporary event loop

The loop respects jitter (_interval_jitter) and aborts if snapshots become stale (older than heartbeat_interval + jitter + 1 second):

async def _emit_heartbeat(self, store: LightningStore) -> None:
    snap = system_snapshot(self._heartbeat_include_gpu)
    await store.update_worker(self.get_worker_id(), snap)

Status Derivation from Timestamps

Rather than storing status explicitly, the framework derives worker state from timestamp fields:

  • last_heartbeat_time: Indicates liveness
  • last_idle_time: Marks when worker became available
  • last_busy_time: Tracks active rollout execution

This approach eliminates the need for complex state machines while enabling accurate liveness detection.

Graceful Shutdown Mechanisms

The heartbeat loop returns a stopper function (type Callable[[], Awaitable[None]]) that cancels the loop and ensures the final heartbeat transmits before process termination. Call await runner.stop() to trigger this cleanup sequence.

Store Operations for Worker Management

All worker mutations funnel through CollectionBasedLightningStore in agentlightning/store/collection_based.py (lines 1561-1674), which provides atomic operations for worker CRUD.

Atomic Upserts with update_worker

The update_worker method implements upsert semantics—creating a new worker if absent, or updating specific fields if present:

async def update_worker(self, worker_id: str,
                       heartbeat_stats: Dict[str, Any] | Unset = UNSET) -> Worker:
    update_fields = ["last_heartbeat_time"]
    new_worker = Worker(worker_id=worker_id,
                        last_heartbeat_time=time.time())
    if not isinstance(heartbeat_stats, Unset):
        update_fields.append("heartbeat_stats")
        new_worker.heartbeat_stats = dict(heartbeat_stats)
    return await self._update_or_insert_worker(new_worker,
                                               update_fields=update_fields)

The internal _update_or_insert_worker method ensures that only specified fields get overwritten, preserving existing worker metadata during partial updates.

Worker Retrieval Patterns

For direct lookups, get_worker_by_id performs primary-key queries:

async def get_worker_by_id(self, worker_id: str) -> Optional[Worker]:
    async with self.collections.atomic(mode="r", ...) as collections:
        return await collections.workers.get({"worker_id": {"exact": worker_id}})

Use store.query_workers() to list all workers and inspect fleet-wide status, heartbeat times, and current rollout assignments.

Resource Allocation and Versioning Strategy

Resources in Agent-Lightning—including LLM endpoints, prompt templates, and tools—follow a versioned update model defined in agentlightning/types/resources.py (lines 92-104). The ResourcesUpdate class bundles named resources with versioning metadata to enable zero-downtime deployments.

Adding Versioned Resources

The add_resources method in agentlightning/store/collection_based.py (lines 938-970) generates a fresh resources_id and marks the record as latest:

resources_id = _generate_resources_id()
now = time.time()
update = ResourcesUpdate(
    resources_id=resources_id,
    resources=named_resources,
    create_time=now,
    update_time=now,
    version=1,
)
await collections.resources.insert([update])

Updating Resources with Rollback Capability

store.update_resources(resources_id, new_resources) creates a new version while preserving historical records. This enables instant rollbacks by reverting to previous resources_id values if new configurations fail.

Automatic Resolution in Rollouts

When enqueueing rollouts via store.enqueue_rollout(..., resources_id=None), the store automatically injects the latest resources ID if none is specified. Runners then fetch specific versions via store.get_resources_by_id(resources_id) or retrieve the current default via store.get_latest_resources(), which sorts records by update_time and returns the most recent.

Thread-Safe Store Interface

The agentlightning/store/threading.py module (lines 152-190) provides an async façade that ensures thread safety without exposing underlying storage complexity:

async def update_worker(self, worker_id: str,
                        heartbeat_stats: Dict[str, Any] | Unset = UNSET) -> Worker:
    return await self.store.update_worker(worker_id, heartbeat_stats)

This wrapper forwards calls to the concrete store implementation (in-memory, MongoDB, or custom backends) while maintaining consistent LightningStore semantics across all deployment scenarios.

Production Implementation Patterns

Starting a Runner with Heartbeat

from agentlightning.runner.agent import AgentRunner
from agentlightning.store import CollectionBasedLightningStore

store = CollectionBasedLightningStore()
runner = AgentRunner(
    store=store,
    worker_id="worker-01",
    heartbeat_interval=5.0,
    heartbeat_launch_mode="asyncio",  # or "thread"

)

await runner.start()

# Execute rollouts...

await runner.stop()

Manual Heartbeat for Testing

from agentlightning.utils.system_snapshot import system_snapshot

snapshot = system_snapshot(include_gpu=False)
await store.update_worker("worker-01", heartbeat_stats=snapshot)

Resource Version Deployment

from agentlightning.types.resources import LLM, PromptTemplate

resources = {
    "main_llm": LLM(endpoint="http://localhost:8000", model="llama3"),
    "system_prompt": PromptTemplate(
        template="You are a helpful assistant.",
        engine="f-string"
    ),
}
update = await store.add_resources(resources)
print(f"Deployed resources version: {update.resources_id}")

Summary

  • Worker lifecycle relies on timestamp-based heartbeats rather than explicit state machines, with status derived from last_heartbeat_time, last_idle_time, and last_busy_time fields.
  • Upsert operations through update_worker in collection_based.py enable idempotent worker registration without racing conditions.
  • Resource versioning via ResourcesUpdate allows atomic rollouts and instant rollbacks of LLM configurations across the worker fleet.
  • Thread-safe façades in threading.py provide consistent async APIs regardless of underlying storage backend.
  • Heartbeat pluggability supports both asyncio and threaded emission modes to accommodate diverse runtime constraints.

Frequently Asked Questions

How does Agent-Lightning detect failed or stale workers?

The framework compares last_heartbeat_time against the current time and the configured heartbeat_interval. If a worker’s latest heartbeat exceeds heartbeat_interval + jitter + 1 second, the store considers it stale. Status queries return "unknown" or filtered results for dead workers, allowing the scheduler to reschedule rollouts to healthy nodes.

What is the difference between add_resources and update_resources?

add_resources creates a brand-new ResourcesUpdate record with a fresh ID and version 1, marking it as the latest default for new rollouts. update_resources increments the version number on an existing resources ID while preserving the history, enabling rollback to previous configurations by referencing older resource IDs.

Can workers use custom identifiers instead of UUIDs?

Yes. The AgentRunner accepts an optional worker_id parameter that supports any string identifier. If omitted, the framework generates a UUID automatically. Custom IDs must remain unique across the store—attempting to create duplicate worker IDs results in upsert operations that update the existing record rather than creating collisions.

How does the threading façade maintain consistency across different store backends?

agentlightning/store/threading.py provides a thin asynchronous wrapper that delegates to the underlying store implementation (such as CollectionBasedLightningStore or MongoDB variants). All high-level APIs inherit from LightningStore, ensuring that method signatures and semantics remain identical whether using in-memory collections for testing or distributed databases for production.

Have a question about this repo?

These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:

Share the following with your agent to get started:
curl -s "https://instagit.com/install.md"

Works with
Claude Codex Cursor VS Code OpenClaw Any MCP Client

Maintain an open-source project? Get it listed too →