文档首页/
MapReduce服务 MRS/
开发指南(普通版_3.x)/
Spark2x开发指南(安全模式)/
开发Spark应用/
Hudi的自定义配置项样例程序/
HoodieDeltaStreamer
更新时间:2024-06-27 GMT+08:00
HoodieDeltaStreamer
编写自定义的转化类实现Transformer。
编写自定义的Schema实现SchemaProvider。
在执行HoodieDeltaStreamer时加入参数:
--schemaprovider-class 定义的schema类 --transformer-class 定义的transform类
Transformer和SchemaProvider样例:
public class TransformerExample implements Transformer, Serializable {
@Override
public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset,
TypedProperties properties) {
JavaRDD<Row> rowJavaRdd = rowDataset.toJavaRDD();
List<Row> rowList =new ArrayList<>();
for(Row row: rowJavaRdd.collect()){
rowList.add(buildRow(row));
}
JavaRDD<Row> stringJavaRdd = jsc.parallelize(rowList);
List<StructField> fields = new ArrayList<>();
builFields(fields);
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> dataFrame = sparkSession.createDataFrame(stringJavaRdd, schema);
return dataFrame;
}
private void builFields(List<StructField> fields) {
fields.add(DataTypes.createStructField("age", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("id", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("job", DataTypes.StringType, true));
}
private Row buildRow(Row row){
String age = row.getString(0);
String id = row.getString(1);
String job = row.getString(2);
String name = row.getString(3);
Row returnRow = RowFactory.create(age, id, job, name);
return returnRow;
}
}
public class DataSchemaProviderExample extends SchemaProvider {
public DataSchemaProviderExample(TypedProperties props, JavaSparkContext jssc) {
super(props, jssc);
}
@Override
public Schema getSourceSchema() {
Schema avroSchema = new Schema.Parser().parse(
"{\"type\":\"record\",\"name\":\"hoodie_source\",\"fields\":[{\"name\":\"age\",\"type\":\"string\"},{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"job\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"}]}");
return avroSchema;
}
@Override
public Schema getTargetSchema() {
Schema avroSchema = new Schema.Parser().parse(
"{\"type\":\"record\",\"name\":\"mytest_record\",\"namespace\":\"hoodie.mytest\",\"fields\":[{\"name\":\"age\",\"type\":\"string\"},{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"job\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"}]}");
return avroSchema;
}
}
父主题: Hudi的自定义配置项样例程序