Skip to main content

Create an Apache Hudi table

This example shows how you can use Apache Spark DataFrames to create Apache Hudi tables and register them in Onehouse.

Prerequistes

  • You will need an existing database in Onehouse. To create a database, you can follow the steps here.

Registering the table in Onehouse

To register the Apache Hudi table in Onehouse, include the saveAsTable method. This will allow the table to appear on the Data page in the Onehouse console.

Java example code

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.hudi.DataSourceWriteOptions;

import java.util.Arrays;
import java.util.List;

public class HudiWriteExample {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("HudiWriteExample")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate();

// Define schema
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.LongType, false, Metadata.empty()),
new StructField("name", DataTypes.StringType, true, Metadata.empty()),
new StructField("age", DataTypes.IntegerType, true, Metadata.empty()),
new StructField("company", DataTypes.StringType, true, Metadata.empty()),
new StructField("join_date", DataTypes.StringType, true, Metadata.empty()),
new StructField("location", DataTypes.StringType, true, Metadata.empty())
});

// Define data
List<Row> data = Arrays.asList(
RowFactory.create(1L, "Richard Hendricks", 30, "Pied Piper", "2014-05-15", "Mountain View"),
RowFactory.create(2L, "Erlich Bachman", 35, "Aviato", "2013-08-22", "San Francisco"),
RowFactory.create(3L, "Jared Dunn", 35, "Pied Piper", "2015-03-10", "Palo Alto"),
RowFactory.create(4L, "Dinesh Chugtai", 28, "Pied Piper", "2016-01-20", "San Jose"),
RowFactory.create(5L, "Bertram Gilfoyle", 32, "Pied Piper", "2015-11-05", "San Francisco")
);

// Create DataFrame
Dataset<Row> df = spark.createDataFrame(data, schema);

// Hudi table options
String tableType = DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(); // MOR (Merge-On-Read)
String databaseName = "DATABASE_NAME";
String tableName = "TABLE_NAME";
String tablePath = "s3a://bucket/table";

// Write to Hudi table
df.write()
.format("hudi")
.option(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "id")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "join_date")
.option(DataSourceWriteOptions.TABLE_TYPE().key(), tableType)
.option("hoodie.table.name", tableName)
.option("path", tablePath)
.mode("append")
.saveAsTable(String.format("%s.%s", databaseName, tableName));
}
}

Python example code

from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType

# Initialize Spark session
spark = SparkSession.builder \
.appName("HudiWriteExample") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.getOrCreate()

# Define schema
schema = StructType([
StructField("id", LongType(), False),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("company", StringType(), True),
StructField("join_date", StringType(), True),
StructField("location", StringType(), True)
])

# Define data
data = [
Row(1, "Richard Hendricks", 30, "Pied Piper", "2014-05-15", "Mountain View"),
Row(2, "Erlich Bachman", 35, "Aviato", "2013-08-22", "San Francisco"),
Row(3, "Jared Dunn", 35, "Pied Piper", "2015-03-10", "Palo Alto"),
Row(4, "Dinesh Chugtai", 28, "Pied Piper", "2016-01-20", "San Jose"),
Row(5, "Bertram Gilfoyle", 32, "Pied Piper", "2015-11-05", "San Francisco")
]

# Create DataFrame
df = spark.createDataFrame(data, schema)

# Write to Hudi table
df.write \
.format("hudi") \
.option("hoodie.datasource.write.recordkey.field", "id") \
.option("hoodie.datasource.write.precombine.field", "join_date") \
.option("hoodie.datasource.write.table.type", "MERGE_ON_READ") \
.option("hoodie.table.name", "TABLE_NAME") \
.option("path", "s3a://bucket/table") \
.mode("append") \
.saveAsTable("DATABASE_NAME.TABLE_NAME")