Flow Configurations
Below are all the possible configurations for a Flow. Note that certain configurations may only appear for specific data sources.
Configurations Summary
| Configuration | Description | Applies To |
|---|---|---|
| Write Mode | Choose between Mutable (Inserts, Updates, Deletes with changelog support) or Append-only (Inserts only). | All |
| Performance Profile | Optimize for Balanced, Fastest Read, or Fastest Write. | Append-only write mode only |
| Sync Frequency | Configure how frequently to capture new data. | All |
| Bootstrap Data | Choose to bulk load existing files or skip them and only ingest new files going forward. | File storage sources (S3, GCS) |
| Source Data Location | Specify folder path, optional file path pattern (regex), file format, and file extension for source data. | File storage sources (S3, GCS) |
| Ingest from Multiple Kafka Clusters | Add multiple Kafka sources to ingest a distributed topic into a single table. | Kafka sources |
| Select Source Tables/Folders/Topics | Select which source tables, folders, or topics to capture. Each creates a separate Onehouse table. | All (varies by source type) |
| Auto Capture | Continuously monitor and capture new topics using regex filter conditions with a configurable table name prefix. | Kafka sources |
| Source Data Schema | Provide or infer schema for incoming data. | All (schema inference available for certain sources) |
| Pipeline Quarantine | Choose to quarantine invalid records to a separate table or fail the pipeline when records cause errors or fail validations. | All |
| Transformations | Add transformations to modify data during ingestion. | All |
| Record Key Fields | Define unique identifier fields for de-duplication, updates, and deletes (similar to Primary Key). | All (required for Mutable write mode) |
| Partition Key Fields | Partition data using specified fields. | All |
| Precombine Key Fields | When multiple records share the same key, only the record with the largest precombine key value is written. | Mutable write mode only |
| Sorting Key Fields | Sort incoming records during ingestion. | Append-only write mode only |
| Kafka Starting Offsets | Set ingestion to start from the earliest or latest available offset. | Kafka sources |
| Advanced Configurations | Optional advanced configurations for further control. | All (specific configs vary by source/mode) |
| Table Type | Optional configuration to set the table type (MoR vs. CoW). | All |
| Multi-Table Select | Ingest multiple tables under one S3 folder by selecting from discovered sub-folders. | File storage sources (S3) |
Write Mode
Select between the following write modes:
- Mutable (Inserts, Updates, and Deletes): Flow will append Inserts and merge Updates and Deletes into your table. Streams in the Mutable write mode also support syncing database changelogs from a database connector (e.g. Postgres, MySQL, MongoDB).
- Append-only (Inserts only): Flow will append all records to your table. This avoids the overhead of looking up records to merge or delete.
Both write modes will create an Apache Hudi Merge on Read table.
Performance Profile
The Performance Profile allows you to optimize your Flow for fastest read, fastest write, or a balance between the two. You can edit this configuration after creating the Flow, so we suggest starting with the "Balanced" profile and later adjusting based on the Flow's performance.
Select from the following options for Performance Profile:
| Balanced | Fastest Read | Fastest Write | |
|---|---|---|---|
| When to Use | Use Balanced for a combination of fast write and read performance out of the box. | Use Fastest Read to get optimal query performance on the table with the initial write. Optimizations take place during writing, which will cause ingestion to take longer. | Use Fastest Write for high-throughput sources requiring rapid ingestion. You can then run clustering to optimize the read performance after the initial write. |
| Operation Performed | Bulk Insert | Insert | Bulk Insert |
| Sorting | Sorted by the partition path and Sorting Key Fields (or Record Key Fields if no Sorting Key) | No sorting by writer (coming soon). Use Clustering to sort. | No sorting by writer. Use Clustering to sort. |
| File Sizing | Enabled; Best Effort | Enabled; Guaranteed | Disabled |
Usage notes:
- Performance Profile is currently available only for Flows in Append-only write mode.
- "Fastest Write" is not suggested for high-cardinality updates where you are updating >100 partitions in a single input batch.
- Sorting is currently not available when using "Fastest Write" or "Fastest Read".
Sync Frequency
Configure how frequently to capture new data. A lower frequency will keep data up-to-date, but will be more costly.
Bootstrap Data
For file storage sources (e.g. S3 or GCS), you can choose to capture only new data or bootstrap the table with the existing data in the bucket.
- If you bootstrap data, the Flow will bulk load existing files into the Onehouse table, then ingest new files.
- If you do not bootstrap data, the Flow will skip all existing files and only ingest new files into the Onehouse table.
Source Data Location
For file storage sources (e.g. S3 or GCS), specify the location and format of the source data.
- Folder: Specify the parent folder to ingest your data from.
- File Path Pattern: Optionally specify a regex pattern filter for files in the source folder.
- File format: Select the file format of the data in your selected folder.
- File extension: Onehouse will automatically fill this in. If the data is compressed, change this to the compressed file extension (e.g. .zip).
Ingest from Multiple Kafka Clusters
If you have a Kafka topic distributed across multiple Kafka clusters, you can ingest it into a single Onehouse table by adding multiple Kafka sources.
First, add multiple Kafka sources to your stream. You will then be prompted to input the details for a distributed lock client to coordinate concurrent writes to the table.
The following distributed lock clients are currently supported:
- DynamoDB (AWS projects only): Provide your DynamoDB instance details and ensure you've granted Onehouse write permissions for DynamoDB.
- On the roadmap: File-based clients. Contact us to request this feature.
Select source Tables, Folders, or Topics
Depending on your Source type, you will select the source tables, folders, or topics for your Flow. Onehouse will create a separate table for each source table/folder/topic you select.
For Kafka sources, you can enable Auto Capture to continuously monitor and capture new topics using filter conditions you define with regex. If you enable Auto Capture, you'll enter a Table Name Prefix that Onehouse will prepend to each topic name (joined by an underscore) to help you identify the tables captured by the stream.
Source Data Schema
Onehouse allows you to provide a source schema for your incoming data. When you provide a source schema, Onehouse will attempt to match incoming records to the provided schema.
For certain types of Sources, Onehouse can infer the schema from the source data, so you do not need to provide a source schema.
Onehouse also supports schema evolution when source data changes.
Pipeline Quarantine
When incoming records cause an error (e.g. schema mismatch) or fail to meet a Data Quality Validation during ingestion, you may either quarantine those records or fail the pipeline.
If Pipeline Quarantine is enabled, Onehouse will write invalid records to a separate quarantine table so your Flow can continue uninterrupted. You may find the quarantine table storage path in the Table details page under Data.
If you choose to fail the pipeline upon invalid records, the pipeline will fail and stop running when an incoming record causes an error or fails to meet a Data Quality Validation.
Transformations
Add transformations to transform data during ingestion. Learn more about transformations in the docs.
Key Configurations
Record Key Fields
Onehouse uses the record key to identify a particular record for de-duplicating, updating, and deleting records. This is similar to a Primary Key in traditional databases.
The values in your record key field should be unique (e.g. UUID). Setting a record key field is required for mutable tables.
Partition Key Fields
You may partition data using partition key fields. Ensure the value type (and format if applicable) of your partition key field matches your data; otherwise, your stream will fail.
You may configure a setting to partition your Onehouse tables using Hive style.
- Avoid creating too many partitions to ensure the Flow runs efficiently. For example, if your partition is in the Timestamp format, set the output path format to Year, Month, or Day to write all records with the same Year, Month, or Day in the same partition.
- Partition key fields play a critical role in query performance. Query performance will be optimal when partition columns are used as a predicate in the query.
Precombine Key Fields
For Mutable Write Mode
If your Flow attempts to write multiple records with the same key value, Onehouse will only write the record with the largest value for the precombine key field you select. Setting a precombine key field is required for mutable tables.
Sorting Key Fields
For Append-only Write Mode
Set a sorting key to sort incoming records when they are ingested. This may accelerate queries with predicates on the sorting key field. Sorting by date can also help efficiently handle late arriving data.
Kafka Starting Offsets
For Kafka sources
Set the starting offset of Kafka source ingestion. With earliest, ingestion starts from the earliest Kafka offsets, and with latest, ingestion starts from the incoming data. This configuration cannot be modified after the stream has been created.
Advanced Configurations
Advanced configurations for Flows can be entered through a text box in the Onehouse console or provided via the API (API reference here). These configurations are always optional.
To add advanced configurations while creating a Flow in the Onehouse console, open the advanced configurations toggle and enter each configuration and value in single quotes as a comma-separated key-value pair. Example:
'flow.configA' = 'true',
'flow.configB' = '100'
Below are the available advanced configurations:
| Config Name | Default | Options | Description | Editable |
|---|---|---|---|---|
| flow.delayThreshold.numSyncIntervals | 0 (off) | Any positive integer, or zero to disable the Delay Threshold. | This value is multiplied by the Sync Frequency to determine the delay threshold. When a sync's duration surpasses the delay threshold, the Flow is considered delayed. | Yes |
| flow.deduplicationPolicy | 'none' |
| Available for append-only Flows only. Drop incoming duplicate records based on the record key. | Yes |
| flow.kafka.missingMessages | 'FAIL' |
| Case-sensitive. For Kafka sources, controls Flow behavior when encountering missing or expired messages. | Yes |
| streamCapture.targetFileSize | '134217728' (128MB) | Between 10485760 (10MB) and 1073741824 (1GB) | Sets the target file size for stream capture files. This controls the maximum size of Parquet files created during ingestion. Larger files improve read performance but may increase latency. Latency here refers to the delay between when data is ingested and when it becomes available for queries. | Yes |
Table Type
You can choose between the two Apache Hudi table types: Merge on Read (MoR) and Copy on Write (CoW). Learn more in this guide.
Toggle open the advanced configurations section in the Flow creation page of the Onehouse console to specify the table type for your destination table. By default, Onehouse uses MoR tables. Project Admins can set project level defaults for Table Types on the Project Settings page.
Multi-Table Select
If you have multiple tables under one S3 folder, Onehouse can ingest them all at once. Provide a directory, then Onehouse will show all the sub-folders which you can choose to ingest.