Skip to main content

Create an Apache Hudi table

This example shows how you can use Apache Spark DataFrames to create Apache Hudi tables and register them in Onehouse.

Prerequistes

  • You will need an existing database in Onehouse. To create a database, you can follow the steps here.

Register table in the Onehouse catalog

Register table in the Onehouse catalog

To make your Apache Hudi table visible in the Onehouse console and enable Table Services, you must write your DataFrame using the saveAsTable method. When calling saveAsTable, always specify the table name in the format <database>.<table> (see example code).

Java example code

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.hudi.DataSourceWriteOptions;

import java.util.Arrays;
import java.util.List;

public class HudiWriteExample {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("HudiWriteExample")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate();

// Define schema
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.LongType, false, Metadata.empty()),
new StructField("name", DataTypes.StringType, true, Metadata.empty()),
new StructField("age", DataTypes.IntegerType, true, Metadata.empty()),
new StructField("company", DataTypes.StringType, true, Metadata.empty()),
new StructField("join_date", DataTypes.StringType, true, Metadata.empty()),
new StructField("location", DataTypes.StringType, true, Metadata.empty())
});

// Define data
List<Row> data = Arrays.asList(
RowFactory.create(1L, "Richard Hendricks", 30, "Pied Piper", "2014-05-15", "Mountain View"),
RowFactory.create(2L, "Erlich Bachman", 35, "Aviato", "2013-08-22", "San Francisco"),
RowFactory.create(3L, "Jared Dunn", 35, "Pied Piper", "2015-03-10", "Palo Alto"),
RowFactory.create(4L, "Dinesh Chugtai", 28, "Pied Piper", "2016-01-20", "San Jose"),
RowFactory.create(5L, "Bertram Gilfoyle", 32, "Pied Piper", "2015-11-05", "San Francisco")
);

// Create DataFrame
Dataset<Row> df = spark.createDataFrame(data, schema);

// Hudi table options
String tableType = DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(); // MOR (Merge-On-Read)
String databaseName = "DATABASE_NAME";
String tableName = "TABLE_NAME";
String tablePath = "s3a://bucket/table";

// Write to Hudi table
df.write()
.format("hudi")
.option(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "id")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "join_date")
.option(DataSourceWriteOptions.TABLE_TYPE().key(), tableType)
.option("hoodie.table.name", tableName)
.option("path", tablePath)
.mode("append")
.saveAsTable(String.format("%s.%s", databaseName, tableName));
}
}

Python example code

from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType

# Initialize Spark session
spark = SparkSession.builder \
.appName("HudiWriteExample") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.getOrCreate()

# Define schema
schema = StructType([
StructField("id", LongType(), False),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("company", StringType(), True),
StructField("join_date", StringType(), True),
StructField("location", StringType(), True)
])

# Define data
data = [
Row(1, "Richard Hendricks", 30, "Pied Piper", "2014-05-15", "Mountain View"),
Row(2, "Erlich Bachman", 35, "Aviato", "2013-08-22", "San Francisco"),
Row(3, "Jared Dunn", 35, "Pied Piper", "2015-03-10", "Palo Alto"),
Row(4, "Dinesh Chugtai", 28, "Pied Piper", "2016-01-20", "San Jose"),
Row(5, "Bertram Gilfoyle", 32, "Pied Piper", "2015-11-05", "San Francisco")
]

# Create DataFrame
df = spark.createDataFrame(data, schema)

# Write to Hudi table
df.write \
.format("hudi") \
.option("hoodie.datasource.write.recordkey.field", "id") \
.option("hoodie.datasource.write.precombine.field", "join_date") \
.option("hoodie.datasource.write.table.type", "MERGE_ON_READ") \
.option("hoodie.table.name", "TABLE_NAME") \
.option("path", "s3a://bucket/table") \
.mode("append") \
.saveAsTable("DATABASE_NAME.TABLE_NAME")