更新时间:2024-05-23 GMT+08:00
自定义图分析算法编程示例
自定义SSSP算法
# 导入必要的包
from hyg.analytics.graph import load_base_graph
from hyg.analytics.model import pregel_types, PregelModel
# 指定graph_name参数,加载图数据
graph = load_base_graph("movie")
# 通过外部id获取内部id
SOURCE_NODE = graph.nid(100)
# 基于Pregel模型实现自定义SSSP算法,并设置顶点值类型ntype为int,
# 消息类型mtype默认和ntype保持一致,combiner类型设置为min
@pregel_types(ntype=int, combiner=min)
class PregelSSSP(PregelModel):
@staticmethod
def compute(ctx, nid, msgs):
if ctx.superstep == 0:
ctx.set_value(nid, 10000)
min_dist = 0 if nid == SOURCE_NODE else 10000
if len(msgs) != 0:
min_dist = min(min_dist, min(msgs))
if min_dist < ctx.value(nid):
ctx.set_value(nid, min_dist)
for e in ctx.out_edges(nid):
ctx.send(ctx.edge_dst(e), min_dist + 1)
ctx.halt(nid)
# 运行自定义SSSP算法,并获取结果
result = graph.run_pregel(PregelSSSP)
print(type(result), result)
自定义PageRank算法
# 导入必要的包
from hyg.analytics.graph import load_base_graph
from hyg.analytics.model import pregel_types, PregelModel
# 指定graph_name参数,加载图数据
graph = load_base_graph("movie")
# 基于Pregel模型实现自定义PageRank算法,
# 设置顶点值类型ntype为float,消息类型mtype默认
# 和ntype保持一致,combiner类型设置为sum
@pregel_types(ntype=float, combiner=sum)
class PregelPageRank(PregelModel):
@staticmethod
def init(ctx, nid):
ctx.set_value(nid, 1.0)
@staticmethod
def compute(ctx, nid, msgs):
if ctx.superstep >= 1:
new_value = 0.85 * sum(msgs) + 0.15 / ctx.num_nodes
if (abs(new_value - ctx.value(nid)) < 0.001
or ctx.superstep == 1000):
ctx.halt(nid)
return
ctx.set_value(nid, new_value)
# 仅在debug模式debug_mode=True时,
# 支持自定义算法中使用print调试语句
print(f"in step {ctx.superstep}, node nid {nid}, "
f"value {ctx.value(nid)}")
out_edges = ctx.out_edges(nid)
if len(out_edges) > 0:
new_msg = ctx.value(nid) / len(out_edges)
for e in out_edges:
ctx.send(ctx.edge_dst(e), new_msg)
# debug运行自定义PageRank算法,并获取结果
result = graph.run_pregel(PregelPageRank, debug_mode=True)
print(type(result), result)
自定义khop算法
# 导入必要的包
from hyg.analytics.graph import load_base_graph
from hyg.analytics.model import pregel_types, PregelModel
# 指定graph_name参数,加载图数据
graph = load_base_graph("movie")
# 设置全局值用于在自定义算法中使用
INFINITY_VALUE = False
SOURCE_NODE = graph.nid("100")
MAX_HOP = 3
# 基于Pregel模型实现自定义khop算法,
# 设置顶点值类型ntype为bool,用来表示是否被遍历到,
# bool类型不支持combiner使用,即使设置了combiner也会被忽略掉
@pregel_types(ntype=bool)
class PregelKHop(PregelModel):
@staticmethod
def compute(ctx, nid, msgs) -> None:
if ctx.superstep == 0:
if nid == SOURCE_NODE:
ctx.set_value(nid, True)
if ctx.superstep < MAX_HOP:
for e in ctx.out_edges(nid):
ctx.send(ctx.edge_dst(e), True)
else:
ctx.set_value(nid, INFINITY_VALUE)
else:
if ctx.value(nid) == INFINITY_VALUE:
ctx.set_value(nid, True)
if ctx.superstep < MAX_HOP:
for e in ctx.out_edges(nid):
ctx.send(ctx.edge_dst(e), True)
ctx.halt(nid)
# 运行自定义khop算法,并对结果通过result_filter参数进行过滤
result = graph.run_pregel(PregelKHop,
result_filter=
lambda ctx, nid: nid != SOURCE_NODE
and ctx.value(nid))
print(len(result), result.keys())
父主题: DSL语法说明