Skip to main content

Orchestrate Jobs with Airflow

Overview

Onehouse allows you to rochestrate jobs with Airflow. This guide will walk you through the process of setting up a Spark job in Airflow. In this guide, we will show you how to create and run a Spark job using the Onehouse API and Airflow.

Prerequisites

Use the Onehouse operator

Coming soon. Contact us for early access.

Use the HTTP operator with Onehouse APIs

Run an existing Spark Job

This section will walk you through running an existing Spark job using the Onehouse API and Airflow.

  1. Create a new DAG in Airflow
  2. Add a new task to the DAG
  3. Add apache-airflow-providers-http, requests, json to your requirements.txt file that is in your Airflow requirements folder
  4. Configure the task to use the Spark Job SQL definition via HTTP Operator
  5. Run the DAG
info

You will need to create a new HTTP connection in Airflow to the Onehouse API (see Airflow docs). The Onehouse API endpoint is https://api.onehouse.ai/. There is no need to specify a username or password as the API is authenticated via the HTTP headers.

Example running an existing Spark Job DAG

from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.utils.dates import days_ago
import json

url = "v1/resource/"

onehouse_project_uid = 'YOUR_PROJECT_UID'
onehouse_api_key = 'YOUR_API_KEY'
onehouse_api_secret = 'YOUR_API_SECRET'
onehouse_link_uid = 'YOUR_ONEHOUSE_REQUEST_ID'
onehouse_region = 'YOUR_CLOUD_REGION'
onehouse_uuid = 'YOUR_ONEHOUSE_USER_ID'

headers = {
'x-onehouse-project-uid': onehouse_project_uid,
'x-onehouse-api-key': onehouse_api_key,
'x-onehouse-api-secret': onehouse_api_secret,
'x-onehouse-link-uid': onehouse_link_uid,
'x-onehouse-region': onehouse_region,
'x-onehouse-uuid': onehouse_uuid
}
cluster_name = "YOUR_ONEHOUSE_CLUSTER_NAME"
job_name = "YOUR_JOB_NAME"

onehouse_sql = f"""RUN JOB {job_name}"""

payload = json.dumps({
"statement": onehouse_sql
})

default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
}

with DAG(
'run_spark_job',
default_args=default_args,
description='Run a Spark job',
schedule_interval=None,
) as dag:

run_job_request = SimpleHttpOperator(
task_id='run_spark_job',
http_conn_id='my_http_conn', # This is the name of the HTTP connection you created in Airflow
endpoint=url,
method='POST',
data=payload, # JSON payload
headers=headers,
log_response=True,
)

run_job_request

Create a new Spark Job

This section will walk you through creating a new Spark job using the Onehouse API.

  1. Create a new Spark Job SQL definition in your DAG code
    • Use the CREATE JOB SQL statement to create a new Spark job. Documentation can be found here
    • Use the CLUSTER parameter to specify the cluster to use
    • Use the PARAMETERS parameter to specify the parameters for the Spark job
  2. Add a new task to the DAG
    • Add apache-airflow-providers-http, requests, json to your requirements.txt file that is in your Airflow requirements folder
  3. Configure the task to use the Spark Job SQL definition via HTTP Operator
  4. Run the DAG
info

You will need to create a new HTTP connection in Airflow to the Onehouse API (see Airflow docs). The Onehouse API endpoint is https://api.onehouse.ai/. There is no need to specify a username or password as the API is authenticated via the HTTP headers.

Example creating a new Spark Job DAG using the CREATE JOB SQL statement (Jar-based job)

from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.utils.dates import days_ago
import json

url = "/v1/resource/"

onehouse_project_uid = 'YOUR_PROJECT_UID'
onehouse_api_key = 'YOUR_API_KEY'
onehouse_api_secret = 'YOUR_API_SECRET'
onehouse_link_uid = 'YOUR_ONEHOUSE_REQUEST_ID'
onehouse_region = 'YOUR_CLOUD_REGION'
onehouse_uuid = 'YOUR_ONEHOUSE_USER_ID'

headers = {
'x-onehouse-project-uid': onehouse_project_uid,
'x-onehouse-api-key': onehouse_api_key,
'x-onehouse-api-secret': onehouse_api_secret,
'x-onehouse-link-uid': onehouse_link_uid,
'x-onehouse-region': onehouse_region,
'x-onehouse-uuid': onehouse_uuid
}
cluster_name = "YOUR_ONEHOUSE_SPARK_CLUSTER_NAME"
job_name = "YOUR_JOB_NAME"

onehouse_sql = f"""CREATE JOB {job_name} TYPE = 'JAR' PARAMETERS = ( "--class", "ai.onehouse.CreateTableAndInsertValues", "--conf", "spark.kubernetes.driver.request.cores=500m", "--conf", "spark.kubernetes.executor.request.cores=500m", "--conf", "spark.kubernetes.driver.limit.cores=500m", "--conf", "spark.kubernetes.executor.limit.cores=500m", "s3a://onehouse-customer-bucket-94efc324/spark-code/create-table-and-insert-values.jar", "--table-path", "s3://onehouse-customer-bucket-94efc324/spark-code-examples/table3", "--database-name", "onehouse_internal_default", "--table-name", "demo_table3", "--table-type", "cow" ) CLUSTER = '{cluster_name}'"""

payload = json.dumps({
"statement": onehouse_sql
})

default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
}

with DAG(
'create_spark_job',
default_args=default_args,
description='Create a Spark job',
schedule_interval=None,
) as dag:

create_job_request = SimpleHttpOperator(
task_id='create_spark_job',
http_conn_id='my_http_conn', # This is the name of the HTTP connection you created in Airflow
endpoint=url,
method='POST',
data=payload, # JSON payload
headers=headers,
log_response=True,
)

create_job_request