Convert CDC Data
Description
Updates records in a table using CDC data. Reads a Spark struct of "before" and "after" values and updates records based on insert or update logic.
Supported CDC formats:
- PostgresSQL (Debezium)
- MySQL (Debezium)
- MongoDB (Atlas)
Parameters
- CDC Format: Specify the format of the CDC data.
- Destination Table Schema (only for Atlas format): Specify the final schema of the destination table after all transformations have been applied.
Input Requirements
Input dataset should follow exact same schema which Debezium or Atlas connector generates.
Expected Output
- Debezium Formats: Transformation will read the CDC data and extract the "before" values for deletes or "after" values for inserts/updates. Metadata from the transaction will also be added as fields in the row. Rows will be deleted in the target table when row has op = d.
- Atlas Formats: Transformation will read the CDC data and produce a struct with fields as the "before" values for deletes or "after" values for updates. Rows will be deleted in the target table when row has
operationType = delete
.
Example
// Input
// CDC Format: PostgresSQL
{
"schema": {...},
"payload": {
"before": {
"id": 123,
"name": "bill",
"rides": 5
},
"after": {
"id": 123,
"name": "bill",
"rides": 7
},
"source": {
"version": 123,
"connector": "my_connector"
...
}
},
"op": "u",
"ts_ms": 1589362330904
}
// Output
{
"id": 123,
"name": "bill",
"rides": 7
// Metafields specific to the CDC source (PostgresSQL, MySQL, etc.)
"_change_operation_type" // Type: String
"_upstream_event_processed_ts_ms" // Type: String
"db_shard_source_partition" // Type: String
"_event_origin_ts_ms" // Type: Long
"_event_tx_id" // Type: Long
"_event_lsn" // Type: Long
"_event_xmin" // Type: Long
}
Tips
- Ensure the Write Mode is set to Mutable if you want to update records in the table using CDC logs. If you prefer to land the raw CDC logs in the table without updating records, you can use Append-only Write Mode to improve write performance