Flows
Configure ingestion pipelines that read from sources and write to Onehouse tables.
Helper dataclass
Flows use one helper dataclass for partition definitions. Import it from onehouse_python_sdk.resources.sql.commands:
from onehouse_python_sdk.resources.sql.commands import PartitionKeyField
PartitionKeyField(
field: str, # column name
*,
partition_type: str = "", # "" | "DATE_STRING" | "EPOCH_MILLIS" | "EPOCH_MICROS"
input_format: str = "", # e.g. "yyyy-mm-dd"
output_format: str = "", # e.g. "yyyy-mm-dd"
)
For non-timestamp partition keys, leave partition_type, input_format, and output_format as empty strings.
Methods
| Method | Description |
|---|---|
create_flow | Create a new ingestion flow |
alter_flow | Change one aspect of an existing flow |
delete_flow | Delete a flow |
describe_flow | Show full configuration for a flow |
show_flows | List all flows in the project |
create_flow
create_flow(
name: str,
*,
source: str,
lake: str,
database: str,
table_name: str,
write_mode: str,
cluster: str,
performance_profile: str | None = None,
source_data_schema: str | None = None,
catalogs: Sequence[str] | None = None,
transformations: Sequence[str] | None = None,
record_key_fields: Sequence[str] | None = None,
partition_key_fields: Sequence[PartitionKeyField] | None = None,
precombine_key_field: str | None = None,
sorting_key_fields: Sequence[str] | None = None,
min_sync_frequency_mins: int | None = None,
quarantine_enabled: bool | None = None,
validations: Sequence[str] | None = None,
table_type: str | None = None,
options: Mapping[str, str] | None = None,
unsafe_raw: bool = False,
timeout: float | None = None,
poll_interval: float | None = None,
)
| Parameter | Required | Type / values |
|---|---|---|
name | yes | str |
source | yes | str — name of an existing source |
lake | yes | str — target lake |
database | yes | str — target database |
table_name | yes | str — target table |
write_mode | yes | "IMMUTABLE", "MUTABLE" |
cluster | yes | str — compute cluster that runs the flow |
performance_profile | no | "BALANCED", "FASTEST_READ", "FASTEST_WRITE" |
source_data_schema | no | str |
catalogs | no | Sequence[str] — catalog names to sync to |
transformations | no | Sequence[str] — transformation names to apply |
record_key_fields | no | Sequence[str] |
partition_key_fields | no | Sequence[PartitionKeyField] |
precombine_key_field | no | str |
sorting_key_fields | no | Sequence[str] |
min_sync_frequency_mins | no | int |
quarantine_enabled | no | bool |
validations | no | Sequence[str] — validation names to apply |
table_type | no | "COPY_ON_WRITE", "MERGE_ON_READ" |
options | no | Mapping[str, str] — source-specific runtime options |
Example
from onehouse_python_sdk.resources.sql.commands import PartitionKeyField
client.create_flow(
"events_pipeline",
source="my_kafka_source",
lake="analytics",
database="events",
table_name="page_views",
write_mode="MUTABLE",
cluster="ingest_cluster",
catalogs=["my_glue_catalog"],
record_key_fields=["id"],
partition_key_fields=[
PartitionKeyField("date", partition_type="DATE_STRING",
input_format="yyyy-mm-dd", output_format="yyyy-mm-dd"),
],
min_sync_frequency_mins=5,
options={"kafka.topic.name": "page_views"},
)
alter_flow
alter_flow(
name: str,
*,
state: str | None = None,
source: str | None = None,
cluster: str | None = None,
performance_profile: str | None = None,
min_sync_frequency_mins: int | None = None,
transformations: Sequence[str] | None = None,
validations: Sequence[str] | None = None,
quarantine_enabled: bool | None = None,
advanced_configs: bool = False,
options: Mapping[str, str] | None = None,
unsafe_raw: bool = False,
timeout: float | None = None,
poll_interval: float | None = None,
)
Requires exactly one of: state, source, cluster, performance_profile, min_sync_frequency_mins, transformations, validations, quarantine_enabled, or advanced_configs=True.
| Parameter | Required | Type / values |
|---|---|---|
name | yes | str |
state | no | "PAUSE", "RESUME", "CLEAN_AND_RESTART" |
source | no | str |
cluster | no | str |
performance_profile | no | "BALANCED", "FASTEST_READ", "FASTEST_WRITE" |
min_sync_frequency_mins | no | int |
transformations | no | Sequence[str] |
validations | no | Sequence[str] |
quarantine_enabled | no | bool |
advanced_configs | no | bool — when True, emits SET ADVANCED_CONFIGS with options |
options | no | Mapping[str, str] |
Examples
# Pause a flow
client.alter_flow("events_pipeline", state="PAUSE")
# Move to a different cluster
client.alter_flow("events_pipeline", cluster="ingest_cluster_v2")
# Update advanced configs
client.alter_flow(
"events_pipeline",
advanced_configs=True,
options={"hoodie.parquet.compression.codec": "zstd"},
)
delete_flow
delete_flow(name: str, *, unsafe_raw=False, timeout=None, poll_interval=None)
Example
client.delete_flow("events_pipeline")
describe_flow
describe_flow(name: str, *, unsafe_raw=False, timeout=None, poll_interval=None)
Example
result = client.describe_flow("events_pipeline")
show_flows
show_flows(*, timeout=None, poll_interval=None)
Example
result = client.show_flows()