Skip to main content

Create and Run Jobs

Create a Job

First, create a Onehouse Cluster with type Spark to run your Jobs.

Next, open the Jobs page to create your Job. You will configure the Job by filling in the following fields:

  • Name: Specify a unique name to identify the Job (max 100 characters).
  • Type: Specify the type of Job - this can be a JAR (for Java or Scala code) or Python.
  • Cluster: Select a Onehouse Cluster with type Spark to run the Job.
  • Parameters: Specify an array of Strings to pass as parameters to the Job, which will be used in a spark-submit (see Apache Spark docs). This should include the following:
    • For JAR Jobs, you must include the --class property.
    • Include the cloud storage bucket path containing the code for your Job. The Onehouse agent must have access to read this path.
    • Optionally, include any arguments you'd like to pass to the Job.
    • Currently, the only supported properties are --class and --conf.

Example Parameters:

["--class", "com.example.MySparkApp', "--conf", "<KEY>=<VALUE>", "/path/to/my-spark-app.jar", "arg1", "arg2"]
Caution

Certain configurations can cause issues when changed. Follow these guidelines to ensure your Jobs run smoothly:

  • Do not add spark.extraListeners in your code. You may instead add these in Parameters.
  • If you are setting spark.driver.extraJavaOptions or spark.executor.extraJavaOptions, you must add "-Dlog4j.configuration=file:///opt/spark/log4j.properties".
  • Do not modify the following Spark configs:
    <!-- Kubernetes -->
    spark.kubernetes.node.selector.*
    spark.kubernetes.driver.label.*
    spark.kubernetes.executor.label.*
    spark.kubernetes.driver.podTemplateFile
    spark.kubernetes.executor.podTemplateFile
    <!-- Observability -->
    spark.metrics.*
    spark.eventLog.*
  • Do not modify the following Hudi configs in your code:
    hoodie.write.concurrency.mode
    hoodie.cleaner.policy.failed.writes
    hoodie.write.lock.*
  • By default, hoodie.table.services.enabled is set to false for Jobs, as Onehouse runs the table services asynchronously. If you set this to true, the Job will run table services in-line.
tip

Importing pyspark version 3.4.3 is required for Python Jobs to run succesfully. Follow the steps in the Dependency Management section below to set this up.

Run a Job

After a Job is created, you may run it as many times as you'd like. Open the Jobs page and find your Job. Open the Job and click Run, or use the Onehouse APIs to run the Job.

When you run a Job, Onehouse will pick up the latest version of the JAR or Python script from the cloud storage bucket path in the Job definition. If the path does not exist, the Job run will fail.

Job runs can have the following states:

  • Queued: Each Job run starts in this state when it is triggered, and stays until the Spark driver starts.
  • Running: The Job run is actively running on the specified Cluster.
  • Completed: The Job run completed without errors.
  • Failed: The Job run failed with error(s).
  • Canceled: The Job was manually canceled while it was queued or running.
info

Jobs can only have one concurrent run. In other words, when a Job is already running, triggering a new run is blocked.

Dependency Management

You may install Python libraries or Java Maven/Gradle dependencies for your Jobs. Dependencies are installed by each Job's Spark driver, rather than at the Cluster-level, to prevent dependency conflicts between Jobs running on the same Cluster.

Pre-Installed Libraries

Apache Hudi 0.14.1 is pre-installed on Onehouse Clusters.

Install Libraries for Java Jobs

You can upload any necessary Java libraries (JAR files) and include them in the parameters field when you create a job for your application.

First, upload your dependencies as JAR files to an S3/GCS location that Onehouse can access.

When you create a Job, you will reference these in the Job Parameters with "--conf", followed by "spark.jars=<JARS>". Below is an example of Job Parameters:

"--class", "com.example.MyApp", "--conf", "spark.jars=s3a://path/to/dependency1.jar,s3a://path/to/dependency2.jar", "s3a://path/to/myapp.jar"

Gradle Build settings and code additions for Java Jobs

For JAR Jobs, you should import Hudi 0.14.1 as a compileOnly dependency in your build.gradle file:

dependencies {
compileOnly group: "org.apache.hudi", name: "hudi-spark3.4-bundle_2.12", version: "0.14.1"
}

Note: Import any other Hudi package as compileOnly dependencies as-needed.

Add the following code to support writer concurrency. You can find out what are your parameters by going the concurrency drop down for your table in the data tab.

                .option("hoodie.write.concurrency.mode", "OPTIMISTIC_CONCURRENCY_CONTROL")
.option("hoodie.cleaner.policy.failed.writes", "LAZY")
.option("hoodie.write.lock.provider", "org.apache.hudi.aws.transaction.lock.DynamoDBBasedImplicitPartitionKeyLockProvider")
.option("hoodie.write.lock.dynamodb.region", "us-west-2")
.option("hoodie.write.lock.dynamodb.table", "ohprojectb624e0f6")
.option("hoodie.write.lock.dynamodb.partition_key", "raw_customers-D68E786B7C90E03C")

Install Libraries for Python Jobs

To provide Python libraries for your Job, first create a Python virtual environment (venv) and install all necessary dependencies within it. Then, package this entire venv directory into a compressed archive named venv.tar.gz. Finally, reference this venv.tar.gz file in the appropriate parameter field when creating your Job.

Critical

Onehouse Spark clusters may run on different CPU architectures depending on your instance configuration. You must build architecture-specific Python dependencies to ensure binary compatibility.

Architecture Detection: Contact Onehouse support to determine your cluster architecture, or check with your infrastructure team whether your instances use:

AMD64/x86_64 (Intel/AMD processors) ARM64/aarch64 (AWS Graviton)

Build Process: Create separate venv packages for each architecture using the appropriate Dockerfile:

tip

Importing pyspark version 3.4.3 is required for Python Jobs to run succesfully.

Code additions for PySpark Jobs

Add the following code to support writer concurrency. You can find out what are your parameters by going the concurrency drop down for your table in the data tab.

                .option("hoodie.write.concurrency.mode", "OPTIMISTIC_CONCURRENCY_CONTROL")
.option("hoodie.cleaner.policy.failed.writes", "LAZY")
.option("hoodie.write.lock.provider", "org.apache.hudi.aws.transaction.lock.DynamoDBBasedImplicitPartitionKeyLockProvider")
.option("hoodie.write.lock.dynamodb.region", "us-west-2")
.option("hoodie.write.lock.dynamodb.table", "ohprojectb624e0f6")
.option("hoodie.write.lock.dynamodb.partition_key", "raw_customers-D68E786B7C90E03C")

Let's say we have the following requirements.txt:

pyspark==3.4.3
pandas==2.2.3
python-dateutil==2.9.0
pytz==2024.2
tzdata==2024.2
venv-pack==0.2.0

Sample Commands to Build venv.tar.gz in Linux Environment

In Linux environment, use the following sample commands to build a venv .tar file:

$ python -m venv venv
$ source ./venv/bin/activate
$ pip install -r requirements.txt
$ venv-pack -o venv.tar.gz

Architecture-Specific Docker Builds

For AMD64 Clusters:

# Dockerfile.amd64
FROM --platform=linux/amd64 amazonlinux:2023
RUN yum update -y && \
yum install -y python3.12 python3.12-pip && \
yum clean all
WORKDIR /workspace
COPY requirements.txt .
RUN python3.12 -m venv venv
RUN . venv/bin/activate && pip install setuptools
RUN . venv/bin/activate && pip install -r requirements.txt
RUN . venv/bin/activate && venv-pack -o venv.tar.gz

Build commands:

docker buildx build -f Dockerfile.amd64 --platform linux/amd64 -t amazonlinux-venv-amd64 . --load
docker create --name temp-amd64 amazonlinux-venv-amd64
docker cp temp-amd64:/workspace/venv.tar.gz ./venv-amd64.tar.gz
docker rm temp-amd64

For ARM64 Clusters:

# Dockerfile.arm64  
FROM --platform=linux/arm64 amazonlinux:2023
RUN yum update -y && \
yum install -y python3.12 python3.12-pip && \
yum clean all
WORKDIR /workspace
COPY requirements.txt .
RUN python3.12 -m venv venv
RUN . venv/bin/activate && pip install setuptools
RUN . venv/bin/activate && pip install -r requirements.txt
RUN . venv/bin/activate && venv-pack -o venv.tar.gz

Build commands:

docker buildx build -f Dockerfile.arm64 --platform linux/arm64 -t amazonlinux-venv-arm64 . --load
docker create --name temp-arm64 amazonlinux-venv-arm64
docker cp temp-arm64:/workspace/venv.tar.gz ./venv-arm64.tar.gz
docker rm temp-arm64

Job Parameters

Use the architecture-specific package in your job configuration:

  • AMD64: spark.archives=s3a://bucket/venv-amd64.tar.gz#environment
  • ARM64: spark.archives=s3a://bucket/venv-arm64.tar.gz#environment

Reference the venv in Jobs

Upload the architecture-specific venv file to an S3/GCS location that Onehouse can access. When you create a Job, you will specify these Spark configs in the Job Parameters to load the venv. Use the appropriate architecture-specific package as shown in the Job Parameters section above.

Example Job Parameters for AMD64:

"--conf", "spark.archives=s3a://<BUCKET>/<FOLDER>/venv-amd64.tar.gz#environment", "--conf", "spark.pyspark.python=./environment/bin/python", "s3a://path/to/pythonscript.py"

Example Job Parameters for ARM64:

"--conf", "spark.archives=s3a://<BUCKET>/<FOLDER>/venv-arm64.tar.gz#environment", "--conf", "spark.pyspark.python=./environment/bin/python", "s3a://path/to/pythonscript.py"

Installing custom packages (.whl files)

If you need to install a custom package (.whl file), you can add the file reference to the requirements.txt file and build the venv.tar.gz file similar to the steps above.

pyspark==3.4.3
pandas==2.2.3
python-dateutil==2.9.0
pytz==2024.2
tzdata==2024.2
venv-pack==0.2.0
./path/to/test-0.1.0-py3-none-any.whl

Default Configurations

By default, Jobs will use the following configurations:

ComponentSpark Configuration KeyPropertyValue
Driverspark.kubernetes.driver.limit.corescpu limit1700m
spark.kubernetes.driver.request.corescpu request1600m
spark.driver.corescores2
spark.driver.memorymemory4500m
Executorspark.kubernetes.executor.limit.corescpu limit1700m
spark.kubernetes.executor.request.corescpu request1600m
spark.executor.corescores2
spark.executor.memorymemory4500m
Dynamic Allocationspark.dynamicAllocation.enabledenabledtrue
spark.dynamicAllocation.initialExecutorsinitial no. of executors0
spark.dynamicAllocation.minExecutorsminimum no. of executors0
spark.dynamicAllocation.maxExecutorsmaximum no. of executors3
tip

When changing the default configurations, you should ensure that a single driver and executor do not use more resources than the available resources per instance as specified below:

  • AWS Projects
    • Max Available CPU = 3600m
    • Max Available Memory = 14500Mi
  • GCP
    • Max Available CPU = 3700m
    • Max Available Memory = 12900Mi

Job Permissions

Currently, only Project Admins can view and run Jobs. We plan to expand access to more users introduce granular roles to manage permissions for Jobs.

Examples

Java Job Application

  • Environment: JDK 1.17 for the best compatibility.
  • Spark Init: Initialize the spark object for SparkSession operations.
  • Database: Target database must pre-exist in Onehouse (create via UI/API).
  • Catalog Sync: Use the Metadata Sync Table Service to register/update tables in your data catalog(s). Avoid SparkSession.builder().enableHiveSupport() and write.option("hoodie.datasource.meta_sync.condition.sync","true").

You can download the java application here SparkJava.zip.

Python Job Application

Here are some key ideas to keep in mind when writing your application.

  • Environment: Must use Python version 3.12 .
  • Spark Init: Initialize the spark object for SparkSession operations.
  • Database: Target database must pre-exist in Onehouse (create via UI/API).
  • Catalog Sync: Use the Metadata Sync Table Service to register/update tables in your data catalog(s). Avoid SparkSession.builder().enableHiveSupport() and write.option("hoodie.datasource.meta_sync.condition.sync","true").

You can download the python application here PySpark.zip.

Execute Spark SQL

You may use Spark SQL functions to emulate the behavior of SQL endpoint in a Job. Below is a sample Java code that returns a DataFrame of tables in a database using Spark SQL.

SparkSession spark = SparkSession.builder().getOrCreate();
Dataset<Row> sqlOutput = spark.sql("SHOW TABLES IN <DATABASE_NAME>");
sqlOutput.show(10);

Create Table from Dataframe

You may use Dataframes to create Onehouse tables. Below are example code snippets to create a table.

tip

Include the saveAsTable method to make the new table appear in the Onehouse console.

Java example:

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.hudi.DataSourceWriteOptions;

import java.util.Arrays;
import java.util.List;

public class HudiWriteExample {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("HudiWriteExample")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport() // Enable Hive for saveAsTable
.getOrCreate();

// Define schema
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.LongType, false, Metadata.empty()),
new StructField("name", DataTypes.StringType, true, Metadata.empty()),
new StructField("age", DataTypes.IntegerType, true, Metadata.empty()),
new StructField("company", DataTypes.StringType, true, Metadata.empty()),
new StructField("join_date", DataTypes.StringType, true, Metadata.empty()),
new StructField("location", DataTypes.StringType, true, Metadata.empty())
});

// Define data
List<Row> data = Arrays.asList(
RowFactory.create(1L, "Richard Hendricks", 30, "Pied Piper", "2014-05-15", "Mountain View"),
RowFactory.create(2L, "Erlich Bachman", 35, "Aviato", "2013-08-22", "San Francisco"),
RowFactory.create(3L, "Jared Dunn", 35, "Pied Piper", "2015-03-10", "Palo Alto"),
RowFactory.create(4L, "Dinesh Chugtai", 28, "Pied Piper", "2016-01-20", "San Jose"),
RowFactory.create(5L, "Bertram Gilfoyle", 32, "Pied Piper", "2015-11-05", "San Francisco")
);

// Create DataFrame
Dataset<Row> df = spark.createDataFrame(data, schema);

// Hudi table options
String tableType = DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(); // MOR (Merge-On-Read)
String databaseName = "DATABASE_NAME";
String tableName = "TABLE_NAME";
String tablePath = "s3a://bucket/table";

// Write to Hudi table
df.write()
.format("hudi")
.option(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "id")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "join_date")
.option(DataSourceWriteOptions.TABLE_TYPE().key(), tableType)
.option("hoodie.table.name", tableName)
.option("path", tablePath)
.mode("append")
.saveAsTable(String.format("%s.%s", databaseName, tableName)); // Save and register table in Hive metastore
}
}

Python example:

from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType

# Initialize Spark session
spark = SparkSession.builder \
.appName("HudiWriteExample") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.enableHiveSupport() \
.getOrCreate()

# Define schema
schema = StructType([
StructField("id", LongType(), False),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("company", StringType(), True),
StructField("join_date", StringType(), True),
StructField("location", StringType(), True)
])

# Define data
data = [
Row(1, "Richard Hendricks", 30, "Pied Piper", "2014-05-15", "Mountain View"),
Row(2, "Erlich Bachman", 35, "Aviato", "2013-08-22", "San Francisco"),
Row(3, "Jared Dunn", 35, "Pied Piper", "2015-03-10", "Palo Alto"),
Row(4, "Dinesh Chugtai", 28, "Pied Piper", "2016-01-20", "San Jose"),
Row(5, "Bertram Gilfoyle", 32, "Pied Piper", "2015-11-05", "San Francisco")
]

# Create DataFrame
df = spark.createDataFrame(data, schema)

# Write to Hudi table
df.write \
.format("hudi") \
.option("hoodie.datasource.write.recordkey.field", "id") \
.option("hoodie.datasource.write.precombine.field", "join_date") \
.option("hoodie.datasource.write.table.type", "MERGE_ON_READ") \
.option("hoodie.table.name", "TABLE_NAME") \
.option("path", "s3a://bucket/table") \
.mode("append") \
.saveAsTable("DATABASE_NAME.TABLE_NAME")

Limitations

  • Only Python version 3.12 is supported for Python Jobs.
  • Only Project Admins can view and run Jobs currently.
  • Spark DataFrame methods that use the catalog (using DATABASE.TABLE to specify the table), such as saveAsTable, insertInto, append, and overwrite do not work with tables created with Stream Capture.
  • Spark event logs, visible on the Spark History Server, are available for 30 days. Please contact Onehouse support to change the retention period.
  • Currently, the only supported properties are --class and --conf.

Delete a Job

Deleting a Job will remove it from the Onehouse console and prevent users from triggering new runs for the Job. Job runs already queued will not be canceled when you delete the Job.