Executions
When you trigger a pipeline, an execution is created to track its progress. Executions contain the input, output, status, and timing information for each pipeline run.
Execution Lifecycle
Section titled “Execution Lifecycle”pending → running → succeeded ↘ failed ↘ cancelled| Status | Description |
|---|---|
pending | Execution queued, waiting to start |
running | Currently processing |
succeeded | Completed successfully |
failed | Completed with an error |
cancelled | Manually cancelled |
Triggering an Execution
Section titled “Triggering an Execution”Executions are created when you trigger a pipeline:
Trigger pipeline execution
curl -X POST https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/trigger \ -H "Authorization: Bearer $API_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "input": { "dataInputs": { "query": "What are the key trends in Q4 sales?" } } }'const response = await fetch( "https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/trigger", { method: "POST", headers: { Authorization: `Bearer ${apiToken}`, "Content-Type": "application/json", }, body: JSON.stringify({ input: { dataInputs: { query: "What are the key trends in Q4 sales?", }, }, }), });const execution = await response.json();console.log(execution.executionId); // "GkR8I6rHBms3W4Qfa2-FN"response = requests.post( "https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/trigger", headers={"Authorization": f"Bearer {api_token}"}, json={ "input": { "dataInputs": { "query": "What are the key trends in Q4 sales?" } } })execution = response.json()print(execution["executionId"]) # "GkR8I6rHBms3W4Qfa2-FN"Getting Execution Status
Section titled “Getting Execution Status”Get execution details
curl https://api.catalyzed.ai/pipeline-executions/GkR8I6rHBms3W4Qfa2-FN \ -H "Authorization: Bearer $API_TOKEN"const response = await fetch( "https://api.catalyzed.ai/pipeline-executions/GkR8I6rHBms3W4Qfa2-FN", { headers: { Authorization: `Bearer ${apiToken}` } });const execution = await response.json();response = requests.get( "https://api.catalyzed.ai/pipeline-executions/GkR8I6rHBms3W4Qfa2-FN", headers={"Authorization": f"Bearer {api_token}"})execution = response.json()Response:
{ "executionId": "GkR8I6rHBms3W4Qfa2-FN", "pipelineId": "EMbMEFLyUWEgvnhMWXVVa", "pipelineName": "Document Summarizer", "pipelineHandlerType": "language_model", "pipelineInputsSchema": { "files": [], "datasets": [], "dataInputs": [ { "id": "query", "label": "Query", "schema": { "type": "string" }, "required": true } ] }, "pipelineOutputsSchema": { "files": [], "datasets": [], "dataInputs": [ { "id": "summary", "label": "Summary", "schema": { "type": "string" }, "required": true } ] }, "pipelineConfiguration": {}, "status": "running", "progressPct": 45, "progressMessage": "Processing document chunks...", "input": { "dataInputs": { "query": "What are the key trends in Q4 sales?" } }, "output": null, "outputCitations": [], "errorMessage": null, "createdAt": "2024-01-15T10:30:00Z", "startedAt": "2024-01-15T10:30:02Z", "completedAt": null, "createdBy": "usr_abc123"}Polling for Completion
Section titled “Polling for Completion”For long-running executions, poll until completion:
async function waitForExecution(executionId: string) { while (true) { const response = await fetch( `https://api.catalyzed.ai/pipeline-executions/${executionId}`, { headers: { Authorization: `Bearer ${apiToken}` } } ); const execution = await response.json();
if (execution.status === "succeeded") { return execution.output; } if (execution.status === "failed") { throw new Error(execution.errorMessage); } if (execution.status === "cancelled") { throw new Error("Execution was cancelled"); }
console.log(`Progress: ${execution.progressPct}% - ${execution.progressMessage}`); await new Promise(r => setTimeout(r, 2000)); // Poll every 2 seconds }}Streaming Execution Updates
Section titled “Streaming Execution Updates”For real-time updates, use Server-Sent Events (SSE) instead of polling:
Stream execution updates via SSE
curl -N https://api.catalyzed.ai/pipeline-executions/GkR8I6rHBms3W4Qfa2-FN/stream \ -H "Authorization: Bearer $API_TOKEN" \ -H "Accept: text/event-stream"const eventSource = new EventSource( `https://api.catalyzed.ai/pipeline-executions/${executionId}/stream`, { headers: { Authorization: `Bearer ${apiToken}`, }, });
eventSource.addEventListener("execution-state", (event) => { const execution = JSON.parse(event.data); console.log(`Status: ${execution.status}, Progress: ${execution.progressPct}%`);});
eventSource.addEventListener("stream-complete", (event) => { const { finalStatus } = JSON.parse(event.data); console.log(`Execution completed with status: ${finalStatus}`); eventSource.close();});
eventSource.addEventListener("ping", (event) => { // Keepalive event, no action needed});import sseclientimport requests
response = requests.get( f"https://api.catalyzed.ai/pipeline-executions/{execution_id}/stream", headers={"Authorization": f"Bearer {api_token}"}, stream=True)
client = sseclient.SSEClient(response)for event in client.events(): if event.event == "execution-state": execution = json.loads(event.data) print(f"Status: {execution['status']}, Progress: {execution['progressPct']}%") elif event.event == "stream-complete": data = json.loads(event.data) print(f"Execution completed with status: {data['finalStatus']}") breakSSE Event Types
Section titled “SSE Event Types”| Event | Description |
|---|---|
execution-state | Full execution state (same structure as GET endpoint) |
stream-complete | Sent when execution reaches terminal state (succeeded/failed/cancelled) |
ping | Keepalive event sent every 15 seconds |
Stream Behavior
Section titled “Stream Behavior”- Immediate state: Current execution state is sent immediately on connect
- Real-time updates: Updates are pushed as they occur
- Auto-close: Stream automatically closes when execution reaches terminal state
- Connection timeout: 5 minutes (reconnect if longer execution expected)
- Keepalive: Ping events sent every 15 seconds to maintain connection
Listing Executions
Section titled “Listing Executions”For a Specific Pipeline
Section titled “For a Specific Pipeline”List pipeline executions
curl "https://api.catalyzed.ai/pipeline-executions?pipelineId=EMbMEFLyUWEgvnhMWXVVa" \ -H "Authorization: Bearer $API_TOKEN"const response = await fetch( "https://api.catalyzed.ai/pipeline-executions?pipelineId=EMbMEFLyUWEgvnhMWXVVa", { headers: { Authorization: `Bearer ${apiToken}` } });const { executions } = await response.json();response = requests.get( "https://api.catalyzed.ai/pipeline-executions", params={"pipelineId": "EMbMEFLyUWEgvnhMWXVVa"}, headers={"Authorization": f"Bearer {api_token}"})executions = response.json()["executions"]Filter by Status
Section titled “Filter by Status”# Only failed executionscurl "https://api.catalyzed.ai/pipeline-executions?pipelineId=EMbMEFLyUWEgvnhMWXVVa&status=failed" \ -H "Authorization: Bearer $API_TOKEN"
# Only running executionscurl "https://api.catalyzed.ai/pipeline-executions?pipelineId=EMbMEFLyUWEgvnhMWXVVa&status=running" \ -H "Authorization: Bearer $API_TOKEN"Pagination
Section titled “Pagination”curl "https://api.catalyzed.ai/pipeline-executions?pipelineId=EMbMEFLyUWEgvnhMWXVVa&page=1&pageSize=10" \ -H "Authorization: Bearer $API_TOKEN"Cancelling an Execution
Section titled “Cancelling an Execution”Cancel a running or pending execution:
Cancel execution
curl -X POST https://api.catalyzed.ai/pipeline-executions/GkR8I6rHBms3W4Qfa2-FN/cancel \ -H "Authorization: Bearer $API_TOKEN"await fetch( "https://api.catalyzed.ai/pipeline-executions/GkR8I6rHBms3W4Qfa2-FN/cancel", { method: "POST", headers: { Authorization: `Bearer ${apiToken}` }, });requests.post( "https://api.catalyzed.ai/pipeline-executions/GkR8I6rHBms3W4Qfa2-FN/cancel", headers={"Authorization": f"Bearer {api_token}"})Execution Output
Section titled “Execution Output”When an execution succeeds, the output field contains the result. The outputCitations field contains citations that map output fields to source documents, enabling attribution and provenance tracking.
See the Citations guide for details on resolving and displaying citations.
{ "executionId": "GkR8I6rHBms3W4Qfa2-FN", "pipelineId": "EMbMEFLyUWEgvnhMWXVVa", "pipelineName": "Document Summarizer", "pipelineHandlerType": "language_model", "pipelineInputsSchema": { "files": [], "datasets": [], "dataInputs": [ { "id": "query", "label": "Query", "schema": { "type": "string" }, "required": true } ] }, "pipelineOutputsSchema": { "files": [], "datasets": [], "dataInputs": [ { "id": "summary", "label": "Summary", "schema": { "type": "string" }, "required": true } ] }, "pipelineConfiguration": {}, "status": "succeeded", "input": { "dataInputs": { "query": "What are the key trends in Q4 sales?" } }, "output": { "summary": "Q4 sales showed a 15% increase year-over-year...", "keyPoints": [ "Revenue grew 15% YoY", "New customer acquisition up 22%", "Average order value increased by $45" ] }, "outputCitations": [ { "outputPointer": "", "citations": [ { "type": "pdf_character_range", "pdfFileExtractionId": "extr_abc123", "charStart": 1500, "charEnd": 1550 } ] } ], "errorMessage": null, "progressPct": 100, "progressMessage": "Complete", "createdAt": "2024-01-15T10:30:00Z", "startedAt": "2024-01-15T10:30:02Z", "completedAt": "2024-01-15T10:32:15Z", "createdBy": "usr_abc123"}Error Handling
Section titled “Error Handling”When an execution fails, check the errorMessage:
{ "executionId": "GkR8I6rHBms3W4Qfa2-FN", "pipelineId": "EMbMEFLyUWEgvnhMWXVVa", "pipelineName": "Document Summarizer", "pipelineHandlerType": "language_model", "pipelineInputsSchema": { "files": [], "datasets": [], "dataInputs": [ { "id": "query", "label": "Query", "schema": { "type": "string" }, "required": true } ] }, "pipelineOutputsSchema": { "files": [], "datasets": [], "dataInputs": [ { "id": "summary", "label": "Summary", "schema": { "type": "string" }, "required": true } ] }, "pipelineConfiguration": {}, "status": "failed", "input": { "dataInputs": { "query": "What are the key trends in Q4 sales?" } }, "output": null, "outputCitations": [], "errorMessage": "Rate limit exceeded", "progressPct": 50, "progressMessage": "Processing with language model...", "createdAt": "2024-01-15T10:30:00Z", "startedAt": "2024-01-15T10:30:02Z", "completedAt": "2024-01-15T10:31:00Z", "createdBy": "usr_abc123"}Common Error Types
Section titled “Common Error Types”| Error | Cause | Resolution |
|---|---|---|
Rate limit exceeded | Too many API calls | Retry with backoff |
Input validation failed | Invalid input data | Check input schema |
Context too large | Input exceeds token limit | Reduce input size |
Insufficient credits | Team out of credits | Add credits |
Progress Tracking
Section titled “Progress Tracking”Long-running executions report progress:
| Field | Description |
|---|---|
progressPct | Completion percentage (0-100) |
progressMessage | Human-readable status message |
Example progress flow:
0% - "Initializing..."20% - "Loading context documents..."50% - "Processing with language model..."80% - "Formatting results..."100% - "Complete"Execution Properties
Section titled “Execution Properties”| Field | Type | Description |
|---|---|---|
executionId | string | Unique identifier |
pipelineId | string | Parent pipeline |
pipelineName | string | Pipeline name (at execution time) |
pipelineHandlerType | string | Handler type used for execution |
pipelineInputsSchema | object | Input schema at execution time (files, datasets, dataInputs) |
pipelineOutputsSchema | object | Output schema at execution time (files, datasets, dataInputs) |
pipelineConfiguration | object | Pipeline config at execution time |
status | string | Current status |
progressPct | number | Progress percentage (0-100) |
progressMessage | string | Status message |
input | object | Input data provided (files, datasets, dataInputs) |
output | object | Result (when succeeded) |
outputCitations | array | Citations mapping output fields to source documents |
handlerOutput | object | Handler-specific runtime metadata (see below) |
errorMessage | string | Error details (when failed) |
createdAt | timestamp | When execution was created |
startedAt | timestamp | When execution started running |
completedAt | timestamp | When execution finished |
createdBy | string | User who triggered this execution |
Handler Output Types
Section titled “Handler Output Types”The handlerOutput field contains handler-specific runtime metadata. The structure varies by handler type:
language_model:
{ "handlerType": "language_model", "trace": { /* optional execution trace */ }}streaming_language_model:
{ "handlerType": "streaming_language_model", "streamingChannelId": "ch_xyz123"}The streamingChannelId is used to subscribe to the token-level SSE stream at /channels/{channelId}/stream. Note: This field is null until the worker begins processing the execution.
code_agent_language_model:
{ "handlerType": "code_agent_language_model"}no_op:
{ "handlerType": "no_op"}Execution History in the App
Section titled “Execution History in the App”The Catalyzed app provides a pipeline execution history view:
- Navigate to Pipelines
- Click on a pipeline
- Select the History tab
From here you can:
- View all past executions
- Filter by status
- View execution details (input, output, timing)
- Cancel running executions
Promoting Executions to Examples
Section titled “Promoting Executions to Examples”Convert successful executions into ground truth examples for evaluations. You can either add to an existing example set or create a new one:
Option 1: Add to existing example set
curl -X POST https://api.catalyzed.ai/pipeline-executions/GkR8I6rHBms3W4Qfa2-FN/promote-to-example \ -H "Authorization: Bearer $API_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "existingExampleSetId": "KjR8I6rHBms3W4Qfa2-FN", "exampleName": "Verified production example", "rationale": "Output verified by domain expert" }'Option 2: Create new example set
curl -X POST https://api.catalyzed.ai/pipeline-executions/GkR8I6rHBms3W4Qfa2-FN/promote-to-example \ -H "Authorization: Bearer $API_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "newExampleSet": { "name": "Production Examples", "description": "Verified examples from production runs" }, "exampleName": "Verified production example", "rationale": "Output verified by domain expert" }'This creates an example using the execution’s input and output as ground truth. See the Promoting Executions Guide for details.
Citations
Section titled “Citations”Pipeline executions include outputCitations that map output fields to source documents. To display citations in your UI:
- Extract citations from
outputCitations - Resolve them using
/citations/resolve - Display file names, page numbers, and cited text
See the Citations guide for complete documentation on working with citations.
API Reference
Section titled “API Reference”See the Pipeline Executions API for complete endpoint documentation.