
In my prior article on building a DAX LLM, I utilized base API calls to stream data from Anthropic. In this article, we’ll look to utilize Pydantic AI to further productionize and expand on this pattern to also incorporate tool calls.
Pydantic AI is a newer LLM agent framework. It’s built by the team behind Pydantic, which I’m a fan of through past use in FastAPI projects. It’s smaller in scope compared to something like LangChain and provides a bridge for creating LLM building blocks.
Prior Design
The most relevant example with the base API call approach is the following function:
async def anthropic_stream_api_call(chat_input_list: list) -> AsyncGenerator[str, None]:
"""Streams anthropic response.
Args:
chat_input_list (list): List of chat inputs to send to the API.
Yields:
AsyncGenerator[str, None]: Stream of anthropic response.
"""
# Build message list
message_input = build_anthropic_message_input(chat_input_list=chat_input_list)
# Setup and make api call.
client = AsyncAnthropic(api_key=ANTHROPIC_API_KEY)
stream = await client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=4096,
temperature=0.2,
system=get_system_prompt(chat_input_list),
messages=message_input,
stream=True
)
async for event in stream:
if event.type in ['message_start', 'message_delta', 'message_stop', 'content_block_start', 'content_block_stop']:
pass
elif event.type == 'content_block_delta':
yield event.delta.text
else:
yield event.type
The API call is made in an async function and then events handled for streaming and displaying where needed via a generator pattern.
Pydantic AI Approach/Design
In the pydantic docs, a pattern is identified for streaming. We’ll break this apart a bit and refactor to plug into something like a FastAPI streaming endpoint.
Messages and Events Overview
Pydantic AI exposes various messages, one good relevant example to our build is a PartDeltaEvent. This is similar to our ‘contentblockdelta’ test above, where we use event types to determine what to display back to the end user via our stream or ignore/log only. We’re also incorporating tool calls into our example, so the list of possible event types expands. The full list of messages we’ll utilize are:
from pydantic_ai.messages import (
AgentStreamEvent,
FinalResultEvent,
FunctionToolCallEvent,
FunctionToolResultEvent,
PartDeltaEvent,
PartStartEvent,
TextPartDelta,
ToolCallPartDelta,
ToolCallPart
)
From the above, we need to delineate which pieces to yield back to caller or not. Using an enum approach and pydantic basemodels, we’ll create a delineation of a ‘display’ (a user should see these) vs. a ‘debug’ event (events such as underlying tool call operations).
from pydantic import BaseModel
from enum import Enum
class EventType(str, Enum):
DISPLAY = "display"
DEBUG = "debug"
class StreamEvent(BaseModel):
event_type: EventType
content: str
def is_display(self) -> bool:
return self.event_type == EventType.DISPLAY
Adding simple wrappers to translate base events into these events:
def build_display_event(content: str) -> StreamEvent:
"""
Build a display event from the provided content.
Args:
content (str): The content to include in the display event.
Returns:
StreamEvent: A validated event with display type and the provided content.
"""
return StreamEvent(
event_type=EventType.DISPLAY,
content=content
)
def build_debug_event(event: AgentStreamEvent) -> StreamEvent:
"""
Build a debug event from an AgentStreamEvent.
Args:
event (AgentStreamEvent): The event to build the debug event from.
Returns:
StreamEvent: A validated event with debug type and the event details.
"""
return StreamEvent(
event_type=EventType.DEBUG,
content=f'{type(event)}: {event}'
)
Message Parsing
Now that we have basics of how messages work in Pydantic AI and how we’ll represent these in two larger buckets (display vs. debug), we’ll need a function to parse a given event and determine which bucket to place it in. For a given stream event (revealed later, but similar to our prior base API example), we need to parse out which type of event - display or debug - as well as the relevant content that should be extracted for the event. We care most about displaying the start and delta events that contain text and logging underlying tool calls to a debugging list of strings.
We utilize testing against message classes using the isinstance function against the prior imports from the messages overview section above.
def parse_event(event: AgentStreamEvent) -> StreamEvent:
"""
Parse an AgentStreamEvent and return a StreamEvent.
Args:
event (AgentStreamEvent): The event to parse.
Returns:
StreamEvent: A validated event with display type and the provided content.
"""
# Content we care about printing
if isinstance(event, PartStartEvent) and hasattr(event.part, 'content'):
return build_display_event(event.part.content)
elif isinstance(event, PartDeltaEvent) and hasattr(event, 'delta') and isinstance(event.delta, TextPartDelta):
return build_display_event(event.delta.content_delta)
# Debug only (tool calls/results)
elif isinstance(event, PartStartEvent) and isinstance(event.part, ToolCallPart):
return build_debug_event(event)
elif isinstance(event, PartDeltaEvent) and isinstance(event.delta, ToolCallPartDelta):
return build_debug_event(event)
elif isinstance(event, FinalResultEvent):
return build_debug_event(event)
elif isinstance(event, FunctionToolCallEvent):
return build_debug_event(event)
elif isinstance(event, FunctionToolResultEvent):
return build_debug_event(event)
else:
return StreamEvent(
event_type=EventType.DEBUG,
content=f'UNKNOWN {type(event)}: {event}'
)
Looping through a stream of events, we can then parse and classify each with the following handler:
async def handle_stream_events(stream: AsyncIterator[AgentStreamEvent], debug_messages: list[str]) -> AsyncIterator[str]:
"""
Process stream events and handle display/debug routing.
Args:
stream: The event stream to process, yields AgentStreamEvent objects.
debug_messages: List to collect debug messages. Modified in place.
Yields:
str: Content from display events
"""
async for event in stream:
parsed_event = parse_event(event)
if parsed_event.is_display():
yield parsed_event.content
else:
debug_messages.append(parsed_event.content)
For each event in a stream (iterator with various AgentEventStream objects), we parse the event and if it’s a display event we yield the content - otherwise just append to a debugging list.
Main Stream Iterator
Utilizing the above functions we can iterate over an agent graph and handle event streams from each node. The agent class from pydantic exposes various helper functions such as isuserpromptnode, ismodelrequestnode, and iscalltools_node most importantly. These will be used to determine which to display vs. log for debugging.
The below function takes an agent and prompt to iterate over nodes in a run. We key in on model requests nodes as containing relevant potentially displayable events in the stream. Other nodes are either directly added to a debug list or lower level parsed events added for more interesting operations such as tool calls.
async def main_stream(agent: Agent, user_prompt: str) -> AsyncIterator[str]:
"""
Main function to handle the streaming of events from the agent based on the user prompt.
This function manages the streaming of different types of nodes (user prompt, model request, tool calls, etc.)
and processes the events generated during the stream. It yields the content of display events and collects
debug messages for other types of events.
Args:
agent (Agent): The agent responsible for handling the user prompt and generating events.
user_prompt (str): The user prompt to be processed by the agent.
Yields:
str: The content of display events generated during the stream.
Collects:
list: A list of debug messages generated during the stream and prints at the end.
"""
debug_messages = []
content_streamed = False
async with agent.iter(user_prompt) as run:
async for node in run:
if Agent.is_user_prompt_node(node):
debug_messages.append(f'UserPromptNode: {node.user_prompt}')
elif Agent.is_model_request_node(node):
debug_messages.append(f'ModelRequestNode: streaming partial request tokens')
async with node.stream(run.ctx) as request_stream:
async for content in handle_stream_events(request_stream, debug_messages):
if content:
content_streamed = True
yield content
elif Agent.is_call_tools_node(node):
debug_messages.append(f'ToolCallNode: streaming tool calls and results (debug only)')
async with node.stream(run.ctx) as tool_request_stream:
async for _ in handle_stream_events(tool_request_stream, debug_messages):
pass
elif Agent.is_end_node(node):
if content_streamed:
debug_messages.append(f'end_node: {run.result.data}')
else:
yield run.result.data
else:
debug_messages.append(f'Unknown Node: {type(node)}: {node}')
print('\n\n\nDebug:')
print('\n'.join(debug_messages))
Running the Streaming Generator
With the above pieces set up, running is as simple as setting up an agent with any tools and calling our main_stream function.
Setting up an example agent with a tool:
from pydantic_ai import Agent, Tool
async def get_user_name() -> str:
"""
This function is used to retrieve the user name.
"""
return "Joe" # Note: this would be dynamically retrieved in an actual app
agent = Agent(
'anthropic:claude-3-5-sonnet-20241022',
system_prompt=(
"You are a helpful assistant that can answer questions and help with tasks. Greet the user by name first before answering any questions."
),
tools=[
Tool(get_user_name, takes_ctx=False)
]
)
async for text_response in main_stream(agent, 'What is 2+2?'):
print(text_response, end='', flush=True)
Output (streamed):
Let me first get your name so I can greet you properly. Hi Joe! The answer to 2+2 is 4. This is a simple arithmetic calculation that I can help you with directly. Is there anything else you'd like to know?
Summary
This example provides a basic approach for streaming using Pydantic AI in a way that is extensible and can be plugged into various backend/frontend workflows. It’s certainly not feature complete to everything one would want to do (chat history, multiple agents, better error handling, etc), but provides an example of how to create building blocks utilizing Pydantic AI.
All examples and files available on Github.