import agentops
from agentops.sdk.decorators import operation
agentops.init(auto_start_session=False)
@operation
def process_batch(batch_data):
# Simulate batch processing
return f"Processed {len(batch_data)} items"
def run_etl_pipeline(data_batches):
"""ETL pipeline with progress tracking via metadata"""
trace = agentops.start_trace("etl-pipeline", tags=["data-processing"])
total_batches = len(data_batches)
processed_records = 0
# Initial metadata
agentops.update_trace_metadata({
"operation_name": "ETL Pipeline Execution",
"pipeline_stage": "starting",
"total_batches": total_batches,
"processed_batches": 0,
"processed_records": 0,
"estimated_completion": "calculating...",
"tags": ["etl", "data-processing", "async-operation"]
})
try:
for i, batch in enumerate(data_batches):
# Update progress
agentops.update_trace_metadata({
"pipeline_stage": "processing",
"current_batch": i + 1,
"processed_batches": i,
"progress_percentage": round((i / total_batches) * 100, 2)
})
# Process the batch
result = process_batch(batch)
processed_records += len(batch)
# Update running totals
agentops.update_trace_metadata({
"processed_records": processed_records,
"last_batch_result": result
})
# Final metadata update
agentops.update_trace_metadata({
"operation_name": "ETL Pipeline Completed",
"pipeline_stage": "completed",
"processed_batches": total_batches,
"progress_percentage": 100.0,
"completion_status": "success",
"total_execution_time": "calculated_automatically",
"tags": ["etl", "completed", "success"]
})
agentops.end_trace(trace, "Success")
except Exception as e:
# Error metadata
agentops.update_trace_metadata({
"operation_name": "ETL Pipeline Failed",
"pipeline_stage": "failed",
"error_message": str(e),
"completion_status": "error",
"failed_at_batch": i + 1 if 'i' in locals() else 0,
"tags": ["etl", "failed", "error"]
})
agentops.end_trace(trace, "Error")
raise
# Example usage
data_batches = [
["record1", "record2", "record3"],
["record4", "record5"],
["record6", "record7", "record8", "record9"]
]
run_etl_pipeline(data_batches)