Process API介绍

GeaFlow对外提供了一系列Process API,这些API和通用的流批类似,但不完全相同。我们在Source API中已有介绍,其构建出来的source是带有window的,因此GeaFlow所有的Process API也都带有window语义。

接口

API 接口说明 入参说明
PWindowStream map(MapFunction mapFunction) 通过实现mapFunction,可以将输入的T转换成R向下游输出。 mapFunction:用户自定义转换逻辑,T表示输入类型,R表示输出类型
PWindowStream filter(FilterFunction filterFunction) 通过实现filterFunction,可以将不符合要求的T进行过滤。 filterFunction:用户自定义过滤逻辑,T表示输入类型
PWindowStream flatMap(FlatMapFunction flatMapFunction) 通过实现flatMapFunction,可以将输入的一个T,生成n个R向下游输出。 flatMapFunction:用户自定义实现逻辑,T表示输入类型,R表示输出类型。
PWindowStream union(PStream uStream) 用于实现将两个输入流进行合并。 uStream:输入流,T表示输入流类型
PWindowBroadcastStream broadcast() 将数据流进行广播。
PWindowKeyStream keyBy(KeySelector selectorFunction) 对输入的record,按照KEY进行shuffle输出。 selectorFunction:用户自定义选取KEY的逻辑,T表示输入record类型,KEY表示定义的KEY类型。
PWindowStream reduce(ReduceFunction reduceFunction) 支持两种模式的reduce,对于批而言,其是基于当前一个window内的reduce聚合计算;而对于流而言,则是基于动态增量的全局reduce聚合计算(等同于flink的流式聚合计算)。GeaFlow默认是流聚合计算语义,如果需要批语义,用户可以通过参数开启。 reduceFunction:用户自定义reduce聚合逻辑,T表示输入record类型。
PWindowStream aggregate(AggregateFunction aggregateFunction) 支持两种模式的aggregate,对于批而言,其是基于当前一个window内的aggregate聚合计算;而对于流而言,则是基于动态增量的全局aggregate聚合计算(等同于flink的流式聚合计算)。GeaFlow默认是流聚合计算语义,如果需要批语义,用户可以通过参数开启。 aggregateFunction:用户自定义聚合计算逻辑,T表示输入类型,ACC为聚合值类型,OUT表示输出类型。
PIncStreamView materialize() 用于标识聚合计算是基于流还是批,默认无须调用该接口。

示例


public class StreamUnionPipeline implements Serializable {

    private static final Logger LOGGER = LoggerFactory.getLogger(StreamUnionPipeline.class);

    public static final String RESULT_FILE_PATH = "./target/tmp/data/result/union";
    public static final String REF_FILE_PATH = "data/reference/union";
    public static final String SPLIT = ",";

    public static void main(String[] args) {
        // 获取作业执行环境
        Environment environment = EnvironmentFactory.onLocalEnvironment();
        // 执行作业提交
        IPipelineResult<?> result = submit(environment);
        result.get();
        // 关闭执行环境
        environment.shutdown();
    }

    public static IPipelineResult<?> submit(Environment environment) {
        Pipeline pipeline = PipelineFactory.buildPipeline(environment);
        Configuration envConfig = ((EnvironmentContext) environment.getEnvironmentContext()).getConfig();
        envConfig.getConfigMap().put(FileSink.OUTPUT_DIR, RESULT_FILE_PATH);
        pipeline.submit(new PipelineTask() {
            @Override
            public void execute(IPipelineTaskContext pipelineTaskCxt) {
                Configuration conf = pipelineTaskCxt.getConfig();
                PWindowSource<String> streamSource =
                    pipelineTaskCxt.buildSource(new FileSource<String>("data/input"
                        + "/email_edge",
                        Collections::singletonList) {}, SizeTumblingWindow.of(5000));

                PWindowSource<String> streamSource2 =
                    pipelineTaskCxt.buildSource(new FileSource<String>("data/input"
                        + "/email_edge",
                        Collections::singletonList) {}, SizeTumblingWindow.of(5000));

                SinkFunction<String> sink = ExampleSinkFunctionFactory.getSinkFunction(conf);
                streamSource
                    // 先将streamSource和streamSource2进行union合并。
                    .union(streamSource2)
                    // 对每条消息进行按照SPLIT分隔符进行解析,并将每条数据向下游分发。
                    .flatMap(new FlatMapFunction<String, Long>() {
                        @Override
                        public void flatMap(String value, Collector collector) {
                            String[] records = value.split(SPLIT);
                            for (String record : records) {
                                collector.partition(Long.valueOf(record));
                            }
                        }
                    })
                    // 构建tuple。
                    .map(p -> Tuple.of(p, p))
                    // 按照tuple作为key进行shuffle。
                    .keyBy(p -> p)
                    // 进行动态流式增量计算。
                    .aggregate(new AggFunc())
                    // 指定agg并发。
                    .withParallelism(conf.getInteger(AGG_PARALLELISM))
                    .map(v -> String.format("%s", v))
                    .sink(sink)
                    .withParallelism(conf.getInteger(SINK_PARALLELISM));
            }
        });

        return pipeline.execute();
    }

    public static class AggFunc implements
        AggregateFunction<Tuple<Long, Long>, Tuple<Long, Long>, Tuple<Long, Long>> {

        // 初始化和创建Accumulator。
        @Override
        public Tuple<Long, Long> createAccumulator() {
            return Tuple.of(0L, 0L);
        }

        // 将value add到accumulator中。
        @Override
        public void add(Tuple<Long, Long> value, Tuple<Long, Long> accumulator) {
            accumulator.setF0(value.f0);
            accumulator.setF1(value.f1 + accumulator.f1);
        }

        // 从accumulator中获取tuple2结果。
        @Override
        public Tuple<Long, Long> getResult(Tuple<Long, Long> accumulator) {
            return Tuple.of(accumulator.f0, accumulator.f1);
        }

        @Override
        public Tuple<Long, Long> merge(Tuple<Long, Long> a, Tuple<Long, Long> b) {
            return null;
        }
    }
}