Orchestrate Jobs with Airflow
Overview
Onehouse allows you to rochestrate jobs with Airflow. This guide will walk you through the process of setting up a Spark job in Airflow. In this guide, we will show you how to create and run a Spark job using the Onehouse API and Airflow.
Prerequisites
- Configured Onehouse API
- Exisiting Airflow setup
Use the Onehouse operator
Coming soon. Contact us for early access.
Use the HTTP operator with Onehouse APIs
Run an existing Spark Job
This section will walk you through running an existing Spark job using the Onehouse API and Airflow.
- Create a new DAG in Airflow
- Add a new task to the DAG
- Add
apache-airflow-providers-http
,requests
,json
to your requirements.txt file that is in your Airflow requirements folder - Configure the task to use the Spark Job SQL definition via HTTP Operator
- Run the DAG
You will need to create a new HTTP connection in Airflow to the Onehouse API (see Airflow docs). The Onehouse API endpoint is https://api.onehouse.ai/
. There is no need to specify a username or password as the API is authenticated via the HTTP headers.
Example running an existing Spark Job DAG
from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.utils.dates import days_ago
import json
url = "v1/resource/"
onehouse_project_uid = 'YOUR_PROJECT_UID'
onehouse_api_key = 'YOUR_API_KEY'
onehouse_api_secret = 'YOUR_API_SECRET'
onehouse_link_uid = 'YOUR_ONEHOUSE_REQUEST_ID'
onehouse_region = 'YOUR_CLOUD_REGION'
onehouse_uuid = 'YOUR_ONEHOUSE_USER_ID'
headers = {
'x-onehouse-project-uid': onehouse_project_uid,
'x-onehouse-api-key': onehouse_api_key,
'x-onehouse-api-secret': onehouse_api_secret,
'x-onehouse-link-uid': onehouse_link_uid,
'x-onehouse-region': onehouse_region,
'x-onehouse-uuid': onehouse_uuid
}
cluster_name = "YOUR_ONEHOUSE_CLUSTER_NAME"
job_name = "YOUR_JOB_NAME"
onehouse_sql = f"""RUN JOB {job_name}"""
payload = json.dumps({
"statement": onehouse_sql
})
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
}
with DAG(
'run_spark_job',
default_args=default_args,
description='Run a Spark job',
schedule_interval=None,
) as dag:
run_job_request = SimpleHttpOperator(
task_id='run_spark_job',
http_conn_id='my_http_conn', # This is the name of the HTTP connection you created in Airflow
endpoint=url,
method='POST',
data=payload, # JSON payload
headers=headers,
log_response=True,
)
run_job_request
Create a new Spark Job
This section will walk you through creating a new Spark job using the Onehouse API.
- Create a new Spark Job SQL definition in your DAG code
- Use the
CREATE JOB
SQL statement to create a new Spark job. Documentation can be found here - Use the
CLUSTER
parameter to specify the cluster to use - Use the
PARAMETERS
parameter to specify the parameters for the Spark job
- Use the
- Add a new task to the DAG
- Add
apache-airflow-providers-http
,requests
,json
to your requirements.txt file that is in your Airflow requirements folder
- Add
- Configure the task to use the Spark Job SQL definition via HTTP Operator
- Run the DAG
You will need to create a new HTTP connection in Airflow to the Onehouse API (see Airflow docs). The Onehouse API endpoint is https://api.onehouse.ai/
. There is no need to specify a username or password as the API is authenticated via the HTTP headers.
Example creating a new Spark Job DAG using the CREATE JOB
SQL statement (Jar-based job)
from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.utils.dates import days_ago
import json
url = "/v1/resource/"
onehouse_project_uid = 'YOUR_PROJECT_UID'
onehouse_api_key = 'YOUR_API_KEY'
onehouse_api_secret = 'YOUR_API_SECRET'
onehouse_link_uid = 'YOUR_ONEHOUSE_REQUEST_ID'
onehouse_region = 'YOUR_CLOUD_REGION'
onehouse_uuid = 'YOUR_ONEHOUSE_USER_ID'
headers = {
'x-onehouse-project-uid': onehouse_project_uid,
'x-onehouse-api-key': onehouse_api_key,
'x-onehouse-api-secret': onehouse_api_secret,
'x-onehouse-link-uid': onehouse_link_uid,
'x-onehouse-region': onehouse_region,
'x-onehouse-uuid': onehouse_uuid
}
cluster_name = "YOUR_ONEHOUSE_SPARK_CLUSTER_NAME"
job_name = "YOUR_JOB_NAME"
onehouse_sql = f"""CREATE JOB {job_name} TYPE = 'JAR' PARAMETERS = ( "--class", "ai.onehouse.CreateTableAndInsertValues", "--conf", "spark.kubernetes.driver.request.cores=500m", "--conf", "spark.kubernetes.executor.request.cores=500m", "--conf", "spark.kubernetes.driver.limit.cores=500m", "--conf", "spark.kubernetes.executor.limit.cores=500m", "s3a://onehouse-customer-bucket-94efc324/spark-code/create-table-and-insert-values.jar", "--table-path", "s3://onehouse-customer-bucket-94efc324/spark-code-examples/table3", "--database-name", "onehouse_internal_default", "--table-name", "demo_table3", "--table-type", "cow" ) CLUSTER = '{cluster_name}'"""
payload = json.dumps({
"statement": onehouse_sql
})
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
}
with DAG(
'create_spark_job',
default_args=default_args,
description='Create a Spark job',
schedule_interval=None,
) as dag:
create_job_request = SimpleHttpOperator(
task_id='create_spark_job',
http_conn_id='my_http_conn', # This is the name of the HTTP connection you created in Airflow
endpoint=url,
method='POST',
data=payload, # JSON payload
headers=headers,
log_response=True,
)
create_job_request