Ingesting Data
Write data to your dataset tables using the /dataset-tables/{tableId}/rows endpoint. Before ingesting data, your table must exist with a defined schema—see the Schema Management guide to create tables.
Supported Formats
Section titled “Supported Formats”Catalyzed accepts data in two formats:
| Format | Content-Type | Use Case |
|---|---|---|
| JSON | application/json | Simple integration, readable, array of row objects |
| Arrow IPC | application/vnd.apache.arrow.stream | High performance, typed data, binary streaming |
Write Modes
Section titled “Write Modes”Choose a write mode based on how you want to modify the table:
| Mode | Description | Primary Key Required |
|---|---|---|
append | Insert new rows without checking for duplicates (fastest) | No |
upsert | Update existing rows by primary key, insert new rows | Yes |
overwrite | Replace all existing data in the table | No |
delete | Remove rows matching the provided primary keys | Yes |
Specify the mode using the mode query parameter: ?mode=append, ?mode=upsert, etc.
Appending Rows
Section titled “Appending Rows”The simplest way to add data. Append mode inserts rows without duplicate checking, making it the fastest option:
Append rows to a table
curl -X POST "https://api.catalyzed.ai/dataset-tables/KzaMsfA0LSw_Ld0KyaXIS/rows?mode=append" \ -H "Authorization: Bearer $API_TOKEN" \ -H "Content-Type: application/json" \ -d '[ {"id": "1", "name": "Alice", "email": "[email protected]"}, {"id": "2", "name": "Bob", "email": "[email protected]"} ]'const response = await fetch( "https://api.catalyzed.ai/dataset-tables/KzaMsfA0LSw_Ld0KyaXIS/rows?mode=append", { method: "POST", headers: { Authorization: `Bearer ${apiToken}`, "Content-Type": "application/json", }, body: JSON.stringify([ ]), });const result = await response.json();response = requests.post( "https://api.catalyzed.ai/dataset-tables/KzaMsfA0LSw_Ld0KyaXIS/rows?mode=append", headers={"Authorization": f"Bearer {api_token}"}, json=[ ])result = response.json()Upserting Rows
Section titled “Upserting Rows”Upsert mode updates existing rows by primary key and inserts new rows. The table must have a primary key defined:
Upsert rows (update or insert)
curl -X POST "https://api.catalyzed.ai/dataset-tables/KzaMsfA0LSw_Ld0KyaXIS/rows?mode=upsert" \ -H "Authorization: Bearer $API_TOKEN" \ -H "Content-Type: application/json" \ -d '[ {"id": "1", "name": "Alice Updated", "email": "[email protected]"}, {"id": "3", "name": "Charlie", "email": "[email protected]"} ]'const response = await fetch( "https://api.catalyzed.ai/dataset-tables/KzaMsfA0LSw_Ld0KyaXIS/rows?mode=upsert", { method: "POST", headers: { Authorization: `Bearer ${apiToken}`, "Content-Type": "application/json", }, body: JSON.stringify([ ]), });const result = await response.json();response = requests.post( "https://api.catalyzed.ai/dataset-tables/KzaMsfA0LSw_Ld0KyaXIS/rows?mode=upsert", headers={"Authorization": f"Bearer {api_token}"}, json=[ ])result = response.json()In this example, if row with id="1" exists, it gets updated. Row with id="3" is inserted as new.
Overwriting Data
Section titled “Overwriting Data”Overwrite mode replaces all existing data in the table with the new rows:
Overwrite entire table
curl -X POST "https://api.catalyzed.ai/dataset-tables/KzaMsfA0LSw_Ld0KyaXIS/rows?mode=overwrite" \ -H "Authorization: Bearer $API_TOKEN" \ -H "Content-Type: application/json" \ -d '[ {"id": "10", "name": "New User", "email": "[email protected]"} ]'const response = await fetch( "https://api.catalyzed.ai/dataset-tables/KzaMsfA0LSw_Ld0KyaXIS/rows?mode=overwrite", { method: "POST", headers: { Authorization: `Bearer ${apiToken}`, "Content-Type": "application/json", }, body: JSON.stringify([ ]), });const result = await response.json();response = requests.post( "https://api.catalyzed.ai/dataset-tables/KzaMsfA0LSw_Ld0KyaXIS/rows?mode=overwrite", headers={"Authorization": f"Bearer {api_token}"}, json=[ ])result = response.json()Deleting Rows
Section titled “Deleting Rows”Delete mode removes rows matching the provided primary keys.
Delete rows by primary key
curl -X POST "https://api.catalyzed.ai/dataset-tables/KzaMsfA0LSw_Ld0KyaXIS/rows?mode=delete" \ -H "Authorization: Bearer $API_TOKEN" \ -H "Content-Type: application/json" \ -d '["1", "2", "3"]'const response = await fetch( "https://api.catalyzed.ai/dataset-tables/KzaMsfA0LSw_Ld0KyaXIS/rows?mode=delete", { method: "POST", headers: { Authorization: `Bearer ${apiToken}`, "Content-Type": "application/json", }, body: JSON.stringify(["1", "2", "3"]), });response = requests.post( "https://api.catalyzed.ai/dataset-tables/KzaMsfA0LSw_Ld0KyaXIS/rows?mode=delete", headers={"Authorization": f"Bearer {api_token}"}, json=["1", "2", "3"])Using Arrow IPC
Section titled “Using Arrow IPC”For large datasets or when working with typed columnar data, use Apache Arrow IPC format. This is more efficient than JSON for bulk operations:
Ingest using Arrow IPC
# Arrow IPC is binary format - use TypeScript/Python libraries# cURL example omitted (not practical for binary data)import { tableToIPC, tableFromArrays } from "apache-arrow";
// Create Arrow table from columnar dataconst table = tableFromArrays({ id: ["1", "2", "3"], name: ["Alice", "Bob", "Charlie"],});
// Serialize to Arrow IPC formatconst arrowData = tableToIPC(table);
const response = await fetch( "https://api.catalyzed.ai/dataset-tables/KzaMsfA0LSw_Ld0KyaXIS/rows?mode=append", { method: "POST", headers: { Authorization: `Bearer ${apiToken}`, "Content-Type": "application/vnd.apache.arrow.stream", }, body: arrowData, });const result = await response.json();import pyarrow as paimport requests
# Create Arrow table from columnar datatable = pa.table({ "id": ["1", "2", "3"], "name": ["Alice", "Bob", "Charlie"],})
# Serialize to Arrow IPC formatsink = pa.BufferOutputStream()with pa.ipc.new_stream(sink, table.schema) as writer: writer.write_table(table)arrow_data = sink.getvalue().to_pybytes()
response = requests.post( "https://api.catalyzed.ai/dataset-tables/KzaMsfA0LSw_Ld0KyaXIS/rows?mode=append", headers={ "Authorization": f"Bearer {api_token}", "Content-Type": "application/vnd.apache.arrow.stream" }, data=arrow_data)result = response.json()When to use Arrow IPC:
- Datasets larger than 10MB
- You already have data in columnar format
- Type safety is critical (no JSON string/number ambiguity)
- Maximum performance is required
Response Format
Section titled “Response Format”All ingestion requests return metrics about the operation:
{ "rows_affected": 100, "rows_inserted": 95, "rows_updated": 5, "rows_deleted": 0, "dataset_version": 42, "duration_ms": 150, "usage": { "bytes_read": 1024, "bytes_written": 2048 }}| Field | Description |
|---|---|
rows_affected | Total rows modified |
rows_inserted | New rows added (append, upsert) |
rows_updated | Existing rows changed (upsert) |
rows_deleted | Rows removed (delete, overwrite) |
dataset_version | New version number after operation |
duration_ms | Time taken for the operation |
usage | Storage I/O metrics for billing |
Query Parameters
Section titled “Query Parameters”Control ingestion behavior with query parameters:
| Parameter | Type | Description |
|---|---|---|
mode | append | upsert | overwrite | delete | Write operation mode (required) |
skip_validation | boolean | Skip schema validation for faster writes (optional) |
Skip Validation
Section titled “Skip Validation”By default, incoming data is validated against the table schema. For trusted data sources, skip validation for faster writes:
curl -X POST "https://api.catalyzed.ai/dataset-tables/KzaMsfA0LSw_Ld0KyaXIS/rows?mode=append&skip_validation=true" \ -H "Authorization: Bearer $API_TOKEN" \ -H "Content-Type: application/json" \ -d '[{"id": "1", "name": "Alice"}]'Best Practices
Section titled “Best Practices”1. Batch Your Writes
Section titled “1. Batch Your Writes”Send up to 100MB per request for optimal performance:
// Good: Batch 1000 rowsconst batch = rows.slice(0, 1000);await ingestRows(tableId, batch);
// Avoid: Single row per requestfor (const row of rows) { await ingestRows(tableId, [row]); // Too many HTTP requests}2. Choose the Right Format
Section titled “2. Choose the Right Format”// For small batches (<1000 rows): JSON is simplerconst response = await fetch(url, { headers: { "Content-Type": "application/json" }, body: JSON.stringify(rows),});
// For large batches (>10,000 rows): Arrow IPC is fasterconst arrowData = tableToIPC(table);const response = await fetch(url, { headers: { "Content-Type": "application/vnd.apache.arrow.stream" }, body: arrowData,});3. Use Idempotency Keys
Section titled “3. Use Idempotency Keys”For batch imports or critical data, always use idempotency keys:
const key = `import-${dataSource}-${timestamp}-${batchId}`;await fetch(`${url}?mode=append&idempotency_key=${key}`, { method: "POST", body: JSON.stringify(rows),});4. Prefer Append for Initial Loads
Section titled “4. Prefer Append for Initial Loads”When loading data for the first time, use append mode—it’s the fastest:
# Initial load: use appendcurl -X POST ".../rows?mode=append" -d '[...]'
# Subsequent updates: use upsertcurl -X POST ".../rows?mode=upsert" -d '[...]'5. Monitor Dataset Versions
Section titled “5. Monitor Dataset Versions”Track dataset_version in responses to detect concurrent writes:
const { dataset_version } = await ingestRows(tableId, rows);console.log(`Data written at version ${dataset_version}`);Error Handling
Section titled “Error Handling”Common errors and solutions:
| Error Code | Cause | Solution |
|---|---|---|
TABLE_NOT_FOUND | Table ID doesn’t exist | Verify table ID and team access |
INVALID_BODY | Request body is not an array | Send JSON array of row objects |
EMPTY_BODY | Array is empty | Include at least one row |
TABLE_NOT_REGISTERED | Table not linked to data engine | Contact support (rare) |
SCHEMA_VALIDATION_ERROR | Data doesn’t match schema | Check field types and names |
Example: Handling Validation Errors
Section titled “Example: Handling Validation Errors”try { const response = await fetch(url, { method: "POST", headers: { Authorization: `Bearer ${apiToken}`, "Content-Type": "application/json", }, body: JSON.stringify(rows), });
if (!response.ok) { const error = await response.json();
if (error.code === "SCHEMA_VALIDATION_ERROR") { console.error("Schema mismatch:", error.message); // Log problematic rows or field types }
throw new Error(`Ingestion failed: ${error.message}`); }
const result = await response.json(); console.log(`Ingested ${result.rows_affected} rows`);} catch (err) { console.error("Failed to ingest data:", err);}Next Steps
Section titled “Next Steps”- Querying Data - Read and analyze your ingested data with SQL
- Schema Management - Create tables and evolve schemas safely
- Tables - Learn about table schemas, indexes, and data types