Orchestrate Onehouse SQL with MWAA (Amazon Managed Airflow)
This guide provides a tutorial for setting up MWAA to orchestrate queries via the Onehouse SQL Endpoint.
Tutorial
- Follow the AWS MWAA quickstart to set up MWAA in the same VPC as your SQL
Endpoint:
https://docs.aws.amazon.com/mwaa/latest/userguide/quick-start.html#quick-start-upload-dag
- Don’t use the CloudFormation template, as this will create a new VPC! Instead, launch MWAA
- Install the Spark operator in MWAA. Create
requirements.txtfile and upload to S3, then set it as your requirements file in MWAA. The file should look like:apache-airflow-providers-apache-spark - Add a Generic Connection to pass SQL Endpoint details to the DAG. In Airflow UI, open
Admin > Connections. Add a new connection:
- Connection Id:
oh_sql_endpoint(or any other name) - Connection Type:
Generic - Schema (optional): name of the default database to use; i.e. andy
- Host: Your SQL endpoint; i.e.
internal-a2fe841b825jr04b124df90bd2ae396e4-2011043636.us-wes t-2.elb.amazonaws.com - Port:
10000 - Leave Login and Password blank
- Connection Id:
- In the sample DAG, fill in your connection ID. The example below uses the
BashOperator. You may also use Airflow's other operators such asPythonOperatorandSQLExecuteQueryOperatorto submit queries to the Onehouse SQL Endpoint.from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.hooks.base_hook import BaseHook
from airflow.utils.dates import days_ago
# Default args for the DAG
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
}
# Define the DAG
dag = DAG(
'bash_operator_with_sql_connection',
default_args=default_args,
description='Query the top 10 orders by cost using BashOperator',
schedule_interval=None,
catchup=False,
)
# Define the Hive query you want to run
sql_query = """
SELECT order_id, customer_id, order_date, total_cost
FROM orders_ro
ORDER BY total_cost DESC
LIMIT 10;
"""
# Fetch the connection details
conn = BaseHook.get_connection('oh_sql_endpoint')
jdbc_url = f"jdbc:hive2://{conn.host}:{conn.port}/{conn.schema}"
# Construct the beeline command
beeline_command = f"""
beeline -u {jdbc_url} \
-e "{sql_query}"
"""
query_task = BashOperator(
task_id='execute_sql_query',
bash_command=beeline_command,
dag=dag
)
# Set the task in the DAG
query_task