# How to Use the HugeGraph-LLM Scheduler API: A Complete Guide

> Master the HugeGraph-LLM Scheduler API to orchestrate AI workflows. Learn to use schedule_flow and schedule_stream_flow for seamless pipeline management and resource pooling.

- Repository: [The Apache Software Foundation/incubator-hugegraph-ai](https://github.com/apache/incubator-hugegraph-ai)
- Tags: how-to-guide
- Published: 2026-02-24

---

**The HugeGraph-LLM Scheduler API provides a singleton-based orchestrator that executes AI workflows through `schedule_flow()` for synchronous processing and `schedule_stream_flow()` for asynchronous streaming, abstracting pipeline management and resource pooling.**

The HugeGraph-LLM Scheduler API serves as the central command layer for Apache HugeGraph's AI module, managing the execution of complex LLM workflows (called "flows") without requiring manual pipeline setup. Implemented in the `apache/incubator-hugegraph-ai` repository, this API handles resource pooling, model loading, and workflow orchestration through a simple singleton interface.

## HugeGraph-LLM Scheduler API Architecture

The Scheduler API architecture centers on four primary components working together to manage workflow execution.

**FlowName Enum** – Defined in [`hugegraph_llm/flows/__init__.py`](https://github.com/apache/incubator-hugegraph-ai/blob/main/hugegraph_llm/flows/__init__.py), this enumeration provides symbolic identifiers for every supported workflow (e.g., `TEXT2GREMLIN`, `RAG_GRAPH_ONLY`, `BUILD_VECTOR_INDEX`). The scheduler uses these constants to look up and instantiate the correct pipeline.

**Scheduler Class** – Located in [`hugegraph_llm/flows/scheduler.py`](https://github.com/apache/incubator-hugegraph-ai/blob/main/hugegraph_llm/flows/scheduler.py), this class maintains a pipeline pool that maps `FlowName` values to their corresponding managers and flow instances. It exposes two primary methods: `schedule_flow()` for synchronous execution and `schedule_stream_flow()` for asynchronous streaming.

**SchedulerSingleton** – Also in [`scheduler.py`](https://github.com/apache/incubator-hugegraph-ai/blob/main/scheduler.py), this singleton pattern ensures a single `Scheduler` instance exists per process, preventing duplicate pipeline managers and resource leaks.

**GPipeline Integration** – The scheduler leverages `GPipeline` and `GPipelineManager` from the external `pycgraph` library to pool reusable pipelines, ensuring heavy resources like LLM models and vector indices persist across requests without reloading.

## How to Use the HugeGraph-LLM Scheduler API

### Step 1: Import and Instantiate

First, import the `SchedulerSingleton` and retrieve the global instance:

```python
from hugegraph_llm.flows.scheduler import SchedulerSingleton

scheduler = SchedulerSingleton.get_instance()

```

### Step 2: Select a Flow and Execute

Import the `FlowName` enum and call `schedule_flow()` with the appropriate parameters for your chosen workflow:

```python
from hugegraph_llm.flows import FlowName

result = scheduler.schedule_flow(
    FlowName.TEXT2GREMLIN,
    query="Find all vertices labeled 'person'",
    example_num=2,
    schema="...",
    gremlin_prompt="...",
    ["match_result", "template_gremlin"]
)

```

### Step 3: Process the Results

Each flow returns a dictionary containing specific output keys. For `TEXT2GREMLIN`, these include `match_result`, `template_gremlin`, `raw_gremlin`, and execution results.

## Practical Code Examples

### Example 1: Synchronous Text-to-Gremlin Conversion

The following implementation demonstrates how to convert natural language queries into Gremlin traversal strings using the scheduler:

```python
from hugegraph_llm.flows import FlowName
from hugegraph_llm.flows.scheduler import SchedulerSingleton

def text2gremlin(
    query: str,
    example_num: int = 2,
    schema: str = "",
    gremlin_prompt: str = "",
    outputs: list | None = None
):
    scheduler = SchedulerSingleton.get_instance()
    
    result = scheduler.schedule_flow(
        FlowName.TEXT2GREMLIN,
        query,
        example_num,
        schema,
        gremlin_prompt,
        outputs or [
            "match_result",
            "template_gremlin", 
            "raw_gremlin",
            "template_execution_result",
            "raw_execution_result"
        ]
    )
    return result

```

This pattern mirrors the implementation found in the demo UI at [`hugegraph-llm/src/hugegraph_llm/demo/rag_demo/text2gremlin_block.py`](https://github.com/apache/incubator-hugegraph-ai/blob/main/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/text2gremlin_block.py), where `SchedulerSingleton.get_instance().schedule_flow` handles the pipeline execution.

### Example 2: Building Vector Indices

To construct vector indices for graph embeddings, use the `BUILD_VECTOR_INDEX` flow:

```python
from hugegraph_llm.flows import FlowName
from hugegraph_llm.flows.scheduler import SchedulerSingleton

def build_vector_index(graph_name: str, space: str, data_path: str):
    scheduler = SchedulerSingleton.get_instance()
    
    return scheduler.schedule_flow(
        FlowName.BUILD_VECTOR_INDEX,
        graph_name,
        space,
        data_path
    )

```

The scheduler initializes the pipeline entries for this flow in [`scheduler.py`](https://github.com/apache/incubator-hugegraph-ai/blob/main/scheduler.py) (lines 47-50), handling the heavy initialization of embedding models and storage backends automatically.

### Example 3: Streaming RAG with Graph Context

For real-time streaming responses in retrieval-augmented generation (RAG) workflows, use the asynchronous `schedule_stream_flow()` method:

```python
import asyncio
from hugegraph_llm.flows import FlowName
from hugegraph_llm.flows.scheduler import SchedulerSingleton

async def streaming_rag(query: str):
    scheduler = SchedulerSingleton.get_instance()
    
    async for chunk in scheduler.schedule_stream_flow(
        FlowName.RAG_GRAPH_ONLY,
        query=query,
        gremlin_tmpl_num=3,
        rerank_method="bleu",
        near_neighbor_first=True,
        custom_related_information="",
        gremlin_prompt="",
        max_graph_items=10,
        topk_return_results=5,
        vector_dis_threshold=0.6,
        topk_per_keyword=3
    ):
        print(chunk, end="")

```

The `schedule_stream_flow` implementation (lines 43-78 in [`scheduler.py`](https://github.com/apache/incubator-hugegraph-ai/blob/main/scheduler.py)) manages the async generator pattern, yielding partial responses as the LLM produces them while maintaining the graph context throughout the stream.

## Key Source Files

Understanding the Scheduler API requires familiarity with these specific files in the `apache/incubator-hugegraph-ai` repository:

- **[`hugegraph-llm/src/hugegraph_llm/flows/scheduler.py`](https://github.com/apache/incubator-hugegraph-ai/blob/main/hugegraph-llm/src/hugegraph_llm/flows/scheduler.py)** – Contains the core `Scheduler` class, `SchedulerSingleton`, and both `schedule_flow()` and `schedule_stream_flow()` implementations.

- **[`hugegraph-llm/src/hugegraph_llm/flows/__init__.py`](https://github.com/apache/incubator-hugegraph-ai/blob/main/hugegraph-llm/src/hugegraph_llm/flows/__init__.py)** – Defines the `FlowName` enum that enumerates all available workflows.

- **[`hugegraph-llm/src/hugegraph_llm/demo/rag_demo/text2gremlin_block.py`](https://github.com/apache/incubator-hugegraph-ai/blob/main/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/text2gremlin_block.py)** – Production example demonstrating scheduler integration within a UI component.

- **[`hugegraph-llm/src/hugegraph_llm/flows/text2gremlin.py`](https://github.com/apache/incubator-hugegraph-ai/blob/main/hugegraph-llm/src/hugegraph_llm/flows/text2gremlin.py)** and **[`rag_flow_graph_only.py`](https://github.com/apache/incubator-hugegraph-ai/blob/main/rag_flow_graph_only.py)** – Flow implementations that define the business logic executed by the scheduler.

## Summary

- The **HugeGraph-LLM Scheduler API** orchestrates AI workflows through a singleton interface, eliminating manual pipeline management.
- Access the scheduler via **`SchedulerSingleton.get_instance()`** from [`hugegraph_llm/flows/scheduler.py`](https://github.com/apache/incubator-hugegraph-ai/blob/main/hugegraph_llm/flows/scheduler.py).
- Use **`schedule_flow()`** for synchronous execution and **`schedule_stream_flow()`** for real-time streaming responses.
- Reference workflows using the **`FlowName`** enum (e.g., `TEXT2GREMLIN`, `RAG_GRAPH_ONLY`, `BUILD_VECTOR_INDEX`).
- The scheduler automatically handles **GPipeline** pooling via `pycgraph`, ensuring efficient reuse of LLM models and vector indices across requests.

## Frequently Asked Questions

### What is the difference between `schedule_flow` and `schedule_stream_flow`?

The `schedule_flow()` method executes workflows synchronously and returns complete results as a dictionary, making it suitable for standard request-response patterns. In contrast, `schedule_stream_flow()` returns an async generator that yields partial results as they become available, which is essential for streaming LLM outputs in real-time applications.

### How does the Scheduler API manage resource pooling?

According to the source code in [`scheduler.py`](https://github.com/apache/incubator-hugegraph-ai/blob/main/scheduler.py), the scheduler maintains an internal pipeline pool that maps `FlowName` values to `GPipeline` instances provided by the `pycgraph` library. This architecture ensures that heavy resources such as loaded LLM models and initialized vector indices remain in memory between requests, significantly reducing latency on subsequent calls.

### Which flows are available through the FlowName enum?

The `FlowName` enum in [`flows/__init__.py`](https://github.com/apache/incubator-hugegraph-ai/blob/main/flows/__init__.py) defines symbolic names for all supported workflows, including `TEXT2GREMLIN` for natural language to graph query conversion, `RAG_GRAPH_ONLY` for graph-based retrieval-augmented generation, and `BUILD_VECTOR_INDEX` for constructing embedding indices. Additional ML tasks may be available depending on your HugeGraph-LLM version.

### Can I use the Scheduler API without understanding the underlying pipeline implementation?

Yes. The Scheduler API abstracts away the complexity of `GPipeline` creation, model loading, and resource cleanup. You only need to provide the correct `FlowName` and the specific parameters required by that flow, allowing you to execute complex graph-AI workflows without managing the underlying `pycgraph` infrastructure directly.