How to Use the HugeGraph-LLM Scheduler API: A Complete Guide
The HugeGraph-LLM Scheduler API provides a singleton-based orchestrator that executes AI workflows through schedule_flow() for synchronous processing and schedule_stream_flow() for asynchronous streaming, abstracting pipeline management and resource pooling.
The HugeGraph-LLM Scheduler API serves as the central command layer for Apache HugeGraph's AI module, managing the execution of complex LLM workflows (called "flows") without requiring manual pipeline setup. Implemented in the apache/incubator-hugegraph-ai repository, this API handles resource pooling, model loading, and workflow orchestration through a simple singleton interface.
HugeGraph-LLM Scheduler API Architecture
The Scheduler API architecture centers on four primary components working together to manage workflow execution.
FlowName Enum – Defined in hugegraph_llm/flows/__init__.py, this enumeration provides symbolic identifiers for every supported workflow (e.g., TEXT2GREMLIN, RAG_GRAPH_ONLY, BUILD_VECTOR_INDEX). The scheduler uses these constants to look up and instantiate the correct pipeline.
Scheduler Class – Located in hugegraph_llm/flows/scheduler.py, this class maintains a pipeline pool that maps FlowName values to their corresponding managers and flow instances. It exposes two primary methods: schedule_flow() for synchronous execution and schedule_stream_flow() for asynchronous streaming.
SchedulerSingleton – Also in scheduler.py, this singleton pattern ensures a single Scheduler instance exists per process, preventing duplicate pipeline managers and resource leaks.
GPipeline Integration – The scheduler leverages GPipeline and GPipelineManager from the external pycgraph library to pool reusable pipelines, ensuring heavy resources like LLM models and vector indices persist across requests without reloading.
How to Use the HugeGraph-LLM Scheduler API
Step 1: Import and Instantiate
First, import the SchedulerSingleton and retrieve the global instance:
from hugegraph_llm.flows.scheduler import SchedulerSingleton
scheduler = SchedulerSingleton.get_instance()
Step 2: Select a Flow and Execute
Import the FlowName enum and call schedule_flow() with the appropriate parameters for your chosen workflow:
from hugegraph_llm.flows import FlowName
result = scheduler.schedule_flow(
FlowName.TEXT2GREMLIN,
query="Find all vertices labeled 'person'",
example_num=2,
schema="...",
gremlin_prompt="...",
["match_result", "template_gremlin"]
)
Step 3: Process the Results
Each flow returns a dictionary containing specific output keys. For TEXT2GREMLIN, these include match_result, template_gremlin, raw_gremlin, and execution results.
Practical Code Examples
Example 1: Synchronous Text-to-Gremlin Conversion
The following implementation demonstrates how to convert natural language queries into Gremlin traversal strings using the scheduler:
from hugegraph_llm.flows import FlowName
from hugegraph_llm.flows.scheduler import SchedulerSingleton
def text2gremlin(
query: str,
example_num: int = 2,
schema: str = "",
gremlin_prompt: str = "",
outputs: list | None = None
):
scheduler = SchedulerSingleton.get_instance()
result = scheduler.schedule_flow(
FlowName.TEXT2GREMLIN,
query,
example_num,
schema,
gremlin_prompt,
outputs or [
"match_result",
"template_gremlin",
"raw_gremlin",
"template_execution_result",
"raw_execution_result"
]
)
return result
This pattern mirrors the implementation found in the demo UI at hugegraph-llm/src/hugegraph_llm/demo/rag_demo/text2gremlin_block.py, where SchedulerSingleton.get_instance().schedule_flow handles the pipeline execution.
Example 2: Building Vector Indices
To construct vector indices for graph embeddings, use the BUILD_VECTOR_INDEX flow:
from hugegraph_llm.flows import FlowName
from hugegraph_llm.flows.scheduler import SchedulerSingleton
def build_vector_index(graph_name: str, space: str, data_path: str):
scheduler = SchedulerSingleton.get_instance()
return scheduler.schedule_flow(
FlowName.BUILD_VECTOR_INDEX,
graph_name,
space,
data_path
)
The scheduler initializes the pipeline entries for this flow in scheduler.py (lines 47-50), handling the heavy initialization of embedding models and storage backends automatically.
Example 3: Streaming RAG with Graph Context
For real-time streaming responses in retrieval-augmented generation (RAG) workflows, use the asynchronous schedule_stream_flow() method:
import asyncio
from hugegraph_llm.flows import FlowName
from hugegraph_llm.flows.scheduler import SchedulerSingleton
async def streaming_rag(query: str):
scheduler = SchedulerSingleton.get_instance()
async for chunk in scheduler.schedule_stream_flow(
FlowName.RAG_GRAPH_ONLY,
query=query,
gremlin_tmpl_num=3,
rerank_method="bleu",
near_neighbor_first=True,
custom_related_information="",
gremlin_prompt="",
max_graph_items=10,
topk_return_results=5,
vector_dis_threshold=0.6,
topk_per_keyword=3
):
print(chunk, end="")
The schedule_stream_flow implementation (lines 43-78 in scheduler.py) manages the async generator pattern, yielding partial responses as the LLM produces them while maintaining the graph context throughout the stream.
Key Source Files
Understanding the Scheduler API requires familiarity with these specific files in the apache/incubator-hugegraph-ai repository:
-
hugegraph-llm/src/hugegraph_llm/flows/scheduler.py– Contains the coreSchedulerclass,SchedulerSingleton, and bothschedule_flow()andschedule_stream_flow()implementations. -
hugegraph-llm/src/hugegraph_llm/flows/__init__.py– Defines theFlowNameenum that enumerates all available workflows. -
hugegraph-llm/src/hugegraph_llm/demo/rag_demo/text2gremlin_block.py– Production example demonstrating scheduler integration within a UI component. -
hugegraph-llm/src/hugegraph_llm/flows/text2gremlin.pyandrag_flow_graph_only.py– Flow implementations that define the business logic executed by the scheduler.
Summary
- The HugeGraph-LLM Scheduler API orchestrates AI workflows through a singleton interface, eliminating manual pipeline management.
- Access the scheduler via
SchedulerSingleton.get_instance()fromhugegraph_llm/flows/scheduler.py. - Use
schedule_flow()for synchronous execution andschedule_stream_flow()for real-time streaming responses. - Reference workflows using the
FlowNameenum (e.g.,TEXT2GREMLIN,RAG_GRAPH_ONLY,BUILD_VECTOR_INDEX). - The scheduler automatically handles GPipeline pooling via
pycgraph, ensuring efficient reuse of LLM models and vector indices across requests.
Frequently Asked Questions
What is the difference between schedule_flow and schedule_stream_flow?
The schedule_flow() method executes workflows synchronously and returns complete results as a dictionary, making it suitable for standard request-response patterns. In contrast, schedule_stream_flow() returns an async generator that yields partial results as they become available, which is essential for streaming LLM outputs in real-time applications.
How does the Scheduler API manage resource pooling?
According to the source code in scheduler.py, the scheduler maintains an internal pipeline pool that maps FlowName values to GPipeline instances provided by the pycgraph library. This architecture ensures that heavy resources such as loaded LLM models and initialized vector indices remain in memory between requests, significantly reducing latency on subsequent calls.
Which flows are available through the FlowName enum?
The FlowName enum in flows/__init__.py defines symbolic names for all supported workflows, including TEXT2GREMLIN for natural language to graph query conversion, RAG_GRAPH_ONLY for graph-based retrieval-augmented generation, and BUILD_VECTOR_INDEX for constructing embedding indices. Additional ML tasks may be available depending on your HugeGraph-LLM version.
Can I use the Scheduler API without understanding the underlying pipeline implementation?
Yes. The Scheduler API abstracts away the complexity of GPipeline creation, model loading, and resource cleanup. You only need to provide the correct FlowName and the specific parameters required by that flow, allowing you to execute complex graph-AI workflows without managing the underlying pycgraph infrastructure directly.
Have a question about this repo?
These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:
curl -s "https://instagit.com/install.md" Maintain an open-source project? Get it listed too →