None
%load_ext watermark
%load_ext autoreload
%autoreload 2
import re
import json
import vllm
import asyncio
from enum import Enum
from uuid import uuid4
from omegaconf import OmegaConf
from dataclasses import dataclass
from vllm.inputs import TokensPrompt
from vllm import AsyncLLMEngine, SamplingParams
from vllm.engine.arg_utils import AsyncEngineArgs
from transformers import AutoModelForCausalLM, AutoTokenizer
from transformers.utils import get_json_schema
from verl.tools.utils.tool_registry import get_tool_class
%watermark -a 'Ethen' -d -u -v -iv
Agent is a system that leverages LLM model to reason, plan, and execute actions through external tools, enabling it to interact with its environment and accomplish specific goals.
The ReAct (Reasoning + Acting) pattern, demonstrates this overalll process:
Thought: I need to find the latest weather in Paris.
Action: Search["weather in Paris"]
Observation: 22C.
Thought: Now that I know the weather ...
Action: Respond["The current temperature is 22C in Paris."]
In this pattern, LLM alternates between thinking - articulating its reasoning and intent, and acting - invoking tools and incorporating their output, until the goal is achieved.
We will implement this so call "agent loop" in the upcoming sections.
For huggingface transformer, end users pass in tool's json schema as part of apply chat template. If we pass in Python function, they will automatically be converted to json schema for us.
# "Qwen/Qwen3-4B-Instruct-2507" # "Qwen/Qwen3-1.7B"
model_name = "Qwen/Qwen3-4B-Instruct-2507"
cache_dir = "/data"
tokenizer = AutoTokenizer.from_pretrained(model_name, cache_dir=cache_dir)
model = AutoModelForCausalLM.from_pretrained(model_name, cache_dir=cache_dir)
def get_current_temperature(location: str, unit: str):
"""
Get the current temperature at a location.
Args:
location: The location to get the temperature for, in the format "City, Country"
unit: The unit to return the temperature in. (choices: ["celsius", "fahrenheit"])
"""
# dummy code, in actual implementation, call some weather API
return 22.
While not necessary, we typically rely on system message to shape the overall behavior and role following pattern: You are a [role] who [behavior]. e.g. You are a senior software engineer who explains concepts with code examples.
messages = [
{"role": "system", "content": "You are a bot that responds to weather queries. You should reply with the unit used in the queried location."},
{"role": "user", "content": "Hey, what's the temperature in Paris right now?"}
]
tools = [get_current_temperature]
inputs = tokenizer.apply_chat_template(
messages,
# pass additional tools
tools=tools,
add_generation_prompt=True,
return_dict=True,
return_tensors="pt",
enable_thinking=False,
)
print(tokenizer.decode(inputs["input_ids"][0]))
Underneath the hood, it parses our tool function's docstring to create a json schema. These tool info are typically specified within some special <tool></tool> tag. Note: LLM doesn't have visibility into tool function's actual implementation code, therefore, the clarity and comprehensiveness of its docstring directly impact our model's ability to effectively utilize these tools.
# example json schema parsing
get_json_schema(get_current_temperature)
Given our input message and tool definition, the model should call our tool for responding to this input query.
tool_outputs = model.generate(**inputs.to(model.device), max_new_tokens=128)
# generate includes the original input, slice to only retain response
tool_response_ids = tool_outputs[0][len(inputs["input_ids"][0]):]
tool_response = tokenizer.decode(tool_response_ids)
print(tool_response)
It is important to note that a model cannot directly execute tool calls by itself. Instead, the model decides it should generate a tool call request through a structured interface, typically encapsulated within special tokens such as <tool_call></tool_call>. It's end user's responsibility to handle this request, execute that tool, and append that result to the main conversation history. Consequently, when a tool call is triggered, two messages must be appended: the assistant's tool call request and its corresponding tool response.
# adding tool call to our chat message, typically requires a bit of parsing
tool_call = {"name": "get_current_temperature", "arguments": {"location": "Paris, France", "unit": "celsius"}}
messages.append({"role": "assistant", "tool_calls": [{"type": "function", "function": tool_call}]})
# tool role for indicating it's a response from tool, which will
# get wrapped in some special token <tool_response>
messages.append({"role": "tool", "content": "22"})
inputs = tokenizer.apply_chat_template(
messages,
tools=tools,
add_generation_prompt=True,
return_dict=True,
return_tensors="pt",
enable_thinking=False
)
print(tokenizer.decode(inputs["input_ids"][0]))
Our model now tailors the response using all 3 information: our original input prompt, tool use, tool response.
outputs = model.generate(**inputs.to(model.device), max_new_tokens=128)
response = tokenizer.decode(outputs[0][len(inputs["input_ids"][0]):])
print(response)
# del model
Having established a high level understanding of agentic tool use, we now turn to a concrete implementation. Specifically, let's look at how verl implements this.
Verl's ToolAgentLoop implements a state machine that orchestrates multi-turn conversations between a language model and a set of external tools. The flow cycles through a sequence of pre-defined states until one or more termination conditions are met, e.g.
User: "What's the weather in Seattle?"
PENDING → Tokenize prompt with tool schemas
GENERATING → Model outputs: <tool_call>get_weather(city="Seattle")</tool_call>
response_mask = [1,1,1,1,1,1,1,1]
PROCESSING_TOOLS → Execute get_weather tool
Tool returns: "72°F, Sunny"
response_mask = [1,1,1,1,1,1,1,1,0,0,0,0]
GENERATING → Model outputs: "The weather in Seattle is 72°F and sunny."
response_mask = [1,1,1,1,1,1,1,1,0,0,0,0,1,1,1,1,1,1]
TERMINATED → No more tool calls, return output
Compared to the simplified example in the previous section, the main enhancements includes:
AsyncLLMEngine for generation. response_mask is maintained throughout the conversation to distinguish model generated tokens (marked as 1) from injected context tokens (marked as 0, which includes the original prompt, tool response, and user messages). This distinction is critical for training, since external information should be excluded from the training signal to avoid corrupting the model's learning objective.class AsyncLLMServerManager:
"""leverage AsyncLLMEngine for async request processing
LLM class is more tailored towards offline batch inference"""
def __init__(self, model, engine_kwargs):
engine_args = AsyncEngineArgs(model=model, **engine_kwargs)
self.engine = AsyncLLMEngine.from_engine_args(engine_args)
async def generate(self, prompt_ids, sampling_params):
# vllm also accepts tokenized id as input instead of raw text
# request_id required for vllm's async llm engine
request_id = uuid4().hex
generator = self.engine.generate(
request_id=request_id,
prompt=TokensPrompt(prompt_token_ids=prompt_ids),
sampling_params=SamplingParams(**sampling_params)
)
async for output in generator:
final_res = output
token_ids = final_res.outputs[0].token_ids
return token_ids
engine_kwargs = {"tensor_parallel_size": 8, "gpu_memory_utilization": 0.6, "max_model_len": 8192}
server_manager = AsyncLLMServerManager(model_name, engine_kwargs=engine_kwargs)
sampling_params = {
"temperature": 0.7,
"top_p": 0.9,
"max_tokens": 512
}
token_ids = await server_manager.generate(inputs["input_ids"][0].tolist(), sampling_params)
tokenizer.decode(token_ids)
@dataclass
class FunctionCall:
arguments: str
"""
The arguments to call the function with, as generated by the model in JSON
format (dump into str). Note that the model does not always generate valid JSON,
and may hallucinate parameters not defined by your function schema. Validate the
arguments in your code before calling your function.
"""
name: str
"""The name of the function to call."""
class ToolParser:
"""Parses tool call's name and argument, tool call token/regex should match the ones
used by our model/prompt"""
def __init__(self, tokenizer):
self.tokenizer = tokenizer
self.tool_call_start_token = "<tool_call>"
self.tool_call_end_token = "</tool_call>"
self.tool_call_regex = re.compile(r"<tool_call>(.*?)</tool_call>", re.DOTALL)
self.loop = asyncio.get_running_loop()
async def extract_tool_calls(self, response_ids):
text = await self.loop.run_in_executor(None, self.tokenizer.decode, response_ids)
if self.tool_call_start_token not in text or self.tool_call_end_token not in text:
return text, []
matches = self.tool_call_regex.findall(text)
function_calls = []
for match in matches:
try:
function_call = json.loads(match)
name, arguments = function_call["name"], function_call["arguments"]
function_call = FunctionCall(name=name, arguments=json.dumps(arguments))
function_calls.append(function_call)
except Exception as e:
print(f"Failed to decode tool call: {e}")
# remaing text exclude tool call tokens
content = self.tool_call_regex.sub("", text)
return content, function_calls
tool_parser = ToolParser(tokenizer)
parsed_output = await tool_parser.extract_tool_calls(tool_response_ids)
parsed_output
To use tool in verl, we need to define a tool class that inherits from BaseTool, and implement the following methods:
import json
from typing import Any
from transformers.utils import get_json_schema
from verl.tools.schemas import ToolResponse
from verl.tools.base_tool import BaseTool, OpenAIFunctionToolSchema
class WeatherTool(BaseTool):
def get_current_temperature(self, city: str):
"""Get current temperature at a location.
Args:
city: The location to get the temperature for, in the format "City, State, Country".
Returns:
the temperature, the location, and the unit in a dict
"""
return {
"temperature": 72,
"city": city,
}
def get_openai_tool_schema(self) -> OpenAIFunctionToolSchema:
schema = get_json_schema(self.get_current_temperature)
return OpenAIFunctionToolSchema(**schema)
async def execute(self, instance_id: str, parameters: dict[str, Any], **kwargs) -> tuple[ToolResponse, float, dict]:
"""trigger the tool call and return response, reward, metrics"""
try:
result = self.get_current_temperature(**parameters)
return ToolResponse(text=json.dumps(result)), 0, {}
except Exception as e:
return ToolResponse(text=str(e)), 0, {}
One defined, these tools modules can be configured via yaml and lazy load only when needed.
tools:
- class_name: "tools.tools.WeatherTool"
def initialize_tools_from_config(tools_config_file):
tools_config = OmegaConf.load(tools_config_file)
tool_list = []
for tool_config in tools_config.tools:
cls_name = tool_config.class_name
# verl's lazy loading util, where we can specify tool class in our yaml config
# file, and dynamically load them at runtime
tool_cls = get_tool_class(cls_name)
tool = tool_cls(
config=OmegaConf.to_container(tool_config.config, resolve=True),
# our tool class already implemented tool's schema, no need to re-define it in tool config file
tool_schema=None,
)
tool_list.append(tool)
return tool_list
tools_config_file = "tools_config.yaml"
tool_list = initialize_tools_from_config(tools_config_file)
tool_list
class AgentState(Enum):
PENDING = "pending"
GENERATING = "generating"
PROCESSING_TOOLS = "processing_tools"
TERMINATED = "terminated"
class AgentData:
"""Encapsulates all state variables for the agent loop."""
def __init__(
self,
messages: list[dict],
):
self.messages = messages
# State variables
self.prompt_ids: list[int] = []
self.response_ids: list[int] = []
self.response_mask: list[int] = []
self.tool_rewards: list[float] = []
self.user_turns = 0
self.assistant_turns = 0
# Temporary state for tool calls
self.tool_calls: list = []
# Extra fields for dynamic addition
self.extra_fields: dict = {}
class AgentLoopOutput:
"""Agent loop output."""
def __init__(
self,
prompt_ids: list[int],
response_ids: list[int],
response_mask: list[int],
extra_fields: dict = {}
):
self.prompt_ids = prompt_ids
self.response_ids = response_ids
self.response_mask = response_mask
self.extra_fields = extra_fields
class ToolAgentLoop:
"""simplified version of verl's tool agent loop"""
def __init__(self, config, server_manager, tokenizer):
self.server_manager = server_manager
self.tokenizer = tokenizer
self.apply_chat_template_kwargs = config.apply_chat_template_kwargs
self.response_length = config.response_length
self.max_user_turns = config.max_user_turns
self.max_assistant_turns = config.max_assistant_turns
self.max_parallel_calls = config.max_parallel_calls
self.max_tool_response_length = config.max_tool_response_length
self.tool_response_truncate_side = config.tool_response_truncate_side
self.tool_config_path = config.tool_response_truncate_side
# Initialize tools and parser
tool_config_path = config.tool_config_path
tool_list = initialize_tools_from_config(tool_config_path) if tool_config_path else []
self.tools = {tool.name: tool for tool in tool_list}
self.tool_schemas = [tool.tool_schema.model_dump(exclude_unset=True, exclude_none=True) for tool in tool_list]
self.tool_parser = ToolParser(tokenizer)
self.loop = asyncio.get_running_loop()
async def run(self, sampling_params, messages):
"""run agent loop terminates when there are:
- no tool calls
- reaches configurable max turns
- reaches configurable max response length
"""
agent_data = AgentData(
messages=messages,
)
# state machine loop
state = AgentState.PENDING
while state != AgentState.TERMINATED:
if state == AgentState.PENDING:
state = await self._handle_pending_state(agent_data)
elif state == AgentState.GENERATING:
state = await self._handle_generating_state(agent_data, sampling_params)
elif state == AgentState.PROCESSING_TOOLS:
state = await self._handle_processing_tools_state(agent_data)
else:
state = AgentState.TERMINATED
# finalize output
response_ids = agent_data.prompt_ids[-len(agent_data.response_mask):]
prompt_ids = agent_data.prompt_ids[:len(agent_data.prompt_ids) - len(agent_data.response_mask)]
output = AgentLoopOutput(
prompt_ids=prompt_ids,
response_ids=response_ids[:self.response_length],
response_mask=agent_data.response_mask[:self.response_length],
extra_fields={"tool_rewards": agent_data.tool_rewards}
)
return output
async def _handle_pending_state(self, agent_data):
"""Prepare tokenized prompt id including tool schema and transition to generating state"""
# tokenization related blocking/synchronous/CPU-bound function are submitted via run_in_executor
# to be executed in separate thread
agent_data.prompt_ids = await self.loop.run_in_executor(
None,
lambda: self.tokenizer.apply_chat_template(
agent_data.messages,
tools=self.tool_schemas,
add_generation_prompt=True,
tokenize=True,
**self.apply_chat_template_kwargs,
)
)
return AgentState.GENERATING
async def _handle_generating_state(self, agent_data, sampling_params):
"""generate model response, append assistant response, parse tool calls (name/argument) if present"""
# generate response output (token id)
token_ids = await self.server_manager.generate(agent_data.prompt_ids, sampling_params)
# update agent state with generation results
agent_data.assistant_turns += 1
agent_data.response_ids = token_ids
agent_data.prompt_ids += token_ids
# mark as model generated
agent_data.response_mask += [1] * len(agent_data.response_ids)
if len(agent_data.response_mask) >= self.response_length:
return AgentState.TERMINATED
if self.max_assistant_turns and agent_data.assistant_turns >= self.max_assistant_turns:
return AgentState.TERMINATED
if self.max_user_turns and agent_data.user_turns >= self.max_user_turns:
return AgentState.TERMINATED
# extract tool calls from generated response
_, agent_data.tool_calls = await tool_parser.extract_tool_calls(agent_data.response_ids)
# add assistant message to conversation history
assistant_message = await self.loop.run_in_executor(
None,
lambda: self.tokenizer.decode(agent_data.response_ids, skip_special_tokens=True)
)
agent_data.messages.append({"role": "assistant", "content": assistant_message})
if agent_data.tool_calls:
return AgentState.PROCESSING_TOOLS
else:
return AgentState.TERMINATED
async def _handle_processing_tools_state(self, agent_data):
"""execute all tool calls, and append tool result"""
# execute tools in parallel
add_messages = []
tasks = []
for tool_call in agent_data.tool_calls[:self.max_parallel_calls]:
tasks.append(self._call_tool(tool_call, agent_data))
responses = await asyncio.gather(*tasks)
for tool_response, tool_reward, _ in responses:
message = {"role": "tool", "content": tool_response or ""}
add_messages.append(message)
if tool_reward is not None:
agent_data.tool_rewards.append(tool_reward)
# encode tool response into tokens
response_ids = await self.loop.run_in_executor(
None,
lambda: self.tokenizer.apply_chat_template(
add_messages,
add_generation_prompt=True,
tokenize=True,
**self.apply_chat_template_kwargs,
)
)
if len(agent_data.response_mask) + len(response_ids) >= self.response_length:
return AgentState.TERMINATED
agent_data.messages.extend(add_messages)
agent_data.prompt_ids += response_ids
# mark as context (not model generated)
agent_data.response_mask += [0] * len(response_ids)
agent_data.user_turns += 1
return AgentState.GENERATING
async def _call_tool(self, tool_call: FunctionCall, agent_data):
"""execute a single tool call, return tool response, reward and metric"""
tool, instance_id = None, None
try:
tool_name = tool_call.name
tool_args = json.loads(tool_call.arguments)
tool = self.tools[tool_name]
instance_id, _ = await tool.create()
tool_execution_response, tool_reward, tool_metrics = await tool.execute(
instance_id, tool_args, agent_data=agent_data
)
except Exception as e:
print(f"Error when executing tool: {e}")
return (
f"Error when executing tool: {e}",
0.0,
{},
)
# Truncate tool response if needed
tool_response_text = tool_execution_response.text
if tool_response_text and len(tool_response_text) > self.max_tool_response_length:
if self.tool_response_truncate_side == "left":
tool_response_text = tool_response_text[:self.max_tool_response_length] + "...(truncated)"
elif self.tool_response_truncate_side == "right":
tool_response_text = "(truncated)..." + tool_response_text[-self.max_tool_response_length:]
else:
length = self.max_tool_response_length // 2
tool_response_text = tool_response_text[:length] + "...(truncated)..." + tool_response_text[-length:]
return tool_response_text, tool_reward, tool_metrics
@dataclass
class Config:
apply_chat_template_kwargs: dict
response_length: int = 256
# multi turn config
max_user_turns: int = 3
max_assistant_turns: int = 5
max_parallel_calls: int = 3
max_tool_response_length: int = 1000
tool_response_truncate_side: str = "middle"
tool_config_path: str = "tools_config.yaml"
config = Config(
apply_chat_template_kwargs={"enable_thinking": False}
)
agent_loop = ToolAgentLoop(
config=config,
server_manager=server_manager,
tokenizer=tokenizer,
)
messages = [
{"role": "user", "content": "What's the weather in Seattle?"}
]
sampling_params = {
"temperature": 0.7,
"top_p": 0.9,
"max_tokens": 512
}
tools_kwargs = {}
# Run the agent loop
output = await agent_loop.run(
sampling_params=sampling_params,
messages=messages,
)
print("prompt ids: \n", tokenizer.decode(output.prompt_ids))
print("response ids: \n", tokenizer.decode(output.response_ids))
print("\nresponse mask: \n", output.response_mask)