分类 标签 存档 黑客派 订阅 搜索

flink 自定义 TableSink 输出数据到控制台

448 浏览

官方文档 中关于 TableSink 仅给出了接口名称和少量描述,关于接口方法的解释所提甚少。

为了能更好地理解 TableSink 的功能和实现方式,这里写了一个简单的场景:
读取一个英文文本文件,统计各单词出现的次数,并将统计结果转为 Table 对象,再把 Table 内容输出到控制台。

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();

    EnvironmentSettings settings = EnvironmentSettings
                    .newInstance()
                    .useBlinkPlanner()
                    .inStreamingMode().build();

    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);


    // 从 hdfs 读取文件
    DataStreamSource<String> text = streamEnv.readTextFile("hdfs://xxxx/file");

    // 统计
    SingleOutputStreamOperator<Tuple2<String, Integer>> counts = text
            .flatMap(new Tokenizer())
            .keyBy(0)
            .sum(1);

    // 将统计结果转为表对象
    Table countsTable = tableEnv.fromDataStream(counts, "word,count");

    // 创建 TableSink
    PrintTableSink sink = PrintTableSink.ofConsole();

    // 注册 TableSink
    tableEnv.registerTableSink(
            "ConsoleSinkTable",
            new String[]{"word", "count"},
            new TypeInformation[]{Types.STRING, Types.INT},
            sink
    );

    // 将表内容输出
    countsTable.insertInto("ConsoleSinkTable");

    // 启动任务
    streamEnv.execute("this is job name");

}

/**
 * 分词器
 */
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        
        String[] words = value.toLowerCase().split("\\W+");

        for (String word : words) {
            if (word.length() > 0) {
                out.collect(new Tuple2<>(word, 1));
            }
        }
    }
}


自定义 TableSink 的实现:

public class PrintTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> {

    private String target;
    private PrintTableSinkFunction sinkFunction;

    public PrintTableSink(String target) {
        this.target = target;

        /**
         * 重点!!!
         *
         * PrintTableSinkFunction 是一个自定义的 SinkFunction
         * 描述了当接收到一条数据时,该如何 sink 的具体逻辑
         */
        this.sinkFunction = new PrintTableSinkFunction(target);
    }

    /**
     * 添加当流被消费时的 sink 逻辑
     */
    @Override
    public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
        return dataStream.addSink(this.sinkFunction);
    }

    /**
     * 对 "流" 添加 sink 逻辑(单条数据)
     */
    @Override
    public void emitDataStream(DataStream<Row> dataStream) {
        dataStream.addSink(this.sinkFunction);
    }

    /**
     * 对 "批" 添加 sink 逻辑(多条数据)
     */
    @Override
    public void emitDataSet(DataSet<Row> dataSet) {

        try {

            List<Row> elements = dataSet.collect();
            for (Iterator<Row> it = elements.iterator(); it.hasNext(); ) {
                Row row = it.next();
                this.sinkFunction.invoke(row);
            }

            dataSet.print();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    private String[] fieldNames;
    private TypeInformation<?>[] fieldTypes;

    /**
     * 当 StreamTableEnvironment.registerTableSink() 时,会通过此方法完成 TableSink 对象的创建。
     *
     * @param strings          字段名列表
     * @param typeInformations 字段类型列表
     * @return
     */
    @Override
    public TableSink<Row> configure(String[] strings, TypeInformation<?>[] typeInformations) {
        PrintTableSink sink = new PrintTableSink(target);
        sink.fieldNames = strings;
        sink.fieldTypes = typeInformations;

        return sink;
    }

    /**
     * 表的字段列表
     */
    @Override
    public String[] getFieldNames() {
        return fieldNames;
    }

    /**
     * 表字段的数据类型
     */
    @Override
    public TypeInformation<?>[] getFieldTypes() {
        return fieldTypes;
    }

    /**
     * 表字段类型的描述信息
     */
    @Override
    public TypeInformation<Row> getOutputType() {
        return Types.ROW_NAMED(fieldNames, fieldTypes);
    }


    /**
     * 这里定义了当接收到一条数据时,该如何 sink 的具体逻辑
     */
    public static class PrintTableSinkFunction implements SinkFunction<Row> {
        private static Logger LOG = LoggerFactory.getLogger(PrintTableSink.class);
        private String target;

        public PrintTableSinkFunction(String target) {
            this.target = target;
        }

        @Override
        public void invoke(Row row, Context context) throws Exception {
            switch (target) {
                case "Console":
                    System.out.println(row);
                    break;
                case "Logger":
                    LOG.info(row.toString());
                    break;
                default:
            }
        }

        @Override
        public void invoke(Row value) throws Exception {
            invoke(value, null);
        }
    }

}

如果需要将数据保存到 数据库、Redis、Hadoop 平台 等地方,只需在上面的 PrintTableSinkFunction 内修改输出逻辑即可。

这里只演示了写入的两种情况(AppendStreamTableSink、BatchTableSink),还有支持删除和修改的 RetractStreamTableSink、UpsertStreamTableSink 接口没有演示,我尽量这几天补上 :)

注:以上代码基于 flink 1.9

评论  
留下你的脚步
推荐阅读