Pipelines
Pipelines are automated workflows that process data, run AI tasks, or transform information. Define a pipeline once, then trigger it on demand with different inputs.
Pipeline Structure
Section titled “Pipeline Structure”A pipeline consists of:
- Handler Type - The type of processing (e.g.,
language_model) - Input Schema - What data the pipeline accepts (files, datasets, dataInputs)
- Output Schema - What data the pipeline produces (files, datasets, dataInputs)
- Configuration - Settings and parameters
Input and Output Schemas
Section titled “Input and Output Schemas”Input and output schemas define the structure of data that flows through a pipeline. Each schema contains three types of slots:
- Files - References to uploaded files (PDFs, CSVs, documents)
- Datasets - References to datasets (collections of tables)
- Data Inputs - Structured JSON data (strings, numbers, arrays, objects)
Each slot has:
id- Unique identifier used when triggering the pipelinelabel- Human-readable namedescription- Optional descriptionrequired- Whether the slot must be providedmultiple- Whether multiple values are allowed (for files and datasets)schema- JSON Schema defining the data structure (for dataInputs)
Data Input Schema Types
Section titled “Data Input Schema Types”The schema field on data inputs accepts JSON Schema objects. The following types are supported for language_model pipelines:
Basic Types
Section titled “Basic Types”| Type | Schema | Description |
|---|---|---|
| String | { "type": "string" } | Text value |
| Integer | { "type": "integer" } | Whole number |
| Number | { "type": "number" } | Decimal number |
| Boolean | { "type": "boolean" } | True or false |
Enum (Constrained Values)
Section titled “Enum (Constrained Values)”Restrict a string to a fixed set of allowed values:
{ "type": "string", "enum": ["positive", "negative", "neutral"]}A list of values with a typed item schema:
{ "type": "array", "items": { "type": "string" }}Items can be any supported type, including objects:
{ "type": "array", "items": { "type": "object", "properties": { "name": { "type": "string" }, "score": { "type": "number" } }, "required": ["name", "score"] }}Object
Section titled “Object”A structured object with named properties. Use the required array to specify which properties must be present — properties not listed are optional and default to null:
{ "type": "object", "properties": { "summary": { "type": "string" }, "confidence": { "type": "number" }, "notes": { "type": "string" } }, "required": ["summary", "confidence"]}In this example, notes is optional and may be null in the output.
Record (Typed Map)
Section titled “Record (Typed Map)”A dictionary with arbitrary string keys and typed values. Use additionalProperties to define the value type — this follows the standard JSON Schema pattern for Record<string, T>:
{ "type": "object", "additionalProperties": { "type": "string" }}This tells the LLM to produce a map of string keys to string values (e.g., { "color": "blue", "size": "large" }). Without additionalProperties, a bare { "type": "object" } gives the LLM no guidance on value types.
The value schema can be any supported type:
{ "type": "object", "additionalProperties": { "type": "number" }}{ "type": "object", "additionalProperties": { "type": "object", "properties": { "label": { "type": "string" }, "score": { "type": "number" } }, "required": ["label", "score"] }}Nullable Types
Section titled “Nullable Types”To indicate that a field’s value can be null, use the anyOf pattern with a null type variant:
{ "anyOf": [{ "type": "string" }, { "type": "null" }]}This is the standard JSON Schema Draft 7 representation and what tools like Zod produce for .nullable(). The shorthand { "type": ["string", "null"] } is also supported.
Nullable types work with any base type:
{ "anyOf": [{ "type": "integer" }, { "type": "null" }]}{ "anyOf": [ { "type": "array", "items": { "type": "string" } }, { "type": "null" } ]}Nullable properties can also appear inside object schemas. A property can be both required (must always be present in the output) and nullable (its value can be null):
{ "type": "object", "properties": { "name": { "type": "string" }, "nickname": { "anyOf": [{ "type": "string" }, { "type": "null" }] } }, "required": ["name", "nickname"]}Creating a Pipeline
Section titled “Creating a Pipeline”Create a pipeline
curl -X POST https://api.catalyzed.ai/pipelines \ -H "Authorization: Bearer $API_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "teamId": "ZkoDMyjZZsXo4VAO_nJLk", "name": "Document Summarizer", "description": "Summarize uploaded documents using AI", "handlerType": "language_model", "inputsSchema": { "files": [ { "id": "document", "label": "Document", "description": "Document to summarize", "required": true, "multiple": false, "contextRetrievalMode": "full" } ], "datasets": [], "dataInputs": [] }, "outputsSchema": { "files": [], "datasets": [], "dataInputs": [ { "id": "summary", "label": "Summary", "description": "Generated summary", "schema": { "type": "string" }, "required": true } ] }, "configuration": { "files": [], "datasets": [], "dataInputs": [] } }'const response = await fetch("https://api.catalyzed.ai/pipelines", { method: "POST", headers: { Authorization: `Bearer ${apiToken}`, "Content-Type": "application/json", }, body: JSON.stringify({ teamId: "ZkoDMyjZZsXo4VAO_nJLk", name: "Document Summarizer", description: "Summarize uploaded documents using AI", handlerType: "language_model", inputsSchema: { files: [ { id: "document", label: "Document", description: "Document to summarize", required: true, multiple: false, contextRetrievalMode: "full", }, ], datasets: [], dataInputs: [], }, outputsSchema: { files: [], datasets: [], dataInputs: [ { id: "summary", label: "Summary", description: "Generated summary", schema: { type: "string" }, required: true, }, ], }, configuration: { files: [], datasets: [], dataInputs: [], }, }),});const pipeline = await response.json();response = requests.post( "https://api.catalyzed.ai/pipelines", headers={"Authorization": f"Bearer {api_token}"}, json={ "teamId": "ZkoDMyjZZsXo4VAO_nJLk", "name": "Document Summarizer", "description": "Summarize uploaded documents using AI", "handlerType": "language_model", "inputsSchema": { "files": [ { "id": "document", "label": "Document", "description": "Document to summarize", "required": True, "multiple": False, "contextRetrievalMode": "full" } ], "datasets": [], "dataInputs": [] }, "outputsSchema": { "files": [], "datasets": [], "dataInputs": [ { "id": "summary", "label": "Summary", "description": "Generated summary", "schema": {"type": "string"}, "required": True } ] }, "configuration": { "files": [], "datasets": [], "dataInputs": [] } })pipeline = response.json()Response:
{ "pipelineId": "EMbMEFLyUWEgvnhMWXVVa", "teamId": "ZkoDMyjZZsXo4VAO_nJLk", "name": "Document Summarizer", "description": "Summarize uploaded documents using AI", "handlerType": "language_model", "activeConfigurationId": "cfg_abc123", "status": "active", "inputsSchema": { "files": [ { "id": "document", "label": "Document", "description": "Document to summarize", "required": true, "multiple": false, "contextRetrievalMode": "full" } ], "datasets": [], "dataInputs": [] }, "outputsSchema": { "files": [], "datasets": [], "dataInputs": [ { "id": "summary", "label": "Summary", "description": "Generated summary", "schema": { "type": "string" }, "required": true } ] }, "configuration": { ... }, "createdAt": "2024-01-15T10:30:00Z", "updatedAt": "2024-01-15T10:30:00Z", "createdBy": "usr_abc123"}Pipeline Status
Section titled “Pipeline Status”| Status | Description |
|---|---|
active | Pipeline can be triggered |
archived | Pipeline is disabled and cannot be triggered |
Listing Pipelines
Section titled “Listing Pipelines”List pipelines
curl "https://api.catalyzed.ai/pipelines?teamIds=ZkoDMyjZZsXo4VAO_nJLk" \ -H "Authorization: Bearer $API_TOKEN"const response = await fetch( "https://api.catalyzed.ai/pipelines?teamIds=ZkoDMyjZZsXo4VAO_nJLk", { headers: { Authorization: `Bearer ${apiToken}` } });const { pipelines } = await response.json();response = requests.get( "https://api.catalyzed.ai/pipelines", params={"teamIds": "ZkoDMyjZZsXo4VAO_nJLk"}, headers={"Authorization": f"Bearer {api_token}"})pipelines = response.json()["pipelines"]Getting a Pipeline
Section titled “Getting a Pipeline”Get pipeline details
curl https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa \ -H "Authorization: Bearer $API_TOKEN"const response = await fetch( "https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa", { headers: { Authorization: `Bearer ${apiToken}` } });const pipeline = await response.json();response = requests.get( "https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa", headers={"Authorization": f"Bearer {api_token}"})pipeline = response.json()Triggering a Pipeline
Section titled “Triggering a Pipeline”Start a pipeline execution with input data:
Trigger a pipeline
curl -X POST https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/trigger \ -H "Authorization: Bearer $API_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "input": { "files": { "document": "LvrGb8UaJk_IjmzaxuMAb" }, "dataInputs": { "query": "Summarize the key findings from this document" } } }'const response = await fetch( "https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/trigger", { method: "POST", headers: { Authorization: `Bearer ${apiToken}`, "Content-Type": "application/json", }, body: JSON.stringify({ input: { files: { document: "LvrGb8UaJk_IjmzaxuMAb", }, dataInputs: { query: "Summarize the key findings from this document", }, }, }), });const execution = await response.json();response = requests.post( "https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/trigger", headers={"Authorization": f"Bearer {api_token}"}, json={ "input": { "files": { "document": "LvrGb8UaJk_IjmzaxuMAb" }, "dataInputs": { "query": "Summarize the key findings from this document" } } })execution = response.json()Response:
{ "executionId": "GkR8I6rHBms3W4Qfa2-FN", "status": "pending", "createdAt": "2024-01-15T10:30:00Z"}See Executions for monitoring execution progress.
Updating a Pipeline
Section titled “Updating a Pipeline”The PUT endpoint only updates pipeline metadata (name, description) and allows setting the active configuration version. To update the actual configuration content (inputsSchema, outputsSchema, configuration), use the Pipeline Configurations endpoint.
Update pipeline
curl -X PUT https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa \ -H "Authorization: Bearer $API_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "name": "Document Summarizer v2", "description": "Updated description" }'await fetch("https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa", { method: "PUT", headers: { Authorization: `Bearer ${apiToken}`, "Content-Type": "application/json", }, body: JSON.stringify({ name: "Document Summarizer v2", description: "Updated description", }),});requests.put( "https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa", headers={"Authorization": f"Bearer {api_token}"}, json={ "name": "Document Summarizer v2", "description": "Updated description" })Archiving a Pipeline
Section titled “Archiving a Pipeline”Archived pipelines cannot be triggered but retain their execution history:
Archive pipeline
curl -X POST https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/archive \ -H "Authorization: Bearer $API_TOKEN"await fetch("https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/archive", { method: "POST", headers: { Authorization: `Bearer ${apiToken}` },});requests.post( "https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/archive", headers={"Authorization": f"Bearer {api_token}"})Reactivate an Archived Pipeline
Section titled “Reactivate an Archived Pipeline”curl -X POST https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/reactivate \ -H "Authorization: Bearer $API_TOKEN"Deleting a Pipeline
Section titled “Deleting a Pipeline”Delete pipeline
curl -X DELETE https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa \ -H "Authorization: Bearer $API_TOKEN"await fetch("https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa", { method: "DELETE", headers: { Authorization: `Bearer ${apiToken}` },});requests.delete( "https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa", headers={"Authorization": f"Bearer {api_token}"})Pipeline Configuration
Section titled “Pipeline Configuration”Input Sources
Section titled “Input Sources”Pipelines can reference:
- Files - Uploaded documents for processing
- Datasets - Tables for data retrieval
- Data Inputs - Specific table columns for context
Handler Types
Section titled “Handler Types”| Type | Description | Web Context |
|---|---|---|
language_model | AI-powered text generation with context retrieval | ✅ URL scraping & web search |
streaming_language_model | Real-time streaming LLM with inline citations and fixed output schema | ❌ Not supported |
code_agent_language_model | Code agent with Python code generation and bidirectional tool callbacks | ✅ Via tools (web_search, web_scrape) |
embedding | Generate vector embeddings from text arrays with fixed input/output schemas | N/A |
text_classification | Classify text into predefined categories (sentiment, topic) with confidence scores | N/A |
zero_shot_classification | Classify text into user-defined categories using natural language inference | N/A |
ner | Extract named entities (people, organizations, locations) with positions and confidence scores | N/A |
rerank | Score and rerank documents by relevance to a query using cross-encoder models | N/A |
code_interpreter | Execute Python code in a sandboxed interpreter with optional state persistence | N/A |
Language Model Configuration
Section titled “Language Model Configuration”Configuration contains optional pre-filled values for files, datasets, and dataInputs:
{ "files": [], "datasets": [], "dataInputs": [ { "id": "instructions", "label": "System Instructions", "type": "string", "value": { "value": "You are a helpful assistant..." } } ]}Configuration vs Runtime Input:
configuration.dataInputs- Pre-filled values that become part of the pipeline’s instructions (e.g., system prompts, settings)configuration.files/configuration.datasets- Pre-filled file/dataset references- Runtime
input- Actual data provided when triggering the pipeline
When triggering a pipeline, you provide runtime values in the input object. Configuration values are baked into the pipeline definition.
Input Priority
Section titled “Input Priority”LLMs tend to pay more attention to content at the beginning and end of a prompt, and less to content in the middle. The inputPriority option lets you control the ordering of inputs in the rendered prompt so you can push large background context early and keep important instructions or questions late where they receive more attention.
inputPriority is set on configuration.handlerOptions as a map of input slot IDs to numeric priority values:
{ "handlerOptions": { "inputPriority": { "background_context": -10, "reference_material": -5, "user_question": 10 } }}How it works:
- Lower values → earlier in the prompt (less attention)
- Higher values → later in the prompt (more attention)
- Unspecified inputs → default to
0, preserving their original relative order - Applies to both
configuration.dataInputs(system message) and runtimeinputsSchema.dataInputs(user message) retrieved_context(from files, datasets, web sources) is not affected — it is always appended last
Configuration Options:
| Field | Type | Default | Description |
|---|---|---|---|
inputPriority | Record<string, number> | undefined | Map of input slot IDs to numeric priority values. Lower = earlier in prompt, higher = later. Unspecified inputs default to 0. |
Validation:
Each key in inputPriority must match an id from inputsSchema.dataInputs or configuration.dataInputs. The API returns a 400 error if any key references an unknown input slot.
Handler support: language_model only.
Example — Financial Q&A with priority ordering:
Create pipeline with input priority
curl -X POST https://api.catalyzed.ai/pipelines \ -H "Authorization: Bearer $API_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "teamId": "ZkoDMyjZZsXo4VAO_nJLk", "name": "Financial Q&A", "handlerType": "language_model", "inputsSchema": { "files": [], "datasets": [], "dataInputs": [ { "id": "company_background", "label": "Company Background", "description": "Background information about the company", "schema": { "type": "string" }, "required": false }, { "id": "financial_data", "label": "Financial Data", "description": "Key financial metrics and figures", "schema": { "type": "string" }, "required": true }, { "id": "question", "label": "Question", "description": "The financial question to answer", "schema": { "type": "string" }, "required": true } ] }, "outputsSchema": { "files": [], "datasets": [], "dataInputs": [ { "id": "answer", "label": "Answer", "description": "The financial analysis answer", "schema": { "type": "string" }, "required": true } ] }, "configuration": { "files": [], "datasets": [], "dataInputs": [ { "id": "guidelines", "label": "Analysis Guidelines", "value": { "value": "Use conservative estimates. Cite specific figures." } } ], "handlerOptions": { "inputPriority": { "company_background": -10, "financial_data": 0, "question": 10 } } } }'const response = await fetch("https://api.catalyzed.ai/pipelines", { method: "POST", headers: { "Authorization": `Bearer ${apiToken}`, "Content-Type": "application/json" }, body: JSON.stringify({ teamId: "ZkoDMyjZZsXo4VAO_nJLk", name: "Financial Q&A", handlerType: "language_model", inputsSchema: { files: [], datasets: [], dataInputs: [ { id: "company_background", label: "Company Background", description: "Background information about the company", schema: { type: "string" }, required: false }, { id: "financial_data", label: "Financial Data", description: "Key financial metrics and figures", schema: { type: "string" }, required: true }, { id: "question", label: "Question", description: "The financial question to answer", schema: { type: "string" }, required: true } ] }, outputsSchema: { files: [], datasets: [], dataInputs: [ { id: "answer", label: "Answer", description: "The financial analysis answer", schema: { type: "string" }, required: true } ] }, configuration: { files: [], datasets: [], dataInputs: [ { id: "guidelines", label: "Analysis Guidelines", value: { value: "Use conservative estimates. Cite specific figures." } } ], handlerOptions: { inputPriority: { company_background: -10, financial_data: 0, question: 10 } } } })});const pipeline = await response.json();response = requests.post( "https://api.catalyzed.ai/pipelines", headers={ "Authorization": f"Bearer {api_token}", "Content-Type": "application/json" }, json={ "teamId": "ZkoDMyjZZsXo4VAO_nJLk", "name": "Financial Q&A", "handlerType": "language_model", "inputsSchema": { "files": [], "datasets": [], "dataInputs": [ { "id": "company_background", "label": "Company Background", "description": "Background information about the company", "schema": {"type": "string"}, "required": False }, { "id": "financial_data", "label": "Financial Data", "description": "Key financial metrics and figures", "schema": {"type": "string"}, "required": True }, { "id": "question", "label": "Question", "description": "The financial question to answer", "schema": {"type": "string"}, "required": True } ] }, "outputsSchema": { "files": [], "datasets": [], "dataInputs": [ { "id": "answer", "label": "Answer", "description": "The financial analysis answer", "schema": {"type": "string"}, "required": True } ] }, "configuration": { "files": [], "datasets": [], "dataInputs": [ { "id": "guidelines", "label": "Analysis Guidelines", "value": {"value": "Use conservative estimates. Cite specific figures."} } ], "handlerOptions": { "inputPriority": { "company_background": -10, "financial_data": 0, "question": 10 } } } })pipeline = response.json()With this configuration, when the pipeline executes, the LLM prompt will order the runtime inputs as: company_background (priority -10) → financial_data (priority 0) → question (priority 10). The question — the most important part — appears last where the model pays the most attention.
Dataset Filtering
Section titled “Dataset Filtering”Dataset slots in inputsSchema can include optional row-level filters to restrict which rows are queried. Filters are defined as an array of predicates that are combined with AND logic and applied server-side in the query engine.
Key Benefits:
- Row-level security - Enforce data isolation between tenants or users
- Performance optimization - Reduce query scope by filtering at the source
- Dynamic filtering - Use runtime values via
$refto reference execution inputs - Server-side enforcement - Filters cannot be bypassed by SQL injection
Filter Structure:
Each dataset slot can include a filter array with predicates:
{ "datasets": [ { "id": "sales", "label": "Sales Data", "required": true, "multiple": false, "filter": [ { "field": "tenant_id", "op": "eq", "value": { "$ref": "input.dataInputs.tenantId" } }, { "field": "deleted_at", "op": "is_null" } ] } ], "dataInputs": [ { "id": "tenantId", "label": "Tenant ID", "schema": { "type": "string" }, "required": true } ]}Supported Operators:
| Operator | Description | Example |
|---|---|---|
eq | Equal to | {"field": "status", "op": "eq", "value": "active"} |
neq | Not equal to | {"field": "type", "op": "neq", "value": "draft"} |
gt | Greater than | {"field": "amount", "op": "gt", "value": 100} |
gte | Greater than or equal | {"field": "score", "op": "gte", "value": 80} |
lt | Less than | {"field": "age", "op": "lt", "value": 18} |
lte | Less than or equal | {"field": "count", "op": "lte", "value": 10} |
in | In list | {"field": "category", "op": "in", "value": ["A", "B"]} |
not_in | Not in list | {"field": "status", "op": "not_in", "value": ["deleted", "archived"]} |
like | Pattern match | {"field": "email", "op": "like", "value": "%@example.com"} |
is_null | Is NULL | {"field": "deleted_at", "op": "is_null"} |
is_not_null | Is not NULL | {"field": "email", "op": "is_not_null"} |
Dynamic Values with $ref:
Use $ref to reference runtime values from dataInputs:
{ "field": "tenant_id", "op": "eq", "value": { "$ref": "input.dataInputs.tenantId" }}The $ref path must follow the pattern input.dataInputs.<id> where <id> matches a data input slot.
Complete Example - Multi-tenant Sales Pipeline:
Create pipeline with dataset filtering
curl -X POST https://api.catalyzed.ai/pipelines \ -H "Authorization: Bearer $API_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "teamId": "ZkoDMyjZZsXo4VAO_nJLk", "name": "Sales Analysis", "description": "Analyze sales data with tenant isolation", "handlerType": "language_model", "inputsSchema": { "files": [], "datasets": [ { "id": "sales", "label": "Sales Data", "description": "Sales transactions dataset", "required": true, "multiple": false, "filter": [ { "field": "tenant_id", "op": "eq", "value": { "$ref": "input.dataInputs.tenantId" } }, { "field": "deleted_at", "op": "is_null" }, { "field": "status", "op": "in", "value": ["completed", "pending"] } ] } ], "dataInputs": [ { "id": "tenantId", "label": "Tenant ID", "description": "Tenant identifier for data isolation", "schema": { "type": "string" }, "required": true }, { "id": "question", "label": "Question", "description": "Analysis question", "schema": { "type": "string" }, "required": true } ] }, "outputsSchema": { "files": [], "datasets": [], "dataInputs": [ { "id": "answer", "label": "Answer", "schema": { "type": "string" }, "required": true } ] }, "configuration": { "files": [], "datasets": [], "dataInputs": [] } }'const response = await fetch("https://api.catalyzed.ai/pipelines", { method: "POST", headers: { Authorization: `Bearer ${apiToken}`, "Content-Type": "application/json", }, body: JSON.stringify({ teamId: "ZkoDMyjZZsXo4VAO_nJLk", name: "Sales Analysis", description: "Analyze sales data with tenant isolation", handlerType: "language_model", inputsSchema: { files: [], datasets: [ { id: "sales", label: "Sales Data", description: "Sales transactions dataset", required: true, multiple: false, filter: [ { field: "tenant_id", op: "eq", value: { $ref: "input.dataInputs.tenantId" }, }, { field: "deleted_at", op: "is_null", }, { field: "status", op: "in", value: ["completed", "pending"], }, ], }, ], dataInputs: [ { id: "tenantId", label: "Tenant ID", description: "Tenant identifier for data isolation", schema: { type: "string" }, required: true, }, { id: "question", label: "Question", description: "Analysis question", schema: { type: "string" }, required: true, }, ], }, outputsSchema: { files: [], datasets: [], dataInputs: [ { id: "answer", label: "Answer", schema: { type: "string" }, required: true, }, ], }, configuration: { files: [], datasets: [], dataInputs: [], }, }),});response = requests.post( "https://api.catalyzed.ai/pipelines", headers={"Authorization": f"Bearer {api_token}"}, json={ "teamId": "ZkoDMyjZZsXo4VAO_nJLk", "name": "Sales Analysis", "description": "Analyze sales data with tenant isolation", "handlerType": "language_model", "inputsSchema": { "files": [], "datasets": [ { "id": "sales", "label": "Sales Data", "description": "Sales transactions dataset", "required": True, "multiple": False, "filter": [ { "field": "tenant_id", "op": "eq", "value": {"$ref": "input.dataInputs.tenantId"} }, { "field": "deleted_at", "op": "is_null" }, { "field": "status", "op": "in", "value": ["completed", "pending"] } ] } ], "dataInputs": [ { "id": "tenantId", "label": "Tenant ID", "description": "Tenant identifier for data isolation", "schema": {"type": "string"}, "required": True }, { "id": "question", "label": "Question", "description": "Analysis question", "schema": {"type": "string"}, "required": True } ] }, "outputsSchema": { "files": [], "datasets": [], "dataInputs": [ { "id": "answer", "label": "Answer", "schema": {"type": "string"}, "required": True } ] }, "configuration": { "files": [], "datasets": [], "dataInputs": [] } })Triggering with Filter Values:
When triggering the pipeline, provide the tenant ID that will be substituted into the filter:
curl -X POST https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/trigger \ -H "Authorization: Bearer $API_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "input": { "datasets": { "sales": "9Yh1BRvQhmFUYrSDZTcRz" }, "dataInputs": { "tenantId": "tenant-xyz", "question": "What were the total sales last month?" } } }'The query engine will automatically filter the sales table to only include rows where:
tenant_id = 'tenant-xyz'deleted_at IS NULLstatus IN ('completed', 'pending')
Security Considerations:
- Filters are applied server-side in the query engine using DataFusion’s DataFrame API
- Filters cannot be bypassed via SQL injection or query manipulation
- All filter values are parameterized and SQL-escaped before execution
- The LLM-generated SQL queries the filtered view transparently
- Filter logic is validated when creating the pipeline
Web Context Retrieval
Section titled “Web Context Retrieval”The language_model handler can automatically enrich LLM context by fetching content from the web. This feature enables pipelines to work with real-time information and user-provided URLs without manual data ingestion.
Available features:
- URL Scraping - Automatically detect and scrape URLs in user inputs
- Web Search - Generate search queries and fetch results via Tavily API
Both features inject content before LLM generation, making web data available in the prompt context. These features are only available in the language_model handler - they are not supported in streaming_language_model or available in code_agent_language_model (which has different tool-based implementations).
URL Scraping
Section titled “URL Scraping”URL scraping automatically detects HTTP/HTTPS URLs in user inputs, fetches their content, and injects it into the LLM prompt context. This feature is enabled by default.
Key Features:
- Automatic URL detection in user inputs via regex pattern
/https?:\/\/[^\s]+/gi - Pre-LLM content injection (scrape completes before model runs)
- JavaScript rendering support for dynamic content
- Multiple content formats: HTML, Markdown, Text
Configuration:
URL scraping is configured in configuration.handlerOptions.urlScraping:
{ "urlScraping": { "enabled": true, // default: true "contentType": "markdown", // "html" | "markdown" | "text", default: "markdown" "renderJs": true // default: true, enables JavaScript rendering }}Configuration Options:
| Field | Type | Default | Description |
|---|---|---|---|
enabled | boolean | true | Enable automatic URL detection and scraping |
contentType | string | "markdown" | Content format: "html", "markdown", or "text" |
renderJs | boolean | true | Enable JavaScript rendering to capture dynamic content |
Creating a Pipeline with URL Scraping:
Create pipeline with URL scraping
curl -X POST https://api.catalyzed.ai/pipelines \ -H "Authorization: Bearer $API_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "teamId": "ZkoDMyjZZsXo4VAO_nJLk", "name": "Article Summarizer", "handlerType": "language_model", "inputsSchema": { "files": [], "datasets": [], "dataInputs": [ { "id": "article_url", "label": "Article URL", "description": "URL of the article to summarize", "schema": { "type": "string" }, "required": true } ] }, "outputsSchema": { "files": [], "datasets": [], "dataInputs": [ { "id": "summary", "label": "Summary", "description": "Generated article summary", "schema": { "type": "string" }, "required": true } ] }, "configuration": { "files": [], "datasets": [], "dataInputs": [], "handlerOptions": { "urlScraping": { "enabled": true, "contentType": "markdown", "renderJs": true } } } }'const response = await fetch("https://api.catalyzed.ai/pipelines", { method: "POST", headers: { "Authorization": `Bearer ${apiToken}`, "Content-Type": "application/json" }, body: JSON.stringify({ teamId: "ZkoDMyjZZsXo4VAO_nJLk", name: "Article Summarizer", handlerType: "language_model", inputsSchema: { files: [], datasets: [], dataInputs: [ { id: "article_url", label: "Article URL", description: "URL of the article to summarize", schema: { type: "string" }, required: true } ] }, outputsSchema: { files: [], datasets: [], dataInputs: [ { id: "summary", label: "Summary", description: "Generated article summary", schema: { type: "string" }, required: true } ] }, configuration: { files: [], datasets: [], dataInputs: [], handlerOptions: { urlScraping: { enabled: true, contentType: "markdown", renderJs: true } } } })});const pipeline = await response.json();response = requests.post( "https://api.catalyzed.ai/pipelines", headers={ "Authorization": f"Bearer {api_token}", "Content-Type": "application/json" }, json={ "teamId": "ZkoDMyjZZsXo4VAO_nJLk", "name": "Article Summarizer", "handlerType": "language_model", "inputsSchema": { "files": [], "datasets": [], "dataInputs": [ { "id": "article_url", "label": "Article URL", "description": "URL of the article to summarize", "schema": {"type": "string"}, "required": True } ] }, "outputsSchema": { "files": [], "datasets": [], "dataInputs": [ { "id": "summary", "label": "Summary", "description": "Generated article summary", "schema": {"type": "string"}, "required": True } ] }, "configuration": { "files": [], "datasets": [], "dataInputs": [], "handlerOptions": { "urlScraping": { "enabled": True, "contentType": "markdown", "renderJs": True } } } })pipeline = response.json()Triggering the Pipeline:
Trigger pipeline with URL in input
curl -X POST https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/trigger \ -H "Authorization: Bearer $API_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "input": { "dataInputs": { "article_url": "https://example.com/article" } } }'const response = await fetch( "https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/trigger", { method: "POST", headers: { "Authorization": `Bearer ${apiToken}`, "Content-Type": "application/json" }, body: JSON.stringify({ input: { dataInputs: { article_url: "https://example.com/article" } } }) });const execution = await response.json();response = requests.post( "https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/trigger", headers={ "Authorization": f"Bearer {api_token}", "Content-Type": "application/json" }, json={ "input": { "dataInputs": { "article_url": "https://example.com/article" } } })execution = response.json()The URL https://example.com/article will be automatically detected, scraped, and its content will be injected into the LLM context before generation.
Use Cases:
- Summarizing news articles or blog posts shared by users
- Extracting structured data from documentation pages
- Analyzing competitor websites or product pages
- Processing user-submitted links in support tickets or feedback forms
Web Search
Section titled “Web Search”Web search enables pipelines to fetch real-time information from the web using the Tavily search API. The system uses a two-phase approach: first, an LLM generates focused search queries based on the task, then Tavily executes those searches and formats the results.
Key Features:
- Two-phase execution: LLM generates queries → Tavily executes searches
- Pre-LLM result injection (search completes before main model runs)
- Configurable query count (1-5 queries) and results per query (1-20 results)
- Search depth control:
basic(1 credit) oradvanced(2 credits) - Optional AI-generated answer summaries from Tavily
Configuration:
Web search is configured in configuration.handlerOptions.webSearch. Web search is disabled by default - you must explicitly set enabled: true.
{ "webSearch": { "enabled": true, // required to enable web search "maxQueries": 3, // 1-5, default: 3 "maxResultsPerQuery": 5, // 1-20, default: 5 "searchDepth": "basic", // "basic" | "advanced", default: "basic" "includeAnswer": false // default: false }}Configuration Options:
| Field | Type | Default | Description |
|---|---|---|---|
enabled | boolean | (required) | Enable automatic web search query generation and execution |
maxQueries | number | 3 | Maximum number of queries to generate (1-5) |
maxResultsPerQuery | number | 5 | Maximum results per query (1-20) |
searchDepth | string | "basic" | Search depth: "basic" (1 credit) or "advanced" (2 credits) |
includeAnswer | boolean | false | Include AI-generated answer summary from Tavily |
How It Works:
- LLM analyzes the task and user input to generate 1-5 focused search queries
- Tavily API executes searches in parallel
- Results are formatted with titles, URLs, snippets, and relevance scores
- Formatted search results are injected into the LLM context
- Main LLM generation runs with search results available in the prompt
Creating a Pipeline with Web Search:
Create pipeline with web search
curl -X POST https://api.catalyzed.ai/pipelines \ -H "Authorization: Bearer $API_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "teamId": "ZkoDMyjZZsXo4VAO_nJLk", "name": "Market Research Assistant", "handlerType": "language_model", "inputsSchema": { "files": [], "datasets": [], "dataInputs": [ { "id": "research_query", "label": "Research Query", "description": "What would you like to research?", "schema": { "type": "string" }, "required": true } ] }, "outputsSchema": { "files": [], "datasets": [], "dataInputs": [ { "id": "analysis", "label": "Analysis", "description": "Research analysis and findings", "schema": { "type": "string" }, "required": true } ] }, "configuration": { "files": [], "datasets": [], "dataInputs": [], "handlerOptions": { "webSearch": { "enabled": true, "maxQueries": 3, "maxResultsPerQuery": 5, "searchDepth": "basic", "includeAnswer": false } } } }'const response = await fetch("https://api.catalyzed.ai/pipelines", { method: "POST", headers: { "Authorization": `Bearer ${apiToken}`, "Content-Type": "application/json" }, body: JSON.stringify({ teamId: "ZkoDMyjZZsXo4VAO_nJLk", name: "Market Research Assistant", handlerType: "language_model", inputsSchema: { files: [], datasets: [], dataInputs: [ { id: "research_query", label: "Research Query", description: "What would you like to research?", schema: { type: "string" }, required: true } ] }, outputsSchema: { files: [], datasets: [], dataInputs: [ { id: "analysis", label: "Analysis", description: "Research analysis and findings", schema: { type: "string" }, required: true } ] }, configuration: { files: [], datasets: [], dataInputs: [], handlerOptions: { webSearch: { enabled: true, maxQueries: 3, maxResultsPerQuery: 5, searchDepth: "basic", includeAnswer: false } } } })});const pipeline = await response.json();response = requests.post( "https://api.catalyzed.ai/pipelines", headers={ "Authorization": f"Bearer {api_token}", "Content-Type": "application/json" }, json={ "teamId": "ZkoDMyjZZsXo4VAO_nJLk", "name": "Market Research Assistant", "handlerType": "language_model", "inputsSchema": { "files": [], "datasets": [], "dataInputs": [ { "id": "research_query", "label": "Research Query", "description": "What would you like to research?", "schema": {"type": "string"}, "required": True } ] }, "outputsSchema": { "files": [], "datasets": [], "dataInputs": [ { "id": "analysis", "label": "Analysis", "description": "Research analysis and findings", "schema": {"type": "string"}, "required": True } ] }, "configuration": { "files": [], "datasets": [], "dataInputs": [], "handlerOptions": { "webSearch": { "enabled": True, "maxQueries": 3, "maxResultsPerQuery": 5, "searchDepth": "basic", "includeAnswer": False } } } })pipeline = response.json()Use Cases:
- Real-time market research and competitive analysis
- Current events analysis and news monitoring
- Fact-checking claims with recent information
- Gathering background information for decision-making
Context Assembly Order
Section titled “Context Assembly Order”When multiple context sources are configured (URL scraping, web search, files, datasets), they are assembled in a specific priority order optimized for LLM attention patterns:
1. Scraped URLs ← Most specific (user explicitly provided URLs)2. Web Search Results ← Current/dynamic information from the web3. File Context ← Uploaded documents (semantic or full retrieval)4. Dataset Context ← Structured data from SQL queries ↓ Most generalThis order ensures the most relevant and specific information appears first in the LLM context, where attention mechanisms are most effective.
Combined Context Example:
Pipeline with multiple context sources
curl -X POST https://api.catalyzed.ai/pipelines \ -H "Authorization: Bearer $API_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "teamId": "ZkoDMyjZZsXo4VAO_nJLk", "name": "Comprehensive Research Assistant", "handlerType": "language_model", "inputsSchema": { "files": [ { "id": "background_docs", "label": "Background Documents", "required": false, "multiple": true, "contextRetrievalMode": "semantic" } ], "datasets": [ { "id": "historical_data", "label": "Historical Data", "required": false, "multiple": false } ], "dataInputs": [ { "id": "question", "label": "Research Question", "schema": { "type": "string" }, "required": true } ] }, "outputsSchema": { "files": [], "datasets": [], "dataInputs": [ { "id": "comprehensive_analysis", "label": "Comprehensive Analysis", "schema": { "type": "string" }, "required": true } ] }, "configuration": { "files": [], "datasets": [], "dataInputs": [], "handlerOptions": { "urlScraping": { "enabled": true, "contentType": "markdown" }, "webSearch": { "enabled": true, "maxQueries": 2, "maxResultsPerQuery": 5 } } } }'const response = await fetch("https://api.catalyzed.ai/pipelines", { method: "POST", headers: { "Authorization": `Bearer ${apiToken}`, "Content-Type": "application/json" }, body: JSON.stringify({ teamId: "ZkoDMyjZZsXo4VAO_nJLk", name: "Comprehensive Research Assistant", handlerType: "language_model", inputsSchema: { files: [ { id: "background_docs", label: "Background Documents", required: false, multiple: true, contextRetrievalMode: "semantic" } ], datasets: [ { id: "historical_data", label: "Historical Data", required: false, multiple: false } ], dataInputs: [ { id: "question", label: "Research Question", schema: { type: "string" }, required: true } ] }, outputsSchema: { files: [], datasets: [], dataInputs: [ { id: "comprehensive_analysis", label: "Comprehensive Analysis", schema: { type: "string" }, required: true } ] }, configuration: { files: [], datasets: [], dataInputs: [], handlerOptions: { urlScraping: { enabled: true, contentType: "markdown" }, webSearch: { enabled: true, maxQueries: 2, maxResultsPerQuery: 5 } } } })});const pipeline = await response.json();response = requests.post( "https://api.catalyzed.ai/pipelines", headers={ "Authorization": f"Bearer {api_token}", "Content-Type": "application/json" }, json={ "teamId": "ZkoDMyjZZsXo4VAO_nJLk", "name": "Comprehensive Research Assistant", "handlerType": "language_model", "inputsSchema": { "files": [ { "id": "background_docs", "label": "Background Documents", "required": False, "multiple": True, "contextRetrievalMode": "semantic" } ], "datasets": [ { "id": "historical_data", "label": "Historical Data", "required": False, "multiple": False } ], "dataInputs": [ { "id": "question", "label": "Research Question", "schema": {"type": "string"}, "required": True } ] }, "outputsSchema": { "files": [], "datasets": [], "dataInputs": [ { "id": "comprehensive_analysis", "label": "Comprehensive Analysis", "schema": {"type": "string"}, "required": True } ] }, "configuration": { "files": [], "datasets": [], "dataInputs": [], "handlerOptions": { "urlScraping": { "enabled": True, "contentType": "markdown" }, "webSearch": { "enabled": True, "maxQueries": 2, "maxResultsPerQuery": 5 } } } })pipeline = response.json()Handler Support Comparison
Section titled “Handler Support Comparison”Different handler types have different approaches to web context retrieval:
| Feature | language_model | streaming_language_model | code_agent_language_model |
|---|---|---|---|
| URL Scraping | ✅ Automatic pre-LLM | ❌ Not supported | ✅ Via web_scrape tool |
| Web Search | ✅ Automatic pre-LLM | ❌ Not supported | ✅ Via web_search tool |
| Implementation | Configuration-driven | N/A | Tool-based (agent decides) |
| Latency | Higher (pre-processing) | N/A | Variable (agent reasoning) |
Key Differences:
-
language_model: Features run automatically before every LLM call when enabled in configuration. Context is always fetched, even if not needed for the specific query. -
streaming_language_model: Does not support web context features. This handler has a fixed output schema and no pre-processing phase. -
code_agent_language_model: Python code agent can callweb_searchandweb_scrapetools dynamically during execution. The agent decides when and how to use these tools based on the task, but this adds reasoning overhead and latency.
Streaming Language Model Handler
Section titled “Streaming Language Model Handler”The streaming_language_model handler enables real-time token-level streaming with inline citations. Unlike the standard language_model handler, it has a fixed output schema that cannot be modified.
Key Features:
- Real-time streaming - Tokens are delivered as they’re generated
- Inline citations - References appear as human-readable chunk IDs like
[swift_falcon],[blue_river]in the output - Fixed output schema - Always returns
{ content: string } - Channel-based delivery - Uses server-sent events (SSE) for streaming
Output Schema (Fixed):
The output schema is automatically set and cannot be modified:
{ "files": [], "datasets": [], "dataInputs": [ { "id": "content", "label": "Response Content", "description": "Generated text response with inline citation markers", "schema": { "type": "object", "properties": { "content": { "type": "string" } }, "required": ["content"], "additionalProperties": false }, "required": true } ]}Creating a Streaming Pipeline:
Create streaming language model pipeline
curl -X POST https://api.catalyzed.ai/pipelines \ -H "Authorization: Bearer $API_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "teamId": "ZkoDMyjZZsXo4VAO_nJLk", "name": "Streaming Document Q&A", "description": "Answer questions about documents with real-time streaming", "handlerType": "streaming_language_model", "inputsSchema": { "files": [ { "id": "document", "label": "Document", "description": "Document to analyze", "required": true, "multiple": false, "contextRetrievalMode": "full" } ], "datasets": [], "dataInputs": [ { "id": "question", "label": "Question", "description": "Question to answer", "schema": { "type": "string" }, "required": true } ] }, "configuration": { "files": [], "datasets": [], "dataInputs": [ { "id": "systemPrompt", "label": "System Prompt", "type": "string", "value": { "value": "You are a helpful assistant that answers questions based on the provided documents. Reference sources using [chunk_id] markers after factual claims." } } ] } }'const response = await fetch("https://api.catalyzed.ai/pipelines", { method: "POST", headers: { Authorization: `Bearer ${apiToken}`, "Content-Type": "application/json", }, body: JSON.stringify({ teamId: "ZkoDMyjZZsXo4VAO_nJLk", name: "Streaming Document Q&A", description: "Answer questions about documents with real-time streaming", handlerType: "streaming_language_model", inputsSchema: { files: [ { id: "document", label: "Document", description: "Document to analyze", required: true, multiple: false, contextRetrievalMode: "full", }, ], datasets: [], dataInputs: [ { id: "question", label: "Question", description: "Question to answer", schema: { type: "string" }, required: true, }, ], }, configuration: { files: [], datasets: [], dataInputs: [ { id: "systemPrompt", label: "System Prompt", type: "string", value: { value: "You are a helpful assistant that answers questions based on the provided documents. Reference sources using [chunk_id] markers after factual claims.", }, }, ], }, }),});response = requests.post( "https://api.catalyzed.ai/pipelines", headers={"Authorization": f"Bearer {api_token}"}, json={ "teamId": "ZkoDMyjZZsXo4VAO_nJLk", "name": "Streaming Document Q&A", "description": "Answer questions about documents with real-time streaming", "handlerType": "streaming_language_model", "inputsSchema": { "files": [ { "id": "document", "label": "Document", "description": "Document to analyze", "required": True, "multiple": False, "contextRetrievalMode": "full" } ], "datasets": [], "dataInputs": [ { "id": "question", "label": "Question", "description": "Question to answer", "schema": {"type": "string"}, "required": True } ] }, "configuration": { "files": [], "datasets": [], "dataInputs": [ { "id": "systemPrompt", "label": "System Prompt", "type": "string", "value": { "value": "You are a helpful assistant that answers questions based on the provided documents. Reference sources using [chunk_id] markers after factual claims." } } ] } })Note: The outputsSchema field is optional when creating a streaming_language_model pipeline. The output schema is always overridden with the fixed format shown above.
Consuming Streaming Output:
Streaming pipelines require a three-step flow to consume the output:
1. Trigger the pipeline:
When you trigger a streaming pipeline, the response contains only the executionId:
{ "executionId": "GkR8I6rHBms3W4Qfa2-FN", "status": "pending", "createdAt": "2024-01-15T10:30:00Z"}2. Poll for the streaming channel ID:
Poll the execution endpoint until handlerOutput.streamingChannelId becomes available:
// Trigger the pipelineconst { executionId } = await fetch( `https://api.catalyzed.ai/pipelines/${pipelineId}/trigger`, { method: "POST", headers: { Authorization: `Bearer ${apiToken}`, "Content-Type": "application/json", }, body: JSON.stringify({ input: { /* ... */ } }), }).then(r => r.json());
// Poll for channel IDlet channelId = null;while (!channelId) { const execution = await fetch( `https://api.catalyzed.ai/pipeline-executions/${executionId}`, { headers: { Authorization: `Bearer ${apiToken}` } } ).then(r => r.json());
channelId = execution.handlerOutput?.streamingChannelId ?? null;
if (!channelId) { // Channel not ready yet, wait before polling again await new Promise(resolve => setTimeout(resolve, 500)); }}Once the worker processes the execution, the response will include the channel ID:
{ "executionId": "GkR8I6rHBms3W4Qfa2-FN", "status": "running", "handlerOutput": { "handlerType": "streaming_language_model", "streamingChannelId": "ch_xyz123" }}3. Subscribe to the SSE stream:
Once you have the streamingChannelId, subscribe using Server-Sent Events (SSE):
const eventSource = new EventSource( `https://api.catalyzed.ai/channels/${channelId}/stream`, { headers: { Authorization: `Bearer ${apiToken}`, }, });
eventSource.addEventListener("channel-message", (event) => { const message = JSON.parse(event.data);
switch (message.dataType) { case "streaming.start": console.log("Streaming started:", message.data); break;
case "conversation.assistant.delta": // Token chunk received const { delta } = message.data; displayText += delta; break;
case "streaming.done": // Processing complete const { content } = message.data; console.log("Final content:", content); eventSource.close(); break;
case "streaming.error": console.error("Streaming error:", message.data); eventSource.close(); break; }});Example Output:
The streaming handler returns content with inline citation markers using human-readable chunk IDs:
{ "content": "The Q4 revenue was $1.65M [swift_falcon] which exceeded the Q3 figure of $1.42M [blue_river]."}Citation Markers:
Each marker uses a human-readable [adjective_noun] format (e.g., [swift_falcon], [blue_river], [calm_peak]). These chunk IDs are:
- Deterministic - Same content always generates the same ID
- Unique - Each chunk gets a different identifier (collisions are resolved with
_2,_3suffixes) - Human-readable - Easier to reference and debug than numeric markers
The execution also includes outputCitations that map markers to source chunks:
{ "outputCitations": [ { "outputPointer": "/content", "outputCharStart": 25, "outputCharEnd": 28, "citations": [ { "type": "file_chunk", "fileChunkId": "chunk_abc123" } ] }, { "outputPointer": "/content", "outputCharStart": 67, "outputCharEnd": 70, "citations": [ { "type": "file_chunk", "fileChunkId": "chunk_def456" } ] } ]}Embedding Handler
Section titled “Embedding Handler”The embedding handler generates vector embeddings from text arrays. Unlike LLM handlers, it has fixed input and output schemas that cannot be modified.
Key Features:
- Fixed schemas - Both input and output schemas are predefined
- Batch processing - Generate embeddings for multiple texts at once
- Model selection - Choose embedding model per-execution via input
- No streaming - Results returned when complete
Input Schema (Fixed):
| Field | Type | Required | Description |
|---|---|---|---|
texts | string[] | Yes | Array of text strings to generate embeddings for |
model | string | No | Embedding model (default: BAAI/bge-small-en-v1.5) |
normalize | boolean | No | Whether to L2 normalize embeddings (default: true) |
Output Schema (Fixed):
| Field | Type | Description |
|---|---|---|
embeddings | number[][] | Array of embedding vectors |
dimensions | number | Dimension of each embedding vector (e.g., 384) |
Creating an Embedding Pipeline:
Create embedding pipeline
curl -X POST https://api.catalyzed.ai/pipelines \ -H "Authorization: Bearer $API_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "teamId": "ZkoDMyjZZsXo4VAO_nJLk", "name": "Text Embedding Pipeline", "description": "Generate embeddings for text arrays", "handlerType": "embedding" }'const response = await fetch("https://api.catalyzed.ai/pipelines", { method: "POST", headers: { Authorization: `Bearer ${apiToken}`, "Content-Type": "application/json", }, body: JSON.stringify({ teamId: "ZkoDMyjZZsXo4VAO_nJLk", name: "Text Embedding Pipeline", description: "Generate embeddings for text arrays", handlerType: "embedding", }),});const pipeline = await response.json();response = requests.post( "https://api.catalyzed.ai/pipelines", headers={"Authorization": f"Bearer {api_token}"}, json={ "teamId": "ZkoDMyjZZsXo4VAO_nJLk", "name": "Text Embedding Pipeline", "description": "Generate embeddings for text arrays", "handlerType": "embedding" })pipeline = response.json()Triggering an Embedding Pipeline:
Trigger embedding pipeline
curl -X POST https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/trigger \ -H "Authorization: Bearer $API_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "input": { "dataInputs": { "texts": [ "Machine learning is a subset of artificial intelligence.", "Natural language processing helps computers understand text.", "Deep learning uses neural networks with multiple layers." ] } } }'const response = await fetch( `https://api.catalyzed.ai/pipelines/${pipelineId}/trigger`, { method: "POST", headers: { Authorization: `Bearer ${apiToken}`, "Content-Type": "application/json", }, body: JSON.stringify({ input: { dataInputs: { texts: [ "Machine learning is a subset of artificial intelligence.", "Natural language processing helps computers understand text.", "Deep learning uses neural networks with multiple layers.", ], }, }, }), });const { executionId } = await response.json();response = requests.post( f"https://api.catalyzed.ai/pipelines/{pipeline_id}/trigger", headers={"Authorization": f"Bearer {api_token}"}, json={ "input": { "dataInputs": { "texts": [ "Machine learning is a subset of artificial intelligence.", "Natural language processing helps computers understand text.", "Deep learning uses neural networks with multiple layers." ] } } })execution = response.json()Using a Custom Model:
Specify a different embedding model by including the model field:
{ "input": { "dataInputs": { "texts": ["Your text here"], "model": "BAAI/bge-large-en-v1.5" } }}Example Output:
Once the execution completes, the output contains embedding vectors:
{ "executionId": "GkR8I6rHBms3W4Qfa2-FN", "status": "succeeded", "output": { "embeddings": [ [0.0123, -0.0456, 0.0789, ...], [0.0234, -0.0567, 0.0890, ...], [0.0345, -0.0678, 0.0901, ...] ], "dimensions": 384 }, "outputCitations": []}Use Cases:
- Semantic search - Generate embeddings for search queries and documents
- Document similarity - Compare documents by embedding distance
- Clustering - Group similar texts using embedding vectors
- RAG preprocessing - Generate embeddings for knowledge base indexing
Text Classification Handler
Section titled “Text Classification Handler”The text_classification handler classifies text into predefined categories using HuggingFace classification models. Like the embedding handler, it has fixed input and output schemas that cannot be modified.
Key Features:
- Fixed schemas - Both input and output schemas are predefined
- Confidence scores - Each predicted label includes a confidence score
- Model selection - Choose a HuggingFace classification model per-execution
- No streaming - Results returned when complete
Input Schema (Fixed):
| Field | Type | Required | Description |
|---|---|---|---|
text | string | Yes | Input text for classification (sentiment, topics, intent, etc.) |
model | string | No | HuggingFace model ID (default: distilbert-base-uncased-finetuned-sst-2-english) |
Output Schema (Fixed):
| Field | Type | Description |
|---|---|---|
predictions | {label: string, score: number}[] | Array of label predictions with confidence scores |
Creating a Text Classification Pipeline:
Create text classification pipeline
curl -X POST https://api.catalyzed.ai/pipelines \ -H "Authorization: Bearer $API_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "teamId": "ZkoDMyjZZsXo4VAO_nJLk", "name": "Sentiment Classifier", "description": "Classify text sentiment", "handlerType": "text_classification" }'const response = await fetch("https://api.catalyzed.ai/pipelines", { method: "POST", headers: { Authorization: `Bearer ${apiToken}`, "Content-Type": "application/json", }, body: JSON.stringify({ teamId: "ZkoDMyjZZsXo4VAO_nJLk", name: "Sentiment Classifier", description: "Classify text sentiment", handlerType: "text_classification", }),});const pipeline = await response.json();response = requests.post( "https://api.catalyzed.ai/pipelines", headers={"Authorization": f"Bearer {api_token}"}, json={ "teamId": "ZkoDMyjZZsXo4VAO_nJLk", "name": "Sentiment Classifier", "description": "Classify text sentiment", "handlerType": "text_classification" })pipeline = response.json()Triggering a Text Classification Pipeline:
Trigger text classification pipeline
curl -X POST https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/trigger \ -H "Authorization: Bearer $API_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "input": { "dataInputs": { "text": "The product quality is excellent and shipping was fast. Very satisfied with my purchase." } } }'const response = await fetch( `https://api.catalyzed.ai/pipelines/${pipelineId}/trigger`, { method: "POST", headers: { Authorization: `Bearer ${apiToken}`, "Content-Type": "application/json", }, body: JSON.stringify({ input: { dataInputs: { text: "The product quality is excellent and shipping was fast. Very satisfied with my purchase.", }, }, }), });const { executionId } = await response.json();response = requests.post( f"https://api.catalyzed.ai/pipelines/{pipeline_id}/trigger", headers={"Authorization": f"Bearer {api_token}"}, json={ "input": { "dataInputs": { "text": "The product quality is excellent and shipping was fast. Very satisfied with my purchase." } } })execution = response.json()Using a Custom Model:
Specify a different classification model by including the model field:
{ "input": { "dataInputs": { "text": "Your text here", "model": "cardiffnlp/twitter-roberta-base-sentiment-latest" } }}Example Output:
Once the execution completes, the output contains classification predictions:
{ "executionId": "GkR8I6rHBms3W4Qfa2-FN", "status": "succeeded", "output": { "predictions": [ { "label": "POSITIVE", "score": 0.9987 }, { "label": "NEGATIVE", "score": 0.0013 } ] }, "outputCitations": []}Use Cases:
- Sentiment analysis - Determine positive/negative sentiment in reviews, feedback, or social media
- Content moderation - Flag inappropriate or harmful content
- Topic classification - Route documents to the correct category
- Intent detection - Classify user messages by intent for chatbot routing
Zero-Shot Classification Handler
Section titled “Zero-Shot Classification Handler”The zero_shot_classification handler classifies text into user-defined categories without requiring any training data. It uses natural language inference (NLI) models to determine how well each candidate label describes the input text. Like other NLP handlers, it has fixed input and output schemas.
Key Features:
- Fixed schemas - Both input and output schemas are predefined
- Dynamic labels - Define your own classification categories at runtime
- Hypothesis template - Customize the NLI hypothesis for better accuracy
- No streaming - Results returned when complete
Input Schema (Fixed):
| Field | Type | Required | Description |
|---|---|---|---|
text | string | Yes | Input text to classify into user-defined categories |
candidate_labels | string[] | Yes | Array of possible classification labels (e.g., ["positive", "negative", "neutral"]) |
hypothesis_template | string | No | Template for NLI hypothesis (default: "This text is about {}.") |
model | string | No | HuggingFace model ID (default: facebook/bart-large-mnli) |
Output Schema (Fixed):
| Field | Type | Description |
|---|---|---|
labels | string[] | Classification labels sorted by confidence (highest first) |
scores | number[] | Confidence scores corresponding to each label |
Creating a Zero-Shot Classification Pipeline:
Create zero-shot classification pipeline
curl -X POST https://api.catalyzed.ai/pipelines \ -H "Authorization: Bearer $API_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "teamId": "ZkoDMyjZZsXo4VAO_nJLk", "name": "Topic Classifier", "description": "Classify text into custom categories", "handlerType": "zero_shot_classification" }'const response = await fetch("https://api.catalyzed.ai/pipelines", { method: "POST", headers: { Authorization: `Bearer ${apiToken}`, "Content-Type": "application/json", }, body: JSON.stringify({ teamId: "ZkoDMyjZZsXo4VAO_nJLk", name: "Topic Classifier", description: "Classify text into custom categories", handlerType: "zero_shot_classification", }),});const pipeline = await response.json();response = requests.post( "https://api.catalyzed.ai/pipelines", headers={"Authorization": f"Bearer {api_token}"}, json={ "teamId": "ZkoDMyjZZsXo4VAO_nJLk", "name": "Topic Classifier", "description": "Classify text into custom categories", "handlerType": "zero_shot_classification" })pipeline = response.json()Triggering a Zero-Shot Classification Pipeline:
Trigger zero-shot classification pipeline
curl -X POST https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/trigger \ -H "Authorization: Bearer $API_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "input": { "dataInputs": { "text": "The Federal Reserve announced a 25 basis point increase in interest rates, citing persistent inflation concerns.", "candidate_labels": ["finance", "politics", "technology", "healthcare", "sports"] } } }'const response = await fetch( `https://api.catalyzed.ai/pipelines/${pipelineId}/trigger`, { method: "POST", headers: { Authorization: `Bearer ${apiToken}`, "Content-Type": "application/json", }, body: JSON.stringify({ input: { dataInputs: { text: "The Federal Reserve announced a 25 basis point increase in interest rates, citing persistent inflation concerns.", candidate_labels: [ "finance", "politics", "technology", "healthcare", "sports", ], }, }, }), });const { executionId } = await response.json();response = requests.post( f"https://api.catalyzed.ai/pipelines/{pipeline_id}/trigger", headers={"Authorization": f"Bearer {api_token}"}, json={ "input": { "dataInputs": { "text": "The Federal Reserve announced a 25 basis point increase in interest rates, citing persistent inflation concerns.", "candidate_labels": ["finance", "politics", "technology", "healthcare", "sports"] } } })execution = response.json()Customizing the Hypothesis Template:
The hypothesis template controls how the model frames the classification. The {} placeholder is replaced with each candidate label:
{ "input": { "dataInputs": { "text": "I need to return this product, it arrived damaged.", "candidate_labels": ["refund request", "product inquiry", "shipping issue", "complaint"], "hypothesis_template": "The customer intent is {}." } }}Example Output:
Once the execution completes, the output contains labels and scores sorted by confidence:
{ "executionId": "GkR8I6rHBms3W4Qfa2-FN", "status": "succeeded", "output": { "labels": ["finance", "politics", "technology", "healthcare", "sports"], "scores": [0.8234, 0.1245, 0.0312, 0.0118, 0.0091] }, "outputCitations": []}Use Cases:
- Dynamic topic routing - Route documents to teams or workflows based on custom categories
- Content tagging - Tag content with custom taxonomies that change over time
- Intent detection - Classify user messages without training a dedicated model
- Multi-domain classification - Apply different label sets to the same pipeline for different use cases
NER Handler
Section titled “NER Handler”The ner (Named Entity Recognition) handler extracts named entities from text, identifying people, organizations, locations, and other entity types along with their positions and confidence scores. Like other NLP handlers, it has fixed input and output schemas.
Key Features:
- Fixed schemas - Both input and output schemas are predefined
- Entity position tracking - Each entity includes start/end character positions
- Confidence scores - Each entity includes a confidence score
- No streaming - Results returned when complete
Input Schema (Fixed):
| Field | Type | Required | Description |
|---|---|---|---|
text | string | Yes | Input text to extract named entities from |
model | string | No | HuggingFace model ID (default: dslim/bert-base-NER) |
Output Schema (Fixed):
| Field | Type | Description |
|---|---|---|
entities | {text: string, label: string, start: number, end: number, score: number}[] | Array of named entities with labels, positions, and confidence scores |
Common Entity Labels:
| Label | Description |
|---|---|
PER | Person names |
ORG | Organizations |
LOC | Locations |
MISC | Miscellaneous entities |
Creating a NER Pipeline:
Create NER pipeline
curl -X POST https://api.catalyzed.ai/pipelines \ -H "Authorization: Bearer $API_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "teamId": "ZkoDMyjZZsXo4VAO_nJLk", "name": "Entity Extractor", "description": "Extract named entities from text", "handlerType": "ner" }'const response = await fetch("https://api.catalyzed.ai/pipelines", { method: "POST", headers: { Authorization: `Bearer ${apiToken}`, "Content-Type": "application/json", }, body: JSON.stringify({ teamId: "ZkoDMyjZZsXo4VAO_nJLk", name: "Entity Extractor", description: "Extract named entities from text", handlerType: "ner", }),});const pipeline = await response.json();response = requests.post( "https://api.catalyzed.ai/pipelines", headers={"Authorization": f"Bearer {api_token}"}, json={ "teamId": "ZkoDMyjZZsXo4VAO_nJLk", "name": "Entity Extractor", "description": "Extract named entities from text", "handlerType": "ner" })pipeline = response.json()Triggering a NER Pipeline:
Trigger NER pipeline
curl -X POST https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/trigger \ -H "Authorization: Bearer $API_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "input": { "dataInputs": { "text": "Pfizer announced a partnership with BioNTech in New York to develop mRNA vaccines, with CEO Albert Bourla leading the initiative." } } }'const response = await fetch( `https://api.catalyzed.ai/pipelines/${pipelineId}/trigger`, { method: "POST", headers: { Authorization: `Bearer ${apiToken}`, "Content-Type": "application/json", }, body: JSON.stringify({ input: { dataInputs: { text: "Pfizer announced a partnership with BioNTech in New York to develop mRNA vaccines, with CEO Albert Bourla leading the initiative.", }, }, }), });const { executionId } = await response.json();response = requests.post( f"https://api.catalyzed.ai/pipelines/{pipeline_id}/trigger", headers={"Authorization": f"Bearer {api_token}"}, json={ "input": { "dataInputs": { "text": "Pfizer announced a partnership with BioNTech in New York to develop mRNA vaccines, with CEO Albert Bourla leading the initiative." } } })execution = response.json()Example Output:
Once the execution completes, the output contains extracted entities with their types and positions:
{ "executionId": "GkR8I6rHBms3W4Qfa2-FN", "status": "succeeded", "output": { "entities": [ { "text": "Pfizer", "label": "ORG", "start": 0, "end": 6, "score": 0.9991 }, { "text": "BioNTech", "label": "ORG", "start": 35, "end": 43, "score": 0.9987 }, { "text": "New York", "label": "LOC", "start": 47, "end": 55, "score": 0.9994 }, { "text": "Albert Bourla", "label": "PER", "start": 101, "end": 114, "score": 0.9982 } ] }, "outputCitations": []}Use Cases:
- Information extraction - Pull structured entities from unstructured documents
- Document indexing - Index documents by the entities they mention
- Entity linking - Identify entities for linking to knowledge bases or databases
- Compliance analysis - Extract person and organization names from regulatory filings
Rerank Handler
Section titled “Rerank Handler”The rerank handler scores and reranks documents by relevance to a query using cross-encoder models. This is useful for improving search result quality by re-scoring candidate documents against a specific query. Like other NLP handlers, it has fixed input and output schemas.
Key Features:
- Fixed schemas - Both input and output schemas are predefined
- Top-N filtering - Optionally return only the most relevant results
- Model selection - Choose a cross-encoder model per-execution
- No streaming - Results returned when complete
Input Schema (Fixed):
| Field | Type | Required | Description |
|---|---|---|---|
query | string | Yes | The search query to score documents against |
documents | {id: string, text: string}[] | Yes | Array of candidate documents with id and text fields |
top_n | integer | No | Return only the top N results by relevance score |
model | string | No | Cross-encoder model ID (default: cross-encoder/ms-marco-MiniLM-L-6-v2) |
Output Schema (Fixed):
| Field | Type | Description |
|---|---|---|
results | {id: string, relevance_score: number}[] | Array of document IDs with relevance scores, in original document order (or top N subset) |
Creating a Rerank Pipeline:
Create rerank pipeline
curl -X POST https://api.catalyzed.ai/pipelines \ -H "Authorization: Bearer $API_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "teamId": "ZkoDMyjZZsXo4VAO_nJLk", "name": "Search Reranker", "description": "Rerank search results by relevance", "handlerType": "rerank" }'const response = await fetch("https://api.catalyzed.ai/pipelines", { method: "POST", headers: { Authorization: `Bearer ${apiToken}`, "Content-Type": "application/json", }, body: JSON.stringify({ teamId: "ZkoDMyjZZsXo4VAO_nJLk", name: "Search Reranker", description: "Rerank search results by relevance", handlerType: "rerank", }),});const pipeline = await response.json();response = requests.post( "https://api.catalyzed.ai/pipelines", headers={"Authorization": f"Bearer {api_token}"}, json={ "teamId": "ZkoDMyjZZsXo4VAO_nJLk", "name": "Search Reranker", "description": "Rerank search results by relevance", "handlerType": "rerank" })pipeline = response.json()Triggering a Rerank Pipeline:
Trigger rerank pipeline
curl -X POST https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/trigger \ -H "Authorization: Bearer $API_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "input": { "dataInputs": { "query": "What are the side effects of aspirin?", "documents": [ { "id": "doc-1", "text": "Aspirin is commonly used as a pain reliever and anti-inflammatory medication." }, { "id": "doc-2", "text": "Common side effects of aspirin include stomach upset, heartburn, and increased bleeding risk." }, { "id": "doc-3", "text": "The history of aspirin dates back to ancient Greece where willow bark was used medicinally." }, { "id": "doc-4", "text": "Aspirin may cause allergic reactions in some individuals, including skin rashes and breathing difficulties." } ], "top_n": 3 } } }'const response = await fetch( `https://api.catalyzed.ai/pipelines/${pipelineId}/trigger`, { method: "POST", headers: { Authorization: `Bearer ${apiToken}`, "Content-Type": "application/json", }, body: JSON.stringify({ input: { dataInputs: { query: "What are the side effects of aspirin?", documents: [ { id: "doc-1", text: "Aspirin is commonly used as a pain reliever and anti-inflammatory medication.", }, { id: "doc-2", text: "Common side effects of aspirin include stomach upset, heartburn, and increased bleeding risk.", }, { id: "doc-3", text: "The history of aspirin dates back to ancient Greece where willow bark was used medicinally.", }, { id: "doc-4", text: "Aspirin may cause allergic reactions in some individuals, including skin rashes and breathing difficulties.", }, ], top_n: 3, }, }, }), });const { executionId } = await response.json();response = requests.post( f"https://api.catalyzed.ai/pipelines/{pipeline_id}/trigger", headers={"Authorization": f"Bearer {api_token}"}, json={ "input": { "dataInputs": { "query": "What are the side effects of aspirin?", "documents": [ {"id": "doc-1", "text": "Aspirin is commonly used as a pain reliever and anti-inflammatory medication."}, {"id": "doc-2", "text": "Common side effects of aspirin include stomach upset, heartburn, and increased bleeding risk."}, {"id": "doc-3", "text": "The history of aspirin dates back to ancient Greece where willow bark was used medicinally."}, {"id": "doc-4", "text": "Aspirin may cause allergic reactions in some individuals, including skin rashes and breathing difficulties."} ], "top_n": 3 } } })execution = response.json()Using a Different Model:
Specify a different cross-encoder model by including the model field. Supported models include BAAI/bge-reranker-base and BAAI/bge-reranker-v2-m3:
{ "input": { "dataInputs": { "query": "Your search query", "documents": [{ "id": "doc-1", "text": "Document text" }], "model": "BAAI/bge-reranker-v2-m3" } }}Example Output:
Once the execution completes, the output contains document IDs with relevance scores:
{ "executionId": "GkR8I6rHBms3W4Qfa2-FN", "status": "succeeded", "output": { "results": [ { "id": "doc-2", "relevance_score": 0.9821 }, { "id": "doc-4", "relevance_score": 0.8934 }, { "id": "doc-1", "relevance_score": 0.4215 } ] }, "outputCitations": []}Use Cases:
- Search quality improvement - Re-score keyword or vector search results for better relevance ranking
- RAG retrieval - Rerank retrieved passages before feeding them to a language model
- Two-stage retrieval - Use fast vector search for recall, then cross-encoder reranking for precision
- Document filtering - Use
top_nto keep only the most relevant documents from a large candidate set
Code Interpreter Handler
Section titled “Code Interpreter Handler”The code_interpreter handler executes user-provided Python code in a sandboxed AST-walking interpreter with optional output schema validation via final_answer(). It supports state persistence across executions via signed interpreter state blobs, enabling REPL-like workflows where users build up computation across multiple calls.
Key Features:
- Customizable schemas - Define your own input and output fields
- Sandboxed execution - Code runs in a restricted interpreter (no filesystem, network, or subprocess access)
final_answer()validation - Structured output is validated against the pipeline’s output schema- Optional
final_answer()- Code can run for side effects only (print, state mutation) without producing structured output - State persistence - Interpreter state (variables, functions, imports) is serialized and returned as a signed blob that can be passed to subsequent executions
- Variable injection - Additional data inputs are injected as Python variables available to the code
- Built-in modules - Standard library modules like
math,json,re,datetimeare available
Input Schema (Customizable):
The code input is always required. Additional inputs are injected as Python variables:
| Field | Type | Required | Description |
|---|---|---|---|
code | string | Yes | Python code to execute |
interpreter_state | string | No | Signed state blob from a previous execution |
| (custom) | (any) | (varies) | Additional fields are injected as variables (see type mapping below) |
Variable Type Mapping:
Custom data inputs are automatically converted from JSON to Python types:
| JSON Schema Type | JSON Value | Python Type | Example |
|---|---|---|---|
string | "hello" | str | greeting = "hello" |
integer | 42 | int | count = 42 |
number | 3.14 | float | rate = 3.14 |
boolean | true | bool | enabled = True |
null | null | NoneType | value = None |
array | [1, 2, 3] | list | items = [1, 2, 3] |
object | {"a": 1} | dict | config = {"a": 1} |
No manual JSON parsing is needed — variables are available as native Python types in your code.
Output Schema (Customizable):
Output fields correspond to final_answer() keyword arguments. The handler also manages these reserved slots:
| Field | Type | Description |
|---|---|---|
| (custom) | (any) | Fields populated by final_answer(field=value) |
stdout | string | Captured print output |
interpreter_state | string | Signed state blob for the next execution |
Creating a Code Interpreter Pipeline:
Create code interpreter pipeline
curl -X POST https://api.catalyzed.ai/pipelines \ -H "Authorization: Bearer $API_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "teamId": "ZkoDMyjZZsXo4VAO_nJLk", "name": "Data Processor", "description": "Execute Python code with validated output", "handlerType": "code_interpreter", "inputsSchema": { "files": [], "datasets": [], "dataInputs": [ { "id": "code", "label": "Code", "schema": { "type": "string" }, "required": true }, { "id": "interpreter_state", "label": "State", "schema": { "type": "string" }, "required": false } ] }, "outputsSchema": { "files": [], "datasets": [], "dataInputs": [ { "id": "result", "label": "Result", "schema": { "type": "number" }, "required": false }, { "id": "stdout", "label": "Stdout", "schema": { "type": "string" }, "required": false }, { "id": "interpreter_state", "label": "State", "schema": { "type": "string" }, "required": false } ] } }'const response = await fetch("https://api.catalyzed.ai/pipelines", { method: "POST", headers: { Authorization: `Bearer ${apiToken}`, "Content-Type": "application/json", }, body: JSON.stringify({ teamId: "ZkoDMyjZZsXo4VAO_nJLk", name: "Data Processor", description: "Execute Python code with validated output", handlerType: "code_interpreter", inputsSchema: { files: [], datasets: [], dataInputs: [ { id: "code", label: "Code", schema: { type: "string" }, required: true }, { id: "interpreter_state", label: "State", schema: { type: "string" }, required: false }, ], }, outputsSchema: { files: [], datasets: [], dataInputs: [ { id: "result", label: "Result", schema: { type: "number" }, required: false }, { id: "stdout", label: "Stdout", schema: { type: "string" }, required: false }, { id: "interpreter_state", label: "State", schema: { type: "string" }, required: false }, ], }, }),});const pipeline = await response.json();response = requests.post( "https://api.catalyzed.ai/pipelines", headers={"Authorization": f"Bearer {api_token}"}, json={ "teamId": "ZkoDMyjZZsXo4VAO_nJLk", "name": "Data Processor", "description": "Execute Python code with validated output", "handlerType": "code_interpreter", "inputsSchema": { "files": [], "datasets": [], "dataInputs": [ {"id": "code", "label": "Code", "schema": {"type": "string"}, "required": True}, {"id": "interpreter_state", "label": "State", "schema": {"type": "string"}, "required": False}, ], }, "outputsSchema": { "files": [], "datasets": [], "dataInputs": [ {"id": "result", "label": "Result", "schema": {"type": "number"}, "required": False}, {"id": "stdout", "label": "Stdout", "schema": {"type": "string"}, "required": False}, {"id": "interpreter_state", "label": "State", "schema": {"type": "string"}, "required": False}, ], }, },)pipeline = response.json()Triggering a Code Interpreter Pipeline:
Trigger code interpreter pipeline
curl -X POST https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/trigger \ -H "Authorization: Bearer $API_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "input": { "dataInputs": { "code": "import math\nresult = round(math.pi * 100, 2)\nprint(f\"Computed: {result}\")\nfinal_answer(result=result)" } } }'const response = await fetch( `https://api.catalyzed.ai/pipelines/${pipelineId}/trigger`, { method: "POST", headers: { Authorization: `Bearer ${apiToken}`, "Content-Type": "application/json", }, body: JSON.stringify({ input: { dataInputs: { code: 'import math\nresult = round(math.pi * 100, 2)\nprint(f"Computed: {result}")\nfinal_answer(result=result)', }, }, }), });const { executionId } = await response.json();response = requests.post( f"https://api.catalyzed.ai/pipelines/{pipeline_id}/trigger", headers={"Authorization": f"Bearer {api_token}"}, json={ "input": { "dataInputs": { "code": "import math\nresult = round(math.pi * 100, 2)\nprint(f'Computed: {result}')\nfinal_answer(result=result)" } } },)execution = response.json()Example Output:
{ "executionId": "GkR8I6rHBms3W4Qfa2-FN", "status": "succeeded", "output": { "result": 314.16, "stdout": "Computed: 314.16\n", "interpreter_state": "dG9rZW4..." }, "outputCitations": []}Resuming with State:
Pass the interpreter_state from a previous execution to continue where you left off. Variables, function definitions, and imported modules are restored:
{ "input": { "dataInputs": { "code": "final_answer(result=result * 2)", "interpreter_state": "dG9rZW4..." } }}Error Handling:
When code execution fails, the execution status is failed and errorMessage contains the Python error description (e.g., "name 'queries' is not defined", "division by zero"). The error message is passed through directly from the interpreter — it is not a generic message.
| Error Type | errorMessage Contains | stdout in Output | interpreter_state in Output |
|---|---|---|---|
| Syntax error | Python SyntaxError with line number | Not available (code did not execute) | Not available |
| Runtime error (NameError, TypeError, ZeroDivisionError, etc.) | Python error description | Partial stdout up to the point of failure | Available (state captured before error check) |
final_answer() validation error | Schema validation details | Full stdout | Available |
Example Failed Execution:
{ "executionId": "WLJagIwiQF0gvMu8jaSbx", "status": "failed", "errorMessage": "name 'queries' is not defined", "output": { "stdout": "processing step 1...\n", "interpreter_state": "eyJzaWdu..." }}Use Cases:
- Data transformation - Clean, reshape, or aggregate data with Python logic
- Computational workflows - Run multi-step calculations across iterative executions
- REPL environments - Build interactive computing sessions with state persistence
- Custom scoring - Implement domain-specific scoring or validation logic in Python
- Prototyping - Quickly test data processing logic before building dedicated pipelines
Pipeline Properties
Section titled “Pipeline Properties”| Field | Type | Description |
|---|---|---|
pipelineId | string | Unique identifier |
teamId | string | Team that owns this pipeline |
name | string | Human-readable name |
description | string | Optional description |
handlerType | string | Type of pipeline handler |
activeConfigurationId | string | ID of the currently active configuration version |
status | string | active or archived |
configuration | object | Handler-specific settings |
inputsSchema | object | Schema for input data |
outputsSchema | object | Schema for output data |
createdAt | timestamp | Creation time |
updatedAt | timestamp | Last modification time |
Question Generation Example
Section titled “Question Generation Example”Pipelines can generate arrays of questions or other structured outputs. Here’s an example of a question generation pipeline:
Create question generation pipeline
curl -X POST https://api.catalyzed.ai/pipelines \ -H "Authorization: Bearer $API_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "teamId": "ZkoDMyjZZsXo4VAO_nJLk", "name": "Statement Question Generator", "description": "Generates recommended questions from financial statements", "handlerType": "language_model", "inputsSchema": { "files": [ { "id": "statements", "label": "Financial Statements", "description": "Bank, credit card, or account statements", "required": true, "multiple": true, "contextRetrievalMode": "full" } ], "datasets": [], "dataInputs": [] }, "outputsSchema": { "files": [], "datasets": [], "dataInputs": [ { "id": "questions", "label": "Recommended Questions", "description": "List of recommended questions users can ask", "schema": { "type": "array", "items": { "type": "string" } }, "required": true } ] }, "configuration": { "files": [], "datasets": [], "dataInputs": [ { "id": "instructions", "label": "System Instructions", "type": "string", "value": { "value": "You are a financial assistant. Analyze the provided financial statements and generate relevant questions that users might ask about them. Return the questions as a JSON array of strings." } } ] } }'const response = await fetch("https://api.catalyzed.ai/pipelines", { method: "POST", headers: { Authorization: `Bearer ${apiToken}`, "Content-Type": "application/json", }, body: JSON.stringify({ teamId: "ZkoDMyjZZsXo4VAO_nJLk", name: "Statement Question Generator", description: "Generates recommended questions from financial statements", handlerType: "language_model", inputsSchema: { files: [ { id: "statements", label: "Financial Statements", description: "Bank, credit card, or account statements", required: true, multiple: true, // Allow multiple files contextRetrievalMode: "full", }, ], datasets: [], dataInputs: [], }, outputsSchema: { files: [], datasets: [], dataInputs: [ { id: "questions", label: "Recommended Questions", description: "List of recommended questions users can ask", schema: { type: "array", items: { type: "string" }, }, required: true, }, ], }, configuration: { files: [], datasets: [], dataInputs: [ { id: "instructions", label: "System Instructions", type: "string", value: { value: "You are a financial assistant. Analyze the provided financial statements and generate relevant questions that users might ask about them. Return the questions as a JSON array of strings.", }, }, ], }, }),});response = requests.post( "https://api.catalyzed.ai/pipelines", headers={"Authorization": f"Bearer {api_token}"}, json={ "teamId": "ZkoDMyjZZsXo4VAO_nJLk", "name": "Statement Question Generator", "description": "Generates recommended questions from financial statements", "handlerType": "language_model", "inputsSchema": { "files": [ { "id": "statements", "label": "Financial Statements", "description": "Bank, credit card, or account statements", "required": True, "multiple": True, # Allow multiple files "contextRetrievalMode": "full" } ], "datasets": [], "dataInputs": [] }, "outputsSchema": { "files": [], "datasets": [], "dataInputs": [ { "id": "questions", "label": "Recommended Questions", "description": "List of recommended questions users can ask", "schema": { "type": "array", "items": {"type": "string"} }, "required": True } ] }, "configuration": { "files": [], "datasets": [], "dataInputs": [ { "id": "instructions", "label": "System Instructions", "type": "string", "value": { "value": "You are a financial assistant. Analyze the provided financial statements and generate relevant questions that users might ask about them. Return the questions as a JSON array of strings." } } ] } })When triggering this pipeline, provide files as an array (since multiple: true):
{ "input": { "files": { "statements": ["fileId1", "fileId2"] } }}The output will contain an array of questions:
{ "output": { "questions": [ "What is the current balance?", "What was the total amount of new purchases?", "When is the payment due date?" ] }}Multiple File Inputs
Section titled “Multiple File Inputs”When a file slot has multiple: true, you can provide multiple file IDs as an array when triggering the pipeline:
{ "input": { "files": { "documents": ["fileId1", "fileId2", "fileId3"] } }}For single file slots (multiple: false), provide a single file ID:
{ "input": { "files": { "document": "fileId1" } }}Pipeline Configurations
Section titled “Pipeline Configurations”Pipeline configurations are versioned snapshots of a pipeline’s inputsSchema, outputsSchema, and configuration. Each time you update a pipeline’s configuration via the configurations endpoint, a new version is created, allowing you to track changes over time and rollback to previous versions.
Creating a Configuration
Section titled “Creating a Configuration”Create a new configuration version (becomes the active configuration automatically):
Create configuration version
curl -X POST https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/configurations \ -H "Authorization: Bearer $API_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "inputsSchema": { "files": [], "datasets": [], "dataInputs": [ { "id": "query", "label": "Query", "schema": { "type": "string" }, "required": true } ] }, "outputsSchema": { "files": [], "datasets": [], "dataInputs": [ { "id": "answer", "label": "Answer", "schema": { "type": "string" }, "required": true } ] }, "configuration": { "files": [], "datasets": [], "dataInputs": [] }, "changeReason": "Updated output schema to include answer field" }'const response = await fetch( `https://api.catalyzed.ai/pipelines/${pipelineId}/configurations`, { method: "POST", headers: { Authorization: `Bearer ${apiToken}`, "Content-Type": "application/json", }, body: JSON.stringify({ inputsSchema: { /* ... */ }, outputsSchema: { /* ... */ }, configuration: { /* ... */ }, changeReason: "Updated output schema", }), });const config = await response.json();response = requests.post( f"https://api.catalyzed.ai/pipelines/{pipeline_id}/configurations", headers={"Authorization": f"Bearer {api_token}"}, json={ "inputsSchema": { ... }, "outputsSchema": { ... }, "configuration": { ... }, "changeReason": "Updated output schema" })config = response.json()Response:
{ "pipelineConfigurationId": "cfg_xyz789", "pipelineId": "EMbMEFLyUWEgvnhMWXVVa", "inputsSchema": { ... }, "outputsSchema": { ... }, "configuration": { ... }, "createdAt": "2024-01-15T10:30:00Z", "createdBy": "usr_abc123", "changeReason": "Updated output schema to include answer field"}Listing Configurations
Section titled “Listing Configurations”View all configuration versions for a pipeline (newest first by default):
List configuration versions
curl "https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/configurations?page=1&pageSize=10" \ -H "Authorization: Bearer $API_TOKEN"const response = await fetch( `https://api.catalyzed.ai/pipelines/${pipelineId}/configurations?page=1&pageSize=10`, { headers: { Authorization: `Bearer ${apiToken}` } });const { configurations, total } = await response.json();response = requests.get( f"https://api.catalyzed.ai/pipelines/{pipeline_id}/configurations", params={"page": 1, "pageSize": 10}, headers={"Authorization": f"Bearer {api_token}"})data = response.json()configurations = data["configurations"]Query Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
page | number | 1 | Page number for pagination (starts at 1) |
pageSize | number | 20 | Number of results per page (1-100) |
orderDirection | string | desc | Sort direction: asc (oldest first) or desc (newest first) |
Response:
{ "configurations": [ { "pipelineConfigurationId": "cfg_xyz789", "pipelineId": "EMbMEFLyUWEgvnhMWXVVa", "inputsSchema": { ... }, "outputsSchema": { ... }, "configuration": { ... }, "createdAt": "2024-01-15T10:30:00Z", "createdBy": "usr_abc123", "changeReason": "Updated output schema" } ], "total": 5, "page": 1, "pageSize": 10}Getting a Specific Configuration
Section titled “Getting a Specific Configuration”Retrieve a specific configuration version by ID:
Get configuration version
curl "https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/configurations/cfg_xyz789" \ -H "Authorization: Bearer $API_TOKEN"const response = await fetch( `https://api.catalyzed.ai/pipelines/${pipelineId}/configurations/${configurationId}`, { headers: { Authorization: `Bearer ${apiToken}` } });const config = await response.json();response = requests.get( f"https://api.catalyzed.ai/pipelines/{pipeline_id}/configurations/{configuration_id}", headers={"Authorization": f"Bearer {api_token}"})config = response.json()Rolling Back to a Previous Configuration
Section titled “Rolling Back to a Previous Configuration”To rollback to a previous configuration version, update the pipeline’s activeConfigurationId:
curl -X PUT https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa \ -H "Authorization: Bearer $API_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "activeConfigurationId": "cfg_abc123" }'This sets the specified configuration as the active one without creating a new version.
Configuration Properties
Section titled “Configuration Properties”| Field | Type | Description |
|---|---|---|
pipelineConfigurationId | string | Unique identifier for this configuration version |
pipelineId | string | ID of the parent pipeline |
inputsSchema | object | Pipeline inputs schema at this version |
outputsSchema | object | Pipeline outputs schema at this version |
configuration | object | Pipeline configuration at this version |
createdAt | timestamp | When this version was created |
createdBy | string | User who created this version |
changeReason | string | Optional description of why this version was created |
See the Pipeline Configurations API for complete endpoint documentation.
API Reference
Section titled “API Reference”See the Pipelines API for complete endpoint documentation.