Create and Run Jobs
Create a Job
First, create a Onehouse Cluster with type Spark
to run your Jobs.
Next, open the Jobs
page to create your Job. You will configure the Job by filling in the following fields:
- Name: Specify a unique name to identify the Job (max 100 characters).
- Type: Specify the type of Job - this can be a JAR (for Java or Scala code) or Python.
- Cluster: Select a Onehouse Cluster with type
Spark
to run the Job. - Parameters: Specify an array of Strings to pass as parameters to the Job, which will be used in a
spark-submit
(see Apache Spark docs). This should include the following:- For JAR Jobs, you must include the
--class
property. - Include the cloud storage bucket path containing the code for your Job. The Onehouse agent must have access to read this path.
- Optionally, include any arguments you'd like to pass to the Job.
- Currently, the only supported properties are
--class
and--conf
.
- For JAR Jobs, you must include the
Example Parameters:
["--class", "com.example.MySparkApp', "--conf", "<KEY>=<VALUE>", "/path/to/my-spark-app.jar", "arg1", "arg2"]
Certain configurations can cause issues when changed. Follow these guidelines to ensure your Jobs run smoothly:
- Do not add
spark.extraListeners
in your code. You may instead add these in Parameters. - If you are setting
spark.driver.extraJavaOptions
orspark.executor.extraJavaOptions
, you must add"-Dlog4j.configuration=file:///opt/spark/log4j.properties"
. - Do not modify the following Spark configs:
<!-- Kubernetes -->
spark.kubernetes.node.selector.*
spark.kubernetes.driver.label.*
spark.kubernetes.executor.label.*
spark.kubernetes.driver.podTemplateFile
spark.kubernetes.executor.podTemplateFile
<!-- Observability -->
spark.metrics.*
spark.eventLog.* - Do not modify the following Hudi configs in your code:
hoodie.write.concurrency.mode
hoodie.cleaner.policy.failed.writes
hoodie.write.lock.* - By default,
hoodie.table.services.enabled
is set to false for Jobs, as Onehouse runs the table services asynchronously. If you set this totrue
, the Job will run table services in-line.
Importing pyspark version 3.4.3 is required for Python Jobs to run succesfully. Follow the steps in the Dependency Management section below to set this up.
Run a Job
After a Job is created, you may run it as many times as you'd like. Open the Jobs
page and find your Job. Open the Job and click Run
, or use the Onehouse APIs to run the Job.
When you run a Job, Onehouse will pick up the latest version of the JAR or Python script from the cloud storage bucket path in the Job definition. If the path does not exist, the Job run will fail.
Job runs can have the following states:
- Queued: Each Job run starts in this state when it is triggered, and stays until the Spark driver starts.
- Running: The Job run is actively running on the specified Cluster.
- Completed: The Job run completed without errors.
- Failed: The Job run failed with error(s).
- Canceled: The Job was manually canceled while it was queued or running.
Jobs can only have one concurrent run. In other words, when a Job is already running, triggering a new run is blocked.
Dependency Management
You may install Python libraries or Java Maven/Gradle dependencies for your Jobs. Dependencies are installed by each Job's Spark driver, rather than at the Cluster-level, to prevent dependency conflicts between Jobs running on the same Cluster.
Pre-Installed Libraries
Apache Hudi 0.14.1 is pre-installed on Onehouse Clusters.
Install Libraries for Java Jobs
You can upload any necessary Java libraries (JAR files) and include them in the parameters field when you create a job for your application.
First, upload your dependencies as JAR files to an S3/GCS location that Onehouse can access.
When you create a Job, you will reference these in the Job Parameters with "--conf"
, followed by "spark.jars=<JARS>"
. Below is an example of Job Parameters:
"--class", "com.example.MyApp", "--conf", "spark.jars=s3a://path/to/dependency1.jar,s3a://path/to/dependency2.jar", "s3a://path/to/myapp.jar"
Gradle Build settings and code additions for Java Jobs
For JAR Jobs, you should import Hudi 0.14.1 as a compileOnly dependency in your build.gradle
file:
dependencies {
compileOnly group: "org.apache.hudi", name: "hudi-spark3.4-bundle_2.12", version: "0.14.1"
}
Note: Import any other Hudi package as compileOnly dependencies as-needed.
Add the following code to support writer concurrency. You can find out what are your parameters by going the concurrency drop down for your table in the data tab.
.option("hoodie.write.concurrency.mode", "OPTIMISTIC_CONCURRENCY_CONTROL")
.option("hoodie.cleaner.policy.failed.writes", "LAZY")
.option("hoodie.write.lock.provider", "org.apache.hudi.aws.transaction.lock.DynamoDBBasedImplicitPartitionKeyLockProvider")
.option("hoodie.write.lock.dynamodb.region", "us-west-2")
.option("hoodie.write.lock.dynamodb.table", "ohprojectb624e0f6")
.option("hoodie.write.lock.dynamodb.partition_key", "raw_customers-D68E786B7C90E03C")
Install Libraries for Python Jobs
To provide Python libraries for your Job, first create a Python virtual environment (venv) and install all necessary dependencies within it. Then, package this entire venv directory into a compressed archive named venv.tar.gz. Finally, reference this venv.tar.gz file in the appropriate parameter field when creating your Job.
Onehouse Spark clusters may run on different CPU architectures depending on your instance configuration. You must build architecture-specific Python dependencies to ensure binary compatibility.
Architecture Detection: Contact Onehouse support to determine your cluster architecture, or check with your infrastructure team whether your instances use:
AMD64/x86_64 (Intel/AMD processors) ARM64/aarch64 (AWS Graviton)
Build Process: Create separate venv packages for each architecture using the appropriate Dockerfile:
Importing pyspark version 3.4.3 is required for Python Jobs to run succesfully.
Code additions for PySpark Jobs
Add the following code to support writer concurrency. You can find out what are your parameters by going the concurrency drop down for your table in the data tab.
.option("hoodie.write.concurrency.mode", "OPTIMISTIC_CONCURRENCY_CONTROL")
.option("hoodie.cleaner.policy.failed.writes", "LAZY")
.option("hoodie.write.lock.provider", "org.apache.hudi.aws.transaction.lock.DynamoDBBasedImplicitPartitionKeyLockProvider")
.option("hoodie.write.lock.dynamodb.region", "us-west-2")
.option("hoodie.write.lock.dynamodb.table", "ohprojectb624e0f6")
.option("hoodie.write.lock.dynamodb.partition_key", "raw_customers-D68E786B7C90E03C")
Let's say we have the following requirements.txt
:
pyspark==3.4.3
pandas==2.2.3
python-dateutil==2.9.0
pytz==2024.2
tzdata==2024.2
venv-pack==0.2.0
Sample Commands to Build venv.tar.gz in Linux Environment
In Linux environment, use the following sample commands to build a venv .tar
file:
$ python -m venv venv
$ source ./venv/bin/activate
$ pip install -r requirements.txt
$ venv-pack -o venv.tar.gz
Architecture-Specific Docker Builds
For AMD64 Clusters:
# Dockerfile.amd64
FROM --platform=linux/amd64 amazonlinux:2023
RUN yum update -y && \
yum install -y python3.12 python3.12-pip && \
yum clean all
WORKDIR /workspace
COPY requirements.txt .
RUN python3.12 -m venv venv
RUN . venv/bin/activate && pip install setuptools
RUN . venv/bin/activate && pip install -r requirements.txt
RUN . venv/bin/activate && venv-pack -o venv.tar.gz
Build commands:
docker buildx build -f Dockerfile.amd64 --platform linux/amd64 -t amazonlinux-venv-amd64 . --load
docker create --name temp-amd64 amazonlinux-venv-amd64
docker cp temp-amd64:/workspace/venv.tar.gz ./venv-amd64.tar.gz
docker rm temp-amd64
For ARM64 Clusters:
# Dockerfile.arm64
FROM --platform=linux/arm64 amazonlinux:2023
RUN yum update -y && \
yum install -y python3.12 python3.12-pip && \
yum clean all
WORKDIR /workspace
COPY requirements.txt .
RUN python3.12 -m venv venv
RUN . venv/bin/activate && pip install setuptools
RUN . venv/bin/activate && pip install -r requirements.txt
RUN . venv/bin/activate && venv-pack -o venv.tar.gz
Build commands:
docker buildx build -f Dockerfile.arm64 --platform linux/arm64 -t amazonlinux-venv-arm64 . --load
docker create --name temp-arm64 amazonlinux-venv-arm64
docker cp temp-arm64:/workspace/venv.tar.gz ./venv-arm64.tar.gz
docker rm temp-arm64
Job Parameters
Use the architecture-specific package in your job configuration:
- AMD64:
spark.archives=s3a://bucket/venv-amd64.tar.gz#environment
- ARM64:
spark.archives=s3a://bucket/venv-arm64.tar.gz#environment
Reference the venv in Jobs
Upload the architecture-specific venv file to an S3/GCS location that Onehouse can access. When you create a Job, you will specify these Spark configs in the Job Parameters to load the venv. Use the appropriate architecture-specific package as shown in the Job Parameters section above.
Example Job Parameters for AMD64:
"--conf", "spark.archives=s3a://<BUCKET>/<FOLDER>/venv-amd64.tar.gz#environment", "--conf", "spark.pyspark.python=./environment/bin/python", "s3a://path/to/pythonscript.py"
Example Job Parameters for ARM64:
"--conf", "spark.archives=s3a://<BUCKET>/<FOLDER>/venv-arm64.tar.gz#environment", "--conf", "spark.pyspark.python=./environment/bin/python", "s3a://path/to/pythonscript.py"
Installing custom packages (.whl files)
If you need to install a custom package (.whl file), you can add the file reference to the requirements.txt
file and build the venv.tar.gz
file similar to the steps above.
pyspark==3.4.3
pandas==2.2.3
python-dateutil==2.9.0
pytz==2024.2
tzdata==2024.2
venv-pack==0.2.0
./path/to/test-0.1.0-py3-none-any.whl
Default Configurations
By default, Jobs will use the following configurations:
Component | Spark Configuration Key | Property | Value |
---|---|---|---|
Driver | spark.kubernetes.driver.limit.cores | cpu limit | 1700m |
spark.kubernetes.driver.request.cores | cpu request | 1600m | |
spark.driver.cores | cores | 2 | |
spark.driver.memory | memory | 4500m | |
Executor | spark.kubernetes.executor.limit.cores | cpu limit | 1700m |
spark.kubernetes.executor.request.cores | cpu request | 1600m | |
spark.executor.cores | cores | 2 | |
spark.executor.memory | memory | 4500m | |
Dynamic Allocation | spark.dynamicAllocation.enabled | enabled | true |
spark.dynamicAllocation.initialExecutors | initial no. of executors | 0 | |
spark.dynamicAllocation.minExecutors | minimum no. of executors | 0 | |
spark.dynamicAllocation.maxExecutors | maximum no. of executors | 3 |
When changing the default configurations, you should ensure that a single driver and executor do not use more resources than the available resources per instance as specified below:
- AWS Projects
- Max Available CPU = 3600m
- Max Available Memory = 14500Mi
- GCP
- Max Available CPU = 3700m
- Max Available Memory = 12900Mi
Job Permissions
Currently, only Project Admins can view and run Jobs. We plan to expand access to more users introduce granular roles to manage permissions for Jobs.
Examples
Java Job Application
- Environment: JDK 1.17 for the best compatibility.
- Spark Init: Initialize the spark object for SparkSession operations.
- Database: Target database must pre-exist in Onehouse (create via UI/API).
- Catalog Sync: Use the Metadata Sync Table Service to register/update tables in your data catalog(s). Avoid
SparkSession.builder().enableHiveSupport()
andwrite.option("hoodie.datasource.meta_sync.condition.sync","true")
.
You can download the java application here SparkJava.zip.
Python Job Application
Here are some key ideas to keep in mind when writing your application.
- Environment: Must use Python version 3.12 .
- Spark Init: Initialize the spark object for SparkSession operations.
- Database: Target database must pre-exist in Onehouse (create via UI/API).
- Catalog Sync: Use the Metadata Sync Table Service to register/update tables in your data catalog(s). Avoid
SparkSession.builder().enableHiveSupport()
andwrite.option("hoodie.datasource.meta_sync.condition.sync","true")
.
You can download the python application here PySpark.zip.
Execute Spark SQL
You may use Spark SQL functions to emulate the behavior of SQL endpoint in a Job. Below is a sample Java code that returns a DataFrame of tables in a database using Spark SQL.
SparkSession spark = SparkSession.builder().getOrCreate();
Dataset<Row> sqlOutput = spark.sql("SHOW TABLES IN <DATABASE_NAME>");
sqlOutput.show(10);
Create Table from Dataframe
You may use Dataframes to create Onehouse tables. Below are example code snippets to create a table.
Include the saveAsTable
method to make the new table appear in the Onehouse console.
Java example:
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.hudi.DataSourceWriteOptions;
import java.util.Arrays;
import java.util.List;
public class HudiWriteExample {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("HudiWriteExample")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport() // Enable Hive for saveAsTable
.getOrCreate();
// Define schema
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.LongType, false, Metadata.empty()),
new StructField("name", DataTypes.StringType, true, Metadata.empty()),
new StructField("age", DataTypes.IntegerType, true, Metadata.empty()),
new StructField("company", DataTypes.StringType, true, Metadata.empty()),
new StructField("join_date", DataTypes.StringType, true, Metadata.empty()),
new StructField("location", DataTypes.StringType, true, Metadata.empty())
});
// Define data
List<Row> data = Arrays.asList(
RowFactory.create(1L, "Richard Hendricks", 30, "Pied Piper", "2014-05-15", "Mountain View"),
RowFactory.create(2L, "Erlich Bachman", 35, "Aviato", "2013-08-22", "San Francisco"),
RowFactory.create(3L, "Jared Dunn", 35, "Pied Piper", "2015-03-10", "Palo Alto"),
RowFactory.create(4L, "Dinesh Chugtai", 28, "Pied Piper", "2016-01-20", "San Jose"),
RowFactory.create(5L, "Bertram Gilfoyle", 32, "Pied Piper", "2015-11-05", "San Francisco")
);
// Create DataFrame
Dataset<Row> df = spark.createDataFrame(data, schema);
// Hudi table options
String tableType = DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(); // MOR (Merge-On-Read)
String databaseName = "DATABASE_NAME";
String tableName = "TABLE_NAME";
String tablePath = "s3a://bucket/table";
// Write to Hudi table
df.write()
.format("hudi")
.option(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "id")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "join_date")
.option(DataSourceWriteOptions.TABLE_TYPE().key(), tableType)
.option("hoodie.table.name", tableName)
.option("path", tablePath)
.mode("append")
.saveAsTable(String.format("%s.%s", databaseName, tableName)); // Save and register table in Hive metastore
}
}
Python example:
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType
# Initialize Spark session
spark = SparkSession.builder \
.appName("HudiWriteExample") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.enableHiveSupport() \
.getOrCreate()
# Define schema
schema = StructType([
StructField("id", LongType(), False),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("company", StringType(), True),
StructField("join_date", StringType(), True),
StructField("location", StringType(), True)
])
# Define data
data = [
Row(1, "Richard Hendricks", 30, "Pied Piper", "2014-05-15", "Mountain View"),
Row(2, "Erlich Bachman", 35, "Aviato", "2013-08-22", "San Francisco"),
Row(3, "Jared Dunn", 35, "Pied Piper", "2015-03-10", "Palo Alto"),
Row(4, "Dinesh Chugtai", 28, "Pied Piper", "2016-01-20", "San Jose"),
Row(5, "Bertram Gilfoyle", 32, "Pied Piper", "2015-11-05", "San Francisco")
]
# Create DataFrame
df = spark.createDataFrame(data, schema)
# Write to Hudi table
df.write \
.format("hudi") \
.option("hoodie.datasource.write.recordkey.field", "id") \
.option("hoodie.datasource.write.precombine.field", "join_date") \
.option("hoodie.datasource.write.table.type", "MERGE_ON_READ") \
.option("hoodie.table.name", "TABLE_NAME") \
.option("path", "s3a://bucket/table") \
.mode("append") \
.saveAsTable("DATABASE_NAME.TABLE_NAME")
Limitations
- Only Python version 3.12 is supported for Python Jobs.
- Only Project Admins can view and run Jobs currently.
- Spark DataFrame methods that use the catalog (using DATABASE.TABLE to specify the table), such as
saveAsTable
,insertInto
,append
, andoverwrite
do not work with tables created with Stream Capture. - Spark event logs, visible on the Spark History Server, are available for 30 days. Please contact Onehouse support to change the retention period.
- Currently, the only supported properties are
--class
and--conf
.
Delete a Job
Deleting a Job will remove it from the Onehouse console and prevent users from triggering new runs for the Job. Job runs already queued will not be canceled when you delete the Job.