Skip to main content

CREATE SOURCE

Description

Creates a new data source.

Note that the SQL statement does not end with ;

Syntax

CREATE SOURCE [ IF NOT EXISTS ] <name>
TYPE = {'APACHE_KAFKA' |'MSK_KAFKA' | 'CONFLUENT_KAFKA' | 'S3' | 'GCS' | 'ONEHOUSE_TABLE' | 'POSTGRES' | 'MY_SQL' }
[ CREDENTIAL_TYPE = { 'CREDENTIAL_TYPE_ONEHOUSE' | 'CREDENTIAL_TYPE_SECRET_MANAGER' } ]
WITH 'key1' = 'value1', 'key2' = 'value2' ....

Example

Example syntax for creating Apache Kafka source.

CREATE SOURCE mykafkasource SOURCE_TYPE = 'APACHE_KAFKA'
WITH
'kafka.connection.protocol' = 'TLS',
'kafka.security.protocol' = 'SSL',
'kafka.bootstrap.servers' = 'server1, server2',
'kafka.payload.serialization' = 'json',
'kafka.cloud.resource.identifier' = 'arn://abc',
'kafka.tls.trust.store.path' = 'sample/truststore/path',
'kafka.tls.key.store.path' = 'sample/keystore/path',
'kafka.tls.key.store.password' = 'secret_password',
'kafka.tls.key.password' = 'another_secret',
'schema.registry.type' = 'confluent',
'schema.registry.confluent.servers' = 'schema_registry_server1, schema_registry_server2',
'schema.registry.confluent.key' = 'schema_registry_key',
'schema.registry.confluent.secret' = 'schema_registry_secret',
'schema.registry.confluent.subject.prefix' = 'schema_registry_subject_prefix',
'schema.registry.confluent.subject.name' = 'schema_registry_subject'

Example syntax for creating Confluent Kafka source.

CREATE SOURCE myconfluentkafkasource 
SOURCE_TYPE = 'CONFLUENT_KAFKA'
WITH
'kafka.bootstrap.servers' = 'server1, server2',
'kafka.cloud.resource.identifier' = 'arn://abc',
'kafka.connection.protocol' = 'SASL',
'kafka.security.protocol' = 'SASL_SSL',
'kafka.payload.serialization' = 'avro',
'kafka.sasl.key' = 'sasl/key',
'kafka.sasl.key.password' = 'sasl/key/password',
'kafka.sasl.keystore.path' = 'sasl/keystore/path',
'kafka.sasl.keystore.password' = 'sasl/keystore/password',
'kafka.sasl.keystore.type' = 'jks',
'kafka.sasl.mechanism' = 'scram_sha_256',
'kafka.sasl.secret' = 'sasl/secret',
'kafka.sasl.trust.store.path' = 'sasl/truststore/path',
'kafka.sasl.trust.store.password' = 'sasl/truststore/password',
'kafka.sasl.trust.store.type' = 'pkcs12',
'schema.registry.type' = 'file',
'schema.registry.file.full.path' = 's3://c/d'

Example syntax for creating S3 source.

CREATE SOURCE mys3source SOURCE_TYPE = 'S3' 
WITH 'object.store.bucket.name' = 'mybucket'

Example syntax for creating GCS source.

CREATE SOURCE mygcssource SOURCE_TYPE = 'GCS' 
WITH 'object.store.bucket.name' = 'mybucket'

Example syntax for creating ONEHOUSE_TABLE source.

CREATE SOURCE myonehousetablesource SOURCE_TYPE = 'ONEHOUSE_TABLE' 
WITH 'source.table.lake' = 'testlake',
'schema.registry.proto.jar.location' = 'gs://e/f.jar',
'source.table.database' = 'testdatabase',
'schema.registry.type' = 'JAR',
'source.table.name' = 'testtable'

Sample response

Required parameters

  • name: Identifier for the source
    • Must be unique for your account
    • Must start with an alphabetic character and cannot contain spaces or special characters unless the entire identifier string is enclosed in double quotes (e.g. "Payments Kafka Cluster")
    • Case-sensitive
  • TYPE: Specifies the type of data source

Optional parameters

  • CREDENTIAL_TYPE: Credential Management Type. Refer Credential Management to read more.
    • CREDENTIAL_TYPE_ONEHOUSE: Credentials get stored in onehouse control-plane.
    • CREDENTIAL_TYPE_SECRET_MANAGER: Credentials are stored in customer dataplane, users need to create the secret and share the cloud specific secret identifier.
    • Applicable only for certain sources.
    • Default: CREDENTIAL_TYPE_ONEHOUSE.

Special parameters

Include special parameters after WITH as type String.

Kafka types

Includes Apache Kafka, Confluent Kafka, MSK Kafka, etc.

Bootstrap

  • kafka.bootstrap.servers: Kafka Bootstrap Servers.

Cloud

  • kafka.cloud.resource.identifier: If MSK Kafka, specify ARN here.

Message Serialization Type

  • kafka.payload.serialization: Serialization type of the Kafka message values. Valid values: json, avro, proto, confluent_proto, confluent_json_sr

Connection

  • kafka.connection.protocol: Valid values are PLAINTEXT, TLS & SASL
  • kafka.security.protocol: Valid values are PLAINTEXT, SASL_PLAINTEXT, SASL_SSL & SSL

TLS

  • kafka.tls.trust.store.path: TLS trust store path.
  • kafka.tls.key.store.path: TLS key store path.
If CREDENTIAL_TYPE is CREDENTIAL_TYPE_ONEHOUSE (Default):
  • kafka.tls.key.store.password: TLS key store password.
  • kafka.tls.key.password: TLS key password.
If CREDENTIAL_TYPE is CREDENTIAL_TYPE_SECRET_MANAGER:
  • kafka.tls.key.store.password.key.password.secret.reference - Cloud specific identifier (e.g., ARN for AWS) of a secret containing the TLS key store password and TLS key password in specified format.

SASL

  • kafka.sasl.mechanism: SASL mechanism. Valid values are PLAIN, SCRAM_SHA_256 & SCRAM_SHA_512.
If CREDENTIAL_TYPE is CREDENTIAL_TYPE_ONEHOUSE (Default):
  • kafka.sasl.key: SASL key
  • kafka.sasl.secret SASL secret.
If CREDENTIAL_TYPE is CREDENTIAL_TYPE_SECRET_MANAGER:
  • kafka.sasl.key.secret.reference - Cloud specific identifier (e.g., ARN for AWS) of a secret containing the SASL key and SASL secret in specified format.

S3 type

  • object.store.bucket.name: Bucket to create S3 Source from.

GCS type

  • object.store.bucket.name: Bucket to create GCS Source from.

Onehouse table type

  • source.table.name: Name of the Onehouse table.
  • source.table.database: Database in which onehouse table is in.
  • source.table.lake: Lake in which the onehouse table is in.

Postgres type

  • postgres.host: Hostname of the Postgres server.
  • postgres.port: Port number the Postgres server is listening to.
  • postgres.database.name: Name of the Postgres database.

If CREDENTIAL_TYPE is CREDENTIAL_TYPE_ONEHOUSE (Default):

  • postgres.user: Postgres user.
  • postgres.password: Password of the Postgres user.

If CREDENTIAL_TYPE is CREDENTIAL_TYPE_SECRET_MANAGER:

  • postgres.user.password.reference - Cloud specific identifier (e.g., ARN for AWS) of a secret containing the postgres user and password in specified format.

MySQL type

  • mysql.host: Hostname of the MySQL server.
  • mysql.port: Port number the MySQL server is listening to.
  • mysql.database.name: Name of the MySQL database.

If CREDENTIAL_TYPE is CREDENTIAL_TYPE_ONEHOUSE (Default):

  • mysql.user: MySQL user.
  • mysql.password: Password of the MySQL user.

If CREDENTIAL_TYPE is CREDENTIAL_TYPE_SECRET_MANAGER:

  • mysql.user.password.reference - Cloud specific identifier (e.g., ARN for AWS) of a secret containing the MySQL user and password in specified format.

Schema registry properties

These are shared by Stream Captures created with this source.

  • schema.registry.type: Valid values are `glue`, `confluent`, `jar` and `file` which implies `GlueSchemaRegistry`, `ConfluentSchemaRegistry`, `Jar`(For Proto Schemas), `FileBasedSchemaRegistry`.

Glue schema registry

  • schema.registry.glue.name: Name for the Glue schema registry.

Confluent schema registry

  • schema.registry.confluent.servers: Confluent Schema registry servers.
  • schema.registry.confluent.subject.name: Schema name in Confluent Schema registry.
  • schema.registry.confluent.subject.prefix: Schema prefix to filter schemas.

If CREDENTIAL_TYPE is CREDENTIAL_TYPE_ONEHOUSE (Default):

  • schema.registry.confluent.key: Confluent schema registry key.
  • schema.registry.confluent.secret: Confluent schema registry secret.

If CREDENTIAL_TYPE is CREDENTIAL_TYPE_SECRET_MANAGER:

  • schema.registry.confluent.key.secret.reference - Cloud specific identifier (e.g., ARN for AWS) of a secret containing the Confluent schema registry key and secret in specified format.

Proto schemas

  • schema.registry.proto.jar.location: Jar location for proto schemas.

File based schemas

  • schema.registry.file.base.path: Base path for file based schema registry.
  • schema.registry.file.full.path: Full path for file based schema registry.