import agentops
from agentops.sdk.decorators import agent, operation, tool, trace
# Initialize AgentOps without auto-starting session since we use @trace
agentops.init("your-api-key", auto_start_session=False)
@agent
class CoordinatorAgent:
def __init__(self):
self.task_queue = []
@operation
def assign_task(self, task, agent_type):
self.task_queue.append({"task": task, "agent": agent_type})
return f"Task assigned to {agent_type}: {task}"
@operation
def collect_results(self, results):
return f"Collected {len(results)} results"
@agent
class WorkerAgent:
def __init__(self, agent_id):
self.agent_id = agent_id
@tool(cost=0.05)
def process_task(self, task):
return f"Agent {self.agent_id} processed: {task}"
@trace(name="coordinated-processing")
def coordinated_processing_workflow(tasks):
"""Workflow with agent coordination"""
coordinator = CoordinatorAgent()
workers = [WorkerAgent(f"worker_{i}") for i in range(3)]
# Assign tasks
assignments = []
for i, task in enumerate(tasks):
worker_type = f"worker_{i % len(workers)}"
assignment = coordinator.assign_task(task, worker_type)
assignments.append(assignment)
# Process tasks
results = []
for i, task in enumerate(tasks):
worker = workers[i % len(workers)]
result = worker.process_task(task)
results.append(result)
# Collect results
summary = coordinator.collect_results(results)
return {
"assignments": assignments,
"results": results,
"summary": summary
}
# Run coordinated workflow
tasks = ["analyze_data", "generate_report", "send_notification"]
result = coordinated_processing_workflow(tasks)