Skip to main content

Onehouse Operators

This document describes the available operators for interacting with Onehouse clusters and jobs.

Cluster Operators

OnehouseCreateClusterOperator

Creates a new Onehouse cluster. Ref: CREATE CLUSTER

Parameters:

  • cluster_name (str): Name of the cluster to create
  • cluster_type (str): Type of the cluster (e.g., 'Spark')
  • max_ocu (int): Maximum OCU (Onehouse Compute Unit) for the cluster
  • min_ocu (int): Minimum OCU for the cluster
  • parameters (dict, optional): Additional parameters for the cluster
  • conn_id (str, optional): Airflow connection ID for Onehouse (default: 'onehouse_default')

Returns:

  • str: The name of the created cluster

Example:

from airflow_providers_onehouse.operators.clusters import OnehouseCreateClusterOperator

create_cluster = OnehouseCreateClusterOperator(
task_id="create_onehouse_cluster",
cluster_name="my_spark_cluster",
cluster_type="Spark",
max_ocu=1,
min_ocu=1,
conn_id="onehouse_default",
)

OnehouseDeleteClusterOperator

Deletes an existing Onehouse cluster. Ref: DELETE CLUSTER

Parameters:

  • cluster_name (str): Name of the cluster to delete
  • conn_id (str, optional): Airflow connection ID for Onehouse (default: 'onehouse_default')

Returns:

  • str: The name of the deleted cluster

Example:

from airflow_providers_onehouse.operators.clusters import OnehouseDeleteClusterOperator

delete_cluster = OnehouseDeleteClusterOperator(
task_id="delete_onehouse_cluster",
cluster_name="my_spark_cluster",
conn_id="onehouse_default",
)

OnehouseAlterClusterOperator

Alters an existing Onehouse cluster's configuration. Ref: ALTER CLUSTER

Parameters:

  • cluster_name (str): Name of the cluster to alter
  • max_ocu (int, optional): New maximum OCU (Onehouse Compute Unit) for the cluster
  • min_ocu (int, optional): New minimum OCU for the cluster
  • parameters (dict, optional): Additional parameters to update for the cluster
  • conn_id (str, optional): Airflow connection ID for Onehouse (default: 'onehouse_default')

Returns:

  • str: The name of the altered cluster

Example:

from airflow_providers_onehouse.operators.clusters import OnehouseAlterClusterOperator

alter_cluster = OnehouseAlterClusterOperator(
task_id="alter_onehouse_cluster",
cluster_name="my_spark_cluster",
max_ocu=2,
min_ocu=1,
new_name='my_new_spark_cluster',
conn_id="onehouse_default",
)

Job Operators

OnehouseCreateJobOperator

Creates a new Onehouse job. Ref: CREATE JOB

Parameters:

  • job_name (str): Name of the job to create
  • job_type (str): Type of job ('PYTHON' or 'JAR')
  • parameters (List[str]): List of parameters for the job
  • cluster_name (str): Name of the cluster to run the job on
  • conn_id (str, optional): Airflow connection ID for Onehouse (default: 'onehouse_default')

Returns:

  • str: The name of the created job

Example:

from airflow_providers_onehouse.operators.jobs import OnehouseCreateJobOperator

create_onehouse_job = OnehouseCreateJobOperator(
task_id="create_onehouse_job",
job_name="spark_job",
job_type="PYTHON",
parameters=[
"--conf", "spark.archives=s3a://lakehouse-albert-load-us-west-2/python/venv.tar.gz#environment",
"--conf", "spark.pyspark.python=./environment/bin/python",
"s3a://lakehouse-albert-load-us-west-2/python/hello_world_job.py",
],
cluster_name="my_spark_cluster",
conn_id="onehouse_default",
)

OnehouseRunJobOperator

Runs an existing Onehouse job. Ref: RUN JOB

Parameters:

  • job_name (str): Name of the job to run
  • conn_id (str, optional): Airflow connection ID for Onehouse (default: 'onehouse_default')

Returns:

  • str: The job run ID of the started job

Example:

from airflow_providers_onehouse.operators.jobs import OnehouseRunJobOperator

run_job = OnehouseRunJobOperator(
task_id='run_onehouse_job',
job_name='my_spark_job',
conn_id='onehouse_default',
)

OnehouseDeleteJobOperator

Deletes an existing Onehouse job. Ref: DELETE JOB

Parameters:

  • job_name (str): Name of the job to delete
  • conn_id (str, optional): Airflow connection ID for Onehouse (default: 'onehouse_default')

Returns:

  • str: The name of the deleted job

Example:

from airflow_providers_onehouse.operators.jobs import OnehouseDeleteJobOperator

delete_job = OnehouseDeleteJobOperator(
task_id='delete_onehouse_job',
job_name='my_spark_job',
conn_id="onehouse_default",
)

OnehouseAlterJobOperator

Alters an existing Onehouse job's configuration. Ref: ALTER JOB

Parameters:

  • job_name (str): Name of the job to alter
  • cluster_name (str, optional): New cluster name to run the job on
  • parameters (List[str], optional): New list of parameters for the job
  • conn_id (str, optional): Airflow connection ID for Onehouse (default: 'onehouse_default')

Returns:

  • str: The name of the altered job

Limitation: Currently, only one parameter (either cluster_name or parameters) can be modified per ALTER JOB command. To modify both cluster and parameters, you need to use two separate OnehouseAlterJobOperator tasks.

Example:

from airflow_providers_onehouse.operators.jobs import OnehouseAlterJobOperator

# First task to update the cluster
alter_job_cluster = OnehouseAlterJobOperator(
task_id="alter_onehouse_job",
job_name="my_spark_job",
cluster_name="new_spark_cluster",
conn_id="onehouse_default",
)

# Second task to update the parameters
alter_job_parameters = OnehouseAlterJobOperator(
task_id="alter_onehouse_job_parameters",
job_name="my_spark_job",
parameters=[
"--conf", "spark.archives=s3a://my-bucket/python/venv.tar.gz#environment",
"--conf", "spark.pyspark.python=./environment/bin/python",
"--conf", "spark.driver.memory=2g",
"s3a://my-bucket/python/updated_job.py",
],
conn_id="onehouse_default",
)

# Set up the task dependency
alter_job_cluster >> alter_job_parameters

OnehouseCancelJobRunOperator

Cancels an in-progress Onehouse job run. Ref: CANCEL JOB_RUN

Parameters:

  • job_run_id (str): ID of the Job run to cancel (should be surrounded by backticks)
  • job_name (str): Name of the Job that is running
  • conn_id (str, optional): Airflow connection ID for Onehouse (default: 'onehouse_default')

Returns:

  • str: The job run ID of the cancelled job

Example:

from airflow_providers_onehouse.operators.jobs import OnehouseCancelJobRunOperator

cancel_job_run = OnehouseCancelJobRunOperator(
task_id='cancel_onehouse_job_run',
job_run_id='acde070d-8c4c-4f0d-9d8a-162843c10333',
job_name='my_job',
conn_id='onehouse_default',
)