更新时间:2024-05-23 GMT+08:00
自定义图分析算法编程示例
自定义SSSP算法
# 导入必要的包 from hyg.analytics.graph import load_base_graph from hyg.analytics.model import pregel_types, PregelModel # 指定graph_name参数,加载图数据 graph = load_base_graph("movie") # 通过外部id获取内部id SOURCE_NODE = graph.nid(100) # 基于Pregel模型实现自定义SSSP算法,并设置顶点值类型ntype为int, # 消息类型mtype默认和ntype保持一致,combiner类型设置为min @pregel_types(ntype=int, combiner=min) class PregelSSSP(PregelModel): @staticmethod def compute(ctx, nid, msgs): if ctx.superstep == 0: ctx.set_value(nid, 10000) min_dist = 0 if nid == SOURCE_NODE else 10000 if len(msgs) != 0: min_dist = min(min_dist, min(msgs)) if min_dist < ctx.value(nid): ctx.set_value(nid, min_dist) for e in ctx.out_edges(nid): ctx.send(ctx.edge_dst(e), min_dist + 1) ctx.halt(nid) # 运行自定义SSSP算法,并获取结果 result = graph.run_pregel(PregelSSSP) print(type(result), result)
自定义PageRank算法
# 导入必要的包 from hyg.analytics.graph import load_base_graph from hyg.analytics.model import pregel_types, PregelModel # 指定graph_name参数,加载图数据 graph = load_base_graph("movie") # 基于Pregel模型实现自定义PageRank算法, # 设置顶点值类型ntype为float,消息类型mtype默认 # 和ntype保持一致,combiner类型设置为sum @pregel_types(ntype=float, combiner=sum) class PregelPageRank(PregelModel): @staticmethod def init(ctx, nid): ctx.set_value(nid, 1.0) @staticmethod def compute(ctx, nid, msgs): if ctx.superstep >= 1: new_value = 0.85 * sum(msgs) + 0.15 / ctx.num_nodes if (abs(new_value - ctx.value(nid)) < 0.001 or ctx.superstep == 1000): ctx.halt(nid) return ctx.set_value(nid, new_value) # 仅在debug模式debug_mode=True时, # 支持自定义算法中使用print调试语句 print(f"in step {ctx.superstep}, node nid {nid}, " f"value {ctx.value(nid)}") out_edges = ctx.out_edges(nid) if len(out_edges) > 0: new_msg = ctx.value(nid) / len(out_edges) for e in out_edges: ctx.send(ctx.edge_dst(e), new_msg) # debug运行自定义PageRank算法,并获取结果 result = graph.run_pregel(PregelPageRank, debug_mode=True) print(type(result), result)
自定义khop算法
# 导入必要的包 from hyg.analytics.graph import load_base_graph from hyg.analytics.model import pregel_types, PregelModel # 指定graph_name参数,加载图数据 graph = load_base_graph("movie") # 设置全局值用于在自定义算法中使用 INFINITY_VALUE = False SOURCE_NODE = graph.nid("100") MAX_HOP = 3 # 基于Pregel模型实现自定义khop算法, # 设置顶点值类型ntype为bool,用来表示是否被遍历到, # bool类型不支持combiner使用,即使设置了combiner也会被忽略掉 @pregel_types(ntype=bool) class PregelKHop(PregelModel): @staticmethod def compute(ctx, nid, msgs) -> None: if ctx.superstep == 0: if nid == SOURCE_NODE: ctx.set_value(nid, True) if ctx.superstep < MAX_HOP: for e in ctx.out_edges(nid): ctx.send(ctx.edge_dst(e), True) else: ctx.set_value(nid, INFINITY_VALUE) else: if ctx.value(nid) == INFINITY_VALUE: ctx.set_value(nid, True) if ctx.superstep < MAX_HOP: for e in ctx.out_edges(nid): ctx.send(ctx.edge_dst(e), True) ctx.halt(nid) # 运行自定义khop算法,并对结果通过result_filter参数进行过滤 result = graph.run_pregel(PregelKHop, result_filter= lambda ctx, nid: nid != SOURCE_NODE and ctx.value(nid)) print(len(result), result.keys())
父主题: DSL语法说明