Stream Graph
基本概念
Streaming Graph指的是流式、动态、变化的图数据,同时在GeaFlow内部Streaming Graph也指对流式图的计算模式,它针对流式变化的图,基于图的变化进行图遍历、图匹配和图计算等操作。
基于GeaFlow框架,可以方便的针对流式变化的图动态计算。在GeaFlow中,我们抽象了Dynamic Graph和Static Graph两个核心的概念。 * Dynamic Graph 是指流式变化的图,它是图在时间轴上不断变化的切片所组成,可以方便的研究图随着时间推移的演化过程。 * Static Graph 是图在某个时间点的 Snapshot,相当于 Dynamic Graph 的一个时间切片。
功能描述
Streaming Graph 主要有以下几个功能:
- 支持流式地处理点、边数据,支持在最新的图做查询。
- 支持持续不断的更新和查询图结构,支持图结构变化带来的增量数据处理。
- 支持回溯历史,基于历史快照做查询。
- 支持图计算的计算逻辑顺序,例如基于边的时间序做计算。
示例介绍
读取点、边两个无限数据流增量构图,对于每次增量数据构图完成,会触发 traversal 计算,查找 'Bob' 的2度内的朋友随着时间推移的演进过程。
DSL 代码
set geaflow.dsl.window.size = 1;
CREATE TABLE table_knows (
personId int,
friendId int,
weight int
) WITH (
type='file',
geaflow.dsl.file.path = 'resource:///data/table_knows.txt'
);
INSERT INTO social_network.knows
SELECT personId, friendId, weight
FROM table_knows;
CREATE TABLE result (
personName varchar,
friendName varchar,
weight int
) WITH (
type='console'
);
-- Graph View Name Defined in Graph View Concept --
USE GRAPH social_network;
-- find person id 3's known persons triggered every window.
INSERT INTO result
SELECT
name,
known_name,
weight
FROM (
MATCH (a:person where a.name = 'Bob') -[e:knows]->{1, 2}(b)
RETURN a.name as name, b.name as known_name, e.weight as weight
)
HLA 代码
//build graph view.
final String graphName = "social_network";
GraphViewDesc graphViewDesc = GraphViewBuilder.createGraphView(graphName).build();
pipeline.withView(graphName, graphViewDesc);
// submit pipeLine task.
pipeline.submit(new PipelineTask() {
@Override
public void execute(IPipelineTaskContext pipelineTaskCxt) {
// build vertices streaming source.
PStreamSource<IVertex<Integer, String>> persons =
pipelineTaskCxt.buildSource(
new CollectionSource.(getVertices()), SizeTumblingWindow.of(5000));
// build edges streaming source.
PStreamSource<IEdge<Integer, Integer>> knows =
pipelineTaskCxt.buildSource(
new CollectionSource<>(getEdges()), SizeTumblingWindow.of(5000));
// build graphview by graph name.
PGraphView<Integer, String, Integer> socialNetwork =
pipelineTaskCxt.buildGraphView(graphName);
// incremental build graph view.
PIncGraphView<Integer, String, Integer> incSocialNetwor =
socialNetwork.appendGraph(vertices, edges);
// traversal by 'Bob'.
incGraphView.incrementalTraversal(new IncGraphTraversalAlgorithms(2))
.start('Bob')
.map(res -> String.format("%s,%s", res.getResponseId(), res.getResponse()))
.sink(new ConsoleSink<>());
}
});