Onehouse Operators
This document describes the available operators for interacting with Onehouse clusters and jobs.
Cluster Operators
OnehouseCreateClusterOperator
Creates a new Onehouse cluster. Ref: CREATE CLUSTER
Parameters:
cluster_name
(str): Name of the cluster to createcluster_type
(str): Type of the cluster (e.g., 'Spark')max_ocu
(int): Maximum OCU (Onehouse Compute Unit) for the clustermin_ocu
(int): Minimum OCU for the clusterparameters
(dict, optional): Additional parameters for the clusterconn_id
(str, optional): Airflow connection ID for Onehouse (default: 'onehouse_default')
Returns:
str
: The name of the created cluster
Example:
from airflow_providers_onehouse.operators.clusters import OnehouseCreateClusterOperator
create_cluster = OnehouseCreateClusterOperator(
task_id="create_onehouse_cluster",
cluster_name="my_spark_cluster",
cluster_type="Spark",
max_ocu=1,
min_ocu=1,
conn_id="onehouse_default",
)
OnehouseDeleteClusterOperator
Deletes an existing Onehouse cluster. Ref: DELETE CLUSTER
Parameters:
cluster_name
(str): Name of the cluster to deleteconn_id
(str, optional): Airflow connection ID for Onehouse (default: 'onehouse_default')
Returns:
str
: The name of the deleted cluster
Example:
from airflow_providers_onehouse.operators.clusters import OnehouseDeleteClusterOperator
delete_cluster = OnehouseDeleteClusterOperator(
task_id="delete_onehouse_cluster",
cluster_name="my_spark_cluster",
conn_id="onehouse_default",
)
OnehouseAlterClusterOperator
Alters an existing Onehouse cluster's configuration. Ref: ALTER CLUSTER
Parameters:
cluster_name
(str): Name of the cluster to altermax_ocu
(int, optional): New maximum OCU (Onehouse Compute Unit) for the clustermin_ocu
(int, optional): New minimum OCU for the clusterparameters
(dict, optional): Additional parameters to update for the clusterconn_id
(str, optional): Airflow connection ID for Onehouse (default: 'onehouse_default')
Returns:
str
: The name of the altered cluster
Example:
from airflow_providers_onehouse.operators.clusters import OnehouseAlterClusterOperator
alter_cluster = OnehouseAlterClusterOperator(
task_id="alter_onehouse_cluster",
cluster_name="my_spark_cluster",
max_ocu=2,
min_ocu=1,
new_name='my_new_spark_cluster',
conn_id="onehouse_default",
)
Job Operators
OnehouseCreateJobOperator
Creates a new Onehouse job. Ref: CREATE JOB
Parameters:
job_name
(str): Name of the job to createjob_type
(str): Type of job ('PYTHON' or 'JAR')parameters
(List[str]): List of parameters for the jobcluster_name
(str): Name of the cluster to run the job onconn_id
(str, optional): Airflow connection ID for Onehouse (default: 'onehouse_default')
Returns:
str
: The name of the created job
Example:
from airflow_providers_onehouse.operators.jobs import OnehouseCreateJobOperator
create_onehouse_job = OnehouseCreateJobOperator(
task_id="create_onehouse_job",
job_name="spark_job",
job_type="PYTHON",
parameters=[
"--conf", "spark.archives=s3a://lakehouse-albert-load-us-west-2/python/venv.tar.gz#environment",
"--conf", "spark.pyspark.python=./environment/bin/python",
"s3a://lakehouse-albert-load-us-west-2/python/hello_world_job.py",
],
cluster_name="my_spark_cluster",
conn_id="onehouse_default",
)
OnehouseRunJobOperator
Runs an existing Onehouse job. Ref: RUN JOB
Parameters:
job_name
(str): Name of the job to runconn_id
(str, optional): Airflow connection ID for Onehouse (default: 'onehouse_default')
Returns:
str
: The job run ID of the started job
Example:
from airflow_providers_onehouse.operators.jobs import OnehouseRunJobOperator
run_job = OnehouseRunJobOperator(
task_id='run_onehouse_job',
job_name='my_spark_job',
conn_id='onehouse_default',
)
OnehouseDeleteJobOperator
Deletes an existing Onehouse job. Ref: DELETE JOB
Parameters:
job_name
(str): Name of the job to deleteconn_id
(str, optional): Airflow connection ID for Onehouse (default: 'onehouse_default')
Returns:
str
: The name of the deleted job
Example:
from airflow_providers_onehouse.operators.jobs import OnehouseDeleteJobOperator
delete_job = OnehouseDeleteJobOperator(
task_id='delete_onehouse_job',
job_name='my_spark_job',
conn_id="onehouse_default",
)
OnehouseAlterJobOperator
Alters an existing Onehouse job's configuration. Ref: ALTER JOB
Parameters:
job_name
(str): Name of the job to altercluster_name
(str, optional): New cluster name to run the job onparameters
(List[str], optional): New list of parameters for the jobconn_id
(str, optional): Airflow connection ID for Onehouse (default: 'onehouse_default')
Returns:
str
: The name of the altered job
Limitation:
Currently, only one parameter (either cluster_name or parameters) can be modified per ALTER JOB
command. To modify both cluster and parameters, you need to use two separate OnehouseAlterJobOperator tasks.
Example:
from airflow_providers_onehouse.operators.jobs import OnehouseAlterJobOperator
# First task to update the cluster
alter_job_cluster = OnehouseAlterJobOperator(
task_id="alter_onehouse_job",
job_name="my_spark_job",
cluster_name="new_spark_cluster",
conn_id="onehouse_default",
)
# Second task to update the parameters
alter_job_parameters = OnehouseAlterJobOperator(
task_id="alter_onehouse_job_parameters",
job_name="my_spark_job",
parameters=[
"--conf", "spark.archives=s3a://my-bucket/python/venv.tar.gz#environment",
"--conf", "spark.pyspark.python=./environment/bin/python",
"--conf", "spark.driver.memory=2g",
"s3a://my-bucket/python/updated_job.py",
],
conn_id="onehouse_default",
)
# Set up the task dependency
alter_job_cluster >> alter_job_parameters
OnehouseCancelJobRunOperator
Cancels an in-progress Onehouse job run. Ref: CANCEL JOB_RUN
Parameters:
job_run_id
(str): ID of the Job run to cancel (should be surrounded by backticks)job_name
(str): Name of the Job that is runningconn_id
(str, optional): Airflow connection ID for Onehouse (default: 'onehouse_default')
Returns:
str
: The job run ID of the cancelled job
Example:
from airflow_providers_onehouse.operators.jobs import OnehouseCancelJobRunOperator
cancel_job_run = OnehouseCancelJobRunOperator(
task_id='cancel_onehouse_job_run',
job_run_id='acde070d-8c4c-4f0d-9d8a-162843c10333',
job_name='my_job',
conn_id='onehouse_default',
)