Skip to main content

oh source

Manage data sources that flows read from.

Source types: APACHE_KAFKA MSK_KAFKA CONFLUENT_KAFKA S3 GCS ONEHOUSE_TABLE POSTGRES MY_SQL


oh source show

Lists all sources.

oh source show

oh source describe NAME

Describes a source.

oh source describe kafka-prod

oh source create NAME

Creates a source.

Flags

FlagRequiredDescription
--typeYesSource type
--credential-typeNoCREDENTIAL_TYPE_ONEHOUSE (default) or CREDENTIAL_TYPE_SECRET_MANAGER
--options KEY=VALUENoWITH clause parameters; repeatable (see Special parameters)
--if-not-existsNoSkip creation silently if the source already exists

Special parameters

All connection and configuration details are passed as --options KEY=VALUE. These map directly to the WITH clause of the CREATE SOURCE API.

Examples

Apache Kafka (TLS)

oh source create kafka-prod \
--type APACHE_KAFKA \
--options kafka.bootstrap.servers=broker1:9092,broker2:9092 \
--options kafka.connection.protocol=TLS \
--options kafka.security.protocol=SSL \
--options kafka.payload.serialization=avro \
--options kafka.tls.trust.store.path=s3://my-bucket/truststore.jks \
--options kafka.tls.key.store.path=s3://my-bucket/keystore.jks \
--options kafka.tls.key.store.password=MY_PASSWORD \
--options kafka.tls.key.password=MY_KEY_PASSWORD

Confluent Kafka (SASL)

oh source create confluent-prod \
--type CONFLUENT_KAFKA \
--options kafka.bootstrap.servers=broker1:9092,broker2:9092 \
--options kafka.connection.protocol=SASL \
--options kafka.security.protocol=SASL_SSL \
--options kafka.payload.serialization=avro \
--options kafka.sasl.mechanism=SCRAM_SHA_256 \
--options kafka.sasl.key=MY_SASL_KEY \
--options kafka.sasl.secret=MY_SASL_SECRET \
--options schema.registry.type=confluent \
--options schema.registry.confluent.servers=https://schema-registry.example.com \
--options schema.registry.confluent.key=MY_SR_KEY \
--options schema.registry.confluent.secret=MY_SR_SECRET

MSK Kafka

oh source create msk-prod \
--type MSK_KAFKA \
--options kafka.bootstrap.servers=broker1:9092,broker2:9092 \
--options kafka.cloud.resource.identifier=arn:aws:kafka:us-east-1:123456789:cluster/my-cluster/abc \
--options kafka.payload.serialization=json

S3

oh source create s3-prod \
--type S3 \
--options object.store.bucket.name=my-data-bucket

GCS

oh source create gcs-prod \
--type GCS \
--options object.store.bucket.name=my-data-bucket

Onehouse table

oh source create onehouse-table-src \
--type ONEHOUSE_TABLE \
--options source.table.lake=my-lake \
--options source.table.database=events \
--options source.table.name=clicks

Postgres CDC

oh source create pg-prod \
--type POSTGRES \
--options postgres.host=db.example.com \
--options postgres.port=5432 \
--options postgres.database.name=mydb \
--options postgres.user=onehouse_user \
--options postgres.password=MY_PASSWORD

MySQL CDC

oh source create mysql-prod \
--type MY_SQL \
--options mysql.host=db.example.com \
--options mysql.port=3306 \
--options mysql.database.name=mydb \
--options mysql.user=onehouse_user \
--options mysql.password=MY_PASSWORD

Using Secrets Manager for credentials

When --credential-type CREDENTIAL_TYPE_SECRET_MANAGER is set, replace plaintext credential options with a secret reference:

oh source create kafka-prod \
--type APACHE_KAFKA \
--credential-type CREDENTIAL_TYPE_SECRET_MANAGER \
--options kafka.bootstrap.servers=broker1:9092 \
--options kafka.connection.protocol=TLS \
--options kafka.security.protocol=SSL \
--options kafka.payload.serialization=avro \
--options kafka.tls.trust.store.path=s3://my-bucket/truststore.jks \
--options kafka.tls.key.store.path=s3://my-bucket/keystore.jks \
--options kafka.tls.key.store.password.key.password.secret.reference=arn:aws:secretsmanager:us-east-1:123456789:secret:my-tls-secret

oh source delete NAME

Deletes a source.

oh source delete kafka-prod