Using Delta to Submit a Spark Jar Job in DLI
1. Add the following dependencies:
<dependency> <groupId>io.delta</groupId> <artifactId>delta-core_2.12</artifactId> <version>2.3.0</version> </dependency>
2. Add the following parameters to SparkSession:
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
3. Write code (through SQL or API).
SQL development example is as follows. For specific SQL syntax, refer to "Delta SQL Syntax Reference".
public static void main( String[] args ) { SparkSession spark = SparkSession .builder() .enableHiveSupport() .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .appName("DeltaDemo") .getOrCreate(); String sql_create = "CREATE TABLE if not exists dligms.deltaTest1012 (\n" + " id int,\n" + " name string,\n" + " start_year int,\n" + " class string\n" + ") USING DELTA\n" + " partitioned by(start_year, class)\n" + " location 'obs://bucket_name/path/deltaTest1012'"; spark.sql(sql_create); String sql_insert = "insert into dligms.deltaTest1012 values\n" + "(1, 'zhangsan', 2024, 'whlg0905')," + "(2, 'lisi', 2024, 'whlg0905')," + "(3, 'wangwu', 2024, 'whlg0905')," + "(4, 'zhaoliu', 2024, 'whlg0905')"; spark.sql(sql_insert); String sql_select = "select * from dligms.deltaTest1012"; spark.sql(sql_select).show(); spark.stop(); }
API development example is as follows (Java).
public static void main( String[] args ) { SparkSession spark = SparkSession .builder() .enableHiveSupport() .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .appName("DeltaDemo") .getOrCreate(); DeltaTable.createIfNotExists(spark) .tableName("deltaJava1011") .addColumn("id", "INT") .addColumn("name", "STRING") .addColumn("start_year", "INT") .addColumn("class", "STRING") .partitionedBy("start_year", "class") .location("obs://bucket_name/path/deltaTest1011") .execute(); Dataset<Row> data = spark.read().format("csv") .option("header", "true") .option("inferSchema", "true") .load("obs://bucket_name/path/export/test1011/"); data.write().insertInto("deltaJava1011"); spark.stop(); }
4. Compile and package the code, and then run the job.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.