Skip to main content

ALTER FLOW

Description

Modifies the configurations for a Flow. Currently supports pausing the stream, resuming the stream, and updating the minimum sync frequency.

Note that the SQL statement does not end with ;

Syntax

ALTER FLOW `<name>`
[ SET STATE = { PAUSE | RESUME | CLEAN_AND_RESTART} ]
[ SET SOURCE = <string> ]
[ SET CLUSTER = <string> ]
[ SET PERFORMANCE_PROFILE = { 'BALANCED' | 'FASTEST_READ' | 'FASTEST_WRITE' } ]
[ SET MIN_SYNC_FREQUENCY_MINS = <integer> ]
[ SET TRANSFORMATIONS = ( ['<TRANSFORMATION_NAME>'], ... ) ]
[ SET VALIDATIONS = ( ['<VALIDATION_NAME>'], ... ) ]
[ SET QUARANTINE_ENABLED = 'false' ]
[ SET ADVANCED_CONFIGS]

Sample response

Examples

Alter State

ALTER FLOW `testStreamCapture`
SET STATE = PAUSE

Alter Source

ALTER FLOW `testStreamCapture`
SET SOURCE = 'kafka_source_name'
WITH
'kafka.topic.name' = 'kafka-topic-name',
'kafka.startingOffsets' = 'latest',
'kafka.topic.schema.name' = 'kafka-topic-name-value'

Alter Advanced Configurations

ALTER FLOW `testStreamCapture`
SET ADVANCED_CONFIGS
WITH
'flow.advancedConfig1' = 'value1',
'flow.advancedConfig2' = 'value2'

Alter transformations

ALTER STREAM CAPTURE `testStreamCapture`
SET TRANSFORMATIONS = ('transformationName1', 'transformationName2')

Required parameters

  • <name>: Identifier for the Flow.

Optional parameters

Use one optional parameter

You must use exactly one of the optional parameters in the query.

  • STATE: Pause, resume or clean & restart the Flow.
  • SOURCE: Specify the name of the new source to use for the Flow.
    • Important: Read the docs on editing sources here for details on usage and limitations.
  • CLUSTER: Specify the name of an existing Cluster that will run the Flow.
  • PERFORMANCE_PROFILE: Specify the desired Performance Profile to optimize how the Flow writes data to the table.
    • Only available for Flows in Append-only write mode.
  • MIN_SYNC_FREQUENCY_MINS: Specify the minimum number of minutes between Flow triggers.
  • ADVANCED_CONFIGS: Include this parameter to update the advanced configurations for the Flow. Then, specify the updated advanced configurations after the WITH clause as comma-separated key value pairs.
    • Important: Any existing advanced configurations that should remain unchanged must also be included in the request. Omitting any existing configurations will result in their removal.
  • QUARANTINE_ENABLED: Specify whether to disable quarantine of invalid records.
    • Pass false to disable quarantine of invalid records.
    • If you want to enable quarantine of invalid records, pass the VALIDATIONS in API, and it will enable quarantine with passed validation.
  • TRANSFORMATIONS: Specify the names of transformations to use for the Stream Capture.
    • You must first create any transformation with the CREATE TRANSFORMATION API command before referencing them. See CREATE TRANSFORMATION API for information on how to create one.
    • Transformation names are case-agnostic.
    • You can view created transformations using SHOW TRANSFORMATIONS API command.
  • VALIDATIONS: Specify the names of validations to use for the Stream Capture.
    • You must first create any validation with the CREATE VALIDATION API command before referencing them. See CREATE VALIDATION API for information on how to create one.
    • Validation names are case-agnostic.
    • You can view created validations using SHOW VALIDATIONS API command.

Advanced Configs

Include advanced configs after WITH as type String only when doing SET ADVANCED_CONFIGS.

  • flow.delayThreshold.numSyncIntervals: This value is multiplied by the Sync Frequency to determine the delay threshold. When a sync's duration surpasses the delay threshold, the Flow is considered delayed. This value cannot be negative. Set this value to 0 to disable the Delayed state for this Flow.
    • Default: 0 (Delayed state & notifications disabled)

Special parameters

Include special parameters after WITH as type String.

Editing the source

Include these parameters when editing the source of the Flow.

  • kafka.topic.name: Specify the name of the new source topic.
  • kafka.startingOffsets: [Optional] Specify the start point for ingestion from the new Kafka source as 'earliest' or 'latest'. Default is 'earliest'.
  • kafka.topic.schema.name: Specify the name of the schema in your schema registry to use for the source data (for Kafka sources with a schema registry).
    • If your Schema Registry uses Schema Contexts, use the format: ':.context.subcontext:schemaName'. Example: ':.prod-us:schema1'

Delays

  • flow.delayThreshold.numSyncIntervals: This value is multiplied by the Sync Frequency to determine the delay threshold. When a sync's duration surpasses the delay threshold, the Flow is considered delayed. This value cannot be negative. Set this value to 0 to disable the Delayed state for this Flow.

Deduplication

  • flow.deduplicationPolicy: For append-only Flows, deduplicate records based on the record key. Options:
    • 'none': No deduplication
    • 'drop': Drop duplicate records