Skip to main content

Flow Configurations

Below are all the possible configurations for a Flow. Note that certain configurations may only appear for specific data sources.

Configurations Summary

ConfigurationDescriptionApplies To
Write ModeChoose between Mutable (Inserts, Updates, Deletes with changelog support) or Append-only (Inserts only).All
Performance ProfileOptimize for Balanced, Fastest Read, or Fastest Write.Append-only write mode only
Sync FrequencyConfigure how frequently to capture new data.All
Bootstrap DataChoose to bulk load existing files or skip them and only ingest new files going forward.File storage sources (S3, GCS)
Source Data LocationSpecify folder path, optional file path pattern (regex), file format, and file extension for source data.File storage sources (S3, GCS)
Ingest from Multiple Kafka ClustersAdd multiple Kafka sources to ingest a distributed topic into a single table.Kafka sources
Select Source Tables/Folders/TopicsSelect which source tables, folders, or topics to capture. Each creates a separate Onehouse table.All (varies by source type)
Auto CaptureContinuously monitor and capture new topics using regex filter conditions with a configurable table name prefix.Kafka sources
Source Data SchemaProvide or infer schema for incoming data.All (schema inference available for certain sources)
Pipeline QuarantineChoose to quarantine invalid records to a separate table or fail the pipeline when records cause errors or fail validations.All
TransformationsAdd transformations to modify data during ingestion.All
Record Key FieldsDefine unique identifier fields for de-duplication, updates, and deletes (similar to Primary Key).All (required for Mutable write mode)
Partition Key FieldsPartition data using specified fields.All
Precombine Key FieldsWhen multiple records share the same key, only the record with the largest precombine key value is written.Mutable write mode only
Sorting Key FieldsSort incoming records during ingestion.Append-only write mode only
Kafka Starting OffsetsSet ingestion to start from the earliest or latest available offset.Kafka sources
Advanced ConfigurationsOptional advanced configurations for further control.All (specific configs vary by source/mode)
Table TypeOptional configuration to set the table type (MoR vs. CoW).All
Multi-Table SelectIngest multiple tables under one S3 folder by selecting from discovered sub-folders.File storage sources (S3)

Write Mode

Select between the following write modes:

  1. Mutable (Inserts, Updates, and Deletes): Flow will append Inserts and merge Updates and Deletes into your table. Streams in the Mutable write mode also support syncing database changelogs from a database connector (e.g. Postgres, MySQL, MongoDB).
  2. Append-only (Inserts only): Flow will append all records to your table. This avoids the overhead of looking up records to merge or delete.

Both write modes will create an Apache Hudi Merge on Read table.

Performance Profile

The Performance Profile allows you to optimize your Flow for fastest read, fastest write, or a balance between the two. You can edit this configuration after creating the Flow, so we suggest starting with the "Balanced" profile and later adjusting based on the Flow's performance.

Select from the following options for Performance Profile:

BalancedFastest ReadFastest Write
When to UseUse Balanced for a combination of fast write and read performance out of the box.Use Fastest Read to get optimal query performance on the table with the initial write. Optimizations take place during writing, which will cause ingestion to take longer.Use Fastest Write for high-throughput sources requiring rapid ingestion. You can then run clustering to optimize the read performance after the initial write.
Operation PerformedBulk InsertInsertBulk Insert
SortingSorted by the partition path and Sorting Key Fields (or Record Key Fields if no Sorting Key)No sorting by writer (coming soon). Use Clustering to sort.No sorting by writer. Use Clustering to sort.
File SizingEnabled; Best EffortEnabled; GuaranteedDisabled

Usage notes:

  • Performance Profile is currently available only for Flows in Append-only write mode.
  • "Fastest Write" is not suggested for high-cardinality updates where you are updating >100 partitions in a single input batch.
  • Sorting is currently not available when using "Fastest Write" or "Fastest Read".

Sync Frequency

Configure how frequently to capture new data. A lower frequency will keep data up-to-date, but will be more costly.

Bootstrap Data

For file storage sources (e.g. S3 or GCS), you can choose to capture only new data or bootstrap the table with the existing data in the bucket.

  • If you bootstrap data, the Flow will bulk load existing files into the Onehouse table, then ingest new files.
  • If you do not bootstrap data, the Flow will skip all existing files and only ingest new files into the Onehouse table.

Source Data Location

For file storage sources (e.g. S3 or GCS), specify the location and format of the source data.

  • Folder: Specify the parent folder to ingest your data from.
  • File Path Pattern: Optionally specify a regex pattern filter for files in the source folder.
  • File format: Select the file format of the data in your selected folder.
  • File extension: Onehouse will automatically fill this in. If the data is compressed, change this to the compressed file extension (e.g. .zip).

Ingest from Multiple Kafka Clusters

If you have a Kafka topic distributed across multiple Kafka clusters, you can ingest it into a single Onehouse table by adding multiple Kafka sources.

First, add multiple Kafka sources to your stream. You will then be prompted to input the details for a distributed lock client to coordinate concurrent writes to the table.

The following distributed lock clients are currently supported:

  1. DynamoDB (AWS projects only): Provide your DynamoDB instance details and ensure you've granted Onehouse write permissions for DynamoDB.
  2. On the roadmap: File-based clients. Contact us to request this feature.

Select source Tables, Folders, or Topics

Depending on your Source type, you will select the source tables, folders, or topics for your Flow. Onehouse will create a separate table for each source table/folder/topic you select.

For Kafka sources, you can enable Auto Capture to continuously monitor and capture new topics using filter conditions you define with regex. If you enable Auto Capture, you'll enter a Table Name Prefix that Onehouse will prepend to each topic name (joined by an underscore) to help you identify the tables captured by the stream.

Source Data Schema

Onehouse allows you to provide a source schema for your incoming data. When you provide a source schema, Onehouse will attempt to match incoming records to the provided schema.

For certain types of Sources, Onehouse can infer the schema from the source data, so you do not need to provide a source schema.

Onehouse also supports schema evolution when source data changes.

Pipeline Quarantine

When incoming records cause an error (e.g. schema mismatch) or fail to meet a Data Quality Validation during ingestion, you may either quarantine those records or fail the pipeline.

If Pipeline Quarantine is enabled, Onehouse will write invalid records to a separate quarantine table so your Flow can continue uninterrupted. You may find the quarantine table storage path in the Table details page under Data.

If you choose to fail the pipeline upon invalid records, the pipeline will fail and stop running when an incoming record causes an error or fails to meet a Data Quality Validation.

Transformations

Add transformations to transform data during ingestion. Learn more about transformations in the docs.

Key Configurations

Record Key Fields

Onehouse uses the record key to identify a particular record for de-duplicating, updating, and deleting records. This is similar to a Primary Key in traditional databases.

The values in your record key field should be unique (e.g. UUID). Setting a record key field is required for mutable tables.

Partition Key Fields

You may partition data using partition key fields. Ensure the value type (and format if applicable) of your partition key field matches your data; otherwise, your stream will fail.

You may configure a setting to partition your Onehouse tables using Hive style.

Partitioning Guidelines
  • Avoid creating too many partitions to ensure the Flow runs efficiently. For example, if your partition is in the Timestamp format, set the output path format to Year, Month, or Day to write all records with the same Year, Month, or Day in the same partition.
  • Partition key fields play a critical role in query performance. Query performance will be optimal when partition columns are used as a predicate in the query.

Precombine Key Fields

For Mutable Write Mode

If your Flow attempts to write multiple records with the same key value, Onehouse will only write the record with the largest value for the precombine key field you select. Setting a precombine key field is required for mutable tables.

Sorting Key Fields

For Append-only Write Mode

Set a sorting key to sort incoming records when they are ingested. This may accelerate queries with predicates on the sorting key field. Sorting by date can also help efficiently handle late arriving data.

Kafka Starting Offsets

For Kafka sources

Set the starting offset of Kafka source ingestion. With earliest, ingestion starts from the earliest Kafka offsets, and with latest, ingestion starts from the incoming data. This configuration cannot be modified after the stream has been created.

Advanced Configurations

Advanced configurations for Flows can be entered through a text box in the Onehouse console or provided via the API (API reference here). These configurations are always optional.

To add advanced configurations while creating a Flow in the Onehouse console, open the advanced configurations toggle and enter each configuration and value in single quotes as a comma-separated key-value pair. Example:

'flow.configA' = 'true',
'flow.configB' = '100'

Below are the available advanced configurations:

Config NameDefaultOptionsDescriptionEditable
flow.delayThreshold.numSyncIntervals0 (off)Any positive integer, or zero to disable the Delay Threshold.This value is multiplied by the Sync Frequency to determine the delay threshold. When a sync's duration surpasses the delay threshold, the Flow is considered delayed.Yes
flow.deduplicationPolicy'none'
  • 'none': No deduplication
  • 'drop': Drop duplicate records
Available for append-only Flows only. Drop incoming duplicate records based on the record key.Yes
flow.kafka.missingMessages'FAIL'
  • 'FAIL': Fail the Flow when missing messages are encountered.
  • 'SKIP': Skip missing messages. The stream should continue running.
Case-sensitive. For Kafka sources, controls Flow behavior when encountering missing or expired messages.Yes
streamCapture.targetFileSize'134217728' (128MB)Between 10485760 (10MB) and 1073741824 (1GB)Sets the target file size for stream capture files. This controls the maximum size of Parquet files created during ingestion. Larger files improve read performance but may increase latency. Latency here refers to the delay between when data is ingested and when it becomes available for queries.Yes

Table Type

You can choose between the two Apache Hudi table types: Merge on Read (MoR) and Copy on Write (CoW). Learn more in this guide.

Toggle open the advanced configurations section in the Flow creation page of the Onehouse console to specify the table type for your destination table. By default, Onehouse uses MoR tables. Project Admins can set project level defaults for Table Types on the Project Settings page.

Multi-Table Select

If you have multiple tables under one S3 folder, Onehouse can ingest them all at once. Provide a directory, then Onehouse will show all the sub-folders which you can choose to ingest.