Skip to content

Pipelines

Pipelines are automated workflows that process data, run AI tasks, or transform information. Define a pipeline once, then trigger it on demand with different inputs.

A pipeline consists of:

  • Handler Type - The type of processing (e.g., language_model)
  • Input Schema - What data the pipeline accepts (files, datasets, dataInputs)
  • Output Schema - What data the pipeline produces (files, datasets, dataInputs)
  • Configuration - Settings and parameters

Input and output schemas define the structure of data that flows through a pipeline. Each schema contains three types of slots:

  • Files - References to uploaded files (PDFs, CSVs, documents)
  • Datasets - References to datasets (collections of tables)
  • Data Inputs - Structured JSON data (strings, numbers, arrays, objects)

Each slot has:

  • id - Unique identifier used when triggering the pipeline
  • label - Human-readable name
  • description - Optional description
  • required - Whether the slot must be provided
  • multiple - Whether multiple values are allowed (for files and datasets)
  • schema - JSON Schema defining the data structure (for dataInputs)

Create a pipeline

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"teamId": "ZkoDMyjZZsXo4VAO_nJLk",
"name": "Document Summarizer",
"description": "Summarize uploaded documents using AI",
"handlerType": "language_model",
"inputsSchema": {
"files": [
{
"id": "document",
"label": "Document",
"description": "Document to summarize",
"required": true,
"multiple": false,
"contextRetrievalMode": "full"
}
],
"datasets": [],
"dataInputs": []
},
"outputsSchema": {
"files": [],
"datasets": [],
"dataInputs": [
{
"id": "summary",
"label": "Summary",
"description": "Generated summary",
"schema": { "type": "string" },
"required": true
}
]
},
"configuration": {
"files": [],
"datasets": [],
"dataInputs": []
}
}'

Response:

{
"pipelineId": "EMbMEFLyUWEgvnhMWXVVa",
"teamId": "ZkoDMyjZZsXo4VAO_nJLk",
"name": "Document Summarizer",
"description": "Summarize uploaded documents using AI",
"handlerType": "language_model",
"activeConfigurationId": "cfg_abc123",
"status": "active",
"inputsSchema": {
"files": [
{
"id": "document",
"label": "Document",
"description": "Document to summarize",
"required": true,
"multiple": false,
"contextRetrievalMode": "full"
}
],
"datasets": [],
"dataInputs": []
},
"outputsSchema": {
"files": [],
"datasets": [],
"dataInputs": [
{
"id": "summary",
"label": "Summary",
"description": "Generated summary",
"schema": { "type": "string" },
"required": true
}
]
},
"configuration": { ... },
"createdAt": "2024-01-15T10:30:00Z",
"updatedAt": "2024-01-15T10:30:00Z",
"createdBy": "usr_abc123"
}
StatusDescription
activePipeline can be triggered
archivedPipeline is disabled and cannot be triggered

List pipelines

Terminal window
curl "https://api.catalyzed.ai/pipelines?teamIds=ZkoDMyjZZsXo4VAO_nJLk" \
-H "Authorization: Bearer $API_TOKEN"

Get pipeline details

Terminal window
curl https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa \
-H "Authorization: Bearer $API_TOKEN"

Start a pipeline execution with input data:

Trigger a pipeline

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/trigger \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"input": {
"files": {
"document": "LvrGb8UaJk_IjmzaxuMAb"
},
"dataInputs": {
"query": "Summarize the key findings from this document"
}
}
}'

Response:

{
"executionId": "GkR8I6rHBms3W4Qfa2-FN",
"status": "pending",
"createdAt": "2024-01-15T10:30:00Z"
}

See Executions for monitoring execution progress.

The PUT endpoint only updates pipeline metadata (name, description) and allows setting the active configuration version. To update the actual configuration content (inputsSchema, outputsSchema, configuration), use the Pipeline Configurations endpoint.

Update pipeline

Terminal window
curl -X PUT https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "Document Summarizer v2",
"description": "Updated description"
}'

Archived pipelines cannot be triggered but retain their execution history:

Archive pipeline

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/archive \
-H "Authorization: Bearer $API_TOKEN"
Terminal window
curl -X POST https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/reactivate \
-H "Authorization: Bearer $API_TOKEN"

Delete pipeline

Terminal window
curl -X DELETE https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa \
-H "Authorization: Bearer $API_TOKEN"

Pipelines can reference:

  • Files - Uploaded documents for processing
  • Datasets - Tables for data retrieval
  • Data Inputs - Specific table columns for context
TypeDescriptionWeb Context
language_modelAI-powered text generation with context retrieval✅ URL scraping & web search
streaming_language_modelReal-time streaming LLM with inline citations and fixed output schema❌ Not supported
code_agent_language_modelCode agent with Python code generation and bidirectional tool callbacks✅ Via tools (web_search, web_scrape)
embeddingGenerate vector embeddings from text arrays with fixed input/output schemasN/A

Configuration contains optional pre-filled values for files, datasets, and dataInputs:

{
"files": [],
"datasets": [],
"dataInputs": [
{
"id": "instructions",
"label": "System Instructions",
"type": "string",
"value": { "value": "You are a helpful assistant..." }
}
]
}

Configuration vs Runtime Input:

  • configuration.dataInputs - Pre-filled values that become part of the pipeline’s instructions (e.g., system prompts, settings)
  • configuration.files / configuration.datasets - Pre-filled file/dataset references
  • Runtime input - Actual data provided when triggering the pipeline

When triggering a pipeline, you provide runtime values in the input object. Configuration values are baked into the pipeline definition.

Dataset slots in inputsSchema can include optional row-level filters to restrict which rows are queried. Filters are defined as an array of predicates that are combined with AND logic and applied server-side in the query engine.

Key Benefits:

  • Row-level security - Enforce data isolation between tenants or users
  • Performance optimization - Reduce query scope by filtering at the source
  • Dynamic filtering - Use runtime values via $ref to reference execution inputs
  • Server-side enforcement - Filters cannot be bypassed by SQL injection

Filter Structure:

Each dataset slot can include a filter array with predicates:

{
"datasets": [
{
"id": "sales",
"label": "Sales Data",
"required": true,
"multiple": false,
"filter": [
{
"field": "tenant_id",
"op": "eq",
"value": { "$ref": "input.dataInputs.tenantId" }
},
{
"field": "deleted_at",
"op": "is_null"
}
]
}
],
"dataInputs": [
{
"id": "tenantId",
"label": "Tenant ID",
"schema": { "type": "string" },
"required": true
}
]
}

Supported Operators:

OperatorDescriptionExample
eqEqual to{"field": "status", "op": "eq", "value": "active"}
neqNot equal to{"field": "type", "op": "neq", "value": "draft"}
gtGreater than{"field": "amount", "op": "gt", "value": 100}
gteGreater than or equal{"field": "score", "op": "gte", "value": 80}
ltLess than{"field": "age", "op": "lt", "value": 18}
lteLess than or equal{"field": "count", "op": "lte", "value": 10}
inIn list{"field": "category", "op": "in", "value": ["A", "B"]}
not_inNot in list{"field": "status", "op": "not_in", "value": ["deleted", "archived"]}
likePattern match{"field": "email", "op": "like", "value": "%@example.com"}
is_nullIs NULL{"field": "deleted_at", "op": "is_null"}
is_not_nullIs not NULL{"field": "email", "op": "is_not_null"}

Dynamic Values with $ref:

Use $ref to reference runtime values from dataInputs:

{
"field": "tenant_id",
"op": "eq",
"value": { "$ref": "input.dataInputs.tenantId" }
}

The $ref path must follow the pattern input.dataInputs.<id> where <id> matches a data input slot.

Complete Example - Multi-tenant Sales Pipeline:

Create pipeline with dataset filtering

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"teamId": "ZkoDMyjZZsXo4VAO_nJLk",
"name": "Sales Analysis",
"description": "Analyze sales data with tenant isolation",
"handlerType": "language_model",
"inputsSchema": {
"files": [],
"datasets": [
{
"id": "sales",
"label": "Sales Data",
"description": "Sales transactions dataset",
"required": true,
"multiple": false,
"filter": [
{
"field": "tenant_id",
"op": "eq",
"value": { "$ref": "input.dataInputs.tenantId" }
},
{
"field": "deleted_at",
"op": "is_null"
},
{
"field": "status",
"op": "in",
"value": ["completed", "pending"]
}
]
}
],
"dataInputs": [
{
"id": "tenantId",
"label": "Tenant ID",
"description": "Tenant identifier for data isolation",
"schema": { "type": "string" },
"required": true
},
{
"id": "question",
"label": "Question",
"description": "Analysis question",
"schema": { "type": "string" },
"required": true
}
]
},
"outputsSchema": {
"files": [],
"datasets": [],
"dataInputs": [
{
"id": "answer",
"label": "Answer",
"schema": { "type": "string" },
"required": true
}
]
},
"configuration": {
"files": [],
"datasets": [],
"dataInputs": []
}
}'

Triggering with Filter Values:

When triggering the pipeline, provide the tenant ID that will be substituted into the filter:

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/trigger \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"input": {
"datasets": {
"sales": "9Yh1BRvQhmFUYrSDZTcRz"
},
"dataInputs": {
"tenantId": "tenant-xyz",
"question": "What were the total sales last month?"
}
}
}'

The query engine will automatically filter the sales table to only include rows where:

  • tenant_id = 'tenant-xyz'
  • deleted_at IS NULL
  • status IN ('completed', 'pending')

Security Considerations:

  • Filters are applied server-side in the query engine using DataFusion’s DataFrame API
  • Filters cannot be bypassed via SQL injection or query manipulation
  • All filter values are parameterized and SQL-escaped before execution
  • The LLM-generated SQL queries the filtered view transparently
  • Filter logic is validated when creating the pipeline

The language_model handler can automatically enrich LLM context by fetching content from the web. This feature enables pipelines to work with real-time information and user-provided URLs without manual data ingestion.

Available features:

  • URL Scraping - Automatically detect and scrape URLs in user inputs
  • Web Search - Generate search queries and fetch results via Tavily API

Both features inject content before LLM generation, making web data available in the prompt context. These features are only available in the language_model handler - they are not supported in streaming_language_model or available in code_agent_language_model (which has different tool-based implementations).

URL scraping automatically detects HTTP/HTTPS URLs in user inputs, fetches their content, and injects it into the LLM prompt context. This feature is enabled by default.

Key Features:

  • Automatic URL detection in user inputs via regex pattern /https?:\/\/[^\s]+/gi
  • Pre-LLM content injection (scrape completes before model runs)
  • JavaScript rendering support for dynamic content
  • Multiple content formats: HTML, Markdown, Text

Configuration:

URL scraping is configured in configuration.handlerOptions.urlScraping:

{
"urlScraping": {
"enabled": true, // default: true
"contentType": "markdown", // "html" | "markdown" | "text", default: "markdown"
"renderJs": true // default: true, enables JavaScript rendering
}
}

Configuration Options:

FieldTypeDefaultDescription
enabledbooleantrueEnable automatic URL detection and scraping
contentTypestring"markdown"Content format: "html", "markdown", or "text"
renderJsbooleantrueEnable JavaScript rendering to capture dynamic content

Creating a Pipeline with URL Scraping:

Create pipeline with URL scraping

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"teamId": "ZkoDMyjZZsXo4VAO_nJLk",
"name": "Article Summarizer",
"handlerType": "language_model",
"inputsSchema": {
"files": [],
"datasets": [],
"dataInputs": [
{
"id": "article_url",
"label": "Article URL",
"description": "URL of the article to summarize",
"schema": { "type": "string" },
"required": true
}
]
},
"outputsSchema": {
"files": [],
"datasets": [],
"dataInputs": [
{
"id": "summary",
"label": "Summary",
"description": "Generated article summary",
"schema": { "type": "string" },
"required": true
}
]
},
"configuration": {
"files": [],
"datasets": [],
"dataInputs": [],
"handlerOptions": {
"urlScraping": {
"enabled": true,
"contentType": "markdown",
"renderJs": true
}
}
}
}'

Triggering the Pipeline:

Trigger pipeline with URL in input

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/trigger \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"input": {
"dataInputs": {
"article_url": "https://example.com/article"
}
}
}'

The URL https://example.com/article will be automatically detected, scraped, and its content will be injected into the LLM context before generation.

Use Cases:

  • Summarizing news articles or blog posts shared by users
  • Extracting structured data from documentation pages
  • Analyzing competitor websites or product pages
  • Processing user-submitted links in support tickets or feedback forms

Web search enables pipelines to fetch real-time information from the web using the Tavily search API. The system uses a two-phase approach: first, an LLM generates focused search queries based on the task, then Tavily executes those searches and formats the results.

Key Features:

  • Two-phase execution: LLM generates queries → Tavily executes searches
  • Pre-LLM result injection (search completes before main model runs)
  • Configurable query count (1-5 queries) and results per query (1-20 results)
  • Search depth control: basic (1 credit) or advanced (2 credits)
  • Optional AI-generated answer summaries from Tavily

Configuration:

Web search is configured in configuration.handlerOptions.webSearch. Web search is disabled by default - you must explicitly set enabled: true.

{
"webSearch": {
"enabled": true, // required to enable web search
"maxQueries": 3, // 1-5, default: 3
"maxResultsPerQuery": 5, // 1-20, default: 5
"searchDepth": "basic", // "basic" | "advanced", default: "basic"
"includeAnswer": false // default: false
}
}

Configuration Options:

FieldTypeDefaultDescription
enabledboolean(required)Enable automatic web search query generation and execution
maxQueriesnumber3Maximum number of queries to generate (1-5)
maxResultsPerQuerynumber5Maximum results per query (1-20)
searchDepthstring"basic"Search depth: "basic" (1 credit) or "advanced" (2 credits)
includeAnswerbooleanfalseInclude AI-generated answer summary from Tavily

How It Works:

  1. LLM analyzes the task and user input to generate 1-5 focused search queries
  2. Tavily API executes searches in parallel
  3. Results are formatted with titles, URLs, snippets, and relevance scores
  4. Formatted search results are injected into the LLM context
  5. Main LLM generation runs with search results available in the prompt

Creating a Pipeline with Web Search:

Create pipeline with web search

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"teamId": "ZkoDMyjZZsXo4VAO_nJLk",
"name": "Market Research Assistant",
"handlerType": "language_model",
"inputsSchema": {
"files": [],
"datasets": [],
"dataInputs": [
{
"id": "research_query",
"label": "Research Query",
"description": "What would you like to research?",
"schema": { "type": "string" },
"required": true
}
]
},
"outputsSchema": {
"files": [],
"datasets": [],
"dataInputs": [
{
"id": "analysis",
"label": "Analysis",
"description": "Research analysis and findings",
"schema": { "type": "string" },
"required": true
}
]
},
"configuration": {
"files": [],
"datasets": [],
"dataInputs": [],
"handlerOptions": {
"webSearch": {
"enabled": true,
"maxQueries": 3,
"maxResultsPerQuery": 5,
"searchDepth": "basic",
"includeAnswer": false
}
}
}
}'

Use Cases:

  • Real-time market research and competitive analysis
  • Current events analysis and news monitoring
  • Fact-checking claims with recent information
  • Gathering background information for decision-making

When multiple context sources are configured (URL scraping, web search, files, datasets), they are assembled in a specific priority order optimized for LLM attention patterns:

1. Scraped URLs ← Most specific (user explicitly provided URLs)
2. Web Search Results ← Current/dynamic information from the web
3. File Context ← Uploaded documents (semantic or full retrieval)
4. Dataset Context ← Structured data from SQL queries
↓ Most general

This order ensures the most relevant and specific information appears first in the LLM context, where attention mechanisms are most effective.

Combined Context Example:

Pipeline with multiple context sources

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"teamId": "ZkoDMyjZZsXo4VAO_nJLk",
"name": "Comprehensive Research Assistant",
"handlerType": "language_model",
"inputsSchema": {
"files": [
{
"id": "background_docs",
"label": "Background Documents",
"required": false,
"multiple": true,
"contextRetrievalMode": "semantic"
}
],
"datasets": [
{
"id": "historical_data",
"label": "Historical Data",
"required": false,
"multiple": false
}
],
"dataInputs": [
{
"id": "question",
"label": "Research Question",
"schema": { "type": "string" },
"required": true
}
]
},
"outputsSchema": {
"files": [],
"datasets": [],
"dataInputs": [
{
"id": "comprehensive_analysis",
"label": "Comprehensive Analysis",
"schema": { "type": "string" },
"required": true
}
]
},
"configuration": {
"files": [],
"datasets": [],
"dataInputs": [],
"handlerOptions": {
"urlScraping": {
"enabled": true,
"contentType": "markdown"
},
"webSearch": {
"enabled": true,
"maxQueries": 2,
"maxResultsPerQuery": 5
}
}
}
}'

Different handler types have different approaches to web context retrieval:

Featurelanguage_modelstreaming_language_modelcode_agent_language_model
URL Scraping✅ Automatic pre-LLM❌ Not supported✅ Via web_scrape tool
Web Search✅ Automatic pre-LLM❌ Not supported✅ Via web_search tool
ImplementationConfiguration-drivenN/ATool-based (agent decides)
LatencyHigher (pre-processing)N/AVariable (agent reasoning)

Key Differences:

  • language_model: Features run automatically before every LLM call when enabled in configuration. Context is always fetched, even if not needed for the specific query.

  • streaming_language_model: Does not support web context features. This handler has a fixed output schema and no pre-processing phase.

  • code_agent_language_model: Python code agent can call web_search and web_scrape tools dynamically during execution. The agent decides when and how to use these tools based on the task, but this adds reasoning overhead and latency.

The streaming_language_model handler enables real-time token-level streaming with inline citations. Unlike the standard language_model handler, it has a fixed output schema that cannot be modified.

Key Features:

  • Real-time streaming - Tokens are delivered as they’re generated
  • Inline citations - References appear as human-readable chunk IDs like [swift_falcon], [blue_river] in the output
  • Fixed output schema - Always returns { content: string }
  • Channel-based delivery - Uses server-sent events (SSE) for streaming

Output Schema (Fixed):

The output schema is automatically set and cannot be modified:

{
"files": [],
"datasets": [],
"dataInputs": [
{
"id": "content",
"label": "Response Content",
"description": "Generated text response with inline citation markers",
"schema": {
"type": "object",
"properties": {
"content": { "type": "string" }
},
"required": ["content"],
"additionalProperties": false
},
"required": true
}
]
}

Creating a Streaming Pipeline:

Create streaming language model pipeline

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"teamId": "ZkoDMyjZZsXo4VAO_nJLk",
"name": "Streaming Document Q&A",
"description": "Answer questions about documents with real-time streaming",
"handlerType": "streaming_language_model",
"inputsSchema": {
"files": [
{
"id": "document",
"label": "Document",
"description": "Document to analyze",
"required": true,
"multiple": false,
"contextRetrievalMode": "full"
}
],
"datasets": [],
"dataInputs": [
{
"id": "question",
"label": "Question",
"description": "Question to answer",
"schema": { "type": "string" },
"required": true
}
]
},
"configuration": {
"files": [],
"datasets": [],
"dataInputs": [
{
"id": "systemPrompt",
"label": "System Prompt",
"type": "string",
"value": {
"value": "You are a helpful assistant that answers questions based on the provided documents. Reference sources using [chunk_id] markers after factual claims."
}
}
]
}
}'

Note: You cannot specify outputsSchema when creating a streaming_language_model pipeline. The output schema is automatically set to the fixed format shown above.

Consuming Streaming Output:

Streaming pipelines require a three-step flow to consume the output:

1. Trigger the pipeline:

When you trigger a streaming pipeline, the response contains only the executionId:

{
"executionId": "GkR8I6rHBms3W4Qfa2-FN",
"status": "pending",
"createdAt": "2024-01-15T10:30:00Z"
}

2. Poll for the streaming channel ID:

Poll the execution endpoint until handlerOutput.streamingChannelId becomes available:

// Trigger the pipeline
const { executionId } = await fetch(
`https://api.catalyzed.ai/pipelines/${pipelineId}/trigger`,
{
method: "POST",
headers: {
Authorization: `Bearer ${apiToken}`,
"Content-Type": "application/json",
},
body: JSON.stringify({ input: { /* ... */ } }),
}
).then(r => r.json());
// Poll for channel ID
let channelId = null;
while (!channelId) {
const execution = await fetch(
`https://api.catalyzed.ai/pipeline-executions/${executionId}`,
{ headers: { Authorization: `Bearer ${apiToken}` } }
).then(r => r.json());
channelId = execution.handlerOutput?.streamingChannelId ?? null;
if (!channelId) {
// Channel not ready yet, wait before polling again
await new Promise(resolve => setTimeout(resolve, 500));
}
}

Once the worker processes the execution, the response will include the channel ID:

{
"executionId": "GkR8I6rHBms3W4Qfa2-FN",
"status": "running",
"handlerOutput": {
"handlerType": "streaming_language_model",
"streamingChannelId": "ch_xyz123"
}
}

3. Subscribe to the SSE stream:

Once you have the streamingChannelId, subscribe using Server-Sent Events (SSE):

const eventSource = new EventSource(
`https://api.catalyzed.ai/channels/${channelId}/stream`,
{
headers: {
Authorization: `Bearer ${apiToken}`,
},
}
);
eventSource.addEventListener("channel-message", (event) => {
const message = JSON.parse(event.data);
switch (message.dataType) {
case "streaming.start":
console.log("Streaming started:", message.data);
break;
case "conversation.assistant.delta":
// Token chunk received
const { delta } = message.data;
displayText += delta;
break;
case "streaming.done":
// Processing complete
const { content } = message.data;
console.log("Final content:", content);
eventSource.close();
break;
case "streaming.error":
console.error("Streaming error:", message.data);
eventSource.close();
break;
}
});

Example Output:

The streaming handler returns content with inline citation markers using human-readable chunk IDs:

{
"content": "The Q4 revenue was $1.65M [swift_falcon] which exceeded the Q3 figure of $1.42M [blue_river]."
}

Citation Markers:

Each marker uses a human-readable [adjective_noun] format (e.g., [swift_falcon], [blue_river], [calm_peak]). These chunk IDs are:

  • Deterministic - Same content always generates the same ID
  • Unique - Each chunk gets a different identifier (collisions are resolved with _2, _3 suffixes)
  • Human-readable - Easier to reference and debug than numeric markers

The execution also includes outputCitations that map markers to source chunks:

{
"outputCitations": [
{
"outputPointer": "/content",
"outputCharStart": 25,
"outputCharEnd": 28,
"citations": [
{
"type": "file_chunk",
"fileChunkId": "chunk_abc123"
}
]
},
{
"outputPointer": "/content",
"outputCharStart": 67,
"outputCharEnd": 70,
"citations": [
{
"type": "file_chunk",
"fileChunkId": "chunk_def456"
}
]
}
]
}

The embedding handler generates vector embeddings from text arrays. Unlike LLM handlers, it has fixed input and output schemas that cannot be modified.

Key Features:

  • Fixed schemas - Both input and output schemas are predefined
  • Batch processing - Generate embeddings for multiple texts at once
  • Model selection - Choose embedding model per-execution via input
  • No streaming - Results returned when complete

Input Schema (Fixed):

FieldTypeRequiredDescription
textsstring[]YesArray of text strings to generate embeddings for
modelstringNoEmbedding model (default: BAAI/bge-small-en-v1.5)

Output Schema (Fixed):

FieldTypeDescription
embeddingsnumber[][]Array of embedding vectors
dimensionsnumberDimension of each embedding vector (e.g., 384)

Creating an Embedding Pipeline:

Create embedding pipeline

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"teamId": "ZkoDMyjZZsXo4VAO_nJLk",
"name": "Text Embedding Pipeline",
"description": "Generate embeddings for text arrays",
"handlerType": "embedding"
}'

Triggering an Embedding Pipeline:

Trigger embedding pipeline

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/trigger \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"input": {
"dataInputs": {
"texts": [
"Machine learning is a subset of artificial intelligence.",
"Natural language processing helps computers understand text.",
"Deep learning uses neural networks with multiple layers."
]
}
}
}'

Using a Custom Model:

Specify a different embedding model by including the model field:

{
"input": {
"dataInputs": {
"texts": ["Your text here"],
"model": "BAAI/bge-large-en-v1.5"
}
}
}

Example Output:

Once the execution completes, the output contains embedding vectors:

{
"executionId": "GkR8I6rHBms3W4Qfa2-FN",
"status": "succeeded",
"output": {
"embeddings": [
[0.0123, -0.0456, 0.0789, ...],
[0.0234, -0.0567, 0.0890, ...],
[0.0345, -0.0678, 0.0901, ...]
],
"dimensions": 384
},
"outputCitations": []
}

Use Cases:

  • Semantic search - Generate embeddings for search queries and documents
  • Document similarity - Compare documents by embedding distance
  • Clustering - Group similar texts using embedding vectors
  • RAG preprocessing - Generate embeddings for knowledge base indexing
FieldTypeDescription
pipelineIdstringUnique identifier
teamIdstringTeam that owns this pipeline
namestringHuman-readable name
descriptionstringOptional description
handlerTypestringType of pipeline handler
activeConfigurationIdstringID of the currently active configuration version
statusstringactive or archived
configurationobjectHandler-specific settings
inputsSchemaobjectSchema for input data
outputsSchemaobjectSchema for output data
createdAttimestampCreation time
updatedAttimestampLast modification time

Measure pipeline quality by running evaluations against example sets:

  1. Create an example set - Define ground truth input/output pairs
  2. Run an evaluation - Compare pipeline outputs against expected outputs
  3. Analyze results - Identify areas for improvement

See Evaluations and the Evaluation Workflow Guide for details.

Use signals and synthesis to improve pipeline performance:

  1. Capture signals - Record expert feedback on pipeline outputs
  2. Run synthesis - Generate AI-proposed improvements
  3. Apply changes - Implement improvements to configuration

See Signals and Synthesis Runs for details.

Pipelines can generate arrays of questions or other structured outputs. Here’s an example of a question generation pipeline:

Create question generation pipeline

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"teamId": "ZkoDMyjZZsXo4VAO_nJLk",
"name": "Statement Question Generator",
"description": "Generates recommended questions from financial statements",
"handlerType": "language_model",
"inputsSchema": {
"files": [
{
"id": "statements",
"label": "Financial Statements",
"description": "Bank, credit card, or account statements",
"required": true,
"multiple": true,
"contextRetrievalMode": "full"
}
],
"datasets": [],
"dataInputs": []
},
"outputsSchema": {
"files": [],
"datasets": [],
"dataInputs": [
{
"id": "questions",
"label": "Recommended Questions",
"description": "List of recommended questions users can ask",
"schema": {
"type": "array",
"items": { "type": "string" }
},
"required": true
}
]
},
"configuration": {
"files": [],
"datasets": [],
"dataInputs": [
{
"id": "instructions",
"label": "System Instructions",
"type": "string",
"value": {
"value": "You are a financial assistant. Analyze the provided financial statements and generate relevant questions that users might ask about them. Return the questions as a JSON array of strings."
}
}
]
}
}'

When triggering this pipeline, provide files as an array (since multiple: true):

{
"input": {
"files": {
"statements": ["fileId1", "fileId2"]
}
}
}

The output will contain an array of questions:

{
"output": {
"questions": [
"What is the current balance?",
"What was the total amount of new purchases?",
"When is the payment due date?"
]
}
}

When a file slot has multiple: true, you can provide multiple file IDs as an array when triggering the pipeline:

{
"input": {
"files": {
"documents": ["fileId1", "fileId2", "fileId3"]
}
}
}

For single file slots (multiple: false), provide a single file ID:

{
"input": {
"files": {
"document": "fileId1"
}
}
}

Pipeline configurations are versioned snapshots of a pipeline’s inputsSchema, outputsSchema, and configuration. Each time you update a pipeline’s configuration via the configurations endpoint, a new version is created, allowing you to track changes over time and rollback to previous versions.

Create a new configuration version (becomes the active configuration automatically):

Create configuration version

Terminal window
curl -X POST https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/configurations \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"inputsSchema": {
"files": [],
"datasets": [],
"dataInputs": [
{
"id": "query",
"label": "Query",
"schema": { "type": "string" },
"required": true
}
]
},
"outputsSchema": {
"files": [],
"datasets": [],
"dataInputs": [
{
"id": "answer",
"label": "Answer",
"schema": { "type": "string" },
"required": true
}
]
},
"configuration": {
"files": [],
"datasets": [],
"dataInputs": []
},
"changeReason": "Updated output schema to include answer field"
}'

Response:

{
"pipelineConfigurationId": "cfg_xyz789",
"pipelineId": "EMbMEFLyUWEgvnhMWXVVa",
"inputsSchema": { ... },
"outputsSchema": { ... },
"configuration": { ... },
"createdAt": "2024-01-15T10:30:00Z",
"createdBy": "usr_abc123",
"changeReason": "Updated output schema to include answer field"
}

View all configuration versions for a pipeline (newest first by default):

List configuration versions

Terminal window
curl "https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/configurations?page=1&pageSize=10" \
-H "Authorization: Bearer $API_TOKEN"

Query Parameters:

ParameterTypeDefaultDescription
pagenumber1Page number for pagination (starts at 1)
pageSizenumber20Number of results per page (1-100)
orderDirectionstringdescSort direction: asc (oldest first) or desc (newest first)

Response:

{
"configurations": [
{
"pipelineConfigurationId": "cfg_xyz789",
"pipelineId": "EMbMEFLyUWEgvnhMWXVVa",
"inputsSchema": { ... },
"outputsSchema": { ... },
"configuration": { ... },
"createdAt": "2024-01-15T10:30:00Z",
"createdBy": "usr_abc123",
"changeReason": "Updated output schema"
}
],
"total": 5,
"page": 1,
"pageSize": 10
}

Retrieve a specific configuration version by ID:

Get configuration version

Terminal window
curl "https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa/configurations/cfg_xyz789" \
-H "Authorization: Bearer $API_TOKEN"

To rollback to a previous configuration version, update the pipeline’s activeConfigurationId:

Terminal window
curl -X PUT https://api.catalyzed.ai/pipelines/EMbMEFLyUWEgvnhMWXVVa \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"activeConfigurationId": "cfg_abc123"
}'

This sets the specified configuration as the active one without creating a new version.

FieldTypeDescription
pipelineConfigurationIdstringUnique identifier for this configuration version
pipelineIdstringID of the parent pipeline
inputsSchemaobjectPipeline inputs schema at this version
outputsSchemaobjectPipeline outputs schema at this version
configurationobjectPipeline configuration at this version
createdAttimestampWhen this version was created
createdBystringUser who created this version
changeReasonstringOptional description of why this version was created

See the Pipeline Configurations API for complete endpoint documentation.

See the Pipelines API for complete endpoint documentation.