Skip to main content

Onehouse Sensors

This document describes the available sensors for monitoring Onehouse operations.

Cluster Sensors

OnehouseCreateClusterSensor

Monitors the creation of a Onehouse cluster until it completes. Ref: DESCRIBE CLUSTER

Parameters:

  • cluster_name (str): Name of the cluster to monitor
  • conn_id (str, optional): Airflow connection ID for Onehouse (default: 'onehouse_default')

Returns:

  • bool: True if the cluster creation completes successfully, False if still in progress

Raises:

  • AirflowException: If the cluster creation fails

Example:

from airflow_providers_onehouse.sensors.onehouse import OnehouseCreateClusterSensor

wait_for_cluster_ready = OnehouseCreateClusterSensor(
task_id="wait_for_cluster_ready",
cluster_name="{{ ti.xcom_pull(task_ids='create_onehouse_cluster') }}", # template to use XCom from the operator
conn_id="onehouse_default",
poke_interval=30,
timeout=60 * 30,
)

Job Sensors

OnehouseJobRunSensor

Monitors a Onehouse Spark job run until it completes. Ref: DESCRIBE JOB_RUN

Parameters:

  • job_name (str): Name of the job to monitor
  • job_run_id (str): ID of the specific job run to monitor
  • conn_id (str, optional): Airflow connection ID for Onehouse (default: 'onehouse_default')

Returns:

  • bool: True if the job run completes successfully, False if still running

Raises:

  • AirflowException: If the job run fails

Example:

from airflow_providers_onehouse.sensors.onehouse import OnehouseJobRunSensor

wait_for_job = OnehouseJobRunSensor(
task_id="wait_for_job_completion",
job_name="job_1",
job_run_id="{{ ti.xcom_pull(task_ids='run_onehouse_job') }}", # template that uses XCom to get the job_run_id
conn_id="onehouse_default",
poke_interval=30,
timeout=60 * 60,
)

OnehouseDescribeJobSensor

Sensor to monitor the description of a Onehouse job until it completes. Ref: DESCRIBE JOB

Parameters:

  • job_name (str): Name of the job to monitor
  • conn_id (str, optional): Airflow connection ID for Onehouse (default: 'onehouse_default')

Returns:

  • bool: True if the job description is successful, False if pending, raises exception if failed

Example:

from airflow_providers_onehouse.sensors.onehouse import OnehouseDescribeJobSensor

describe_job = OnehouseDescribeJobSensor(
task_id='describe_onehouse_job',
job_name='my_spark_job',
conn_id='onehouse_default',
)

Behavior:

  • Returns True when the job description succeeds (API_OPERATION_STATUS_SUCCESS)
  • Returns False when the job description is pending (API_OPERATION_STATUS_PENDING)
  • Raises an AirflowException when the job description fails or has an unexpected status