Compute API介绍

GeaFlow对外提供了实现图计算算法的接口,通过实现相应接口可进行静态图计算或动态图计算,用户可在compute算法中定义具体的计算逻辑及迭代最大次数。

动态图

接口

API 接口说明 入参说明
void init(IncGraphComputeContext incGraphContext) 图计算初始化接口 incGraphContext: 增量动态图计算的上下文,K表示vertex id的类型,VV表示vertex value类型,EV表示edge value类型,M表示发送消息的类型。
void evolve(K vertexId, TemporaryGraph temporaryGraph) 首轮迭代对增量图实现处理逻辑 vertexId:当前计算点的id,其中K表示vertex id的类型。
temporaryGraph:临时增量图,其中K表示vertexId的类型,VV表示vertex value类型,EV表示edge value类型。
void compute(K vertexId, Iterator messageIterator) 迭代计算接口 vertexId:当前计算点的id,其中K表示vertex id的类型。
void finish(K vertexId, MutableGraph mutableGraph) 迭代计算完成接口 vertexId:当前计算点的id,其中K表示vertex id的类型。
mutableGraph:可变图,其中K表示vertexId的类型,VV表示vertex value类型,EV表示edge value类型
  • 详细接口
public interface IncVertexCentricFunction<K, VV, EV, M> extends Function {

   void evolve(K vertexId, TemporaryGraph<K, VV, EV> temporaryGraph);

   void compute(K vertexId, Iterator<M> messageIterator);

   void finish(K vertexId, MutableGraph<K, VV, EV> mutableGraph);

   interface IncGraphContext<K, VV, EV, M> {
       /** 获取job id */
       long getJobId();

        /** 获取当前迭代 id */
        long getIterationId();

        /** 获取运行时上下文 */
        RuntimeContext getRuntimeContext();

        /** 获取可变图 */
        MutableGraph<K, VV, EV> getMutableGraph();

        /** 获取增量图 */
        TemporaryGraph<K, VV, EV> getTemporaryGraph();

        /** 获取图存储上的历史图 */
        HistoricalGraph<K, VV, EV> getHistoricalGraph();

        /** 给指定vertex发送消息 */
        void sendMessage(K vertexId, M message);

        /** 给当前vertex邻居节点发送消息 */
        void sendMessageToNeighbors(M message);

   }

   interface TemporaryGraph<K, VV, EV> {
   /** 从增量图中获取vertex */
   IVertex<K, VV> getVertex();

        /** 从增量图中获取edges */
        List<IEdge<K, EV>> getEdges();

        /** 更新vertex value */
        void updateVertexValue(VV value);

   }

   interface HistoricalGraph<K, VV, EV> {
   /** 获取图数据最新版本id */
   Long getLatestVersionId();

        /** 获取图数据所有版本 */
        List<Long> getAllVersionIds();

        /** 获取图数据所有vertex */
        Map<Long, IVertex<K, VV>> getAllVertex();

        /** 获取图数据指定版本的vertex */
        Map<Long, IVertex<K, VV>> getAllVertex(List<Long> versions);

        /** 获取图数据指定版本并满足过滤条件的vertex */
        Map<Long, IVertex<K, VV>> getAllVertex(List<Long> versions, IVertexFilter<K, VV> vertexFilter);

        /** 获取图数据指定版本的快照 */
        GraphSnapShot<K, VV, EV> getSnapShot(long version);

   }

   interface GraphSnapShot<K, VV, EV> {
   /** 获取当前版本id */
   long getVersion();
   /** 获取vertex */
   VertexQuery<K, VV> vertex();
   /** 获取edges */
   EdgeQuery<K, EV> edges();

   }

   interface MutableGraph<K, VV, EV> {
   /** 向图中添加vertex,并指定其版本id */
   void addVertex(long version, IVertex<K, VV> vertex);
   /** 向图中添加edge,并指定其版本id */
   void addEdge(long version, IEdge<K, EV> edge);

   }


}

public interface IncVertexCentricComputeFunction<K, VV, EV, M> extends
        IncVertexCentricFunction<K, VV, EV, M> {

    void init(IncGraphComputeContext<K, VV, EV, M> incGraphContext);

    interface IncGraphComputeContext<K, VV, EV, M> extends IncGraphContext<K, VV, EV, M> {
        void collect(IVertex<K, VV> vertex);
    }

}

示例

public class IncrGraphCompute {

   private static final Logger LOGGER = LoggerFactory.getLogger(IncrGraphCompute.class);

   public static void main(String[] args) {
      Environment environment = EnvironmentFactory.onLocalEnvironment();
      IPipelineResult result = submit(environment);
      result.get();
      environment.shutdown();
   }

   public static IPipelineResult<?> submit(Environment environment) {
      final Pipeline pipeline = PipelineFactory.buildPipeline(environment);
      final String graphName = "graph_view_name";
      GraphViewDesc graphViewDesc = GraphViewBuilder.createGraphView(graphName)
              .withShardNum(4)
              .withBackend(BackendType.RocksDB)
              .withSchema(new GraphMetaType(IntegerType.INSTANCE, ValueVertex.class, Integer.class, ValueEdge.class, IntegerType.class))
              .build();
      pipeline.withView(graphName, graphViewDesc);
      pipeline.submit(new PipelineTask() {
         @Override
         public void execute(IPipelineTaskContext pipelineTaskCxt) {
            Configuration conf = pipelineTaskCxt.getConfig();
            PWindowSource<IVertex<Integer, Integer>> vertices =
                    // extract vertex from edge file
                    pipelineTaskCxt.buildSource(new RecoverableFileSource<>("data/input/email_edge",
                            line -> {
                               String[] fields = line.split(",");
                               IVertex<Integer, Integer> vertex1 = new ValueVertex<>(
                                       Integer.valueOf(fields[0]), 1);
                               IVertex<Integer, Integer> vertex2 = new ValueVertex<>(
                                       Integer.valueOf(fields[1]), 1);
                               return Arrays.asList(vertex1, vertex2);
                            }), SizeTumblingWindow.of(10000));

            PWindowSource<IEdge<Integer, Integer>> edges =
                    pipelineTaskCxt.buildSource( new RecoverableFileSource<>("data/input/email_edge",
                            line -> {
                               String[] fields = line.split(",");
                               IEdge<Integer, Integer> edge = new ValueEdge<>(Integer.valueOf(fields[0]),
                                       Integer.valueOf(fields[1]), 1);
                               return Collections.singletonList(edge);
                            }), SizeTumblingWindow.of(5000));

            PGraphView<Integer, Integer, Integer> fundGraphView = pipelineTaskCxt.getGraphView(graphName);

            PIncGraphView<Integer, Integer, Integer> incGraphView = fundGraphView.appendGraph(vertices, edges);
            incGraphView.incrementalCompute(new IncGraphAlgorithms(3))
                    .getVertices()
                    .map(v -> String.format("%s,%s", v.getId(), v.getValue()))
                    .sink(v -> {
                       LOGGER.info("result: {}", v);
                    });
         }
      });
      return pipeline.execute();
   }


   public static class IncGraphAlgorithms extends IncVertexCentricCompute<Integer, Integer, Integer, Integer> {
      public IncGraphAlgorithms(long iterations) {
         super(iterations);
      }

      @Override
      public IncVertexCentricComputeFunction<Integer, Integer, Integer, Integer> getIncComputeFunction() {
         return new IncVertexCentricComputeFunction<Integer, Integer, Integer, Integer>() {
            private IncGraphComputeContext<Integer, Integer, Integer, Integer> graphContext;

            @Override
            public void init(IncGraphComputeContext<Integer, Integer, Integer, Integer> graphContext) {
               this.graphContext = graphContext;
            }
            @Override
            public void evolve(Integer vertexId,
                               TemporaryGraph<Integer, Integer, Integer> temporaryGraph) {
               IVertex<Integer, Integer> vertex = temporaryGraph.getVertex();
               if (vertex != null) {
                  if (temporaryGraph.getEdges() != null) {
                     for (IEdge<Integer, Integer> edge : temporaryGraph.getEdges()) {
                        graphContext.sendMessage(edge.getTargetId(), vertexId);
                     }
                  }
               }
            }

            @Override
            public void compute(Integer vertexId, Iterator<Integer> messageIterator) {
               int max = 0;
               while (messageIterator.hasNext()) {
                  int value = messageIterator.next();
                  max = Math.max(max, value);
               }
               graphContext.getTemporaryGraph().updateVertexValue(max);
            }

            @Override
            public void finish(Integer vertexId, MutableGraph<Integer, Integer, Integer> mutableGraph) {
               IVertex<Integer, Integer> vertex = graphContext.getTemporaryGraph().getVertex();
               graphContext.collect(vertex);
            }
         };
      }
      @Override
      public VertexCentricCombineFunction<Integer> getCombineFunction() {
         return null;
      }

   }
}

静态图

接口

API 接口说明 入参说明
void init(VertexCentricComputeFuncContext vertexCentricFuncContext) 迭代计算初始化接口 vertexCentricFuncContext:静态图计算的上下文,K表示vertex id的类型,VV表示vertex value类型,EV表示edge value类型,M表示发送消息的类型。
void compute(K vertexId, Iterator messageIterator) 迭代计算接口 vertexId:当前计算点的id,其中K表示vertex id的类型。
messageIterator:迭代过程中所有发送给当前vertex的消息,其中M表示迭代计算过程中定义的发送消息类型。
void finish() 迭代计算完成接口
  • 详细接口
public interface VertexCentricComputeFunction<K, VV, EV, M> extends VertexCentricFunction<K, VV,
EV, M> {

    void init(VertexCentricComputeFuncContext<K, VV, EV, M> vertexCentricFuncContext);

    void compute(K vertex, Iterator<M> messageIterator);

    void finish();

    interface VertexCentricComputeFuncContext<K, VV, EV, M> extends VertexCentricFuncContext<K, VV,
        EV, M> {
        /** 设置vertex value */
        void setNewVertexValue(VV value);

    }

}

示例

public class StaticsGraphCompute {

    public static void main(String[] args) {
        Environment environment = EnvironmentFactory.onLocalEnvironment();
        IPipelineResult result = submit(environment);
        result.get();
        environment.shutdown();
    }

    public static IPipelineResult<?> submit(Environment environment) {
        Pipeline pipeline = PipelineFactory.buildPipeline(environment);

        pipeline.submit((PipelineTask) pipelineTaskCxt -> {
            PWindowSource<IVertex<Integer, Integer>> prVertices =
                pipelineTaskCxt.buildSource(new FileSource<>("data/input/email_vertex",
                    line -> {
                        String[] fields = line.split(",");
                        IVertex<Integer, Integer> vertex = new ValueVertex<>(
                            Integer.valueOf(fields[0]), Integer.valueOf(fields[1]));
                        return Collections.singletonList(vertex);
                    }), AllWindow.getInstance())
                    .withParallelism(2);

            PWindowSource<IEdge<Integer, Integer>> prEdges = pipelineTaskCxt.buildSource(new FileSource<>(
                "data/input/email_edge", line -> {
                String[] fields = line.split(",");
                IEdge<Integer, Integer> edge = new ValueEdge<>(Integer.valueOf(fields[0]), Integer.valueOf(fields[1]), 1);
                return Collections.singletonList(edge);
            }), AllWindow.getInstance()).withParallelism(2);

            GraphViewDesc graphViewDesc = GraphViewBuilder
                .createGraphView(GraphViewBuilder.DEFAULT_GRAPH)
                .withShardNum(2)
                .withBackend(BackendType.Memory)
                .build();

            PGraphWindow<Integer, Integer, Integer> graphWindow =
                pipelineTaskCxt.buildWindowStreamGraph(prVertices, prEdges, graphViewDesc);
            graphWindow.compute(new SSSPAlgorithm(1, 10))
                .compute(2)
                .getVertices()
                .sink(v -> {});
        });
        return pipeline.execute();
    }

    public static class SSSPAlgorithm extends VertexCentricCompute<Integer, Integer, Integer, Integer> {

        private final int srcId;
        public SSSPAlgorithm(int srcId, long iterations) {
            super(iterations);
            this.srcId = srcId;
        }

        @Override
        public VertexCentricComputeFunction<Integer, Integer, Integer, Integer> getComputeFunction() {
            return new VertexCentricComputeFunction<Integer, Integer, Integer, Integer>() {

                private VertexCentricComputeFuncContext<Integer, Integer, Integer, Integer> context;
                @Override
                public void init(VertexCentricComputeFuncContext<Integer, Integer, Integer, Integer> vertexCentricFuncContext) {
                    this.context = vertexCentricFuncContext;
                }

                @Override
                public void compute(Integer vertex, Iterator<Integer> messageIterator) {
                    int minDistance = vertex == srcId ? 0 : Integer.MAX_VALUE;
                    if (messageIterator != null) {
                        while (messageIterator.hasNext()) {
                            Integer value = messageIterator.next();
                            minDistance = Math.min(minDistance, value);
                        }
                    }
                    IVertex<Integer, Integer> iVertex = this.context.vertex().get();
                    if (minDistance < iVertex.getValue()) {
                        this.context.setNewVertexValue(minDistance);
                        for (IEdge<Integer, Integer> edge : this.context.edges().getOutEdges()) {
                            this.context.sendMessage(edge.getTargetId(), minDistance + edge.getValue());
                        }
                    }
                }
                @Override
                public void finish() {

                }
            };
        }
        @Override
        public VertexCentricCombineFunction<Integer> getCombineFunction() {
            return null;
        }
    }
}