Orchestrate Jobs with Apache Airflow
Overview
Onehouse enables you to orchestrate Jobs with any tool using the RUN JOB API command. This example walks through the process of setting up orchestration with Apache Airflow.
Prerequisites
- Onehouse API token
- Apache Airflow instance with version >= 2.9.2
- Python >= 3.10
Use the Onehouse Apache Airflow operator
Instructions can also be found on pypi.
- Install the provider package via pip:
pip install apache-airflow-providers-onehouse - Set up an Airflow connection with the following details:
- Connection Id: onehouse_default (or your custom connection id)
- Connection Type: Generic
- Host: https://api.onehouse.ai
- Extra: Configure the following JSON:
{
"project_uid": "your-project-uid",
"user_id": "your-user-id",
"api_key": "your-api-key",
"api_secret": "your-api-secret",
"link_uid": "your-link-uid",
"region": "your-region"
}
- Run a basic example DAG
from datetime import datetime, timedelta
from airflow import DAG
from airflow_providers_onehouse.operators.clusters import (
OnehouseCreateClusterOperator,
OnehouseDeleteClusterOperator,
)
from airflow_providers_onehouse.operators.jobs import (
OnehouseCreateJobOperator,
OnehouseRunJobOperator,
OnehouseDeleteJobOperator,
)
from airflow_providers_onehouse.sensors.onehouse import OnehouseJobRunSensor, OnehouseCreateClusterSensor
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
}
cluster_name = "cluster_1"
job_name = "job_1"
bucket_name = "bucket-name"
job_path = "s3a://{bucket_name}/path/to/hello_world_job.py"
venv_path = "s3a://{bucket_name}/path/to/venv.tar.gz"
with DAG(
dag_id="example_dag",
default_args=default_args,
description="Example DAG",
schedule_interval=None,
start_date=datetime(2025, 4, 28),
catchup=False,
tags=["onehouse", "example", "dag"],
) as dag:
create_cluster = OnehouseCreateClusterOperator(
task_id="create_onehouse_cluster",
cluster_name=cluster_name,
cluster_type="Spark",
max_ocu=1,
min_ocu=1,
conn_id="onehouse_default",
)
wait_for_cluster_ready = OnehouseCreateClusterSensor(
task_id="wait_for_cluster_ready",
cluster_name="{{ ti.xcom_pull(task_ids='create_onehouse_cluster') }}",
conn_id="onehouse_default",
poke_interval=30,
timeout=60 * 30,
)
create_onehouse_job = OnehouseCreateJobOperator(
task_id="create_onehouse_job",
job_name=job_name,
job_type="PYTHON",
parameters=[
"--conf", f"spark.archives={venv_path}#environment",
"--conf", "spark.pyspark.python=./environment/bin/python",
job_path,
],
cluster_name=cluster_name,
conn_id="onehouse_default",
)
run_onehouse_job = OnehouseRunJobOperator(
task_id="run_onehouse_job",
job_name=job_name,
conn_id="onehouse_default",
)
wait_for_job = OnehouseJobRunSensor(
task_id="wait_for_job_completion",
job_name=job_name,
job_run_id="{{ ti.xcom_pull(task_ids='run_onehouse_job') }}",
conn_id="onehouse_default",
poke_interval=30,
timeout=60 * 60,
)
delete_onehouse_job = OnehouseDeleteJobOperator(
task_id="delete_onehouse_job",
job_name=job_name,
conn_id="onehouse_default",
)
delete_onehouse_cluster = OnehouseDeleteClusterOperator(
task_id="delete_onehouse_cluster",
cluster_name=cluster_name,
conn_id="onehouse_default",
)
(
create_cluster
>> wait_for_cluster_ready
>> create_onehouse_job
>> run_onehouse_job
>> wait_for_job
>> delete_onehouse_job
>> delete_onehouse_cluster
)
Use the HTTP operator with Onehouse APIs
Run an existing Spark Job
This section will walk you through running an existing Spark job using the Onehouse API and Apache Airflow.
- Create a new DAG in Apache Airflow
- Add a new task to the DAG
- Add
apache-airflow-providers-http,requests,jsonto your requirements.txt file that is in your Apache Airflow requirements folder - Configure the task to use the Spark Job SQL definition via HTTP Operator
- Run the DAG
info
You will need to create a new HTTP connection in Apache Airflow to the Onehouse API (see Apache Airflow docs).
Example running an existing Spark Job DAG
from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.utils.dates import days_ago
import json
url = "v1/resource/"
onehouse_project_uid = 'YOUR_PROJECT_UID'
onehouse_api_key = 'YOUR_API_KEY'
onehouse_api_secret = 'YOUR_API_SECRET'
onehouse_link_uid = 'YOUR_ONEHOUSE_REQUEST_ID'
onehouse_region = 'YOUR_CLOUD_REGION'
onehouse_uuid = 'YOUR_ONEHOUSE_USER_ID'
headers = {
'x-onehouse-project-uid': onehouse_project_uid,
'x-onehouse-api-key': onehouse_api_key,
'x-onehouse-api-secret': onehouse_api_secret,
'x-onehouse-link-uid': onehouse_link_uid,
'x-onehouse-region': onehouse_region,
'x-onehouse-uuid': onehouse_uuid
}
cluster_name = "YOUR_ONEHOUSE_CLUSTER_NAME"
job_name = "YOUR_JOB_NAME"
onehouse_sql = f"""RUN JOB {job_name}"""
payload = json.dumps({
"statement": onehouse_sql
})
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
}
with DAG(
'run_spark_job',
default_args=default_args,
description='Run a Spark job',
schedule_interval=None,
) as dag:
run_job_request = SimpleHttpOperator(
task_id='run_spark_job',
http_conn_id='my_http_conn', # This is the name of the HTTP connection you created in Airflow
endpoint=url,
method='POST',
data=payload, # JSON payload
headers=headers,
log_response=True,
)
run_job_request
Create a new Spark Job
This section will walk you through creating a new Spark job using the Onehouse API.
- Create a new Spark Job SQL definition in your DAG code
- Use the
CREATE JOBSQL statement to create a new Spark job. Documentation can be found here - Use the
CLUSTERparameter to specify the cluster to use - Use the
PARAMETERSparameter to specify the parameters for the Spark job
- Use the
- Add a new task to the DAG
- Add
apache-airflow-providers-http,requests,jsonto your requirements.txt file that is in your Apache Airflow requirements folder
- Add
- Configure the task to use the Spark Job SQL definition via HTTP Operator
- Run the DAG
info
You will need to create a new HTTP connection in Apache Airflow to the Onehouse API (see Apache Airflow docs).
Example creating a new Spark Job DAG using the CREATE JOB SQL statement (Jar-based job)
from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.utils.dates import days_ago
import json
url = "/v1/resource/"
onehouse_project_uid = 'YOUR_PROJECT_UID'
onehouse_api_key = 'YOUR_API_KEY'
onehouse_api_secret = 'YOUR_API_SECRET'
onehouse_link_uid = 'YOUR_ONEHOUSE_REQUEST_ID'
onehouse_region = 'YOUR_CLOUD_REGION'
onehouse_uuid = 'YOUR_ONEHOUSE_USER_ID'
headers = {
'x-onehouse-project-uid': onehouse_project_uid,
'x-onehouse-api-key': onehouse_api_key,
'x-onehouse-api-secret': onehouse_api_secret,
'x-onehouse-link-uid': onehouse_link_uid,
'x-onehouse-region': onehouse_region,
'x-onehouse-uuid': onehouse_uuid
}
cluster_name = "YOUR_ONEHOUSE_SPARK_CLUSTER_NAME"
job_name = "YOUR_JOB_NAME"
onehouse_sql = f"""CREATE JOB {job_name} TYPE = 'JAR' PARAMETERS = ( "--class", "ai.onehouse.CreateTableAndInsertValues", "--conf", "spark.kubernetes.driver.request.cores=500m", "--conf", "spark.kubernetes.executor.request.cores=500m", "--conf", "spark.kubernetes.driver.limit.cores=500m", "--conf", "spark.kubernetes.executor.limit.cores=500m", "s3a://onehouse-customer-bucket-94efc324/spark-code/create-table-and-insert-values.jar", "--table-path", "s3://onehouse-customer-bucket-94efc324/spark-code-examples/table3", "--database-name", "onehouse_internal_default", "--table-name", "demo_table3", "--table-type", "cow" ) CLUSTER = '{cluster_name}'"""
payload = json.dumps({
"statement": onehouse_sql
})
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
}
with DAG(
'create_spark_job',
default_args=default_args,
description='Create a Spark job',
schedule_interval=None,
) as dag:
create_job_request = SimpleHttpOperator(
task_id='create_spark_job',
http_conn_id='my_http_conn', # This is the name of the HTTP connection you created in Airflow
endpoint=url,
method='POST',
data=payload, # JSON payload
headers=headers,
log_response=True,
)
create_job_request