> ## Documentation Index
> Fetch the complete documentation index at: https://docs.agentops.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# AutoGen

> Integrate AgentOps with Microsoft AutoGen for multi-agent workflow tracking

[AutoGen](https://microsoft.github.io/autogen/stable/) is Microsoft's framework for building multi-agent conversational AI systems. AgentOps provides seamless integration with AutoGen to track and monitor your multi-agent workflows.

## Installation

<CodeGroup>
  ```bash pip theme={null}
  pip install agentops autogen-core python-dotenv
  ```

  ```bash poetry theme={null}
  poetry add agentops autogen-core python-dotenv
  ```

  ```bash uv theme={null}
  uv pip install agentops autogen-core python-dotenv
  ```
</CodeGroup>

## Setting Up API Keys

Before using AutoGen with AgentOps, you need to set up your API keys. You can obtain:

* **OPENAI\_API\_KEY**: From the [OpenAI Platform](https://platform.openai.com/api-keys)
* **AGENTOPS\_API\_KEY**: From your [AgentOps Dashboard](https://app.agentops.ai/)

Then to set them up, you can either export them as environment variables or set them in a `.env` file.

<CodeGroup>
  ```bash Export to CLI theme={null}
  export OPENAI_API_KEY="your_openai_api_key_here"
  export AGENTOPS_API_KEY="your_agentops_api_key_here"
  ```

  ```txt Set in .env file theme={null}
  OPENAI_API_KEY="your_openai_api_key_here"
  AGENTOPS_API_KEY="your_agentops_api_key_here"
  ```
</CodeGroup>

Then load the environment variables in your Python code:

```python theme={null}
from dotenv import load_dotenv
import os

# Load environment variables from .env file
load_dotenv()

# Set up environment variables with fallback values
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY")
os.environ["AGENTOPS_API_KEY"] = os.getenv("AGENTOPS_API_KEY")
```

## Usage

AgentOps automatically instruments AutoGen agents and tracks their interactions. Simply initialize AgentOps before creating your AutoGen agents!

<CodeGroup>
  ```python Countdown theme={null}
  import asyncio
  from dataclasses import dataclass
  from typing import Callable
  import agentops

  from autogen_core import (
      DefaultTopicId, 
      MessageContext, 
      RoutedAgent, 
      default_subscription, 
      message_handler,
      AgentId,
      SingleThreadedAgentRuntime
  )

  # Initialize AgentOps
  agentops.init()

  @dataclass
  class CountdownMessage:
      """Message containing a number for countdown operations"""
      content: int

  @default_subscription
  class ModifierAgent(RoutedAgent):
      """Agent that modifies numbers by applying a transformation function"""
      
      def __init__(self, modify_val: Callable[[int], int]) -> None:
          super().__init__("A modifier agent that transforms numbers.")
          self._modify_val = modify_val

      @message_handler
      async def handle_message(self, message: CountdownMessage, ctx: MessageContext) -> None:
          """Handle incoming messages and apply modification"""
          original_val = message.content
          modified_val = self._modify_val(original_val)
          
          print(f"🔧 ModifierAgent: Transformed {original_val} → {modified_val}")
          
          # Publish the modified value to continue the workflow
          await self.publish_message(
              CountdownMessage(content=modified_val), 
              DefaultTopicId()
          )

  @default_subscription  
  class CheckerAgent(RoutedAgent):
      """Agent that checks if a condition is met and decides whether to continue"""
      
      def __init__(self, stop_condition: Callable[[int], bool]) -> None:
          super().__init__("A checker agent that validates conditions.")
          self._stop_condition = stop_condition

      @message_handler
      async def handle_message(self, message: CountdownMessage, ctx: MessageContext) -> None:
          """Handle incoming messages and check stopping condition"""
          value = message.content
          
          if not self._stop_condition(value):
              print(f"✅ CheckerAgent: {value} passed validation, continuing workflow")
              # Continue the workflow by publishing the message
              await self.publish_message(
                  CountdownMessage(content=value), 
                  DefaultTopicId()
              )
          else:
              print(f"🛑 CheckerAgent: {value} failed validation, stopping workflow")
              print("🎉 Countdown completed successfully!")

  async def run_countdown_workflow():
      """Run a countdown workflow from 10 to 1 using AutoGen agents"""
      
      print("🚀 Starting AutoGen Countdown Workflow")
      print("=" * 50)
      
      # Create the AutoGen runtime
      runtime = SingleThreadedAgentRuntime()
      
      # Register the modifier agent (subtracts 1 from each number)
      await ModifierAgent.register(
          runtime,
          "modifier",
          lambda: ModifierAgent(modify_val=lambda x: x - 1),
      )
      
      # Register the checker agent (stops when value <= 1)
      await CheckerAgent.register(
          runtime,
          "checker", 
          lambda: CheckerAgent(stop_condition=lambda x: x <= 1),
      )
      
      # Start the runtime
      runtime.start()
      print("🤖 AutoGen runtime started")
      print("📨 Sending initial message with value: 10")
      
      # Send initial message to start the countdown
      await runtime.send_message(
          CountdownMessage(10), 
          AgentId("checker", "default")
      )
      
      # Wait for the workflow to complete
      await runtime.stop_when_idle()
      
      print("=" * 50)
      print("✨ Workflow completed! Check your AgentOps dashboard for detailed traces.")

  # Run the workflow
  if __name__ == "__main__":
      asyncio.run(run_countdown_workflow())
  ```

  ```python Multi-Agent theme={null}
  import asyncio
  from dataclasses import dataclass
  from typing import List, Dict, Any
  import agentops

  from autogen_core import (
      DefaultTopicId,
      MessageContext, 
      RoutedAgent,
      default_subscription,
      message_handler,
      AgentId,
      SingleThreadedAgentRuntime
  )

  # Initialize AgentOps
  agentops.init()

  @dataclass
  class DataMessage:
      """Message containing data to be processed"""
      data: List[Dict[str, Any]]
      stage: str
      metadata: Dict[str, Any]

  @default_subscription
  class DataCollectorAgent(RoutedAgent):
      """Agent responsible for collecting and preparing initial data"""
      
      def __init__(self) -> None:
          super().__init__("Data collector agent that gathers initial dataset.")

      @message_handler
      async def handle_message(self, message: DataMessage, ctx: MessageContext) -> None:
          print(f"📊 DataCollector: Collecting data for {message.metadata.get('source', 'unknown')}")
          
          # Simulate data collection
          collected_data = [
              {"id": 1, "value": 100, "category": "A"},
              {"id": 2, "value": 200, "category": "B"}, 
              {"id": 3, "value": 150, "category": "A"},
              {"id": 4, "value": 300, "category": "C"},
          ]
          
          print(f"✅ DataCollector: Collected {len(collected_data)} records")
          
          # Send to processor
          await self.publish_message(
              DataMessage(
                  data=collected_data,
                  stage="processing",
                  metadata={**message.metadata, "collected_count": len(collected_data)}
              ),
              DefaultTopicId()
          )

  @default_subscription
  class DataProcessorAgent(RoutedAgent):
      """Agent that processes and transforms data"""
      
      def __init__(self) -> None:
          super().__init__("Data processor agent that transforms collected data.")

      @message_handler  
      async def handle_message(self, message: DataMessage, ctx: MessageContext) -> None:
          if message.stage != "processing":
              return
              
          print(f"⚙️ DataProcessor: Processing {len(message.data)} records")
          
          # Process data - add calculated fields
          processed_data = []
          for item in message.data:
              processed_item = {
                  **item,
                  "processed_value": item["value"] * 1.1,  # 10% increase
                  "status": "processed"
              }
              processed_data.append(processed_item)
          
          print(f"✅ DataProcessor: Processed {len(processed_data)} records")
          
          # Send to analyzer
          await self.publish_message(
              DataMessage(
                  data=processed_data,
                  stage="analysis", 
                  metadata={**message.metadata, "processed_count": len(processed_data)}
              ),
              DefaultTopicId()
          )

  @default_subscription
  class DataAnalyzerAgent(RoutedAgent):
      """Agent that analyzes processed data and generates insights"""
      
      def __init__(self) -> None:
          super().__init__("Data analyzer agent that generates insights.")

      @message_handler
      async def handle_message(self, message: DataMessage, ctx: MessageContext) -> None:
          if message.stage != "analysis":
              return
              
          print(f"🧠 DataAnalyzer: Analyzing {len(message.data)} records")
          
          # Perform analysis
          total_value = sum(item["processed_value"] for item in message.data)
          avg_value = total_value / len(message.data)
          categories = set(item["category"] for item in message.data)
          
          analysis_results = {
              "total_records": len(message.data),
              "total_value": total_value,
              "average_value": avg_value,
              "unique_categories": len(categories),
              "categories": list(categories)
          }
          
          print(f"📈 DataAnalyzer: Analysis complete")
          print(f"   • Total records: {analysis_results['total_records']}")
          print(f"   • Average value: {analysis_results['average_value']:.2f}")
          print(f"   • Categories: {', '.join(analysis_results['categories'])}")
          
          # Send to reporter
          await self.publish_message(
              DataMessage(
                  data=message.data,
                  stage="reporting",
                  metadata={
                      **message.metadata, 
                      "analysis": analysis_results
                  }
              ),
              DefaultTopicId()
          )

  @default_subscription
  class ReportGeneratorAgent(RoutedAgent):
      """Agent that generates final reports"""
      
      def __init__(self) -> None:
          super().__init__("Report generator agent that creates final output.")

      @message_handler
      async def handle_message(self, message: DataMessage, ctx: MessageContext) -> None:
          if message.stage != "reporting":
              return
              
          print(f"📝 ReportGenerator: Generating final report")
          
          analysis = message.metadata.get("analysis", {})
          
          report = f"""
  🎯 DATA PROCESSING REPORT
  ========================
  Source: {message.metadata.get('source', 'Unknown')}
  Processing Date: {message.metadata.get('timestamp', 'Unknown')}

  📊 SUMMARY STATISTICS:
  • Total Records Processed: {analysis.get('total_records', 0)}
  • Total Value: ${analysis.get('total_value', 0):,.2f}
  • Average Value: ${analysis.get('average_value', 0):,.2f}
  • Unique Categories: {analysis.get('unique_categories', 0)}
  • Categories Found: {', '.join(analysis.get('categories', []))}

  ✅ Processing pipeline completed successfully!
          """
          
          print(report)
          print("🎉 Multi-agent data processing workflow completed!")

  async def run_data_processing_pipeline():
      """Run a complete data processing pipeline using multiple AutoGen agents"""
      
      print("🚀 Starting AutoGen Data Processing Pipeline")
      print("=" * 60)
      
      # Create runtime
      runtime = SingleThreadedAgentRuntime()
      
      # Register all agents
      await DataCollectorAgent.register(
          runtime,
          "collector",
          lambda: DataCollectorAgent(),
      )
      
      await DataProcessorAgent.register(
          runtime,
          "processor", 
          lambda: DataProcessorAgent(),
      )
      
      await DataAnalyzerAgent.register(
          runtime,
          "analyzer",
          lambda: DataAnalyzerAgent(),
      )
      
      await ReportGeneratorAgent.register(
          runtime,
          "reporter",
          lambda: ReportGeneratorAgent(),
      )
      
      # Start runtime
      runtime.start()
      print("🤖 AutoGen runtime with 4 agents started")
      
      # Trigger the pipeline
      initial_message = DataMessage(
          data=[],
          stage="collection",
          metadata={
              "source": "customer_database",
              "timestamp": "2024-01-15T10:30:00Z",
              "pipeline_id": "data_proc_001"
          }
      )
      
      print("📨 Triggering data processing pipeline...")
      await runtime.send_message(
          initial_message,
          AgentId("collector", "default")
      )
      
      # Wait for completion
      await runtime.stop_when_idle()
      
      print("=" * 60)
      print("✨ Pipeline completed! Check AgentOps dashboard for detailed agent traces.")

  # Run the pipeline
  if __name__ == "__main__":
      asyncio.run(run_data_processing_pipeline())
  ```
</CodeGroup>

## Examples

<CardGroup cols={2}>
  <Card title="Agent Chat Example" icon="notebook" href="/v2/examples/autogen">
    Basic multi-agent chat functionality
  </Card>

  <Card title="Math Agent Example" icon="notebook" href="https://github.com/AgentOps-AI/agentops/blob/main/examples/autogen/MathAgent.ipynb" newTab={true}>
    Demonstrates an agent specialized for mathematical problem-solving.
  </Card>
</CardGroup>

Visit your [AgentOps Dashboard](https://app.agentops.ai) to see detailed traces of your AutoGen agent interactions, performance metrics, and workflow analytics.

<script type="module" src="/scripts/github_stars.js" />

<script type="module" src="/scripts/scroll-img-fadein-animation.js" />

<script type="module" src="/scripts/button_heartbeat_animation.js" />

<script type="module" src="/scripts/adjust_api_dynamically.js" />
