import asyncio
from dataclasses import dataclass
from typing import Callable
import agentops
from autogen_core import (
DefaultTopicId,
MessageContext,
RoutedAgent,
default_subscription,
message_handler,
AgentId,
SingleThreadedAgentRuntime
)
# Initialize AgentOps
agentops.init()
@dataclass
class CountdownMessage:
"""Message containing a number for countdown operations"""
content: int
@default_subscription
class ModifierAgent(RoutedAgent):
"""Agent that modifies numbers by applying a transformation function"""
def __init__(self, modify_val: Callable[[int], int]) -> None:
super().__init__("A modifier agent that transforms numbers.")
self._modify_val = modify_val
@message_handler
async def handle_message(self, message: CountdownMessage, ctx: MessageContext) -> None:
"""Handle incoming messages and apply modification"""
original_val = message.content
modified_val = self._modify_val(original_val)
print(f"🔧 ModifierAgent: Transformed {original_val} → {modified_val}")
# Publish the modified value to continue the workflow
await self.publish_message(
CountdownMessage(content=modified_val),
DefaultTopicId()
)
@default_subscription
class CheckerAgent(RoutedAgent):
"""Agent that checks if a condition is met and decides whether to continue"""
def __init__(self, stop_condition: Callable[[int], bool]) -> None:
super().__init__("A checker agent that validates conditions.")
self._stop_condition = stop_condition
@message_handler
async def handle_message(self, message: CountdownMessage, ctx: MessageContext) -> None:
"""Handle incoming messages and check stopping condition"""
value = message.content
if not self._stop_condition(value):
print(f"✅ CheckerAgent: {value} passed validation, continuing workflow")
# Continue the workflow by publishing the message
await self.publish_message(
CountdownMessage(content=value),
DefaultTopicId()
)
else:
print(f"🛑 CheckerAgent: {value} failed validation, stopping workflow")
print("🎉 Countdown completed successfully!")
async def run_countdown_workflow():
"""Run a countdown workflow from 10 to 1 using AutoGen agents"""
print("🚀 Starting AutoGen Countdown Workflow")
print("=" * 50)
# Create the AutoGen runtime
runtime = SingleThreadedAgentRuntime()
# Register the modifier agent (subtracts 1 from each number)
await ModifierAgent.register(
runtime,
"modifier",
lambda: ModifierAgent(modify_val=lambda x: x - 1),
)
# Register the checker agent (stops when value <= 1)
await CheckerAgent.register(
runtime,
"checker",
lambda: CheckerAgent(stop_condition=lambda x: x <= 1),
)
# Start the runtime
runtime.start()
print("🤖 AutoGen runtime started")
print("📨 Sending initial message with value: 10")
# Send initial message to start the countdown
await runtime.send_message(
CountdownMessage(10),
AgentId("checker", "default")
)
# Wait for the workflow to complete
await runtime.stop_when_idle()
print("=" * 50)
print("✨ Workflow completed! Check your AgentOps dashboard for detailed traces.")
# Run the workflow
if __name__ == "__main__":
asyncio.run(run_countdown_workflow())