oh flow
Manage ingestion pipelines that stream data from a source to a target table.
Write modes: IMMUTABLE MUTABLE
Performance profiles: BALANCED FASTEST_READ FASTEST_WRITE
oh flow show
Lists all flows.
oh flow show
oh flow describe NAME
Describes a flow. NAME can be either the flow name or the fully-qualified flow ID in name:table_name form (as returned by oh flow show).
oh flow describe web-events
oh flow describe web-events:clicks
oh flow create NAME
Creates a flow.
Flags
| Flag | Required | Description |
|---|---|---|
--source | Yes | Source name |
--lake | Yes | Target lake |
--database | Yes | Target database |
--table-name | Yes | Target table name |
--write-mode | Yes | IMMUTABLE or MUTABLE |
--cluster | Yes | Cluster to run the flow on |
--performance-profile | No | BALANCED, FASTEST_READ, or FASTEST_WRITE |
--table-type | No | Hudi table type: COPY_ON_WRITE or MERGE_ON_READ (default: MERGE_ON_READ) |
--record-key-fields FIELD | No | Record key field(s); repeatable |
--partition-key-fields FIELD[:TYPE[:IN_FMT[:OUT_FMT]]] | No | Partition key field(s); repeatable (see Partition keys below) |
--precombine-key-field | No | Precombine key field name |
--sorting-key-fields FIELD | No | Sorting key field(s); repeatable |
--min-sync-frequency-mins INT | No | Minimum sync frequency in minutes (default: 1) |
--quarantine-enabled | No | Enable quarantine for records that fail validation |
--catalogs NAME | No | Catalog name(s) to register the table in; repeatable |
--transformations NAME | No | Transformation name(s) to apply; repeatable |
--validations NAME | No | Validation name(s) to apply; repeatable |
--options KEY=VALUE | No | WITH clause parameters; repeatable (see Special parameters) |
Partition keys
--partition-key-fields takes a colon-separated string: FIELD:TYPE:INPUT_FORMAT:OUTPUT_FORMAT.
- For non-timestamp partitions, set
TYPE,INPUT_FORMAT, andOUTPUT_FORMATto empty strings:date:::. - For timestamp partitions, match
TYPEandINPUT_FORMATto your data. ValidTYPEvalues:DATE_STRING,EPOCH_MILLIS,EPOCH_MICROS.
# Non-timestamp partition
--partition-key-fields region:::
# Date string partition (e.g. "2024-01-15")
--partition-key-fields event_date:DATE_STRING:yyyy-MM-dd:yyyy-MM-dd
# Epoch millis partition
--partition-key-fields created_at:EPOCH_MILLIS::yyyy-MM-dd
Special parameters
All WITH clause parameters from the CREATE FLOW API are passed via --options KEY=VALUE. This includes source-specific configs, schema registry settings, and advanced configurations.
--options kafka.topic.name=my-topic
--options flow.delayThreshold.numSyncIntervals=3
--options table.configured.base.path=s3://my-bucket/custom/path
Examples
Kafka flow
oh flow create web-events \
--source kafka-prod \
--lake my-lake \
--database events \
--table-name clicks \
--write-mode MUTABLE \
--cluster prod \
--record-key-fields event_id \
--partition-key-fields event_date:DATE_STRING:yyyy-MM-dd:yyyy-MM-dd \
--precombine-key-field updated_at \
--min-sync-frequency-mins 5 \
--quarantine-enabled \
--catalogs glue-catalog \
--transformations mask-pii \
--validations schema-check \
--options kafka.topic.name=web-clicks \
--options kafka.startingOffsets=earliest \
--options flow.delayThreshold.numSyncIntervals=3
Kafka flow with Avro schema registry
oh flow create avro-events \
--source kafka-prod \
--lake my-lake \
--database events \
--table-name user-actions \
--write-mode MUTABLE \
--cluster prod \
--options kafka.topic.name=user-actions \
--options schema.registry.type=confluent \
--options schema.registry.confluent.servers=https://schema-registry.example.com \
--options schema.registry.confluent.key=MY_KEY \
--options schema.registry.confluent.secret=MY_SECRET
S3 flow (JSON files)
oh flow create s3-orders \
--source s3-prod \
--lake my-lake \
--database orders \
--table-name raw \
--write-mode IMMUTABLE \
--cluster prod \
--record-key-fields order_id \
--partition-key-fields order_date:DATE_STRING:yyyy-MM-dd:yyyy-MM-dd \
--options s3.folder.uri=s3://my-bucket/orders/ \
--options s3.file.format=JSON \
--options s3.file.extension=.json \
--options s3.source.bootstrap=TRUE
S3 flow (CSV files)
oh flow create s3-csv-events \
--source s3-prod \
--lake my-lake \
--database events \
--table-name raw-csv \
--write-mode IMMUTABLE \
--cluster prod \
--record-key-fields id \
--options s3.folder.uri=s3://my-bucket/events/ \
--options s3.file.format=CSV \
--options s3.file.extension=.csv \
--options s3.file.csv.header=TRUE
GCS flow
oh flow create gcs-events \
--source gcs-prod \
--lake my-lake \
--database events \
--table-name raw \
--write-mode IMMUTABLE \
--cluster prod \
--record-key-fields event_id \
--options gcs.folder.uri=gs://my-bucket/events/ \
--options gcs.file.format=PARQUET \
--options gcs.file.extension=.parquet \
--options gcs.source.bootstrap=TRUE
Onehouse table flow
oh flow create table-copy \
--source onehouse-table-src \
--lake my-lake \
--database derived \
--table-name clicks-copy \
--write-mode MUTABLE \
--cluster prod \
--record-key-fields event_id \
--options source.table.lake=my-lake \
--options source.table.database=events \
--options source.table.name=clicks
Postgres CDC flow
oh flow create pg-orders \
--source pg-prod \
--lake my-lake \
--database orders \
--table-name orders \
--write-mode MUTABLE \
--cluster prod \
--record-key-fields id \
--options postgres.table.name=orders \
--options postgres.schema.name=public
oh flow alter NAME
Alters one property of a flow per invocation (the API enforces a single change per request).
| Flag | Description |
|---|---|
--state | PAUSE, RESUME, or CLEAN_AND_RESTART |
--source | New source name |
--cluster | New cluster |
--performance-profile | New performance profile |
--min-sync-frequency-mins INT | New minimum sync frequency |
--transformations NAME | New transformation list; repeatable |
--validations NAME | New validation list; repeatable |
--quarantine-enabled / --no-quarantine-enabled | Enable or disable quarantine |
--options KEY=VALUE | WITH clause parameters; repeatable (see Special parameters) |
Special parameters
When changing the source (--source), pass the new source's connection details via --options. When updating advanced configurations, pass them via --options without any other flags.
Important: When updating advanced configurations, any existing configs not included in the request will be removed.
| Parameter | Description |
|---|---|
kafka.topic.name | New Kafka topic name (when changing source) |
kafka.startingOffsets | earliest or latest (when changing source) |
kafka.topic.schema.name | Schema registry schema name (when changing source) |
flow.delayThreshold.numSyncIntervals | Delay threshold multiplier. Set to 0 to disable. |
flow.deduplicationPolicy | none (default) or drop — for append-only flows |
Examples
# Pause / resume / clean & restart
oh flow alter web-events --state PAUSE
oh flow alter web-events --state RESUME
oh flow alter web-events --state CLEAN_AND_RESTART
# Change cluster
oh flow alter web-events --cluster prod-v2
# Update sync frequency
oh flow alter web-events --min-sync-frequency-mins 10
# Change Kafka source and topic
oh flow alter web-events \
--source kafka-prod-v2 \
--options kafka.topic.name=web-clicks-v2 \
--options kafka.startingOffsets=latest
# Update advanced configurations
oh flow alter web-events \
--options flow.delayThreshold.numSyncIntervals=5 \
--options flow.deduplicationPolicy=drop
oh flow delete NAME
Deletes a flow.
oh flow delete web-events