文档首页/
MapReduce服务 MRS/
开发指南(普通版_3.x)/
Spark2x开发指南(安全模式)/
开发Spark应用/
Hudi的自定义配置项样例程序/
HoodieDeltaStreamer
更新时间:2024-06-27 GMT+08:00
HoodieDeltaStreamer
编写自定义的转化类实现Transformer。
编写自定义的Schema实现SchemaProvider。
在执行HoodieDeltaStreamer时加入参数:
--schemaprovider-class 定义的schema类 --transformer-class 定义的transform类
Transformer和SchemaProvider样例:
public class TransformerExample implements Transformer, Serializable { @Override public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) { JavaRDD<Row> rowJavaRdd = rowDataset.toJavaRDD(); List<Row> rowList =new ArrayList<>(); for(Row row: rowJavaRdd.collect()){ rowList.add(buildRow(row)); } JavaRDD<Row> stringJavaRdd = jsc.parallelize(rowList); List<StructField> fields = new ArrayList<>(); builFields(fields); StructType schema = DataTypes.createStructType(fields); Dataset<Row> dataFrame = sparkSession.createDataFrame(stringJavaRdd, schema); return dataFrame; } private void builFields(List<StructField> fields) { fields.add(DataTypes.createStructField("age", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("id", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("job", DataTypes.StringType, true)); } private Row buildRow(Row row){ String age = row.getString(0); String id = row.getString(1); String job = row.getString(2); String name = row.getString(3); Row returnRow = RowFactory.create(age, id, job, name); return returnRow; } } public class DataSchemaProviderExample extends SchemaProvider { public DataSchemaProviderExample(TypedProperties props, JavaSparkContext jssc) { super(props, jssc); } @Override public Schema getSourceSchema() { Schema avroSchema = new Schema.Parser().parse( "{\"type\":\"record\",\"name\":\"hoodie_source\",\"fields\":[{\"name\":\"age\",\"type\":\"string\"},{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"job\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"}]}"); return avroSchema; } @Override public Schema getTargetSchema() { Schema avroSchema = new Schema.Parser().parse( "{\"type\":\"record\",\"name\":\"mytest_record\",\"namespace\":\"hoodie.mytest\",\"fields\":[{\"name\":\"age\",\"type\":\"string\"},{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"job\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"}]}"); return avroSchema; } }
父主题: Hudi的自定义配置项样例程序