AgentOps provides native context manager support for traces, allowing you to use Python’s with statement for automatic trace lifecycle management. This approach ensures traces are properly started and ended, even when exceptions occur.
The simplest way to use context managers is with the start_trace() function:
Copy
import agentops# Initialize AgentOpsagentops.init(api_key="your-api-key")# Use context manager for automatic trace managementwith agentops.start_trace("my_workflow") as trace: # Your code here print("Processing data...") # Trace automatically ends when exiting the with block
You can add tags to traces for better organization and filtering:
Copy
import agentopsagentops.init(api_key="your-api-key")# Using list tagswith agentops.start_trace("data_processing", tags=["batch", "production"]): process_batch_data()# Using dictionary tags for more structured metadatawith agentops.start_trace("user_request", tags={ "user_id": "12345", "request_type": "query", "priority": "high"}): handle_user_request()
Context managers automatically handle exceptions and set appropriate trace states:
Copy
import agentopsagentops.init(api_key="your-api-key")# Automatic error handlingtry: with agentops.start_trace("risky_operation"): # This will automatically set trace status to "Error" raise ValueError("Something went wrong")except ValueError as e: print(f"Caught error: {e}") # Trace has already been ended with Error status# Graceful degradation patterntry: with agentops.start_trace("primary_service"): result = call_primary_service()except ServiceUnavailableError: with agentops.start_trace("fallback_service"): result = call_fallback_service()
Context managers work seamlessly with threading and asyncio:
Copy
import agentopsimport threadingagentops.init(api_key="your-api-key")# With threadingdef worker_function(worker_id): with agentops.start_trace(f"worker_{worker_id}"): # Each thread gets its own independent trace process_work(worker_id)threads = []for i in range(3): thread = threading.Thread(target=worker_function, args=(i,)) threads.append(thread) thread.start()for thread in threads: thread.join()