Sources
Define data sources for ingestion: Kafka (Apache, MSK, Confluent), object storage (S3, GCS), relational databases (Postgres, MySQL), and Onehouse tables.
Methods
| Method | Description |
|---|---|
create_source | Register a new source |
delete_source | Delete a source |
describe_source | Show full configuration for a source |
show_sources | List all sources in the project |
create_source
create_source(
name: str,
*,
source_type: str,
credential_type: str | None = None,
options: Mapping[str, str] | None = None,
if_not_exists: bool = False,
unsafe_raw: bool = False,
timeout: float | None = None,
poll_interval: float | None = None,
)
| Parameter | Required | Type / values |
|---|---|---|
name | yes | str |
source_type | yes | "APACHE_KAFKA", "MSK_KAFKA", "CONFLUENT_KAFKA", "S3", "GCS", "ONEHOUSE_TABLE", "POSTGRES", "MY_SQL" |
credential_type | no | "CREDENTIAL_TYPE_ONEHOUSE", "CREDENTIAL_TYPE_SECRET_MANAGER" |
options | no | Mapping[str, str] — source-specific config (broker URLs, region, paths, etc.) |
if_not_exists | no | bool — emit IF NOT EXISTS so the call is idempotent |
Examples
# Kafka source
client.create_source(
"my_kafka_source",
source_type="APACHE_KAFKA",
options={"bootstrap.servers": "broker1:9092,broker2:9092"},
)
# S3 source
client.create_source(
"events_bucket",
source_type="S3",
options={"path": "s3://my-bucket/events/", "region": "us-west-2"},
if_not_exists=True,
)
delete_source
delete_source(name: str, *, unsafe_raw=False, timeout=None, poll_interval=None)
Example
client.delete_source("my_kafka_source")
describe_source
describe_source(name: str, *, unsafe_raw=False, timeout=None, poll_interval=None)
Example
result = client.describe_source("my_kafka_source")
show_sources
show_sources(*, timeout=None, poll_interval=None)
Example
result = client.show_sources()