from agents.mcp import MCPServer, MCPServerStdio
from agents import Agent, Runner, trace
import json
def simple_render_chunk(chunk):
"""Simple version that just filters important events"""
# Tool calls
if (hasattr(chunk, 'type') and
chunk.type == 'run_item_stream_event'):
if chunk.name == 'tool_called':
tool_name = chunk.item.raw_item.name
args = chunk.item.raw_item.arguments
print(f"🔧 Tool: {tool_name}({args})")
elif chunk.name == 'tool_output':
try:
# Handle both string and already-parsed output
if isinstance(chunk.item.output, str):
output = json.loads(chunk.item.output)
else:
output = chunk.item.output
# Handle both dict and list formats
if isinstance(output, dict):
if output.get('type') == 'text':
text = output['text']
if 'Error' in text:
print(f"❌ Error: {text}")
else:
print(f"✅ Result: {text[:100]}...")
elif isinstance(output, list) and len(output) > 0:
# Handle list format
first_item = output[0]
if isinstance(first_item, dict) and first_item.get('type') == 'text':
text = first_item['text']
if 'Error' in text:
print(f"❌ Error: {text}")
else:
print(f"✅ Result: {text[:100]}...")
else:
# Fallback - just print the raw output
print(f"✅ Result: {str(output)[:100]}...")
except (json.JSONDecodeError, AttributeError, KeyError) as e:
# Fallback to raw output if parsing fails
print(f"✅ Result: {str(chunk.item.output)[:100]}...")
elif chunk.name == 'message_output_created':
try:
content = chunk.item.raw_item.content
if content and len(content) > 0:
print(f"💬 Response: {content[0].text}")
except (AttributeError, IndexError):
print(f"💬 Response: {str(chunk.item)[:100]}...")
# Text deltas for streaming
elif (hasattr(chunk, 'type') and
chunk.type == 'raw_response_event' and
hasattr(chunk, 'data') and
hasattr(chunk.data, 'type') and
chunk.data.type == 'response.output_text.delta'):
print(chunk.data.delta, end='', flush=True)
async with MCPServerStdio(
name="ClickHouse SQL Playground",
params={
"command": "uv",
"args": [
'run',
'--with', 'mcp-clickhouse',
'--python', '3.13',
'mcp-clickhouse'
],
"env": env
}, client_session_timeout_seconds = 60
) as server:
agent = Agent(
name="Assistant",
instructions="Use the tools to query ClickHouse and answer questions based on those files.",
mcp_servers=[server],
)
message = "What's the biggest GitHub project so far in 2025?"
print(f"\n\nRunning: {message}")
with trace("Biggest project workflow"):
result = Runner.run_streamed(starting_agent=agent, input=message, max_turns=20)
async for chunk in result.stream_events():
simple_render_chunk(chunk)