Skip to main content

Flows

Configure ingestion pipelines that read from sources and write to Onehouse tables.

Helper dataclass

Flows use one helper dataclass for partition definitions. Import it from onehouse_python_sdk.resources.sql.commands:

from onehouse_python_sdk.resources.sql.commands import PartitionKeyField

PartitionKeyField(
field: str, # column name
*,
partition_type: str = "", # "" | "DATE_STRING" | "EPOCH_MILLIS" | "EPOCH_MICROS"
input_format: str = "", # e.g. "yyyy-mm-dd"
output_format: str = "", # e.g. "yyyy-mm-dd"
)

For non-timestamp partition keys, leave partition_type, input_format, and output_format as empty strings.

Methods

MethodDescription
create_flowCreate a new ingestion flow
alter_flowChange one aspect of an existing flow
delete_flowDelete a flow
describe_flowShow full configuration for a flow
show_flowsList all flows in the project

create_flow

create_flow(
name: str,
*,
source: str,
lake: str,
database: str,
table_name: str,
write_mode: str,
cluster: str,
performance_profile: str | None = None,
source_data_schema: str | None = None,
catalogs: Sequence[str] | None = None,
transformations: Sequence[str] | None = None,
record_key_fields: Sequence[str] | None = None,
partition_key_fields: Sequence[PartitionKeyField] | None = None,
precombine_key_field: str | None = None,
sorting_key_fields: Sequence[str] | None = None,
min_sync_frequency_mins: int | None = None,
quarantine_enabled: bool | None = None,
validations: Sequence[str] | None = None,
table_type: str | None = None,
options: Mapping[str, str] | None = None,
unsafe_raw: bool = False,
timeout: float | None = None,
poll_interval: float | None = None,
)
ParameterRequiredType / values
nameyesstr
sourceyesstr — name of an existing source
lakeyesstr — target lake
databaseyesstr — target database
table_nameyesstr — target table
write_modeyes"IMMUTABLE", "MUTABLE"
clusteryesstr — compute cluster that runs the flow
performance_profileno"BALANCED", "FASTEST_READ", "FASTEST_WRITE"
source_data_schemanostr
catalogsnoSequence[str] — catalog names to sync to
transformationsnoSequence[str] — transformation names to apply
record_key_fieldsnoSequence[str]
partition_key_fieldsnoSequence[PartitionKeyField]
precombine_key_fieldnostr
sorting_key_fieldsnoSequence[str]
min_sync_frequency_minsnoint
quarantine_enablednobool
validationsnoSequence[str] — validation names to apply
table_typeno"COPY_ON_WRITE", "MERGE_ON_READ"
optionsnoMapping[str, str] — source-specific runtime options

Example

from onehouse_python_sdk.resources.sql.commands import PartitionKeyField

client.create_flow(
"events_pipeline",
source="my_kafka_source",
lake="analytics",
database="events",
table_name="page_views",
write_mode="MUTABLE",
cluster="ingest_cluster",
catalogs=["my_glue_catalog"],
record_key_fields=["id"],
partition_key_fields=[
PartitionKeyField("date", partition_type="DATE_STRING",
input_format="yyyy-mm-dd", output_format="yyyy-mm-dd"),
],
min_sync_frequency_mins=5,
options={"kafka.topic.name": "page_views"},
)

alter_flow

alter_flow(
name: str,
*,
state: str | None = None,
source: str | None = None,
cluster: str | None = None,
performance_profile: str | None = None,
min_sync_frequency_mins: int | None = None,
transformations: Sequence[str] | None = None,
validations: Sequence[str] | None = None,
quarantine_enabled: bool | None = None,
advanced_configs: bool = False,
options: Mapping[str, str] | None = None,
unsafe_raw: bool = False,
timeout: float | None = None,
poll_interval: float | None = None,
)

Requires exactly one of: state, source, cluster, performance_profile, min_sync_frequency_mins, transformations, validations, quarantine_enabled, or advanced_configs=True.

ParameterRequiredType / values
nameyesstr
stateno"PAUSE", "RESUME", "CLEAN_AND_RESTART"
sourcenostr
clusternostr
performance_profileno"BALANCED", "FASTEST_READ", "FASTEST_WRITE"
min_sync_frequency_minsnoint
transformationsnoSequence[str]
validationsnoSequence[str]
quarantine_enablednobool
advanced_configsnobool — when True, emits SET ADVANCED_CONFIGS with options
optionsnoMapping[str, str]

Examples

# Pause a flow
client.alter_flow("events_pipeline", state="PAUSE")

# Move to a different cluster
client.alter_flow("events_pipeline", cluster="ingest_cluster_v2")

# Update advanced configs
client.alter_flow(
"events_pipeline",
advanced_configs=True,
options={"hoodie.parquet.compression.codec": "zstd"},
)

delete_flow

delete_flow(name: str, *, unsafe_raw=False, timeout=None, poll_interval=None)

Example

client.delete_flow("events_pipeline")

describe_flow

describe_flow(name: str, *, unsafe_raw=False, timeout=None, poll_interval=None)

Example

result = client.describe_flow("events_pipeline")

show_flows

show_flows(*, timeout=None, poll_interval=None)

Example

result = client.show_flows()