Skip to main content

Orchestrate Jobs with Apache Airflow

Overview

Onehouse enables you to orchestrate Jobs with any tool using the RUN JOB API command. This example walks through the process of setting up orchestration with Apache Airflow.

Prerequisites

Use the Onehouse Apache Airflow operator

Instructions can also be found on pypi.

  1. Install the provider package via pip:
    pip install apache-airflow-providers-onehouse
  2. Set up an Airflow connection with the following details:
    • Connection Id: onehouse_default (or your custom connection id)
    • Connection Type: Generic
    • Host: https://api.onehouse.ai
    • Extra: Configure the following JSON:
      {
      "project_uid": "your-project-uid",
      "user_id": "your-user-id",
      "api_key": "your-api-key",
      "api_secret": "your-api-secret",
      "link_uid": "your-link-uid",
      "region": "your-region"
      }
  3. Run a basic example DAG
    from datetime import datetime, timedelta
    from airflow import DAG
    from airflow_providers_onehouse.operators.clusters import (
    OnehouseCreateClusterOperator,
    OnehouseDeleteClusterOperator,
    )
    from airflow_providers_onehouse.operators.jobs import (
    OnehouseCreateJobOperator,
    OnehouseRunJobOperator,
    OnehouseDeleteJobOperator,
    )
    from airflow_providers_onehouse.sensors.onehouse import OnehouseJobRunSensor, OnehouseCreateClusterSensor

    default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
    }

    cluster_name = "cluster_1"
    job_name = "job_1"

    bucket_name = "bucket-name"
    job_path = "s3a://{bucket_name}/path/to/hello_world_job.py"
    venv_path = "s3a://{bucket_name}/path/to/venv.tar.gz"

    with DAG(
    dag_id="example_dag",
    default_args=default_args,
    description="Example DAG",
    schedule_interval=None,
    start_date=datetime(2025, 4, 28),
    catchup=False,
    tags=["onehouse", "example", "dag"],
    ) as dag:

    create_cluster = OnehouseCreateClusterOperator(
    task_id="create_onehouse_cluster",
    cluster_name=cluster_name,
    cluster_type="Spark",
    max_ocu=1,
    min_ocu=1,
    conn_id="onehouse_default",
    )

    wait_for_cluster_ready = OnehouseCreateClusterSensor(
    task_id="wait_for_cluster_ready",
    cluster_name="{{ ti.xcom_pull(task_ids='create_onehouse_cluster') }}",
    conn_id="onehouse_default",
    poke_interval=30,
    timeout=60 * 30,
    )

    create_onehouse_job = OnehouseCreateJobOperator(
    task_id="create_onehouse_job",
    job_name=job_name,
    job_type="PYTHON",
    parameters=[
    "--conf", f"spark.archives={venv_path}#environment",
    "--conf", "spark.pyspark.python=./environment/bin/python",
    job_path,
    ],
    cluster_name=cluster_name,
    conn_id="onehouse_default",
    )

    run_onehouse_job = OnehouseRunJobOperator(
    task_id="run_onehouse_job",
    job_name=job_name,
    conn_id="onehouse_default",
    )

    wait_for_job = OnehouseJobRunSensor(
    task_id="wait_for_job_completion",
    job_name=job_name,
    job_run_id="{{ ti.xcom_pull(task_ids='run_onehouse_job') }}",
    conn_id="onehouse_default",
    poke_interval=30,
    timeout=60 * 60,
    )

    delete_onehouse_job = OnehouseDeleteJobOperator(
    task_id="delete_onehouse_job",
    job_name=job_name,
    conn_id="onehouse_default",
    )

    delete_onehouse_cluster = OnehouseDeleteClusterOperator(
    task_id="delete_onehouse_cluster",
    cluster_name=cluster_name,
    conn_id="onehouse_default",
    )

    (
    create_cluster
    >> wait_for_cluster_ready
    >> create_onehouse_job
    >> run_onehouse_job
    >> wait_for_job
    >> delete_onehouse_job
    >> delete_onehouse_cluster
    )

Use the HTTP operator with Onehouse APIs

Run an existing Spark Job

This section will walk you through running an existing Spark job using the Onehouse API and Apache Airflow.

  1. Create a new DAG in Apache Airflow
  2. Add a new task to the DAG
  3. Add apache-airflow-providers-http, requests, json to your requirements.txt file that is in your Apache Airflow requirements folder
  4. Configure the task to use the Spark Job SQL definition via HTTP Operator
  5. Run the DAG
info

You will need to create a new HTTP connection in Apache Airflow to the Onehouse API (see Apache Airflow docs).

Example running an existing Spark Job DAG

from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.utils.dates import days_ago
import json

url = "v1/resource/"

onehouse_project_uid = 'YOUR_PROJECT_UID'
onehouse_api_key = 'YOUR_API_KEY'
onehouse_api_secret = 'YOUR_API_SECRET'
onehouse_link_uid = 'YOUR_ONEHOUSE_REQUEST_ID'
onehouse_region = 'YOUR_CLOUD_REGION'
onehouse_uuid = 'YOUR_ONEHOUSE_USER_ID'

headers = {
'x-onehouse-project-uid': onehouse_project_uid,
'x-onehouse-api-key': onehouse_api_key,
'x-onehouse-api-secret': onehouse_api_secret,
'x-onehouse-link-uid': onehouse_link_uid,
'x-onehouse-region': onehouse_region,
'x-onehouse-uuid': onehouse_uuid
}
cluster_name = "YOUR_ONEHOUSE_CLUSTER_NAME"
job_name = "YOUR_JOB_NAME"

onehouse_sql = f"""RUN JOB {job_name}"""

payload = json.dumps({
"statement": onehouse_sql
})

default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
}

with DAG(
'run_spark_job',
default_args=default_args,
description='Run a Spark job',
schedule_interval=None,
) as dag:

run_job_request = SimpleHttpOperator(
task_id='run_spark_job',
http_conn_id='my_http_conn', # This is the name of the HTTP connection you created in Airflow
endpoint=url,
method='POST',
data=payload, # JSON payload
headers=headers,
log_response=True,
)

run_job_request

Create a new Spark Job

This section will walk you through creating a new Spark job using the Onehouse API.

  1. Create a new Spark Job SQL definition in your DAG code
    • Use the CREATE JOB SQL statement to create a new Spark job. Documentation can be found here
    • Use the CLUSTER parameter to specify the cluster to use
    • Use the PARAMETERS parameter to specify the parameters for the Spark job
  2. Add a new task to the DAG
    • Add apache-airflow-providers-http, requests, json to your requirements.txt file that is in your Apache Airflow requirements folder
  3. Configure the task to use the Spark Job SQL definition via HTTP Operator
  4. Run the DAG
info

You will need to create a new HTTP connection in Apache Airflow to the Onehouse API (see Apache Airflow docs).

Example creating a new Spark Job DAG using the CREATE JOB SQL statement (Jar-based job)

from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.utils.dates import days_ago
import json

url = "/v1/resource/"

onehouse_project_uid = 'YOUR_PROJECT_UID'
onehouse_api_key = 'YOUR_API_KEY'
onehouse_api_secret = 'YOUR_API_SECRET'
onehouse_link_uid = 'YOUR_ONEHOUSE_REQUEST_ID'
onehouse_region = 'YOUR_CLOUD_REGION'
onehouse_uuid = 'YOUR_ONEHOUSE_USER_ID'

headers = {
'x-onehouse-project-uid': onehouse_project_uid,
'x-onehouse-api-key': onehouse_api_key,
'x-onehouse-api-secret': onehouse_api_secret,
'x-onehouse-link-uid': onehouse_link_uid,
'x-onehouse-region': onehouse_region,
'x-onehouse-uuid': onehouse_uuid
}
cluster_name = "YOUR_ONEHOUSE_SPARK_CLUSTER_NAME"
job_name = "YOUR_JOB_NAME"

onehouse_sql = f"""CREATE JOB {job_name} TYPE = 'JAR' PARAMETERS = ( "--class", "ai.onehouse.CreateTableAndInsertValues", "--conf", "spark.kubernetes.driver.request.cores=500m", "--conf", "spark.kubernetes.executor.request.cores=500m", "--conf", "spark.kubernetes.driver.limit.cores=500m", "--conf", "spark.kubernetes.executor.limit.cores=500m", "s3a://onehouse-customer-bucket-94efc324/spark-code/create-table-and-insert-values.jar", "--table-path", "s3://onehouse-customer-bucket-94efc324/spark-code-examples/table3", "--database-name", "onehouse_internal_default", "--table-name", "demo_table3", "--table-type", "cow" ) CLUSTER = '{cluster_name}'"""

payload = json.dumps({
"statement": onehouse_sql
})

default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
}

with DAG(
'create_spark_job',
default_args=default_args,
description='Create a Spark job',
schedule_interval=None,
) as dag:

create_job_request = SimpleHttpOperator(
task_id='create_spark_job',
http_conn_id='my_http_conn', # This is the name of the HTTP connection you created in Airflow
endpoint=url,
method='POST',
data=payload, # JSON payload
headers=headers,
log_response=True,
)

create_job_request