Help Center/
MapReduce Service/
Developer Guide (Normal_3.x)/
Spark2x Development Guide (Normal Mode)/
Developing Spark Applications/
Sample Project for Customizing Configuration Items in Hudi/
HoodieDeltaStreamer
Updated on 2024-10-23 GMT+08:00
HoodieDeltaStreamer
Compile a user-defined conversion class for Transformer.
Compile a user-defined schema for SchemaProvider.
Add the following parameters when running HoodieDeltaStreamer:
--schemaprovider-class Defined schema class --transformer-class Defined transform class
Examples of Transformer and SchemaProvider:
public class TransformerExample implements Transformer, Serializable { @Override public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) { JavaRDD<Row> rowJavaRdd = rowDataset.toJavaRDD(); List<Row> rowList =new ArrayList<>(); for(Row row: rowJavaRdd.collect()){ rowList.add(buildRow(row)); } JavaRDD<Row> stringJavaRdd = jsc.parallelize(rowList); List<StructField> fields = new ArrayList<>(); builFields(fields); StructType schema = DataTypes.createStructType(fields); Dataset<Row> dataFrame = sparkSession.createDataFrame(stringJavaRdd, schema); return dataFrame; } private void builFields(List<StructField> fields) { fields.add(DataTypes.createStructField("age", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("id", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("job", DataTypes.StringType, true)); } private Row buildRow(Row row){ String age = row.getString(0); String id = row.getString(1); String job = row.getString(2); String name = row.getString(3); Row returnRow = RowFactory.create(age, id, job, name); return returnRow; } } public class DataSchemaProviderExample extends SchemaProvider { public DataSchemaProviderExample(TypedProperties props, JavaSparkContext jssc) { super(props, jssc); } @Override public Schema getSourceSchema() { Schema avroSchema = new Schema.Parser().parse( "{\"type\":\"record\",\"name\":\"hoodie_source\",\"fields\":[{\"name\":\"age\",\"type\":\"string\"},{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"job\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"}]}"); return avroSchema; } @Override public Schema getTargetSchema() { Schema avroSchema = new Schema.Parser().parse( "{\"type\":\"record\",\"name\":\"mytest_record\",\"namespace\":\"hoodie.mytest\",\"fields\":[{\"name\":\"age\",\"type\":\"string\"},{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"job\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"}]}"); return avroSchema; } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
The system is busy. Please try again later.
For any further questions, feel free to contact us through the chatbot.
Chatbot