Onehouse Sensors
This document describes the available sensors for monitoring Onehouse operations.
Cluster Sensors
OnehouseCreateClusterSensor
Monitors the creation of a Onehouse cluster until it completes. Ref: DESCRIBE CLUSTER
Parameters:
cluster_name
(str): Name of the cluster to monitorconn_id
(str, optional): Airflow connection ID for Onehouse (default: 'onehouse_default')
Returns:
bool
: True if the cluster creation completes successfully, False if still in progress
Raises:
AirflowException
: If the cluster creation fails
Example:
from airflow_providers_onehouse.sensors.onehouse import OnehouseCreateClusterSensor
wait_for_cluster_ready = OnehouseCreateClusterSensor(
task_id="wait_for_cluster_ready",
cluster_name="{{ ti.xcom_pull(task_ids='create_onehouse_cluster') }}", # template to use XCom from the operator
conn_id="onehouse_default",
poke_interval=30,
timeout=60 * 30,
)
Job Sensors
OnehouseJobRunSensor
Monitors a Onehouse Spark job run until it completes. Ref: DESCRIBE JOB_RUN
Parameters:
job_name
(str): Name of the job to monitorjob_run_id
(str): ID of the specific job run to monitorconn_id
(str, optional): Airflow connection ID for Onehouse (default: 'onehouse_default')
Returns:
bool
: True if the job run completes successfully, False if still running
Raises:
AirflowException
: If the job run fails
Example:
from airflow_providers_onehouse.sensors.onehouse import OnehouseJobRunSensor
wait_for_job = OnehouseJobRunSensor(
task_id="wait_for_job_completion",
job_name="job_1",
job_run_id="{{ ti.xcom_pull(task_ids='run_onehouse_job') }}", # template that uses XCom to get the job_run_id
conn_id="onehouse_default",
poke_interval=30,
timeout=60 * 60,
)
OnehouseDescribeJobSensor
Sensor to monitor the description of a Onehouse job until it completes. Ref: DESCRIBE JOB
Parameters:
job_name
(str): Name of the job to monitorconn_id
(str, optional): Airflow connection ID for Onehouse (default: 'onehouse_default')
Returns:
bool
: True if the job description is successful, False if pending, raises exception if failed
Example:
from airflow_providers_onehouse.sensors.onehouse import OnehouseDescribeJobSensor
describe_job = OnehouseDescribeJobSensor(
task_id='describe_onehouse_job',
job_name='my_spark_job',
conn_id='onehouse_default',
)
Behavior:
- Returns
True
when the job description succeeds (API_OPERATION_STATUS_SUCCESS) - Returns
False
when the job description is pending (API_OPERATION_STATUS_PENDING) - Raises an AirflowException when the job description fails or has an unexpected status