Skip to main content

Orchestrate Onehouse SQL with MWAA (Amazon Managed Airflow)

This guide provides a tutorial for setting up MWAA to orchestrate queries via the Onehouse SQL Endpoint.

Tutorial

  1. Follow the AWS MWAA quickstart to set up MWAA in the same VPC as your SQL Endpoint: https://docs.aws.amazon.com/mwaa/latest/userguide/quick-start.html#quick-start-upload-dag
    1. Don’t use the CloudFormation template, as this will create a new VPC! Instead, launch MWAA
  2. Install the Spark operator in MWAA. Create requirements.txt file and upload to S3, then set it as your requirements file in MWAA. The file should look like: apache-airflow-providers-apache-spark
  3. Add a Generic Connection to pass SQL Endpoint details to the DAG. In Airflow UI, open Admin > Connections. Add a new connection:
    1. Connection Id: oh_sql_endpoint (or any other name)
    2. Connection Type: Generic
    3. Schema (optional): name of the default database to use; i.e. andy
    4. Host: Your SQL endpoint; i.e. internal-a2fe841b825jr04b124df90bd2ae396e4-2011043636.us-wes t-2.elb.amazonaws.com
    5. Port: 10000
    6. Leave Login and Password blank
  4. In the sample DAG, fill in your connection ID. The example below uses the BashOperator. You may also use Airflow's other operators such as PythonOperator and SQLExecuteQueryOperator to submit queries to the Onehouse SQL Endpoint.
    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from airflow.hooks.base_hook import BaseHook
    from airflow.utils.dates import days_ago

    # Default args for the DAG
    default_args = {
    'owner': 'airflow',
    'start_date': days_ago(1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    }

    # Define the DAG
    dag = DAG(
    'bash_operator_with_sql_connection',
    default_args=default_args,
    description='Query the top 10 orders by cost using BashOperator',
    schedule_interval=None,
    catchup=False,
    )
    # Define the Hive query you want to run
    sql_query = """
    SELECT order_id, customer_id, order_date, total_cost
    FROM orders_ro
    ORDER BY total_cost DESC
    LIMIT 10;
    """

    # Fetch the connection details
    conn = BaseHook.get_connection('oh_sql_endpoint')
    jdbc_url = f"jdbc:hive2://{conn.host}:{conn.port}/{conn.schema}"
    # Construct the beeline command
    beeline_command = f"""
    beeline -u {jdbc_url} \
    -e "{sql_query}"

    """

    query_task = BashOperator(
    task_id='execute_sql_query',
    bash_command=beeline_command,
    dag=dag
    )
    # Set the task in the DAG

    query_task