How to Build Custom Training Algorithms for Agent-Lightning: A Complete Guide

To build a custom training algorithm for Agent-Lightning, subclass the Algorithm class from agentlightning/algorithm/base.py, implement the asynchronous run method, and decorate it with @with_store (and optionally @with_llm_proxy) to automatically inject the distributed store and LLM proxy dependencies.

Agent-Lightning separates training strategies from execution infrastructure, allowing researchers to plug custom optimization loops into a robust distributed rollout system. This guide explains how to create custom algorithms using the core abstractions provided in the microsoft/agent-lightning repository, referencing actual source files and production examples.

Subclass the Algorithm Base Class

The Algorithm class in agentlightning/algorithm/base.py defines the contract that all training strategies must implement. It exposes lifecycle hooks for linking to the Trainer, accessing the distributed store, and managing an optional LLMProxy.


# agentlightning/algorithm/base.py

class Algorithm:
    """Algorithm is the strategy, or tuner to train the agent."""
    
    async def run(
        self,
        train_dataset: Optional[Dataset[Any]] = None,
        val_dataset: Optional[Dataset[Any]] = None,
    ) -> Union[None, Awaitable[None]]:
        """Subclasses should implement this method."""
        raise NotImplementedError

When subclassing, you only need to focus on implementing the run coroutine. The base class already handles set_trainer / get_trainer, set_store / get_store, and set_llm_proxy / get_llm_proxy plumbing.

Inject Dependencies with Decorators

The agentlightning/algorithm/utils.py module provides decorators that eliminate boilerplate when accessing shared resources. Instead of manually calling self.get_store() inside run, you inject dependencies directly into the method signature.

@with_store automatically passes self.get_store() as the first positional argument after self.

@with_llm_proxy(required=False, auto_start=True) injects the configured LLMProxy and manages its lifecycle. When auto_start=True, the decorator starts the proxy before the call and stops it afterward.


# agentlightning/algorithm/utils.py

from agentlightning.algorithm.utils import with_store, with_llm_proxy

class MyCustomAlgorithm(Algorithm):
    @with_llm_proxy(required=True, auto_start=True)
    @with_store
    async def run(
        self,
        store: LightningStore,
        llm_proxy: Optional[LLMProxy],
        train_dataset: Optional[Dataset] = None,
        val_dataset: Optional[Dataset] = None,
    ) -> None:
        # store and llm_proxy are now available without manual wiring

        pass

If required=True and no proxy is configured, the decorator raises a clear ValueError before execution begins.

Orchestrate Rollouts via the LightningStore

The LightningStore (defined in agentlightning/store/base.py) serves as the central data bus for enqueuing work, polling for completion, and retrieving execution traces. Inside your run method, use these three core operations:

1. Enqueue rollouts for training samples and link them to resource configurations:

resources_update = await store.add_resources(
    "default", 
    {"main_llm": llm_proxy.as_resource()}
)
resources_id = resources_update.resources_id

rollout = await store.enqueue_rollout(
    input=sample,
    mode="train",
    resources_id=resources_id,
)

2. Wait for completion using non-blocking polling:

completed = await store.wait_for_rollouts(
    rollout_ids=[r.rollout_id for r in rollouts],
    timeout=0.0,  # non-blocking; loop until finished

)

3. Query spans (execution traces) from finished rollouts:

for rollout in completed:
    spans = await store.query_spans(
        rollout_id=rollout.rollout_id, 
        attempt_id="latest"
    )
    # Process spans for training

Transform Traces with Adapters

If your trainer expects specific data shapes (e.g., reward-model triplets or preference pairs), implement a TraceAdapter from agentlightning/adapter/base.py to convert raw spans into your target format.


# agentlightning/adapter/base.py

class TraceAdapter(Generic[T]):
    def adapt(self, spans: List[Span]) -> T:
        ...

Inject the adapter via self.set_adapter() before training, or instantiate it directly inside run. For example, LlmProxyTraceToTriplet converts LLM interaction traces into training triplets.

from agentlightning.adapter import LlmProxyTraceToTriplet

adapter = LlmProxyTraceToTriplet()
triplets = adapter.adapt(spans)

Complete Working Example

The examples/unsloth/sft_algorithm.py file demonstrates a full supervised fine-tuning algorithm that integrates all components. Below is a simplified adaptation showing the core pattern for a custom REINFORCE-style trainer:

from agentlightning.algorithm import Algorithm
from agentlightning.algorithm.utils import with_store, with_llm_proxy
from agentlightning.store import LightningStore
from agentlightning.llm_proxy import LLMProxy

class ReinforceAlgorithm(Algorithm):
    """Custom policy-gradient trainer."""
    
    @with_llm_proxy(required=True, auto_start=True)
    @with_store
    async def run(
        self,
        store: LightningStore,
        llm_proxy: LLMProxy,
        train_dataset=None,
        val_dataset=None,
    ) -> None:
        # Update store resources with the current LLM proxy address

        res = await store.add_resources({"main_llm": llm_proxy.as_resource()})
        resources_id = res.resources_id
        
        # Enqueue rollouts for all training samples

        rollouts = [
            await store.enqueue_rollout(
                input=sample, 
                mode="train", 
                resources_id=resources_id
            )
            for sample in train_dataset
        ]
        
        # Wait for distributed execution to complete

        completed = await store.wait_for_rollouts(
            rollout_ids=[r.rollout_id for r in rollouts],
            timeout=0.0,
        )
        
        # Gather traces and compute policy gradient

        all_data = []
        for rollout in completed:
            spans = await store.query_spans(rollout.rollout_id, "latest")
            all_data.extend(self.process_spans(spans))
        
        # Execute training step (framework-specific)

        await self.update_policy(all_data)
    
    async def update_policy(self, batch_data):
        """Hook for PyTorch/TensorFlow/JAX optimization logic."""
        pass

This pattern cleanly separates infrastructure (store operations, proxy lifecycle) from optimization logic (loss computation, model updates).

Functional-Style Algorithm Definition

If you prefer plain functions over classes, use the algo decorator from agentlightning/algorithm/decorator.py. This wraps a function into a FunctionalAlgorithm instance that respects the same injection decorators.

from agentlightning.algorithm.decorator import algo
from agentlightning.algorithm.utils import with_store

@algo
@with_store
async def my_custom_trainer(store: LightningStore, train_dataset=None):
    # Same body as class-based run

    rollout = await store.enqueue_rollout(...)
    ...

The functional approach is ideal for simple, stateless training loops that do not require complex internal configuration.

Summary

  • Subclass Algorithm from agentlightning/algorithm/base.py and implement the async run method.
  • Use @with_store and @with_llm_proxy to inject dependencies without manual wiring.
  • Interact with LightningStore via enqueue_rollout, wait_for_rollouts, and query_spans to drive distributed data collection.
  • Apply TraceAdapter instances to convert raw execution spans into training-compatible formats.
  • Reference examples/unsloth/sft_algorithm.py for a production-grade implementation integrating vLLM, resource updates, and HuggingFace datasets.

Frequently Asked Questions

What is the minimum code required to create a valid custom algorithm?

You must subclass Algorithm in agentlightning/algorithm/base.py and implement the async def run(self, train_dataset, val_dataset) method. Decorate this method with @with_store from agentlightning/algorithm/utils.py to receive the LightningStore instance. This three-line skeleton satisfies the framework contract and plugs into Trainer.fit().

How do I handle LLM proxy lifecycle management in my algorithm?

Use the @with_llm_proxy(auto_start=True) decorator on your run method. When auto_start=True, the decorator automatically starts the proxy before your training loop and stops it afterward, ensuring the inference endpoint is only active during data collection. If your algorithm requires a proxy but none is configured, setting required=True raises a ValueError before execution.

Can I convert Agent-Lightning execution traces into HuggingFace datasets?

Yes. Use a TraceAdapter from agentlightning/adapter/base.py to convert List[Span] objects into your desired format. For example, the LlmProxyTraceToTriplet adapter transforms traces into triplet structures suitable for supervised fine-tuning. After adapting spans, construct a datasets.Dataset inside your run method before passing it to your model trainer.

Where can I find a complete reference implementation?

The file examples/unsloth/sft_algorithm.py in the microsoft/agent-lightning repository provides a full end-to-end example. It demonstrates vLLM server management, resource updates via store.add_resources(), batch rollout enqueuing, triplet adaptation, and process-isolated training with Unsloth. Study this file for patterns on handling distributed rollouts and checkpointing.

Have a question about this repo?

These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:

Share the following with your agent to get started:
curl -s "https://instagit.com/install.md"

Works with
Claude Codex Cursor VS Code OpenClaw Any MCP Client

Maintain an open-source project? Get it listed too →