> ## Documentation Index
> Fetch the complete documentation index at: https://docs.agentops.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Context Managers

> Use AgentOps traces as Python context managers for automatic lifecycle management

# Context Managers

AgentOps provides native context manager support for traces, allowing you to use Python's `with` statement for automatic trace lifecycle management. This approach ensures traces are properly started and ended, even when exceptions occur.

## Basic Usage

The simplest way to use context managers is with the `start_trace()` function:

```python theme={null}
import agentops

# Initialize AgentOps
agentops.init(api_key="your-api-key")

# Use context manager for automatic trace management
with agentops.start_trace("my_workflow") as trace:
    # Your code here
    print("Processing data...")
    # Trace automatically ends when exiting the with block
```

The trace will automatically:

* Start when entering the `with` block
* End with "Success" status when exiting normally
* End with "Error" status if an exception occurs
* Clean up resources properly in all cases

## Advanced Usage

### Traces with Tags

You can add tags to traces for better organization and filtering:

```python theme={null}
import agentops

agentops.init(api_key="your-api-key")

# Using list tags
with agentops.start_trace("data_processing", tags=["batch", "production"]):
    process_batch_data()

# Using dictionary tags for more structured metadata
with agentops.start_trace("user_request", tags={
    "user_id": "12345",
    "request_type": "query",
    "priority": "high"
}):
    handle_user_request()
```

### Parallel Traces

Context managers create independent parallel traces, not parent-child relationships:

```python theme={null}
import agentops

agentops.init(api_key="your-api-key")

# Sequential parallel traces
with agentops.start_trace("task_1"):
    print("Task 1 executing")

with agentops.start_trace("task_2"):
    print("Task 2 executing")

# Nested context managers create parallel traces
with agentops.start_trace("outer_workflow"):
    print("Outer workflow started")
    
    with agentops.start_trace("inner_task"):
        print("Inner task executing (parallel to outer)")
    
    print("Outer workflow continuing")
```

### Exception Handling

Context managers automatically handle exceptions and set appropriate trace states:

```python theme={null}
import agentops

agentops.init(api_key="your-api-key")

# Automatic error handling
try:
    with agentops.start_trace("risky_operation"):
        # This will automatically set trace status to "Error"
        raise ValueError("Something went wrong")
except ValueError as e:
    print(f"Caught error: {e}")
    # Trace has already been ended with Error status

# Graceful degradation pattern
try:
    with agentops.start_trace("primary_service"):
        result = call_primary_service()
except ServiceUnavailableError:
    with agentops.start_trace("fallback_service"):
        result = call_fallback_service()
```

### Concurrent Execution

Context managers work seamlessly with threading and asyncio:

<CodeGroup>
  ```python Threading theme={null}
  import agentops
  import threading

  agentops.init(api_key="your-api-key")

  # With threading
  def worker_function(worker_id):
      with agentops.start_trace(f"worker_{worker_id}"):
          # Each thread gets its own independent trace
          process_work(worker_id)

  threads = []
  for i in range(3):
      thread = threading.Thread(target=worker_function, args=(i,))
      threads.append(thread)
      thread.start()

  for thread in threads:
      thread.join()
  ```

  ```python Asyncio theme={null}
  import agentops
  import asyncio

  agentops.init(api_key="your-api-key")

  # With asyncio
  async def async_task(task_id):
      with agentops.start_trace(f"async_task_{task_id}"):
          await asyncio.sleep(0.1)  # Simulate async work
          return f"result_{task_id}"

  async def main():
      tasks = [async_task(i) for i in range(3)]
      results = await asyncio.gather(*tasks)
      return results

  # Run async tasks
  results = asyncio.run(main())
  ```
</CodeGroup>

## Production Patterns

### API Endpoint Monitoring

```python theme={null}
import agentops
from flask import Flask, request

app = Flask(__name__)
agentops.init(api_key="your-api-key")

@app.route('/api/process', methods=['POST'])
def process_request():
    # Create trace for each API request
    with agentops.start_trace("api_request", tags={
        "endpoint": "/api/process",
        "method": "POST",
        "user_id": request.headers.get("user-id")
    }):
        try:
            data = request.get_json()
            result = process_data(data)
            return {"status": "success", "result": result}
        except Exception as e:
            # Exception automatically sets trace to Error status
            return {"status": "error", "message": str(e)}, 500
```

### Batch Processing

```python theme={null}
import agentops

agentops.init(api_key="your-api-key")

def process_batch(items):
    with agentops.start_trace("batch_processing", tags={
        "batch_size": len(items),
        "batch_type": "data_processing"
    }):
        successful = 0
        failed = 0
        
        for item in items:
            try:
                with agentops.start_trace("item_processing", tags={
                    "item_id": item.get("id"),
                    "item_type": item.get("type")
                }):
                    process_item(item)
                    successful += 1
            except Exception as e:
                failed += 1
                print(f"Failed to process item {item.get('id')}: {e}")
        
        print(f"Batch completed: {successful} successful, {failed} failed")
```

### Retry Logic

```python theme={null}
import agentops
import time

agentops.init(api_key="your-api-key")

def retry_operation(operation_name, max_retries=3):
    for attempt in range(max_retries):
        try:
            with agentops.start_trace(f"{operation_name}_attempt_{attempt + 1}", tags={
                "operation": operation_name,
                "attempt": attempt + 1,
                "max_retries": max_retries
            }):
                # Your operation here
                result = perform_operation()
                return result  # Success - exit retry loop
                
        except Exception as e:
            if attempt < max_retries - 1:
                wait_time = 2 ** attempt  # Exponential backoff
                print(f"Attempt {attempt + 1} failed: {e}. Retrying in {wait_time}s...")
                time.sleep(wait_time)
            else:
                print(f"All {max_retries} attempts failed")
                raise
```

## Backward Compatibility

Context managers are fully backward compatible with existing AgentOps code patterns:

<CodeGroup>
  ```python Manual Management theme={null}
  import agentops

  agentops.init(api_key="your-api-key")

  # Manual trace management (legacy)
  trace = agentops.start_trace("manual_trace")
  # ... your code ...
  agentops.end_trace(trace, "Success")
  ```

  ```python Context Manager theme={null}
  import agentops

  agentops.init(api_key="your-api-key")

  # Context manager (new, recommended)
  with agentops.start_trace("context_managed_trace") as trace:
      # ... your code ...
      pass  # Automatically ended
  ```

  ```python Property Access theme={null}
  import agentops

  agentops.init(api_key="your-api-key")

  # Accessing trace properties
  with agentops.start_trace("property_access") as trace:
      span = trace.span  # Access underlying span
      trace_id = trace.span.get_span_context().trace_id
  ```

  ```python Mixed Usage theme={null}
  import agentops

  agentops.init(api_key="your-api-key")

  # Mixed usage
  trace = agentops.start_trace("mixed_usage")
  try:
      with trace:  # Use existing trace as context manager
          # ... your code ...
          pass
  except Exception:
      agentops.end_trace(trace, "Error")
  ```
</CodeGroup>

## Examples

For complete working examples, see the following files in the AgentOps repository:

<CardGroup cols={2}>
  <Card title="Basic Usage" icon="play" href="https://github.com/AgentOps-AI/agentops/blob/main/examples/context_manager/basic_usage.py">
    Simple context manager patterns and error handling
  </Card>

  <Card title="Parallel Traces" icon="arrows-split-up-and-left" href="https://github.com/AgentOps-AI/agentops/blob/main/examples/context_manager/parallel_traces.py">
    Sequential, nested, and concurrent trace patterns
  </Card>

  <Card title="Error Handling" icon="shield-exclamation" href="https://github.com/AgentOps-AI/agentops/blob/main/examples/context_manager/error_handling.py">
    Exception handling, retry patterns, and graceful degradation
  </Card>

  <Card title="Production Patterns" icon="server" href="https://github.com/AgentOps-AI/agentops/blob/main/examples/context_manager/production_patterns.py">
    API endpoints, batch processing, microservices, and monitoring
  </Card>
</CardGroup>

These examples demonstrate real-world usage patterns and best practices for using AgentOps context managers in production applications.

## API Reference

For detailed API information, see the [SDK Reference](/v2/usage/sdk-reference#trace-management) documentation.
