Orchestrate Onehouse SQL with MWAA (Amazon Managed Airflow)
This guide provides a tutorial for setting up MWAA to orchestrate queries via the Onehouse SQL Endpoint.
Tutorial
- Follow the AWS MWAA quickstart to set up MWAA in the same VPC as your SQL
Endpoint:
https://docs.aws.amazon.com/mwaa/latest/userguide/quick-start.html#quick-start-upload-dag
- Don’t use the CloudFormation template, as this will create a new VPC! Instead, launch MWAA
- Install the Spark operator in MWAA. Create
requirements.txt
file and upload to S3, then set it as your requirements file in MWAA. The file should look like:apache-airflow-providers-apache-spark
- Add a Generic Connection to pass SQL Endpoint details to the DAG. In Airflow UI, open
Admin > Connections. Add a new connection:
- Connection Id:
oh_sql_endpoint
(or any other name) - Connection Type:
Generic
- Schema (optional): name of the default database to use; i.e. andy
- Host: Your SQL endpoint; i.e.
internal-a2fe841b825jr04b124df90bd2ae396e4-2011043636.us-wes t-2.elb.amazonaws.com
- Port:
10000
- Leave Login and Password blank
- Connection Id:
- In the sample DAG, fill in your connection ID. The example below uses the
BashOperator
. You may also use Airflow's other operators such asPythonOperator
andSQLExecuteQueryOperator
to submit queries to the Onehouse SQL Endpoint.from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.hooks.base_hook import BaseHook
from airflow.utils.dates import days_ago
# Default args for the DAG
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
}
# Define the DAG
dag = DAG(
'bash_operator_with_sql_connection',
default_args=default_args,
description='Query the top 10 orders by cost using BashOperator',
schedule_interval=None,
catchup=False,
)
# Define the Hive query you want to run
sql_query = """
SELECT order_id, customer_id, order_date, total_cost
FROM orders_ro
ORDER BY total_cost DESC
LIMIT 10;
"""
# Fetch the connection details
conn = BaseHook.get_connection('oh_sql_endpoint')
jdbc_url = f"jdbc:hive2://{conn.host}:{conn.port}/{conn.schema}"
# Construct the beeline command
beeline_command = f"""
beeline -u {jdbc_url} \
-e "{sql_query}"
"""
query_task = BashOperator(
task_id='execute_sql_query',
bash_command=beeline_command,
dag=dag
)
# Set the task in the DAG
query_task