HoodieDeltaStreamer
Compile a user-defined conversion class for Transformer.
Compile a user-defined schema for SchemaProvider.
Add the following parameters when running HoodieDeltaStreamer:
--schemaprovider-class Defined schema class --transformer-class Defined transform class
Examples of Transformer and SchemaProvider:
public class TransformerExample implements Transformer, Serializable { @Override public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) { JavaRDD<Row> rowJavaRdd = rowDataset.toJavaRDD(); List<Row> rowList =new ArrayList<>(); for(Row row: rowJavaRdd.collect()){ rowList.add(buildRow(row)); } JavaRDD<Row> stringJavaRdd = jsc.parallelize(rowList); List<StructField> fields = new ArrayList<>(); builFields(fields); StructType schema = DataTypes.createStructType(fields); Dataset<Row> dataFrame = sparkSession.createDataFrame(stringJavaRdd, schema); return dataFrame; } private void builFields(List<StructField> fields) { fields.add(DataTypes.createStructField("age", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("id", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("job", DataTypes.StringType, true)); } private Row buildRow(Row row){ String age = row.getString(0); String id = row.getString(1); String job = row.getString(2); String name = row.getString(3); Row returnRow = RowFactory.create(age, id, job, name); return returnRow; } } public class DataSchemaProviderExample extends SchemaProvider { public DataSchemaProviderExample(TypedProperties props, JavaSparkContext jssc) { super(props, jssc); } @Override public Schema getSourceSchema() { Schema avroSchema = new Schema.Parser().parse( "{\"type\":\"record\",\"name\":\"hoodie_source\",\"fields\":[{\"name\":\"age\",\"type\":\"string\"},{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"job\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"}]}"); return avroSchema; } @Override public Schema getTargetSchema() { Schema avroSchema = new Schema.Parser().parse( "{\"type\":\"record\",\"name\":\"mytest_record\",\"namespace\":\"hoodie.mytest\",\"fields\":[{\"name\":\"age\",\"type\":\"string\"},{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"job\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"}]}"); return avroSchema; } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.