CREATE SOURCE
Description
Creates a new data source.
Note that the SQL statement does not end with ;
Syntax
CREATE SOURCE [ IF NOT EXISTS ] <name>
TYPE = {'APACHE_KAFKA' |'MSK_KAFKA' | 'CONFLUENT_KAFKA' | 'S3' | 'GCS' | 'ONEHOUSE_TABLE' | 'POSTGRES' | 'MY_SQL' }
[ CREDENTIAL_TYPE = { 'CREDENTIAL_TYPE_ONEHOUSE' | 'CREDENTIAL_TYPE_SECRET_MANAGER' } ]
WITH 'key1' = 'value1', 'key2' = 'value2' ....
Example
Example syntax for creating Apache Kafka source.
CREATE SOURCE mykafkasource SOURCE_TYPE = 'APACHE_KAFKA'
WITH
'kafka.connection.protocol' = 'TLS',
'kafka.security.protocol' = 'SSL',
'kafka.bootstrap.servers' = 'server1, server2',
'kafka.payload.serialization' = 'json',
'kafka.cloud.resource.identifier' = 'arn://abc',
'kafka.tls.trust.store.path' = 'sample/truststore/path',
'kafka.tls.key.store.path' = 'sample/keystore/path',
'kafka.tls.key.store.password' = 'secret_password',
'kafka.tls.key.password' = 'another_secret',
'schema.registry.type' = 'confluent',
'schema.registry.confluent.servers' = 'schema_registry_server1, schema_registry_server2',
'schema.registry.confluent.key' = 'schema_registry_key',
'schema.registry.confluent.secret' = 'schema_registry_secret',
'schema.registry.confluent.subject.prefix' = 'schema_registry_subject_prefix',
'schema.registry.confluent.subject.name' = 'schema_registry_subject'
Example syntax for creating Confluent Kafka source.
CREATE SOURCE myconfluentkafkasource
SOURCE_TYPE = 'CONFLUENT_KAFKA'
WITH
'kafka.bootstrap.servers' = 'server1, server2',
'kafka.cloud.resource.identifier' = 'arn://abc',
'kafka.connection.protocol' = 'SASL',
'kafka.security.protocol' = 'SASL_SSL',
'kafka.payload.serialization' = 'avro',
'kafka.sasl.key' = 'sasl/key',
'kafka.sasl.key.password' = 'sasl/key/password',
'kafka.sasl.keystore.path' = 'sasl/keystore/path',
'kafka.sasl.keystore.password' = 'sasl/keystore/password',
'kafka.sasl.keystore.type' = 'jks',
'kafka.sasl.mechanism' = 'scram_sha_256',
'kafka.sasl.secret' = 'sasl/secret',
'kafka.sasl.trust.store.path' = 'sasl/truststore/path',
'kafka.sasl.trust.store.password' = 'sasl/truststore/password',
'kafka.sasl.trust.store.type' = 'pkcs12',
'schema.registry.type' = 'file',
'schema.registry.file.full.path' = 's3://c/d'
Example syntax for creating S3 source.
CREATE SOURCE mys3source SOURCE_TYPE = 'S3'
WITH 'object.store.bucket.name' = 'mybucket'
Example syntax for creating GCS source.
CREATE SOURCE mygcssource SOURCE_TYPE = 'GCS'
WITH 'object.store.bucket.name' = 'mybucket'
Example syntax for creating ONEHOUSE_TABLE source.
CREATE SOURCE myonehousetablesource SOURCE_TYPE = 'ONEHOUSE_TABLE'
WITH 'source.table.lake' = 'testlake',
'schema.registry.proto.jar.location' = 'gs://e/f.jar',
'source.table.database' = 'testdatabase',
'schema.registry.type' = 'JAR',
'source.table.name' = 'testtable'
Sample response
Required parameters
name
: Identifier for the source- Must be unique for your account
- Must start with an alphabetic character and cannot contain spaces or special characters unless the entire identifier string is enclosed in double quotes (e.g. "Payments Kafka Cluster")
- Case-sensitive
TYPE
: Specifies the type of data source
Optional parameters
CREDENTIAL_TYPE
: Credential Management Type. Refer Credential Management to read more.CREDENTIAL_TYPE_ONEHOUSE
: Credentials get stored in onehouse control-plane.CREDENTIAL_TYPE_SECRET_MANAGER
: Credentials are stored in customer dataplane, users need to create the secret and share the cloud specific secret identifier.- Applicable only for certain sources.
- Default:
CREDENTIAL_TYPE_ONEHOUSE
.
Special parameters
Include special parameters after WITH
as type String.
Kafka types
Includes Apache Kafka, Confluent Kafka, MSK Kafka, etc.
Bootstrap
kafka.bootstrap.servers
: Kafka Bootstrap Servers.
Cloud
kafka.cloud.resource.identifier
: If MSK Kafka, specify ARN here.
Message Serialization Type
kafka.payload.serialization
: Serialization type of the Kafka message values. Valid values:json
,avro
,proto
,confluent_proto
,confluent_json_sr
Connection
kafka.connection.protocol
: Valid values arePLAINTEXT
,TLS
&SASL
kafka.security.protocol
: Valid values arePLAINTEXT
,SASL_PLAINTEXT
,SASL_SSL
&SSL
TLS
kafka.tls.trust.store.path
: TLS trust store path.kafka.tls.key.store.path
: TLS key store path.
If CREDENTIAL_TYPE
is CREDENTIAL_TYPE_ONEHOUSE
(Default):
kafka.tls.key.store.password
: TLS key store password.kafka.tls.key.password
: TLS key password.
If CREDENTIAL_TYPE
is CREDENTIAL_TYPE_SECRET_MANAGER
:
kafka.tls.key.store.password.key.password.secret.reference
- Cloud specific identifier (e.g., ARN for AWS) of a secret containing the TLS key store password and TLS key password in specified format.
SASL
kafka.sasl.mechanism
: SASL mechanism. Valid values arePLAIN
,SCRAM_SHA_256
&SCRAM_SHA_512
.
If CREDENTIAL_TYPE
is CREDENTIAL_TYPE_ONEHOUSE
(Default):
kafka.sasl.key
: SASL keykafka.sasl.secret
SASL secret.
If CREDENTIAL_TYPE
is CREDENTIAL_TYPE_SECRET_MANAGER
:
kafka.sasl.key.secret.reference
- Cloud specific identifier (e.g., ARN for AWS) of a secret containing the SASL key and SASL secret in specified format.
S3 type
object.store.bucket.name
: Bucket to create S3 Source from.
GCS type
object.store.bucket.name
: Bucket to create GCS Source from.
Onehouse table type
source.table.name
: Name of the Onehouse table.source.table.database
: Database in which onehouse table is in.source.table.lake
: Lake in which the onehouse table is in.
Postgres type
postgres.host
: Hostname of the Postgres server.postgres.port
: Port number the Postgres server is listening to.postgres.database.name
: Name of the Postgres database.
If CREDENTIAL_TYPE
is CREDENTIAL_TYPE_ONEHOUSE
(Default):
postgres.user
: Postgres user.postgres.password
: Password of the Postgres user.
If CREDENTIAL_TYPE
is CREDENTIAL_TYPE_SECRET_MANAGER
:
postgres.user.password.reference
- Cloud specific identifier (e.g., ARN for AWS) of a secret containing the postgres user and password in specified format.
MySQL type
mysql.host
: Hostname of the MySQL server.mysql.port
: Port number the MySQL server is listening to.mysql.database.name
: Name of the MySQL database.
If CREDENTIAL_TYPE
is CREDENTIAL_TYPE_ONEHOUSE
(Default):
mysql.user
: MySQL user.mysql.password
: Password of the MySQL user.
If CREDENTIAL_TYPE
is CREDENTIAL_TYPE_SECRET_MANAGER
:
mysql.user.password.reference
- Cloud specific identifier (e.g., ARN for AWS) of a secret containing the MySQL user and password in specified format.
Schema registry properties
These are shared by Stream Captures created with this source.
schema.registry.type
: Valid values are `glue`, `confluent`, `jar` and `file` which implies `GlueSchemaRegistry`, `ConfluentSchemaRegistry`, `Jar`(For Proto Schemas), `FileBasedSchemaRegistry`.
Glue schema registry
schema.registry.glue.name
: Name for the Glue schema registry.
Confluent schema registry
schema.registry.confluent.servers
: Confluent Schema registry servers.schema.registry.confluent.subject.name
: Schema name in Confluent Schema registry.schema.registry.confluent.subject.prefix
: Schema prefix to filter schemas.
If CREDENTIAL_TYPE
is CREDENTIAL_TYPE_ONEHOUSE
(Default):
schema.registry.confluent.key
: Confluent schema registry key.schema.registry.confluent.secret
: Confluent schema registry secret.
If CREDENTIAL_TYPE
is CREDENTIAL_TYPE_SECRET_MANAGER
:
schema.registry.confluent.key.secret.reference
- Cloud specific identifier (e.g., ARN for AWS) of a secret containing the Confluent schema registry key and secret in specified format.
Proto schemas
schema.registry.proto.jar.location
: Jar location for proto schemas.
File based schemas
schema.registry.file.base.path
: Base path for file based schema registry.schema.registry.file.full.path
: Full path for file based schema registry.