更新时间:2024-05-23 GMT+08:00

自定义图分析算法编程示例

自定义SSSP算法

# 导入必要的包
from hyg.analytics.graph import load_base_graph
from hyg.analytics.model import pregel_types, PregelModel

# 指定graph_name参数,加载图数据
graph = load_base_graph("movie")
# 通过外部id获取内部id
SOURCE_NODE = graph.nid(100)

# 基于Pregel模型实现自定义SSSP算法,并设置顶点值类型ntype为int,
# 消息类型mtype默认和ntype保持一致,combiner类型设置为min
@pregel_types(ntype=int, combiner=min)
class PregelSSSP(PregelModel):
    @staticmethod
    def compute(ctx, nid, msgs):
        if ctx.superstep == 0:
            ctx.set_value(nid, 10000)

        min_dist = 0 if nid == SOURCE_NODE else 10000

        if len(msgs) != 0:
            min_dist = min(min_dist, min(msgs))

        if min_dist < ctx.value(nid):
            ctx.set_value(nid, min_dist)
            for e in ctx.out_edges(nid):
                ctx.send(ctx.edge_dst(e), min_dist + 1)

        ctx.halt(nid)

# 运行自定义SSSP算法,并获取结果
result = graph.run_pregel(PregelSSSP)
print(type(result), result)

自定义PageRank算法

# 导入必要的包
from hyg.analytics.graph import load_base_graph
from hyg.analytics.model import pregel_types, PregelModel

# 指定graph_name参数,加载图数据
graph = load_base_graph("movie")

# 基于Pregel模型实现自定义PageRank算法,
# 设置顶点值类型ntype为float,消息类型mtype默认
# 和ntype保持一致,combiner类型设置为sum
@pregel_types(ntype=float, combiner=sum)
class PregelPageRank(PregelModel):
    @staticmethod
    def init(ctx, nid):
        ctx.set_value(nid, 1.0)

    @staticmethod
    def compute(ctx, nid, msgs):
        if ctx.superstep >= 1:
            new_value = 0.85 * sum(msgs) + 0.15 / ctx.num_nodes

            if (abs(new_value - ctx.value(nid)) < 0.001
                    or ctx.superstep == 1000):
                ctx.halt(nid)
                return

            ctx.set_value(nid, new_value)
            # 仅在debug模式debug_mode=True时,
            # 支持自定义算法中使用print调试语句
            print(f"in step {ctx.superstep}, node nid {nid}, "
                  f"value {ctx.value(nid)}")  
        out_edges = ctx.out_edges(nid)
        if len(out_edges) > 0:
            new_msg = ctx.value(nid) / len(out_edges)
            for e in out_edges:
                ctx.send(ctx.edge_dst(e), new_msg)

# debug运行自定义PageRank算法,并获取结果
result = graph.run_pregel(PregelPageRank, debug_mode=True)
print(type(result), result)

自定义khop算法

# 导入必要的包
from hyg.analytics.graph import load_base_graph
from hyg.analytics.model import pregel_types, PregelModel

# 指定graph_name参数,加载图数据
graph = load_base_graph("movie")

# 设置全局值用于在自定义算法中使用
INFINITY_VALUE = False
SOURCE_NODE = graph.nid("100")
MAX_HOP = 3

# 基于Pregel模型实现自定义khop算法,
# 设置顶点值类型ntype为bool,用来表示是否被遍历到,
# bool类型不支持combiner使用,即使设置了combiner也会被忽略掉
@pregel_types(ntype=bool)
class PregelKHop(PregelModel):
    @staticmethod
    def compute(ctx, nid, msgs) -> None:
        if ctx.superstep == 0:
            if nid == SOURCE_NODE:
                ctx.set_value(nid, True)
                if ctx.superstep < MAX_HOP:
                    for e in ctx.out_edges(nid):
                        ctx.send(ctx.edge_dst(e), True)
            else:
                ctx.set_value(nid, INFINITY_VALUE)
        else:
            if ctx.value(nid) == INFINITY_VALUE:
                ctx.set_value(nid, True)
                if ctx.superstep < MAX_HOP:
                    for e in ctx.out_edges(nid):
                        ctx.send(ctx.edge_dst(e), True)

        ctx.halt(nid)

# 运行自定义khop算法,并对结果通过result_filter参数进行过滤
result = graph.run_pregel(PregelKHop,
                          result_filter=
                          lambda ctx, nid: nid != SOURCE_NODE
                                           and ctx.value(nid))
print(len(result), result.keys())