Skip to main content

CREATE STREAM CAPTURE

Description

Creates a new Stream Capture.

Note that the SQL statement does not end with ;

Syntax

CREATE STREAM CAPTURE <name>
SOURCE = <string>
LAKE = <string>
DATABASE = <string>
TABLE_NAME = <string>
WRITE_MODE = { 'IMMUTABLE' | 'MUTABLE' }
CLUSTER = <string>
[ PERFORMANCE_PROFILE = { 'BALANCED' | 'FASTEST_READ' | 'FASTEST_WRITE' } ]
[ SOURCE_DATA_SCHEMA = <string> ]
[ CATALOGS = ( ['<CATALOG_NAME>'], ... ) ]
[ TRANSFORMATIONS = ( ['<TRANSFORMATION_NAME>'], ... ) ]
[ RECORD_KEY_FIELDS = ( [<string>, <string>] ) ]
[ PARTITION_KEY_FIELDS = ( [ field = <string> partition_type = { '' | 'DATE_STRING' | 'EPOCH_MILLIS' | 'EPOCH_MICROS' } input_format = { '' | 'yyyy-mm-dd' | 'yyyy-mm' ...} output_format = { '' | 'yyyy-mm-dd' | 'yyyy-mm' ...} ], ... ) ]
[ PRECOMBINE_KEY_FIELD = <string> ]
[ SORTING_KEY_FIELDS = ( [<string>, <string>] ) ]
[ MIN_SYNC_FREQUENCY_MINS = <integer> ]
[ QUARANTINE_ENABLED = 'true' | 'false' ]
[ VALIDATIONS = ( ['<VALIDATION_NAME>'], ... ) ]
WITH 'key1' = 'value1', 'key2' = 'value2' ....

The order of fields must be maintained as specified in the syntax above.

Writing to multiple destination tables

The Onehouse API only supports writing to one destination table per Stream Capture. If you'd like to write to multiple tables (eg. writing tables for multiple Kafka topics), you may create a new Stream Capture for each table.

Example

Kafka Stream Capture

{
"statement": "
CREATE STREAM CAPTURE test_stream10
SOURCE = 'avro-kafka-source-palak'
LAKE = 'staging'
DATABASE = 'andy_api_test'
TABLE_NAME = 'andy_api_table'
WRITE_MODE = 'MUTABLE'
SOURCE_DATA_SCHEMA = 'ghCfAvroPublicEvent-value'
CATALOGS = ('test_duplicates7') RECORD_KEY_FIELDS = ('date', 'timestamp')
PARTITION_KEY_FIELDS = (field = 'date' partition_type = 'DATE_STRING' input_format = 'yyyy-mm-dd' output_format = 'yyyy-mm-dd')
MIN_SYNC_FREQUENCY_MINS = 5
QUARANTINE_ENABLED = 'true'
VALIDATIONS = ('schemavalidation', 'timestampvalidation')
WITH
'kafka.topic.name' = 'ghCfAvroPublicEvent',
'kafka.topic.type' = 'event',
'streamCapture.delayThreshold.numSyncIntervals' = '3'
"
}

Sample response

Required parameters

  • <name>: Identifier for the Stream Capture
    • Must be unique for your account
    • Must start with an alphabetic character and cannot contain spaces or special characters unless the entire identifier string is enclosed in double quotes (e.g. "Github Incremental Pipeline")
    • Case-sensitive
  • SOURCE: Specify the source for the Stream Capture.
    • You may reference a source you have created in the Onehouse UI or using the CREATE SOURCE command
  • LAKE: Specify the name of the lake where the destination Onehouse table will exist
    • You may reference a lake you have created in the Onehouse UI or using the CREATE LAKE command
  • DATABASE: Specify the name of the database where the destination Onehouse table will exist
    • You may reference a database you have created in the Onehouse UI or using the CREATE DATABASE command
  • TABLE_NAME: Identifier for the new destination table to be created by the Stream Capture
    • Must be unique within the parent database
    • Must start with an alphabetic character and cannot contain spaces or special characters
  • WRITE_MODE: Specify whether the destination table will be mutable or immutable
  • CLUSTER: Specify the name of an existing Cluster that will run the Stream Capture.
  • SOURCE_DATA_SCHEMA: For Kafka sources only, specify the name of the schema in your schema registry to use for the source data.
    • Required and used only for Kafka sources with a schema registry. Exclude this parameter for other source types.
    • If your Schema Registry uses Schema Contexts, use the format: ':.context.subcontext:schemaName'. Example: ':.prod-us:schema1'

Optional parameters

  • PERFORMANCE_PROFILE: Specify the desired Performance Profile to optimize how the Stream Capture writes data to the table.
    Optional for Stream Captures in Append-only write mode.
    • Default: 'BALANCED'
  • MIN_SYNC_FREQUENCY_MINS: Specify the minimum number of minutes between Stream Capture triggers.
    Optional for all sources
    • Default: 1
  • CATALOGS: Specify the catalogs Onehouse will sync to your newly generated tables.
    Optional for all sources
    • You may reference catalogs you have created in the Onehouse UI or using the CREATE CATALOG command
  • TRANSFORMATIONS: Specify a list of transformations to apply to the data in your Stream Capture. Optional for sources: Kafka, Onehouse table
    • You must first create any transformations with the CREATE TRANSFORMATION API command before referencing them. This applies to CDC transformations as well, they need to be explicitly created. See CREATE TRANSFORMATION API for information on how to create one.
    • Transformation names are case-agnostic.
  • RECORD_KEY_FIELDS: Specify the record key fields that Onehouse will use to identify a particular record for deduping, updating, and deleting records.
    Required for sources: S3, Onehouse table
    Optional for sources: Kafka
    • The values in your record key field should be unique (eg. UUID)
  • PARTITION_KEY_FIELDS: Specify the partition key fields that Onehouse will use to partition data. Optional for sources: Kafka, Onehouse table
    • If the partition key is not of timestamp type, set partition_type, input_format, and output_format to empty strings. Note that these fields must all be specified if you are adding partition(s).
    • For partition keys of timestamp type, ensure partition_type, input_format of your partition key fields match your data; otherwise, your Stream Capture will fail. Also make sure to provide the correct output_format for your partition key fields based on your use case i.e. yyyy-MM-dd or yyyyMMddHH etc.
Note about timestamp type partition keys

timestamp type keys can look like 2021-01-01T00:00:00.000000Z, but can have primitive type i.e. long type. In such cases you need to pick the right partition_type and input_format to match the data. For this example, you would set partition_type to EPOCH_MICROS and input_format to ''.

  • PRECOMBINE_KEY_FIELD: Specify the precombine key field that Onehouse will use.
    Optional for sources: Kafka, Onehouse table
    • If your Stream Capture attempts to write two records with the same key value, Onehouse will only write the record with the largest value for the precombine key field you select
  • SORTING_KEY_FIELDS: Specify the sorting key fields to sort incoming records when they are ingested. This may speed up incremental queries of the table.
    Optional for sources: Kafka, Onehouse table
  • MIN_SYNC_FREQUENCY_MINS: Specify the minimum number of minutes between Stream Capture runs
    Optional for all sources
    • Default: 1

Special parameters

Include special parameters and advanced configs after WITH as type String.

Available across sources

Table configuration

Schema registry

  • schema.registry.type: Valid values are `glue`, `confluent`, `jar` and `file` which implies `GlueSchemaRegistry`, `ConfluentSchemaRegistry`, `Jar`(For Proto Schemas), `FileBasedSchemaRegistry`.
  • schema.registry.glue.name: Name for the Glue schema registry.
  • schema.registry.confluent.servers: Confluent Schema registry servers.
  • schema.registry.confluent.key: Confluent schema registry key.
  • schema.registry.confluent.secretc Confluent schema registry secret.
  • schema.registry.confluent.subject.name: Schema name in Confluent Schema registry.
  • schema.registry.confluent.subject.prefix: Schema prefix to filter schemas.
  • schema.registry.proto.jar.location: Jar location for proto schemas.
  • schema.registry.file.base.path: Base path for file based schema registry.
  • schema.registry.file.full.path: Full path for file based schema registry.

Delays

  • streamCapture.delayThreshold.numSyncIntervals: This value is multiplied by the Sync Frequency to determine the delay threshold. When a sync's duration surpasses the delay threshold, the Stream Capture is considered delayed. This value cannot be negative. Set this value to 0 to disable the Delayed state for this Stream Capture.
    • Default: 0 (Delayed state & notifications disabled)

Deduplication

  • streamCapture.deduplicationPolicy: For append-only Stream Captures, deduplicate records based on the record key. Options:
    • 'none' (default): No deduplication
    • 'drop': Drop duplicate records

Kafka source

  • kafka.topic.name: Specify the name of the source topic.
  • kafka.startingOffsets: [Optional] Specify the start point for Kafka ingestion as 'earliest' or 'latest'. Default is 'earliest'.

S3 source

  • s3.folder.uri: Specify the parent folder to ingest your data from. All data in this folder must be of the same format and schema.
  • s3.file.format: Specify the format of the files as 'AVRO', 'JSON', 'CSV', 'ORC', or 'PARQUET'.
  • s3.file.extension: Provide the extension of the files (eg. '.gz', '.json', etc.).
  • s3.source.bootstrap: Specify whether to bootstrap the source as 'TRUE' or 'FALSE'.
  • s3.source.if_infer_fields_from_source_path: Enable or disable field inference from S3 object paths. Set to 'TRUE' to extract field values from directory structure, or 'FALSE' to disable this feature.
    • When enabled, Onehouse will parse the S3 object path and extract field values from directory names that follow the pattern field=value
    • Example: For path s3://your-bucket/schema=orders/table=daily_sales/date=2025-07-17/data.parquet, Onehouse can infer schema, table, and date as fields
  • s3.source.fields_to_infer: Comma-separated list of field names to extract from the S3 object path structure.
    • Only used when s3.source.if_infer_fields_from_source_path is set to 'TRUE'
    • Example: 'schema,table,date' will extract values from paths like s3://your-bucket/schema=orders/table=daily_sales/date=2025-07-17/
    • Field names must match the directory structure in your S3 bucket

CSV files

  • s3.file.csv.header: Specify whether the first row is a header as 'TRUE' or 'FALSE'.

GCS source

  • gcs.folder.uri: Specify the parent folder to ingest your data from. All data in this folder must be of the same format and schema.
  • gcs.file.format: Specify the format of the files as 'AVRO', 'JSON', 'CSV', 'ORC', or 'PARQUET'.
  • gcs.file.extension: Provide the extension of the files (eg. '.gz', '.json', etc.).
  • gcs.source.bootstrap: Specify whether to bootstrap the source as 'TRUE' or 'FALSE'.
  • gcs.source.if_infer_fields_from_source_path: Enable or disable field inference from GCS object paths. Set to 'TRUE' to extract field values from directory structure, or 'FALSE' to disable this feature.
    • When enabled, Onehouse will parse the GCS object path and extract field values from directory names that follow the pattern field=value
    • Example: For path gs://your-bucket/schema=orders/table=daily_sales/date=2025-07-17/data.parquet, Onehouse can infer schema, table, and date as fields
  • gcs.source.fields_to_infer: Comma-separated list of field names to extract from the GCS object path structure.
    • Only used when gcs.source.if_infer_fields_from_source_path is set to 'TRUE'
    • Example: 'schema,table,date' will extract values from paths like gs://your-bucket/schema=orders/table=daily_sales/date=2025-07-17/
    • Field names must match the directory structure in your GCS bucket

CSV files

  • gcs.file.csv.header: Specify whether the first row is a header as 'TRUE' or 'FALSE'.

Onehouse source

  • source.table.name: Specify the name of the source table in Onehouse.
  • source.table.database: Specify the database of the source table in Onehouse.
  • source.table.lake: Specify the lake of the source table in Onehouse.

Postgres source

  • postgres.table.name: Specify the name of the Postgres table to ingest.
  • postgres.schema.name: Specify the name of the Postgres schema.

MySQL source

  • mysql.table.name: Specify the name of the MySQL table to ingest.