ALTER FLOW
Description
Modifies the configurations for a Flow. Currently supports pausing the stream, resuming the stream, and updating the minimum sync frequency.
Note that the SQL statement does not end with ;
Syntax
ALTER FLOW `<name>`
[ SET STATE = { PAUSE | RESUME | CLEAN_AND_RESTART} ]
[ SET SOURCE = <string> ]
[ SET CLUSTER = <string> ]
[ SET PERFORMANCE_PROFILE = { 'BALANCED' | 'FASTEST_READ' | 'FASTEST_WRITE' } ]
[ SET MIN_SYNC_FREQUENCY_MINS = <integer> ]
[ SET TRANSFORMATIONS = ( ['<TRANSFORMATION_NAME>'], ... ) ]
[ SET VALIDATIONS = ( ['<VALIDATION_NAME>'], ... ) ]
[ SET QUARANTINE_ENABLED = 'false' ]
[ SET ADVANCED_CONFIGS]
Sample response
Examples
Alter State
ALTER FLOW `testStreamCapture`
SET STATE = PAUSE
Alter Source
ALTER FLOW `testStreamCapture`
SET SOURCE = 'kafka_source_name'
WITH
'kafka.topic.name' = 'kafka-topic-name',
'kafka.startingOffsets' = 'latest',
'kafka.topic.schema.name' = 'kafka-topic-name-value'
Alter Advanced Configurations
ALTER FLOW `testStreamCapture`
SET ADVANCED_CONFIGS
WITH
'flow.advancedConfig1' = 'value1',
'flow.advancedConfig2' = 'value2'
Alter transformations
ALTER STREAM CAPTURE `testStreamCapture`
SET TRANSFORMATIONS = ('transformationName1', 'transformationName2')
Required parameters
<name>: Identifier for the Flow.
Optional parameters
Use one optional parameter
You must use exactly one of the optional parameters in the query.
STATE: Pause, resume or clean & restart the Flow.SOURCE: Specify the name of the new source to use for the Flow.- Important: Read the docs on editing sources here for details on usage and limitations.
CLUSTER: Specify the name of an existing Cluster that will run the Flow.PERFORMANCE_PROFILE: Specify the desired Performance Profile to optimize how the Flow writes data to the table.- Only available for Flows in Append-only write mode.
MIN_SYNC_FREQUENCY_MINS: Specify the minimum number of minutes between Flow triggers.ADVANCED_CONFIGS: Include this parameter to update the advanced configurations for the Flow. Then, specify the updated advanced configurations after the WITH clause as comma-separated key value pairs.- Important: Any existing advanced configurations that should remain unchanged must also be included in the request. Omitting any existing configurations will result in their removal.
QUARANTINE_ENABLED: Specify whether to disable quarantine of invalid records.- Pass false to disable quarantine of invalid records.
- If you want to enable quarantine of invalid records, pass the
VALIDATIONSin API, and it will enable quarantine with passed validation.
TRANSFORMATIONS: Specify the names of transformations to use for the Stream Capture.- You must first create any transformation with the
CREATE TRANSFORMATIONAPI command before referencing them. SeeCREATE TRANSFORMATIONAPI for information on how to create one. - Transformation names are case-agnostic.
- You can view created transformations using
SHOW TRANSFORMATIONSAPI command.
- You must first create any transformation with the
VALIDATIONS: Specify the names of validations to use for the Stream Capture.- You must first create any validation with the
CREATE VALIDATIONAPI command before referencing them. SeeCREATE VALIDATIONAPI for information on how to create one. - Validation names are case-agnostic.
- You can view created validations using
SHOW VALIDATIONSAPI command.
- You must first create any validation with the
Advanced Configs
Include advanced configs after WITH as type String only when doing SET ADVANCED_CONFIGS.
flow.delayThreshold.numSyncIntervals: This value is multiplied by the Sync Frequency to determine the delay threshold. When a sync's duration surpasses the delay threshold, the Flow is considered delayed. This value cannot be negative. Set this value to 0 to disable the Delayed state for this Flow.- Default:
0(Delayed state & notifications disabled)
- Default:
Special parameters
Include special parameters after WITH as type String.
Editing the source
Include these parameters when editing the source of the Flow.
kafka.topic.name: Specify the name of the new source topic.kafka.startingOffsets: [Optional] Specify the start point for ingestion from the new Kafka source as 'earliest' or 'latest'. Default is 'earliest'.kafka.topic.schema.name: Specify the name of the schema in your schema registry to use for the source data (for Kafka sources with a schema registry).- If your Schema Registry uses Schema Contexts, use the format: ':.context.subcontext:schemaName'. Example: ':.prod-us:schema1'
Delays
flow.delayThreshold.numSyncIntervals: This value is multiplied by the Sync Frequency to determine the delay threshold. When a sync's duration surpasses the delay threshold, the Flow is considered delayed. This value cannot be negative. Set this value to 0 to disable the Delayed state for this Flow.
Deduplication
flow.deduplicationPolicy: For append-only Flows, deduplicate records based on the record key. Options:- 'none': No deduplication
- 'drop': Drop duplicate records