Skip to main content

oh flow

Manage ingestion pipelines that stream data from a source to a target table.

Write modes: IMMUTABLE MUTABLE
Performance profiles: BALANCED FASTEST_READ FASTEST_WRITE


oh flow show

Lists all flows.

oh flow show

oh flow describe NAME

Describes a flow. NAME can be either the flow name or the fully-qualified flow ID in name:table_name form (as returned by oh flow show).

oh flow describe web-events
oh flow describe web-events:clicks

oh flow create NAME

Creates a flow.

Flags

FlagRequiredDescription
--sourceYesSource name
--lakeYesTarget lake
--databaseYesTarget database
--table-nameYesTarget table name
--write-modeYesIMMUTABLE or MUTABLE
--clusterYesCluster to run the flow on
--performance-profileNoBALANCED, FASTEST_READ, or FASTEST_WRITE
--table-typeNoHudi table type: COPY_ON_WRITE or MERGE_ON_READ (default: MERGE_ON_READ)
--record-key-fields FIELDNoRecord key field(s); repeatable
--partition-key-fields FIELD[:TYPE[:IN_FMT[:OUT_FMT]]]NoPartition key field(s); repeatable (see Partition keys below)
--precombine-key-fieldNoPrecombine key field name
--sorting-key-fields FIELDNoSorting key field(s); repeatable
--min-sync-frequency-mins INTNoMinimum sync frequency in minutes (default: 1)
--quarantine-enabledNoEnable quarantine for records that fail validation
--catalogs NAMENoCatalog name(s) to register the table in; repeatable
--transformations NAMENoTransformation name(s) to apply; repeatable
--validations NAMENoValidation name(s) to apply; repeatable
--options KEY=VALUENoWITH clause parameters; repeatable (see Special parameters)

Partition keys

--partition-key-fields takes a colon-separated string: FIELD:TYPE:INPUT_FORMAT:OUTPUT_FORMAT.

  • For non-timestamp partitions, set TYPE, INPUT_FORMAT, and OUTPUT_FORMAT to empty strings: date:::.
  • For timestamp partitions, match TYPE and INPUT_FORMAT to your data. Valid TYPE values: DATE_STRING, EPOCH_MILLIS, EPOCH_MICROS.
# Non-timestamp partition
--partition-key-fields region:::

# Date string partition (e.g. "2024-01-15")
--partition-key-fields event_date:DATE_STRING:yyyy-MM-dd:yyyy-MM-dd

# Epoch millis partition
--partition-key-fields created_at:EPOCH_MILLIS::yyyy-MM-dd

Special parameters

All WITH clause parameters from the CREATE FLOW API are passed via --options KEY=VALUE. This includes source-specific configs, schema registry settings, and advanced configurations.

--options kafka.topic.name=my-topic
--options flow.delayThreshold.numSyncIntervals=3
--options table.configured.base.path=s3://my-bucket/custom/path

Examples

Kafka flow

oh flow create web-events \
--source kafka-prod \
--lake my-lake \
--database events \
--table-name clicks \
--write-mode MUTABLE \
--cluster prod \
--record-key-fields event_id \
--partition-key-fields event_date:DATE_STRING:yyyy-MM-dd:yyyy-MM-dd \
--precombine-key-field updated_at \
--min-sync-frequency-mins 5 \
--quarantine-enabled \
--catalogs glue-catalog \
--transformations mask-pii \
--validations schema-check \
--options kafka.topic.name=web-clicks \
--options kafka.startingOffsets=earliest \
--options flow.delayThreshold.numSyncIntervals=3

Kafka flow with Avro schema registry

oh flow create avro-events \
--source kafka-prod \
--lake my-lake \
--database events \
--table-name user-actions \
--write-mode MUTABLE \
--cluster prod \
--options kafka.topic.name=user-actions \
--options schema.registry.type=confluent \
--options schema.registry.confluent.servers=https://schema-registry.example.com \
--options schema.registry.confluent.key=MY_KEY \
--options schema.registry.confluent.secret=MY_SECRET

S3 flow (JSON files)

oh flow create s3-orders \
--source s3-prod \
--lake my-lake \
--database orders \
--table-name raw \
--write-mode IMMUTABLE \
--cluster prod \
--record-key-fields order_id \
--partition-key-fields order_date:DATE_STRING:yyyy-MM-dd:yyyy-MM-dd \
--options s3.folder.uri=s3://my-bucket/orders/ \
--options s3.file.format=JSON \
--options s3.file.extension=.json \
--options s3.source.bootstrap=TRUE

S3 flow (CSV files)

oh flow create s3-csv-events \
--source s3-prod \
--lake my-lake \
--database events \
--table-name raw-csv \
--write-mode IMMUTABLE \
--cluster prod \
--record-key-fields id \
--options s3.folder.uri=s3://my-bucket/events/ \
--options s3.file.format=CSV \
--options s3.file.extension=.csv \
--options s3.file.csv.header=TRUE

GCS flow

oh flow create gcs-events \
--source gcs-prod \
--lake my-lake \
--database events \
--table-name raw \
--write-mode IMMUTABLE \
--cluster prod \
--record-key-fields event_id \
--options gcs.folder.uri=gs://my-bucket/events/ \
--options gcs.file.format=PARQUET \
--options gcs.file.extension=.parquet \
--options gcs.source.bootstrap=TRUE

Onehouse table flow

oh flow create table-copy \
--source onehouse-table-src \
--lake my-lake \
--database derived \
--table-name clicks-copy \
--write-mode MUTABLE \
--cluster prod \
--record-key-fields event_id \
--options source.table.lake=my-lake \
--options source.table.database=events \
--options source.table.name=clicks

Postgres CDC flow

oh flow create pg-orders \
--source pg-prod \
--lake my-lake \
--database orders \
--table-name orders \
--write-mode MUTABLE \
--cluster prod \
--record-key-fields id \
--options postgres.table.name=orders \
--options postgres.schema.name=public

oh flow alter NAME

Alters one property of a flow per invocation (the API enforces a single change per request).

FlagDescription
--statePAUSE, RESUME, or CLEAN_AND_RESTART
--sourceNew source name
--clusterNew cluster
--performance-profileNew performance profile
--min-sync-frequency-mins INTNew minimum sync frequency
--transformations NAMENew transformation list; repeatable
--validations NAMENew validation list; repeatable
--quarantine-enabled / --no-quarantine-enabledEnable or disable quarantine
--options KEY=VALUEWITH clause parameters; repeatable (see Special parameters)

Special parameters

When changing the source (--source), pass the new source's connection details via --options. When updating advanced configurations, pass them via --options without any other flags.

Important: When updating advanced configurations, any existing configs not included in the request will be removed.

ParameterDescription
kafka.topic.nameNew Kafka topic name (when changing source)
kafka.startingOffsetsearliest or latest (when changing source)
kafka.topic.schema.nameSchema registry schema name (when changing source)
flow.delayThreshold.numSyncIntervalsDelay threshold multiplier. Set to 0 to disable.
flow.deduplicationPolicynone (default) or drop — for append-only flows

Examples

# Pause / resume / clean & restart
oh flow alter web-events --state PAUSE
oh flow alter web-events --state RESUME
oh flow alter web-events --state CLEAN_AND_RESTART

# Change cluster
oh flow alter web-events --cluster prod-v2

# Update sync frequency
oh flow alter web-events --min-sync-frequency-mins 10

# Change Kafka source and topic
oh flow alter web-events \
--source kafka-prod-v2 \
--options kafka.topic.name=web-clicks-v2 \
--options kafka.startingOffsets=latest

# Update advanced configurations
oh flow alter web-events \
--options flow.delayThreshold.numSyncIntervals=5 \
--options flow.deduplicationPolicy=drop

oh flow delete NAME

Deletes a flow.

oh flow delete web-events