Skip to content

Pipelines

Pipelines are automated workflows that process data, run AI tasks, or transform information. Define a pipeline once, then trigger it on demand with different inputs.

A pipeline consists of:

  • Handler Type - The type of processing (e.g., language_model)
  • Input Schema - What data the pipeline accepts (files, datasets, dataInputs)
  • Output Schema - What data the pipeline produces (files, datasets, dataInputs)
  • Configuration - Settings and parameters

Input and output schemas define the structure of data that flows through a pipeline. Each schema contains three types of slots:

  • Files - References to uploaded files (PDFs, CSVs, documents)
  • Datasets - References to datasets (collections of tables)
  • Data Inputs - Structured JSON data (strings, numbers, arrays, objects)

Each slot has:

  • id - Unique identifier used when triggering the pipeline
  • label - Human-readable name
  • description - Optional description
  • required - Whether the slot must be provided
  • multiple - Whether multiple values are allowed (for files and datasets)
  • schema - JSON Schema defining the data structure (for dataInputs)

The schema field on data inputs accepts JSON Schema objects. The following types are supported for language_model pipelines:

TypeSchemaDescription
String{ "type": "string" }Text value
Integer{ "type": "integer" }Whole number
Number{ "type": "number" }Decimal number
Boolean{ "type": "boolean" }True or false

Restrict a string to a fixed set of allowed values:

{
"type": "string",
"enum": ["positive", "negative", "neutral"]
}

A list of values with a typed item schema:

{
"type": "array",
"items": { "type": "string" }
}

Items can be any supported type, including objects:

{
"type": "array",
"items": {
"type": "object",
"properties": {
"name": { "type": "string" },
"score": { "type": "number" }
},
"required": ["name", "score"]
}
}

A structured object with named properties. Use the required array to specify which properties must be present — properties not listed are optional and default to null:

{
"type": "object",
"properties": {
"summary": { "type": "string" },
"confidence": { "type": "number" },
"notes": { "type": "string" }
},
"required": ["summary", "confidence"]
}

In this example, notes is optional and may be null in the output.

A dictionary with arbitrary string keys and typed values. Use additionalProperties to define the value type — this follows the standard JSON Schema pattern for Record<string, T>:

{
"type": "object",
"additionalProperties": { "type": "string" }
}

This tells the LLM to produce a map of string keys to string values (e.g., { "color": "blue", "size": "large" }). Without additionalProperties, a bare { "type": "object" } gives the LLM no guidance on value types.

The value schema can be any supported type:

{
"type": "object",
"additionalProperties": { "type": "number" }
}
{
"type": "object",
"additionalProperties": {
"type": "object",
"properties": {
"label": { "type": "string" },
"score": { "type": "number" }
},
"required": ["label", "score"]
}
}

To indicate that a field’s value can be null, use the anyOf pattern with a null type variant:

{
"anyOf": [{ "type": "string" }, { "type": "null" }]
}

This is the standard JSON Schema Draft 7 representation and what tools like Zod produce for .nullable(). The shorthand { "type": ["string", "null"] } is also supported.

Nullable types work with any base type:

{
"anyOf": [{ "type": "integer" }, { "type": "null" }]
}
{
"anyOf": [
{ "type": "array", "items": { "type": "string" } },
{ "type": "null" }
]
}

Nullable properties can also appear inside object schemas. A property can be both required (must always be present in the output) and nullable (its value can be null):

{
"type": "object",
"properties": {
"name": { "type": "string" },
"nickname": { "anyOf": [{ "type": "string" }, { "type": "null" }] }
},
"required": ["name", "nickname"]
}

Create a pipeline

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"teamId": "ZkoDMyjZZsXo4VAO_nJLk",
"name": "Document Summarizer",
"description": "Summarize uploaded documents using AI",
"handlerType": "language_model",
"inputsSchema": {
"files": [
{
"id": "document",
"label": "Document",
"description": "Document to summarize",
"required": true,
"multiple": false,
"contextRetrievalMode": "full"
}
],
"datasets": [],
"dataInputs": []
},
"outputsSchema": {
"files": [],
"datasets": [],
"dataInputs": [
{
"id": "summary",
"label": "Summary",
"description": "Generated summary",
"schema": { "type": "string" },
"required": true
}
]
},
"configuration": {
"files": [],
"datasets": [],
"dataInputs": []
}
}'

Response:

{
"pipelineId": "EMbMEFLyUWEgvnhMWXVVa",
"teamId": "ZkoDMyjZZsXo4VAO_nJLk",
"name": "Document Summarizer",
"description": "Summarize uploaded documents using AI",
"handlerType": "language_model",
"activeConfigurationId": "cfg_abc123",
"status": "active",
"inputsSchema": {
"files": [
{
"id": "document",
"label": "Document",
"description": "Document to summarize",
"required": true,
"multiple": false,
"contextRetrievalMode": "full"
}
],
"datasets": [],
"dataInputs": []
},
"outputsSchema": {
"files": [],
"datasets": [],
"dataInputs": [
{
"id": "summary",
"label": "Summary",
"description": "Generated summary",
"schema": { "type": "string" },
"required": true
}
]
},
"configuration": { ... },
"createdAt": "2024-01-15T10:30:00Z",
"updatedAt": "2024-01-15T10:30:00Z",
"createdBy": "usr_abc123"
}
StatusDescription
activePipeline can be triggered
archivedPipeline is disabled and cannot be triggered

List pipelines

Terminal window
curl "https://api.catalyzed.ai/pipelines?teamIds=ZkoDMyjZZsXo4VAO_nJLk" \
-H "Authorization: Bearer $API_TOKEN"

Get pipeline details

Terminal window
curl https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa \
-H "Authorization: Bearer $API_TOKEN"

Start a pipeline execution with input data:

Trigger a pipeline

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/trigger \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"input": {
"files": {
"document": "LvrGb8UaJk_IjmzaxuMAb"
},
"dataInputs": {
"query": "Summarize the key findings from this document"
}
}
}'

Response:

{
"executionId": "GkR8I6rHBms3W4Qfa2-FN",
"status": "pending",
"createdAt": "2024-01-15T10:30:00Z"
}

See Executions for monitoring execution progress.

The PUT endpoint only updates pipeline metadata (name, description) and allows setting the active configuration version. To update the actual configuration content (inputsSchema, outputsSchema, configuration), use the Pipeline Configurations endpoint.

Update pipeline

Terminal window
curl -X PUT https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "Document Summarizer v2",
"description": "Updated description"
}'

Archived pipelines cannot be triggered but retain their execution history:

Archive pipeline

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/archive \
-H "Authorization: Bearer $API_TOKEN"
Terminal window
curl -X POST https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/reactivate \
-H "Authorization: Bearer $API_TOKEN"

Delete pipeline

Terminal window
curl -X DELETE https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa \
-H "Authorization: Bearer $API_TOKEN"

Pipelines can reference:

  • Files - Uploaded documents for processing
  • Datasets - Tables for data retrieval
  • Data Inputs - Specific table columns for context
TypeDescriptionWeb Context
language_modelAI-powered text generation with context retrieval✅ URL scraping & web search
streaming_language_modelReal-time streaming LLM with inline citations and fixed output schema❌ Not supported
code_agent_language_modelCode agent with Python code generation and bidirectional tool callbacks✅ Via tools (web_search, web_scrape)
embeddingGenerate vector embeddings from text arrays with fixed input/output schemasN/A
text_classificationClassify text into predefined categories (sentiment, topic) with confidence scoresN/A
zero_shot_classificationClassify text into user-defined categories using natural language inferenceN/A
nerExtract named entities (people, organizations, locations) with positions and confidence scoresN/A
rerankScore and rerank documents by relevance to a query using cross-encoder modelsN/A
code_interpreterExecute Python code in a sandboxed interpreter with optional state persistenceN/A

Configuration contains optional pre-filled values for files, datasets, and dataInputs:

{
"files": [],
"datasets": [],
"dataInputs": [
{
"id": "instructions",
"label": "System Instructions",
"type": "string",
"value": { "value": "You are a helpful assistant..." }
}
]
}

Configuration vs Runtime Input:

  • configuration.dataInputs - Pre-filled values that become part of the pipeline’s instructions (e.g., system prompts, settings)
  • configuration.files / configuration.datasets - Pre-filled file/dataset references
  • Runtime input - Actual data provided when triggering the pipeline

When triggering a pipeline, you provide runtime values in the input object. Configuration values are baked into the pipeline definition.

LLMs tend to pay more attention to content at the beginning and end of a prompt, and less to content in the middle. The inputPriority option lets you control the ordering of inputs in the rendered prompt so you can push large background context early and keep important instructions or questions late where they receive more attention.

inputPriority is set on configuration.handlerOptions as a map of input slot IDs to numeric priority values:

{
"handlerOptions": {
"inputPriority": {
"background_context": -10,
"reference_material": -5,
"user_question": 10
}
}
}

How it works:

  • Lower values → earlier in the prompt (less attention)
  • Higher values → later in the prompt (more attention)
  • Unspecified inputs → default to 0, preserving their original relative order
  • Applies to both configuration.dataInputs (system message) and runtime inputsSchema.dataInputs (user message)
  • retrieved_context (from files, datasets, web sources) is not affected — it is always appended last

Configuration Options:

FieldTypeDefaultDescription
inputPriorityRecord<string, number>undefinedMap of input slot IDs to numeric priority values. Lower = earlier in prompt, higher = later. Unspecified inputs default to 0.

Validation:

Each key in inputPriority must match an id from inputsSchema.dataInputs or configuration.dataInputs. The API returns a 400 error if any key references an unknown input slot.

Handler support: language_model only.

Example — Financial Q&A with priority ordering:

Create pipeline with input priority

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"teamId": "ZkoDMyjZZsXo4VAO_nJLk",
"name": "Financial Q&A",
"handlerType": "language_model",
"inputsSchema": {
"files": [],
"datasets": [],
"dataInputs": [
{
"id": "company_background",
"label": "Company Background",
"description": "Background information about the company",
"schema": { "type": "string" },
"required": false
},
{
"id": "financial_data",
"label": "Financial Data",
"description": "Key financial metrics and figures",
"schema": { "type": "string" },
"required": true
},
{
"id": "question",
"label": "Question",
"description": "The financial question to answer",
"schema": { "type": "string" },
"required": true
}
]
},
"outputsSchema": {
"files": [],
"datasets": [],
"dataInputs": [
{
"id": "answer",
"label": "Answer",
"description": "The financial analysis answer",
"schema": { "type": "string" },
"required": true
}
]
},
"configuration": {
"files": [],
"datasets": [],
"dataInputs": [
{
"id": "guidelines",
"label": "Analysis Guidelines",
"value": { "value": "Use conservative estimates. Cite specific figures." }
}
],
"handlerOptions": {
"inputPriority": {
"company_background": -10,
"financial_data": 0,
"question": 10
}
}
}
}'

With this configuration, when the pipeline executes, the LLM prompt will order the runtime inputs as: company_background (priority -10) → financial_data (priority 0) → question (priority 10). The question — the most important part — appears last where the model pays the most attention.

Dataset slots in inputsSchema can include optional row-level filters to restrict which rows are queried. Filters are defined as an array of predicates that are combined with AND logic and applied server-side in the query engine.

Key Benefits:

  • Row-level security - Enforce data isolation between tenants or users
  • Performance optimization - Reduce query scope by filtering at the source
  • Dynamic filtering - Use runtime values via $ref to reference execution inputs
  • Server-side enforcement - Filters cannot be bypassed by SQL injection

Filter Structure:

Each dataset slot can include a filter array with predicates:

{
"datasets": [
{
"id": "sales",
"label": "Sales Data",
"required": true,
"multiple": false,
"filter": [
{
"field": "tenant_id",
"op": "eq",
"value": { "$ref": "input.dataInputs.tenantId" }
},
{
"field": "deleted_at",
"op": "is_null"
}
]
}
],
"dataInputs": [
{
"id": "tenantId",
"label": "Tenant ID",
"schema": { "type": "string" },
"required": true
}
]
}

Supported Operators:

OperatorDescriptionExample
eqEqual to{"field": "status", "op": "eq", "value": "active"}
neqNot equal to{"field": "type", "op": "neq", "value": "draft"}
gtGreater than{"field": "amount", "op": "gt", "value": 100}
gteGreater than or equal{"field": "score", "op": "gte", "value": 80}
ltLess than{"field": "age", "op": "lt", "value": 18}
lteLess than or equal{"field": "count", "op": "lte", "value": 10}
inIn list{"field": "category", "op": "in", "value": ["A", "B"]}
not_inNot in list{"field": "status", "op": "not_in", "value": ["deleted", "archived"]}
likePattern match{"field": "email", "op": "like", "value": "%@example.com"}
is_nullIs NULL{"field": "deleted_at", "op": "is_null"}
is_not_nullIs not NULL{"field": "email", "op": "is_not_null"}

Dynamic Values with $ref:

Use $ref to reference runtime values from dataInputs:

{
"field": "tenant_id",
"op": "eq",
"value": { "$ref": "input.dataInputs.tenantId" }
}

The $ref path must follow the pattern input.dataInputs.<id> where <id> matches a data input slot.

Complete Example - Multi-tenant Sales Pipeline:

Create pipeline with dataset filtering

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"teamId": "ZkoDMyjZZsXo4VAO_nJLk",
"name": "Sales Analysis",
"description": "Analyze sales data with tenant isolation",
"handlerType": "language_model",
"inputsSchema": {
"files": [],
"datasets": [
{
"id": "sales",
"label": "Sales Data",
"description": "Sales transactions dataset",
"required": true,
"multiple": false,
"filter": [
{
"field": "tenant_id",
"op": "eq",
"value": { "$ref": "input.dataInputs.tenantId" }
},
{
"field": "deleted_at",
"op": "is_null"
},
{
"field": "status",
"op": "in",
"value": ["completed", "pending"]
}
]
}
],
"dataInputs": [
{
"id": "tenantId",
"label": "Tenant ID",
"description": "Tenant identifier for data isolation",
"schema": { "type": "string" },
"required": true
},
{
"id": "question",
"label": "Question",
"description": "Analysis question",
"schema": { "type": "string" },
"required": true
}
]
},
"outputsSchema": {
"files": [],
"datasets": [],
"dataInputs": [
{
"id": "answer",
"label": "Answer",
"schema": { "type": "string" },
"required": true
}
]
},
"configuration": {
"files": [],
"datasets": [],
"dataInputs": []
}
}'

Triggering with Filter Values:

When triggering the pipeline, provide the tenant ID that will be substituted into the filter:

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/trigger \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"input": {
"datasets": {
"sales": "9Yh1BRvQhmFUYrSDZTcRz"
},
"dataInputs": {
"tenantId": "tenant-xyz",
"question": "What were the total sales last month?"
}
}
}'

The query engine will automatically filter the sales table to only include rows where:

  • tenant_id = 'tenant-xyz'
  • deleted_at IS NULL
  • status IN ('completed', 'pending')

Security Considerations:

  • Filters are applied server-side in the query engine using DataFusion’s DataFrame API
  • Filters cannot be bypassed via SQL injection or query manipulation
  • All filter values are parameterized and SQL-escaped before execution
  • The LLM-generated SQL queries the filtered view transparently
  • Filter logic is validated when creating the pipeline

The language_model handler can automatically enrich LLM context by fetching content from the web. This feature enables pipelines to work with real-time information and user-provided URLs without manual data ingestion.

Available features:

  • URL Scraping - Automatically detect and scrape URLs in user inputs
  • Web Search - Generate search queries and fetch results via Tavily API

Both features inject content before LLM generation, making web data available in the prompt context. These features are only available in the language_model handler - they are not supported in streaming_language_model or available in code_agent_language_model (which has different tool-based implementations).

URL scraping automatically detects HTTP/HTTPS URLs in user inputs, fetches their content, and injects it into the LLM prompt context. This feature is enabled by default.

Key Features:

  • Automatic URL detection in user inputs via regex pattern /https?:\/\/[^\s]+/gi
  • Pre-LLM content injection (scrape completes before model runs)
  • JavaScript rendering support for dynamic content
  • Multiple content formats: HTML, Markdown, Text

Configuration:

URL scraping is configured in configuration.handlerOptions.urlScraping:

{
"urlScraping": {
"enabled": true, // default: true
"contentType": "markdown", // "html" | "markdown" | "text", default: "markdown"
"renderJs": true // default: true, enables JavaScript rendering
}
}

Configuration Options:

FieldTypeDefaultDescription
enabledbooleantrueEnable automatic URL detection and scraping
contentTypestring"markdown"Content format: "html", "markdown", or "text"
renderJsbooleantrueEnable JavaScript rendering to capture dynamic content

Creating a Pipeline with URL Scraping:

Create pipeline with URL scraping

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"teamId": "ZkoDMyjZZsXo4VAO_nJLk",
"name": "Article Summarizer",
"handlerType": "language_model",
"inputsSchema": {
"files": [],
"datasets": [],
"dataInputs": [
{
"id": "article_url",
"label": "Article URL",
"description": "URL of the article to summarize",
"schema": { "type": "string" },
"required": true
}
]
},
"outputsSchema": {
"files": [],
"datasets": [],
"dataInputs": [
{
"id": "summary",
"label": "Summary",
"description": "Generated article summary",
"schema": { "type": "string" },
"required": true
}
]
},
"configuration": {
"files": [],
"datasets": [],
"dataInputs": [],
"handlerOptions": {
"urlScraping": {
"enabled": true,
"contentType": "markdown",
"renderJs": true
}
}
}
}'

Triggering the Pipeline:

Trigger pipeline with URL in input

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/trigger \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"input": {
"dataInputs": {
"article_url": "https://example.com/article"
}
}
}'

The URL https://example.com/article will be automatically detected, scraped, and its content will be injected into the LLM context before generation.

Use Cases:

  • Summarizing news articles or blog posts shared by users
  • Extracting structured data from documentation pages
  • Analyzing competitor websites or product pages
  • Processing user-submitted links in support tickets or feedback forms

Web search enables pipelines to fetch real-time information from the web using the Tavily search API. The system uses a two-phase approach: first, an LLM generates focused search queries based on the task, then Tavily executes those searches and formats the results.

Key Features:

  • Two-phase execution: LLM generates queries → Tavily executes searches
  • Pre-LLM result injection (search completes before main model runs)
  • Configurable query count (1-5 queries) and results per query (1-20 results)
  • Search depth control: basic (1 credit) or advanced (2 credits)
  • Optional AI-generated answer summaries from Tavily

Configuration:

Web search is configured in configuration.handlerOptions.webSearch. Web search is disabled by default - you must explicitly set enabled: true.

{
"webSearch": {
"enabled": true, // required to enable web search
"maxQueries": 3, // 1-5, default: 3
"maxResultsPerQuery": 5, // 1-20, default: 5
"searchDepth": "basic", // "basic" | "advanced", default: "basic"
"includeAnswer": false // default: false
}
}

Configuration Options:

FieldTypeDefaultDescription
enabledboolean(required)Enable automatic web search query generation and execution
maxQueriesnumber3Maximum number of queries to generate (1-5)
maxResultsPerQuerynumber5Maximum results per query (1-20)
searchDepthstring"basic"Search depth: "basic" (1 credit) or "advanced" (2 credits)
includeAnswerbooleanfalseInclude AI-generated answer summary from Tavily

How It Works:

  1. LLM analyzes the task and user input to generate 1-5 focused search queries
  2. Tavily API executes searches in parallel
  3. Results are formatted with titles, URLs, snippets, and relevance scores
  4. Formatted search results are injected into the LLM context
  5. Main LLM generation runs with search results available in the prompt

Creating a Pipeline with Web Search:

Create pipeline with web search

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"teamId": "ZkoDMyjZZsXo4VAO_nJLk",
"name": "Market Research Assistant",
"handlerType": "language_model",
"inputsSchema": {
"files": [],
"datasets": [],
"dataInputs": [
{
"id": "research_query",
"label": "Research Query",
"description": "What would you like to research?",
"schema": { "type": "string" },
"required": true
}
]
},
"outputsSchema": {
"files": [],
"datasets": [],
"dataInputs": [
{
"id": "analysis",
"label": "Analysis",
"description": "Research analysis and findings",
"schema": { "type": "string" },
"required": true
}
]
},
"configuration": {
"files": [],
"datasets": [],
"dataInputs": [],
"handlerOptions": {
"webSearch": {
"enabled": true,
"maxQueries": 3,
"maxResultsPerQuery": 5,
"searchDepth": "basic",
"includeAnswer": false
}
}
}
}'

Use Cases:

  • Real-time market research and competitive analysis
  • Current events analysis and news monitoring
  • Fact-checking claims with recent information
  • Gathering background information for decision-making

When multiple context sources are configured (URL scraping, web search, files, datasets), they are assembled in a specific priority order optimized for LLM attention patterns:

1. Scraped URLs ← Most specific (user explicitly provided URLs)
2. Web Search Results ← Current/dynamic information from the web
3. File Context ← Uploaded documents (semantic or full retrieval)
4. Dataset Context ← Structured data from SQL queries
↓ Most general

This order ensures the most relevant and specific information appears first in the LLM context, where attention mechanisms are most effective.

Combined Context Example:

Pipeline with multiple context sources

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"teamId": "ZkoDMyjZZsXo4VAO_nJLk",
"name": "Comprehensive Research Assistant",
"handlerType": "language_model",
"inputsSchema": {
"files": [
{
"id": "background_docs",
"label": "Background Documents",
"required": false,
"multiple": true,
"contextRetrievalMode": "semantic"
}
],
"datasets": [
{
"id": "historical_data",
"label": "Historical Data",
"required": false,
"multiple": false
}
],
"dataInputs": [
{
"id": "question",
"label": "Research Question",
"schema": { "type": "string" },
"required": true
}
]
},
"outputsSchema": {
"files": [],
"datasets": [],
"dataInputs": [
{
"id": "comprehensive_analysis",
"label": "Comprehensive Analysis",
"schema": { "type": "string" },
"required": true
}
]
},
"configuration": {
"files": [],
"datasets": [],
"dataInputs": [],
"handlerOptions": {
"urlScraping": {
"enabled": true,
"contentType": "markdown"
},
"webSearch": {
"enabled": true,
"maxQueries": 2,
"maxResultsPerQuery": 5
}
}
}
}'

Different handler types have different approaches to web context retrieval:

Featurelanguage_modelstreaming_language_modelcode_agent_language_model
URL Scraping✅ Automatic pre-LLM❌ Not supported✅ Via web_scrape tool
Web Search✅ Automatic pre-LLM❌ Not supported✅ Via web_search tool
ImplementationConfiguration-drivenN/ATool-based (agent decides)
LatencyHigher (pre-processing)N/AVariable (agent reasoning)

Key Differences:

  • language_model: Features run automatically before every LLM call when enabled in configuration. Context is always fetched, even if not needed for the specific query.

  • streaming_language_model: Does not support web context features. This handler has a fixed output schema and no pre-processing phase.

  • code_agent_language_model: Python code agent can call web_search and web_scrape tools dynamically during execution. The agent decides when and how to use these tools based on the task, but this adds reasoning overhead and latency.

The streaming_language_model handler enables real-time token-level streaming with inline citations. Unlike the standard language_model handler, it has a fixed output schema that cannot be modified.

Key Features:

  • Real-time streaming - Tokens are delivered as they’re generated
  • Inline citations - References appear as human-readable chunk IDs like [swift_falcon], [blue_river] in the output
  • Fixed output schema - Always returns { content: string }
  • Channel-based delivery - Uses server-sent events (SSE) for streaming

Output Schema (Fixed):

The output schema is automatically set and cannot be modified:

{
"files": [],
"datasets": [],
"dataInputs": [
{
"id": "content",
"label": "Response Content",
"description": "Generated text response with inline citation markers",
"schema": {
"type": "object",
"properties": {
"content": { "type": "string" }
},
"required": ["content"],
"additionalProperties": false
},
"required": true
}
]
}

Creating a Streaming Pipeline:

Create streaming language model pipeline

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"teamId": "ZkoDMyjZZsXo4VAO_nJLk",
"name": "Streaming Document Q&A",
"description": "Answer questions about documents with real-time streaming",
"handlerType": "streaming_language_model",
"inputsSchema": {
"files": [
{
"id": "document",
"label": "Document",
"description": "Document to analyze",
"required": true,
"multiple": false,
"contextRetrievalMode": "full"
}
],
"datasets": [],
"dataInputs": [
{
"id": "question",
"label": "Question",
"description": "Question to answer",
"schema": { "type": "string" },
"required": true
}
]
},
"configuration": {
"files": [],
"datasets": [],
"dataInputs": [
{
"id": "systemPrompt",
"label": "System Prompt",
"type": "string",
"value": {
"value": "You are a helpful assistant that answers questions based on the provided documents. Reference sources using [chunk_id] markers after factual claims."
}
}
]
}
}'

Note: The outputsSchema field is optional when creating a streaming_language_model pipeline. The output schema is always overridden with the fixed format shown above.

Consuming Streaming Output:

Streaming pipelines require a three-step flow to consume the output:

1. Trigger the pipeline:

When you trigger a streaming pipeline, the response contains only the executionId:

{
"executionId": "GkR8I6rHBms3W4Qfa2-FN",
"status": "pending",
"createdAt": "2024-01-15T10:30:00Z"
}

2. Poll for the streaming channel ID:

Poll the execution endpoint until handlerOutput.streamingChannelId becomes available:

// Trigger the pipeline
const { executionId } = await fetch(
`https://api.catalyzed.ai/pipelines/${pipelineId}/trigger`,
{
method: "POST",
headers: {
Authorization: `Bearer ${apiToken}`,
"Content-Type": "application/json",
},
body: JSON.stringify({ input: { /* ... */ } }),
}
).then(r => r.json());
// Poll for channel ID
let channelId = null;
while (!channelId) {
const execution = await fetch(
`https://api.catalyzed.ai/pipeline-executions/${executionId}`,
{ headers: { Authorization: `Bearer ${apiToken}` } }
).then(r => r.json());
channelId = execution.handlerOutput?.streamingChannelId ?? null;
if (!channelId) {
// Channel not ready yet, wait before polling again
await new Promise(resolve => setTimeout(resolve, 500));
}
}

Once the worker processes the execution, the response will include the channel ID:

{
"executionId": "GkR8I6rHBms3W4Qfa2-FN",
"status": "running",
"handlerOutput": {
"handlerType": "streaming_language_model",
"streamingChannelId": "ch_xyz123"
}
}

3. Subscribe to the SSE stream:

Once you have the streamingChannelId, subscribe using Server-Sent Events (SSE):

const eventSource = new EventSource(
`https://api.catalyzed.ai/channels/${channelId}/stream`,
{
headers: {
Authorization: `Bearer ${apiToken}`,
},
}
);
eventSource.addEventListener("channel-message", (event) => {
const message = JSON.parse(event.data);
switch (message.dataType) {
case "streaming.start":
console.log("Streaming started:", message.data);
break;
case "conversation.assistant.delta":
// Token chunk received
const { delta } = message.data;
displayText += delta;
break;
case "streaming.done":
// Processing complete
const { content } = message.data;
console.log("Final content:", content);
eventSource.close();
break;
case "streaming.error":
console.error("Streaming error:", message.data);
eventSource.close();
break;
}
});

Example Output:

The streaming handler returns content with inline citation markers using human-readable chunk IDs:

{
"content": "The Q4 revenue was $1.65M [swift_falcon] which exceeded the Q3 figure of $1.42M [blue_river]."
}

Citation Markers:

Each marker uses a human-readable [adjective_noun] format (e.g., [swift_falcon], [blue_river], [calm_peak]). These chunk IDs are:

  • Deterministic - Same content always generates the same ID
  • Unique - Each chunk gets a different identifier (collisions are resolved with _2, _3 suffixes)
  • Human-readable - Easier to reference and debug than numeric markers

The execution also includes outputCitations that map markers to source chunks:

{
"outputCitations": [
{
"outputPointer": "/content",
"outputCharStart": 25,
"outputCharEnd": 28,
"citations": [
{
"type": "file_chunk",
"fileChunkId": "chunk_abc123"
}
]
},
{
"outputPointer": "/content",
"outputCharStart": 67,
"outputCharEnd": 70,
"citations": [
{
"type": "file_chunk",
"fileChunkId": "chunk_def456"
}
]
}
]
}

The embedding handler generates vector embeddings from text arrays. Unlike LLM handlers, it has fixed input and output schemas that cannot be modified.

Key Features:

  • Fixed schemas - Both input and output schemas are predefined
  • Batch processing - Generate embeddings for multiple texts at once
  • Model selection - Choose embedding model per-execution via input
  • No streaming - Results returned when complete

Input Schema (Fixed):

FieldTypeRequiredDescription
textsstring[]YesArray of text strings to generate embeddings for
modelstringNoEmbedding model (default: BAAI/bge-small-en-v1.5)
normalizebooleanNoWhether to L2 normalize embeddings (default: true)

Output Schema (Fixed):

FieldTypeDescription
embeddingsnumber[][]Array of embedding vectors
dimensionsnumberDimension of each embedding vector (e.g., 384)

Creating an Embedding Pipeline:

Create embedding pipeline

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"teamId": "ZkoDMyjZZsXo4VAO_nJLk",
"name": "Text Embedding Pipeline",
"description": "Generate embeddings for text arrays",
"handlerType": "embedding"
}'

Triggering an Embedding Pipeline:

Trigger embedding pipeline

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/trigger \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"input": {
"dataInputs": {
"texts": [
"Machine learning is a subset of artificial intelligence.",
"Natural language processing helps computers understand text.",
"Deep learning uses neural networks with multiple layers."
]
}
}
}'

Using a Custom Model:

Specify a different embedding model by including the model field:

{
"input": {
"dataInputs": {
"texts": ["Your text here"],
"model": "BAAI/bge-large-en-v1.5"
}
}
}

Example Output:

Once the execution completes, the output contains embedding vectors:

{
"executionId": "GkR8I6rHBms3W4Qfa2-FN",
"status": "succeeded",
"output": {
"embeddings": [
[0.0123, -0.0456, 0.0789, ...],
[0.0234, -0.0567, 0.0890, ...],
[0.0345, -0.0678, 0.0901, ...]
],
"dimensions": 384
},
"outputCitations": []
}

Use Cases:

  • Semantic search - Generate embeddings for search queries and documents
  • Document similarity - Compare documents by embedding distance
  • Clustering - Group similar texts using embedding vectors
  • RAG preprocessing - Generate embeddings for knowledge base indexing

The text_classification handler classifies text into predefined categories using HuggingFace classification models. Like the embedding handler, it has fixed input and output schemas that cannot be modified.

Key Features:

  • Fixed schemas - Both input and output schemas are predefined
  • Confidence scores - Each predicted label includes a confidence score
  • Model selection - Choose a HuggingFace classification model per-execution
  • No streaming - Results returned when complete

Input Schema (Fixed):

FieldTypeRequiredDescription
textstringYesInput text for classification (sentiment, topics, intent, etc.)
modelstringNoHuggingFace model ID (default: distilbert-base-uncased-finetuned-sst-2-english)

Output Schema (Fixed):

FieldTypeDescription
predictions{label: string, score: number}[]Array of label predictions with confidence scores

Creating a Text Classification Pipeline:

Create text classification pipeline

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"teamId": "ZkoDMyjZZsXo4VAO_nJLk",
"name": "Sentiment Classifier",
"description": "Classify text sentiment",
"handlerType": "text_classification"
}'

Triggering a Text Classification Pipeline:

Trigger text classification pipeline

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/trigger \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"input": {
"dataInputs": {
"text": "The product quality is excellent and shipping was fast. Very satisfied with my purchase."
}
}
}'

Using a Custom Model:

Specify a different classification model by including the model field:

{
"input": {
"dataInputs": {
"text": "Your text here",
"model": "cardiffnlp/twitter-roberta-base-sentiment-latest"
}
}
}

Example Output:

Once the execution completes, the output contains classification predictions:

{
"executionId": "GkR8I6rHBms3W4Qfa2-FN",
"status": "succeeded",
"output": {
"predictions": [
{ "label": "POSITIVE", "score": 0.9987 },
{ "label": "NEGATIVE", "score": 0.0013 }
]
},
"outputCitations": []
}

Use Cases:

  • Sentiment analysis - Determine positive/negative sentiment in reviews, feedback, or social media
  • Content moderation - Flag inappropriate or harmful content
  • Topic classification - Route documents to the correct category
  • Intent detection - Classify user messages by intent for chatbot routing

The zero_shot_classification handler classifies text into user-defined categories without requiring any training data. It uses natural language inference (NLI) models to determine how well each candidate label describes the input text. Like other NLP handlers, it has fixed input and output schemas.

Key Features:

  • Fixed schemas - Both input and output schemas are predefined
  • Dynamic labels - Define your own classification categories at runtime
  • Hypothesis template - Customize the NLI hypothesis for better accuracy
  • No streaming - Results returned when complete

Input Schema (Fixed):

FieldTypeRequiredDescription
textstringYesInput text to classify into user-defined categories
candidate_labelsstring[]YesArray of possible classification labels (e.g., ["positive", "negative", "neutral"])
hypothesis_templatestringNoTemplate for NLI hypothesis (default: "This text is about {}.")
modelstringNoHuggingFace model ID (default: facebook/bart-large-mnli)

Output Schema (Fixed):

FieldTypeDescription
labelsstring[]Classification labels sorted by confidence (highest first)
scoresnumber[]Confidence scores corresponding to each label

Creating a Zero-Shot Classification Pipeline:

Create zero-shot classification pipeline

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"teamId": "ZkoDMyjZZsXo4VAO_nJLk",
"name": "Topic Classifier",
"description": "Classify text into custom categories",
"handlerType": "zero_shot_classification"
}'

Triggering a Zero-Shot Classification Pipeline:

Trigger zero-shot classification pipeline

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/trigger \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"input": {
"dataInputs": {
"text": "The Federal Reserve announced a 25 basis point increase in interest rates, citing persistent inflation concerns.",
"candidate_labels": ["finance", "politics", "technology", "healthcare", "sports"]
}
}
}'

Customizing the Hypothesis Template:

The hypothesis template controls how the model frames the classification. The {} placeholder is replaced with each candidate label:

{
"input": {
"dataInputs": {
"text": "I need to return this product, it arrived damaged.",
"candidate_labels": ["refund request", "product inquiry", "shipping issue", "complaint"],
"hypothesis_template": "The customer intent is {}."
}
}
}

Example Output:

Once the execution completes, the output contains labels and scores sorted by confidence:

{
"executionId": "GkR8I6rHBms3W4Qfa2-FN",
"status": "succeeded",
"output": {
"labels": ["finance", "politics", "technology", "healthcare", "sports"],
"scores": [0.8234, 0.1245, 0.0312, 0.0118, 0.0091]
},
"outputCitations": []
}

Use Cases:

  • Dynamic topic routing - Route documents to teams or workflows based on custom categories
  • Content tagging - Tag content with custom taxonomies that change over time
  • Intent detection - Classify user messages without training a dedicated model
  • Multi-domain classification - Apply different label sets to the same pipeline for different use cases

The ner (Named Entity Recognition) handler extracts named entities from text, identifying people, organizations, locations, and other entity types along with their positions and confidence scores. Like other NLP handlers, it has fixed input and output schemas.

Key Features:

  • Fixed schemas - Both input and output schemas are predefined
  • Entity position tracking - Each entity includes start/end character positions
  • Confidence scores - Each entity includes a confidence score
  • No streaming - Results returned when complete

Input Schema (Fixed):

FieldTypeRequiredDescription
textstringYesInput text to extract named entities from
modelstringNoHuggingFace model ID (default: dslim/bert-base-NER)

Output Schema (Fixed):

FieldTypeDescription
entities{text: string, label: string, start: number, end: number, score: number}[]Array of named entities with labels, positions, and confidence scores

Common Entity Labels:

LabelDescription
PERPerson names
ORGOrganizations
LOCLocations
MISCMiscellaneous entities

Creating a NER Pipeline:

Create NER pipeline

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"teamId": "ZkoDMyjZZsXo4VAO_nJLk",
"name": "Entity Extractor",
"description": "Extract named entities from text",
"handlerType": "ner"
}'

Triggering a NER Pipeline:

Trigger NER pipeline

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/trigger \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"input": {
"dataInputs": {
"text": "Pfizer announced a partnership with BioNTech in New York to develop mRNA vaccines, with CEO Albert Bourla leading the initiative."
}
}
}'

Example Output:

Once the execution completes, the output contains extracted entities with their types and positions:

{
"executionId": "GkR8I6rHBms3W4Qfa2-FN",
"status": "succeeded",
"output": {
"entities": [
{ "text": "Pfizer", "label": "ORG", "start": 0, "end": 6, "score": 0.9991 },
{ "text": "BioNTech", "label": "ORG", "start": 35, "end": 43, "score": 0.9987 },
{ "text": "New York", "label": "LOC", "start": 47, "end": 55, "score": 0.9994 },
{ "text": "Albert Bourla", "label": "PER", "start": 101, "end": 114, "score": 0.9982 }
]
},
"outputCitations": []
}

Use Cases:

  • Information extraction - Pull structured entities from unstructured documents
  • Document indexing - Index documents by the entities they mention
  • Entity linking - Identify entities for linking to knowledge bases or databases
  • Compliance analysis - Extract person and organization names from regulatory filings

The rerank handler scores and reranks documents by relevance to a query using cross-encoder models. This is useful for improving search result quality by re-scoring candidate documents against a specific query. Like other NLP handlers, it has fixed input and output schemas.

Key Features:

  • Fixed schemas - Both input and output schemas are predefined
  • Top-N filtering - Optionally return only the most relevant results
  • Model selection - Choose a cross-encoder model per-execution
  • No streaming - Results returned when complete

Input Schema (Fixed):

FieldTypeRequiredDescription
querystringYesThe search query to score documents against
documents{id: string, text: string}[]YesArray of candidate documents with id and text fields
top_nintegerNoReturn only the top N results by relevance score
modelstringNoCross-encoder model ID (default: cross-encoder/ms-marco-MiniLM-L-6-v2)

Output Schema (Fixed):

FieldTypeDescription
results{id: string, relevance_score: number}[]Array of document IDs with relevance scores, in original document order (or top N subset)

Creating a Rerank Pipeline:

Create rerank pipeline

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"teamId": "ZkoDMyjZZsXo4VAO_nJLk",
"name": "Search Reranker",
"description": "Rerank search results by relevance",
"handlerType": "rerank"
}'

Triggering a Rerank Pipeline:

Trigger rerank pipeline

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/trigger \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"input": {
"dataInputs": {
"query": "What are the side effects of aspirin?",
"documents": [
{ "id": "doc-1", "text": "Aspirin is commonly used as a pain reliever and anti-inflammatory medication." },
{ "id": "doc-2", "text": "Common side effects of aspirin include stomach upset, heartburn, and increased bleeding risk." },
{ "id": "doc-3", "text": "The history of aspirin dates back to ancient Greece where willow bark was used medicinally." },
{ "id": "doc-4", "text": "Aspirin may cause allergic reactions in some individuals, including skin rashes and breathing difficulties." }
],
"top_n": 3
}
}
}'

Using a Different Model:

Specify a different cross-encoder model by including the model field. Supported models include BAAI/bge-reranker-base and BAAI/bge-reranker-v2-m3:

{
"input": {
"dataInputs": {
"query": "Your search query",
"documents": [{ "id": "doc-1", "text": "Document text" }],
"model": "BAAI/bge-reranker-v2-m3"
}
}
}

Example Output:

Once the execution completes, the output contains document IDs with relevance scores:

{
"executionId": "GkR8I6rHBms3W4Qfa2-FN",
"status": "succeeded",
"output": {
"results": [
{ "id": "doc-2", "relevance_score": 0.9821 },
{ "id": "doc-4", "relevance_score": 0.8934 },
{ "id": "doc-1", "relevance_score": 0.4215 }
]
},
"outputCitations": []
}

Use Cases:

  • Search quality improvement - Re-score keyword or vector search results for better relevance ranking
  • RAG retrieval - Rerank retrieved passages before feeding them to a language model
  • Two-stage retrieval - Use fast vector search for recall, then cross-encoder reranking for precision
  • Document filtering - Use top_n to keep only the most relevant documents from a large candidate set

The code_interpreter handler executes user-provided Python code in a sandboxed AST-walking interpreter with optional output schema validation via final_answer(). It supports state persistence across executions via signed interpreter state blobs, enabling REPL-like workflows where users build up computation across multiple calls.

Key Features:

  • Customizable schemas - Define your own input and output fields
  • Sandboxed execution - Code runs in a restricted interpreter (no filesystem, network, or subprocess access)
  • final_answer() validation - Structured output is validated against the pipeline’s output schema
  • Optional final_answer() - Code can run for side effects only (print, state mutation) without producing structured output
  • State persistence - Interpreter state (variables, functions, imports) is serialized and returned as a signed blob that can be passed to subsequent executions
  • Variable injection - Additional data inputs are injected as Python variables available to the code
  • Built-in modules - Standard library modules like math, json, re, datetime are available

Input Schema (Customizable):

The code input is always required. Additional inputs are injected as Python variables:

FieldTypeRequiredDescription
codestringYesPython code to execute
interpreter_statestringNoSigned state blob from a previous execution
(custom)(any)(varies)Additional fields are injected as variables (see type mapping below)

Variable Type Mapping:

Custom data inputs are automatically converted from JSON to Python types:

JSON Schema TypeJSON ValuePython TypeExample
string"hello"strgreeting = "hello"
integer42intcount = 42
number3.14floatrate = 3.14
booleantrueboolenabled = True
nullnullNoneTypevalue = None
array[1, 2, 3]listitems = [1, 2, 3]
object{"a": 1}dictconfig = {"a": 1}

No manual JSON parsing is needed — variables are available as native Python types in your code.

Output Schema (Customizable):

Output fields correspond to final_answer() keyword arguments. The handler also manages these reserved slots:

FieldTypeDescription
(custom)(any)Fields populated by final_answer(field=value)
stdoutstringCaptured print output
interpreter_statestringSigned state blob for the next execution

Creating a Code Interpreter Pipeline:

Create code interpreter pipeline

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"teamId": "ZkoDMyjZZsXo4VAO_nJLk",
"name": "Data Processor",
"description": "Execute Python code with validated output",
"handlerType": "code_interpreter",
"inputsSchema": {
"files": [],
"datasets": [],
"dataInputs": [
{ "id": "code", "label": "Code", "schema": { "type": "string" }, "required": true },
{ "id": "interpreter_state", "label": "State", "schema": { "type": "string" }, "required": false }
]
},
"outputsSchema": {
"files": [],
"datasets": [],
"dataInputs": [
{ "id": "result", "label": "Result", "schema": { "type": "number" }, "required": false },
{ "id": "stdout", "label": "Stdout", "schema": { "type": "string" }, "required": false },
{ "id": "interpreter_state", "label": "State", "schema": { "type": "string" }, "required": false }
]
}
}'

Triggering a Code Interpreter Pipeline:

Trigger code interpreter pipeline

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/trigger \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"input": {
"dataInputs": {
"code": "import math\nresult = round(math.pi * 100, 2)\nprint(f\"Computed: {result}\")\nfinal_answer(result=result)"
}
}
}'

Example Output:

{
"executionId": "GkR8I6rHBms3W4Qfa2-FN",
"status": "succeeded",
"output": {
"result": 314.16,
"stdout": "Computed: 314.16\n",
"interpreter_state": "dG9rZW4..."
},
"outputCitations": []
}

Resuming with State:

Pass the interpreter_state from a previous execution to continue where you left off. Variables, function definitions, and imported modules are restored:

{
"input": {
"dataInputs": {
"code": "final_answer(result=result * 2)",
"interpreter_state": "dG9rZW4..."
}
}
}

Error Handling:

When code execution fails, the execution status is failed and errorMessage contains the Python error description (e.g., "name 'queries' is not defined", "division by zero"). The error message is passed through directly from the interpreter — it is not a generic message.

Error TypeerrorMessage Containsstdout in Outputinterpreter_state in Output
Syntax errorPython SyntaxError with line numberNot available (code did not execute)Not available
Runtime error (NameError, TypeError, ZeroDivisionError, etc.)Python error descriptionPartial stdout up to the point of failureAvailable (state captured before error check)
final_answer() validation errorSchema validation detailsFull stdoutAvailable

Example Failed Execution:

{
"executionId": "WLJagIwiQF0gvMu8jaSbx",
"status": "failed",
"errorMessage": "name 'queries' is not defined",
"output": {
"stdout": "processing step 1...\n",
"interpreter_state": "eyJzaWdu..."
}
}

Use Cases:

  • Data transformation - Clean, reshape, or aggregate data with Python logic
  • Computational workflows - Run multi-step calculations across iterative executions
  • REPL environments - Build interactive computing sessions with state persistence
  • Custom scoring - Implement domain-specific scoring or validation logic in Python
  • Prototyping - Quickly test data processing logic before building dedicated pipelines
FieldTypeDescription
pipelineIdstringUnique identifier
teamIdstringTeam that owns this pipeline
namestringHuman-readable name
descriptionstringOptional description
handlerTypestringType of pipeline handler
activeConfigurationIdstringID of the currently active configuration version
statusstringactive or archived
configurationobjectHandler-specific settings
inputsSchemaobjectSchema for input data
outputsSchemaobjectSchema for output data
createdAttimestampCreation time
updatedAttimestampLast modification time

Pipelines can generate arrays of questions or other structured outputs. Here’s an example of a question generation pipeline:

Create question generation pipeline

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"teamId": "ZkoDMyjZZsXo4VAO_nJLk",
"name": "Statement Question Generator",
"description": "Generates recommended questions from financial statements",
"handlerType": "language_model",
"inputsSchema": {
"files": [
{
"id": "statements",
"label": "Financial Statements",
"description": "Bank, credit card, or account statements",
"required": true,
"multiple": true,
"contextRetrievalMode": "full"
}
],
"datasets": [],
"dataInputs": []
},
"outputsSchema": {
"files": [],
"datasets": [],
"dataInputs": [
{
"id": "questions",
"label": "Recommended Questions",
"description": "List of recommended questions users can ask",
"schema": {
"type": "array",
"items": { "type": "string" }
},
"required": true
}
]
},
"configuration": {
"files": [],
"datasets": [],
"dataInputs": [
{
"id": "instructions",
"label": "System Instructions",
"type": "string",
"value": {
"value": "You are a financial assistant. Analyze the provided financial statements and generate relevant questions that users might ask about them. Return the questions as a JSON array of strings."
}
}
]
}
}'

When triggering this pipeline, provide files as an array (since multiple: true):

{
"input": {
"files": {
"statements": ["fileId1", "fileId2"]
}
}
}

The output will contain an array of questions:

{
"output": {
"questions": [
"What is the current balance?",
"What was the total amount of new purchases?",
"When is the payment due date?"
]
}
}

When a file slot has multiple: true, you can provide multiple file IDs as an array when triggering the pipeline:

{
"input": {
"files": {
"documents": ["fileId1", "fileId2", "fileId3"]
}
}
}

For single file slots (multiple: false), provide a single file ID:

{
"input": {
"files": {
"document": "fileId1"
}
}
}

Pipeline configurations are versioned snapshots of a pipeline’s inputsSchema, outputsSchema, and configuration. Each time you update a pipeline’s configuration via the configurations endpoint, a new version is created, allowing you to track changes over time and rollback to previous versions.

Create a new configuration version (becomes the active configuration automatically):

Create configuration version

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/configurations \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"inputsSchema": {
"files": [],
"datasets": [],
"dataInputs": [
{
"id": "query",
"label": "Query",
"schema": { "type": "string" },
"required": true
}
]
},
"outputsSchema": {
"files": [],
"datasets": [],
"dataInputs": [
{
"id": "answer",
"label": "Answer",
"schema": { "type": "string" },
"required": true
}
]
},
"configuration": {
"files": [],
"datasets": [],
"dataInputs": []
},
"changeReason": "Updated output schema to include answer field"
}'

Response:

{
"pipelineConfigurationId": "cfg_xyz789",
"pipelineId": "EMbMEFLyUWEgvnhMWXVVa",
"inputsSchema": { ... },
"outputsSchema": { ... },
"configuration": { ... },
"createdAt": "2024-01-15T10:30:00Z",
"createdBy": "usr_abc123",
"changeReason": "Updated output schema to include answer field"
}

View all configuration versions for a pipeline (newest first by default):

List configuration versions

Terminal window
curl "https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/configurations?page=1&pageSize=10" \
-H "Authorization: Bearer $API_TOKEN"

Query Parameters:

ParameterTypeDefaultDescription
pagenumber1Page number for pagination (starts at 1)
pageSizenumber20Number of results per page (1-100)
orderDirectionstringdescSort direction: asc (oldest first) or desc (newest first)

Response:

{
"configurations": [
{
"pipelineConfigurationId": "cfg_xyz789",
"pipelineId": "EMbMEFLyUWEgvnhMWXVVa",
"inputsSchema": { ... },
"outputsSchema": { ... },
"configuration": { ... },
"createdAt": "2024-01-15T10:30:00Z",
"createdBy": "usr_abc123",
"changeReason": "Updated output schema"
}
],
"total": 5,
"page": 1,
"pageSize": 10
}

Retrieve a specific configuration version by ID:

Get configuration version

Terminal window
curl "https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/configurations/cfg_xyz789" \
-H "Authorization: Bearer $API_TOKEN"

To rollback to a previous configuration version, update the pipeline’s activeConfigurationId:

Terminal window
curl -X PUT https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"activeConfigurationId": "cfg_abc123"
}'

This sets the specified configuration as the active one without creating a new version.

FieldTypeDescription
pipelineConfigurationIdstringUnique identifier for this configuration version
pipelineIdstringID of the parent pipeline
inputsSchemaobjectPipeline inputs schema at this version
outputsSchemaobjectPipeline outputs schema at this version
configurationobjectPipeline configuration at this version
createdAttimestampWhen this version was created
createdBystringUser who created this version
changeReasonstringOptional description of why this version was created

See the Pipeline Configurations API for complete endpoint documentation.

See the Pipelines API for complete endpoint documentation.