CREATE STREAM CAPTURE
Description
Creates a new Stream Capture.
Note that the SQL statement does not end with ;
Syntax
CREATE STREAM CAPTURE <name>
SOURCE = <string>
LAKE = <string>
DATABASE = <string>
TABLE_NAME = <string>
WRITE_MODE = { 'IMMUTABLE' | 'MUTABLE' }
CLUSTER = <string>
[ PERFORMANCE_PROFILE = { 'BALANCED' | 'FASTEST_READ' | 'FASTEST_WRITE' } ]
[ SOURCE_DATA_SCHEMA = <string> ]
[ CATALOGS = ( ['<CATALOG_NAME>'], ... ) ]
[ TRANSFORMATIONS = ( ['<TRANSFORMATION_NAME>'], ... ) ]
[ RECORD_KEY_FIELDS = ( [<string>, <string>] ) ]
[ PARTITION_KEY_FIELDS = ( [ field = <string> partition_type = { '' | 'DATE_STRING' | 'EPOCH_MILLIS' | 'EPOCH_MICROS' } input_format = { '' | 'yyyy-mm-dd' | 'yyyy-mm' ...} output_format = { '' | 'yyyy-mm-dd' | 'yyyy-mm' ...} ], ... ) ]
[ PRECOMBINE_KEY_FIELD = <string> ]
[ SORTING_KEY_FIELDS = ( [<string>, <string>] ) ]
[ MIN_SYNC_FREQUENCY_MINS = <integer> ]
[ QUARANTINE_ENABLED = 'true' | 'false' ]
[ VALIDATIONS = ( ['<VALIDATION_NAME>'], ... ) ]
WITH 'key1' = 'value1', 'key2' = 'value2' ....
The order of fields must be maintained as specified in the syntax above.
Writing to multiple destination tables
The Onehouse API only supports writing to one destination table per Stream Capture. If you'd like to write to multiple tables (eg. writing tables for multiple Kafka topics), you may create a new Stream Capture for each table.
Example
Kafka Stream Capture
{
"statement": "
CREATE STREAM CAPTURE test_stream10
SOURCE = 'avro-kafka-source-palak'
LAKE = 'staging'
DATABASE = 'andy_api_test'
TABLE_NAME = 'andy_api_table'
WRITE_MODE = 'MUTABLE'
SOURCE_DATA_SCHEMA = 'ghCfAvroPublicEvent-value'
CATALOGS = ('test_duplicates7') RECORD_KEY_FIELDS = ('date', 'timestamp')
PARTITION_KEY_FIELDS = (field = 'date' partition_type = 'DATE_STRING' input_format = 'yyyy-mm-dd' output_format = 'yyyy-mm-dd')
MIN_SYNC_FREQUENCY_MINS = 5
QUARANTINE_ENABLED = 'true'
VALIDATIONS = ('schemavalidation', 'timestampvalidation')
WITH
'kafka.topic.name' = 'ghCfAvroPublicEvent',
'kafka.topic.type' = 'event',
'streamCapture.delayThreshold.numSyncIntervals' = '3'
"
}
Sample response
Required parameters
<name>
: Identifier for the Stream Capture- Must be unique for your account
- Must start with an alphabetic character and cannot contain spaces or special characters unless the entire identifier string is enclosed in double quotes (e.g. "Github Incremental Pipeline")
- Case-sensitive
SOURCE
: Specify the source for the Stream Capture.- You may reference a source you have created in the Onehouse UI or using the
CREATE SOURCE
command
- You may reference a source you have created in the Onehouse UI or using the
LAKE
: Specify the name of the lake where the destination Onehouse table will exist- You may reference a lake you have created in the Onehouse UI or using the
CREATE LAKE
command
- You may reference a lake you have created in the Onehouse UI or using the
DATABASE
: Specify the name of the database where the destination Onehouse table will exist- You may reference a database you have created in the Onehouse UI or using the
CREATE DATABASE
command
- You may reference a database you have created in the Onehouse UI or using the
TABLE_NAME
: Identifier for the new destination table to be created by the Stream Capture- Must be unique within the parent database
- Must start with an alphabetic character and cannot contain spaces or special characters
WRITE_MODE
: Specify whether the destination table will be mutable or immutableCLUSTER
: Specify the name of an existing Cluster that will run the Stream Capture.SOURCE_DATA_SCHEMA
: For Kafka sources only, specify the name of the schema in your schema registry to use for the source data.- Required and used only for Kafka sources with a schema registry. Exclude this parameter for other source types.
- If your Schema Registry uses Schema Contexts, use the format: ':.context.subcontext:schemaName'. Example: ':.prod-us:schema1'
Optional parameters
PERFORMANCE_PROFILE
: Specify the desired Performance Profile to optimize how the Stream Capture writes data to the table.
Optional for Stream Captures in Append-only write mode.- Default:
'BALANCED'
- Default:
MIN_SYNC_FREQUENCY_MINS
: Specify the minimum number of minutes between Stream Capture triggers.
Optional for all sources- Default:
1
- Default:
CATALOGS
: Specify the catalogs Onehouse will sync to your newly generated tables.
Optional for all sources- You may reference catalogs you have created in the Onehouse UI or using the
CREATE CATALOG
command
- You may reference catalogs you have created in the Onehouse UI or using the
TRANSFORMATIONS
: Specify a list of transformations to apply to the data in your Stream Capture. Optional for sources: Kafka, Onehouse table- You must first create any transformations with the
CREATE TRANSFORMATION
API command before referencing them. This applies to CDC transformations as well, they need to be explicitly created. SeeCREATE TRANSFORMATION
API for information on how to create one. - Transformation names are case-agnostic.
- You must first create any transformations with the
RECORD_KEY_FIELDS
: Specify the record key fields that Onehouse will use to identify a particular record for deduping, updating, and deleting records.
Required for sources: S3, Onehouse table
Optional for sources: Kafka- The values in your record key field should be unique (eg. UUID)
PARTITION_KEY_FIELDS
: Specify the partition key fields that Onehouse will use to partition data. Optional for sources: Kafka, Onehouse table- If the partition key is not of timestamp type, set
partition_type
,input_format
, andoutput_format
to empty strings. Note that these fields must all be specified if you are adding partition(s). - For partition keys of timestamp type, ensure
partition_type
,input_format
of your partition key fields match your data; otherwise, your Stream Capture will fail. Also make sure to provide the correctoutput_format
for your partition key fields based on your use case i.e.yyyy-MM-dd
oryyyyMMddHH
etc.
- If the partition key is not of timestamp type, set
timestamp
type keys can look like 2021-01-01T00:00:00.000000Z
, but can have primitive type i.e. long
type.
In such cases you need to pick the right partition_type
and input_format
to match the data.
For this example, you would set partition_type
to EPOCH_MICROS
and input_format
to ''
.
PRECOMBINE_KEY_FIELD
: Specify the precombine key field that Onehouse will use.
Optional for sources: Kafka, Onehouse table- If your Stream Capture attempts to write two records with the same key value, Onehouse will only write the record with the largest value for the precombine key field you select
SORTING_KEY_FIELDS
: Specify the sorting key fields to sort incoming records when they are ingested. This may speed up incremental queries of the table.
Optional for sources: Kafka, Onehouse tableMIN_SYNC_FREQUENCY_MINS
: Specify the minimum number of minutes between Stream Capture runs
Optional for all sources- Default:
1
- Default:
Special parameters
Include special parameters and advanced configs after WITH
as type String.
Available across sources
Table configuration
-
table.configured.base.path
: Optional configuration to specify a custom storage location for the table (ex: "s3://your-bucket/custom-path/table=your-table"). "Clean & Restart" functionality will be disabled for tables using this configuration. https://docs.onehouse.ai/docs/product/ingest-data/stream-captures#clean--restartPartition
table.partition.style
: Partition style for table (ex: "default", "hive").
Schema registry
schema.registry.type
: Valid values are `glue`, `confluent`, `jar` and `file` which implies `GlueSchemaRegistry`, `ConfluentSchemaRegistry`, `Jar`(For Proto Schemas), `FileBasedSchemaRegistry`.schema.registry.glue.name
: Name for the Glue schema registry.schema.registry.confluent.servers
: Confluent Schema registry servers.schema.registry.confluent.key
: Confluent schema registry key.schema.registry.confluent.secret
c Confluent schema registry secret.schema.registry.confluent.subject.name
: Schema name in Confluent Schema registry.schema.registry.confluent.subject.prefix
: Schema prefix to filter schemas.schema.registry.proto.jar.location
: Jar location for proto schemas.schema.registry.file.base.path
: Base path for file based schema registry.schema.registry.file.full.path
: Full path for file based schema registry.
Delays
streamCapture.delayThreshold.numSyncIntervals
: This value is multiplied by the Sync Frequency to determine the delay threshold. When a sync's duration surpasses the delay threshold, the Stream Capture is considered delayed. This value cannot be negative. Set this value to 0 to disable the Delayed state for this Stream Capture.- Default:
0
(Delayed state & notifications disabled)
- Default:
Deduplication
streamCapture.deduplicationPolicy
: For append-only Stream Captures, deduplicate records based on the record key. Options:- 'none' (default): No deduplication
- 'drop': Drop duplicate records
Kafka source
kafka.topic.name
: Specify the name of the source topic.kafka.startingOffsets
: [Optional] Specify the start point for Kafka ingestion as 'earliest' or 'latest'. Default is 'earliest'.
S3 source
s3.folder.uri
: Specify the parent folder to ingest your data from. All data in this folder must be of the same format and schema.s3.file.format
: Specify the format of the files as 'AVRO', 'JSON', 'CSV', 'ORC', or 'PARQUET'.s3.file.extension
: Provide the extension of the files (eg. '.gz', '.json', etc.).s3.source.bootstrap
: Specify whether to bootstrap the source as 'TRUE' or 'FALSE'.s3.source.if_infer_fields_from_source_path
: Enable or disable field inference from S3 object paths. Set to 'TRUE' to extract field values from directory structure, or 'FALSE' to disable this feature.- When enabled, Onehouse will parse the S3 object path and extract field values from directory names that follow the pattern
field=value
- Example: For path
s3://your-bucket/schema=orders/table=daily_sales/date=2025-07-17/data.parquet
, Onehouse can inferschema
,table
, anddate
as fields
- When enabled, Onehouse will parse the S3 object path and extract field values from directory names that follow the pattern
s3.source.fields_to_infer
: Comma-separated list of field names to extract from the S3 object path structure.- Only used when
s3.source.if_infer_fields_from_source_path
is set to 'TRUE' - Example:
'schema,table,date'
will extract values from paths likes3://your-bucket/schema=orders/table=daily_sales/date=2025-07-17/
- Field names must match the directory structure in your S3 bucket
- Only used when
CSV files
s3.file.csv.header
: Specify whether the first row is a header as 'TRUE' or 'FALSE'.
GCS source
gcs.folder.uri
: Specify the parent folder to ingest your data from. All data in this folder must be of the same format and schema.gcs.file.format
: Specify the format of the files as 'AVRO', 'JSON', 'CSV', 'ORC', or 'PARQUET'.gcs.file.extension
: Provide the extension of the files (eg. '.gz', '.json', etc.).gcs.source.bootstrap
: Specify whether to bootstrap the source as 'TRUE' or 'FALSE'.gcs.source.if_infer_fields_from_source_path
: Enable or disable field inference from GCS object paths. Set to 'TRUE' to extract field values from directory structure, or 'FALSE' to disable this feature.- When enabled, Onehouse will parse the GCS object path and extract field values from directory names that follow the pattern
field=value
- Example: For path
gs://your-bucket/schema=orders/table=daily_sales/date=2025-07-17/data.parquet
, Onehouse can inferschema
,table
, anddate
as fields
- When enabled, Onehouse will parse the GCS object path and extract field values from directory names that follow the pattern
gcs.source.fields_to_infer
: Comma-separated list of field names to extract from the GCS object path structure.- Only used when
gcs.source.if_infer_fields_from_source_path
is set to 'TRUE' - Example:
'schema,table,date'
will extract values from paths likegs://your-bucket/schema=orders/table=daily_sales/date=2025-07-17/
- Field names must match the directory structure in your GCS bucket
- Only used when
CSV files
gcs.file.csv.header
: Specify whether the first row is a header as 'TRUE' or 'FALSE'.
Onehouse source
source.table.name
: Specify the name of the source table in Onehouse.source.table.database
: Specify the database of the source table in Onehouse.source.table.lake
: Specify the lake of the source table in Onehouse.
Postgres source
postgres.table.name
: Specify the name of the Postgres table to ingest.postgres.schema.name
: Specify the name of the Postgres schema.
MySQL source
mysql.table.name
: Specify the name of the MySQL table to ingest.