Skip to main content

Sources

Define data sources for ingestion: Kafka (Apache, MSK, Confluent), object storage (S3, GCS), relational databases (Postgres, MySQL), and Onehouse tables.

Methods

MethodDescription
create_sourceRegister a new source
delete_sourceDelete a source
describe_sourceShow full configuration for a source
show_sourcesList all sources in the project

create_source

create_source(
name: str,
*,
source_type: str,
credential_type: str | None = None,
options: Mapping[str, str] | None = None,
if_not_exists: bool = False,
unsafe_raw: bool = False,
timeout: float | None = None,
poll_interval: float | None = None,
)
ParameterRequiredType / values
nameyesstr
source_typeyes"APACHE_KAFKA", "MSK_KAFKA", "CONFLUENT_KAFKA", "S3", "GCS", "ONEHOUSE_TABLE", "POSTGRES", "MY_SQL"
credential_typeno"CREDENTIAL_TYPE_ONEHOUSE", "CREDENTIAL_TYPE_SECRET_MANAGER"
optionsnoMapping[str, str] — source-specific config (broker URLs, region, paths, etc.)
if_not_existsnobool — emit IF NOT EXISTS so the call is idempotent

Examples

# Kafka source
client.create_source(
"my_kafka_source",
source_type="APACHE_KAFKA",
options={"bootstrap.servers": "broker1:9092,broker2:9092"},
)

# S3 source
client.create_source(
"events_bucket",
source_type="S3",
options={"path": "s3://my-bucket/events/", "region": "us-west-2"},
if_not_exists=True,
)

delete_source

delete_source(name: str, *, unsafe_raw=False, timeout=None, poll_interval=None)

Example

client.delete_source("my_kafka_source")

describe_source

describe_source(name: str, *, unsafe_raw=False, timeout=None, poll_interval=None)

Example

result = client.describe_source("my_kafka_source")

show_sources

show_sources(*, timeout=None, poll_interval=None)

Example

result = client.show_sources()