在DLI使用Delta提交Spark Jar作业
1. 添加如下依赖
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_2.12</artifactId>
<version>2.3.0</version>
</dependency>
2. SparkSession添加下面两个参数
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
3. 编写代码(可通过sql或者api两种方式实现)。
1) sql开发示例如下,具体sql语法参考“Delta Sql语法参考”。
public static void main( String[] args )
{
SparkSession spark = SparkSession
.builder()
.enableHiveSupport()
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.appName("DeltaDemo")
.getOrCreate();
String sql_create = "CREATE TABLE if not exists dligms.deltaTest1012 (\n" +
" id int,\n" +
" name string,\n" +
" start_year int,\n" +
" class string\n" +
") USING DELTA\n" +
" partitioned by(start_year, class)\n" +
" location 'obs://bucket_name/path/deltaTest1012'";
spark.sql(sql_create);
String sql_insert = "insert into dligms.deltaTest1012 values\n" +
"(1, 'zhangsan', 2024, 'whlg0905')," +
"(2, 'lisi', 2024, 'whlg0905')," +
"(3, 'wangwu', 2024, 'whlg0905')," +
"(4, 'zhaoliu', 2024, 'whlg0905')";
spark.sql(sql_insert);
String sql_select = "select * from dligms.deltaTest1012";
spark.sql(sql_select).show();
spark.stop();
}
2) api开发示例如下(java)。
public static void main( String[] args )
{
SparkSession spark = SparkSession
.builder()
.enableHiveSupport()
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.appName("DeltaDemo")
.getOrCreate();
DeltaTable.createIfNotExists(spark)
.tableName("deltaJava1011")
.addColumn("id", "INT")
.addColumn("name", "STRING")
.addColumn("start_year", "INT")
.addColumn("class", "STRING")
.partitionedBy("start_year", "class")
.location("obs://bucket_name/path/deltaTest1011")
.execute();
Dataset<Row> data = spark.read().format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("obs://bucket_name/path/export/test1011/");
data.write().insertInto("deltaJava1011");
spark.stop();
}
4. 编译打包后参照https://support.huaweicloud.com/devg-dli/dli_09_0205.html创建Spark Jar作业