Help Center/ Graph Engine Service/ API Reference/ Service Plane APIs/ Database Edition/ HyG Algorithm APIs/ DSL Syntax/ Programming Example of Creating Custom Graph Analysis Algorithms
Updated on 2024-05-20 GMT+08:00

Programming Example of Creating Custom Graph Analysis Algorithms

Creating a Custom SSSP Algorithm

# Import necessary packages.
from hyg.analytics.graph import load_base_graph
from hyg.analytics.model import pregel_types, PregelModel

# Set graph_name to load graph data.
graph = load_base_graph("movie")
# Obtain the internal ID based on the external ID.
SOURCE_NODE = graph.nid(100)

# Implement the custom SSSP algorithm based on the Pregel model and set ntype to int.
# By default, set mtype to the same value as ntype, and set combiner to min.
@pregel_types(ntype=int, combiner=min)
class PregelSSSP(PregelModel):
    @staticmethod
    def compute(ctx, nid, msgs):
        if ctx.superstep == 0:
            ctx.set_value(nid, 10000)

        min_dist = 0 if nid == SOURCE_NODE else 10000

        if len(msgs) != 0:
            min_dist = min(min_dist, min(msgs))

        if min_dist < ctx.value(nid):
            ctx.set_value(nid, min_dist)
            for e in ctx.out_edges(nid):
                ctx.send(ctx.edge_dst(e), min_dist + 1)

        ctx.halt(nid)

# Run the custom SSSP algorithm and obtain the result.
result = graph.run_pregel(PregelSSSP)
print(type(result), result)

Creating a Custom PageRank Algorithm

# Import necessary packages.
from hyg.analytics.graph import load_base_graph
from hyg.analytics.model import pregel_types, PregelModel

# Set graph_name to load graph data.
graph = load_base_graph("movie")

# Implement the custom PageRank algorithm based on the Pregel model.
# Set ntype to float and retain the default value for mtype.
# Set combiner to sum.
@pregel_types(ntype=float, combiner=sum)
class PregelPageRank(PregelModel):
    @staticmethod
    def init(ctx, nid):
        ctx.set_value(nid, 1.0)

    @staticmethod
    def compute(ctx, nid, msgs):
        if ctx.superstep >= 1:
            new_value = 0.85 * sum(msgs) + 0.15 / ctx.num_nodes

            if (abs(new_value - ctx.value(nid)) < 0.001
                    or ctx.superstep == 1000):
                ctx.halt(nid)
                return

            ctx.set_value(nid, new_value)
            # Only when debug_mode is set to True,
            # You can use print debugging statements in the custom algorithm.
            print(f"in step {ctx.superstep}, node nid {nid}, "
                  f"value {ctx.value(nid)}")  
        out_edges = ctx.out_edges(nid)
        if len(out_edges) > 0:
            new_msg = ctx.value(nid) / len(out_edges)
            for e in out_edges:
                ctx.send(ctx.edge_dst(e), new_msg)

# Run the custom PageRank algorithm in debug mode and obtain the result.
result = graph.run_pregel(PregelPageRank, debug_mode=True)
print(type(result), result)

Creating a Custom K-Hop Algorithm

# Import necessary packages.
from hyg.analytics.graph import load_base_graph
from hyg.analytics.model import pregel_types, PregelModel

# Set graph_name to load graph data.
graph = load_base_graph("movie")

# Set global values for the custom algorithm.
INFINITY_VALUE = False
SOURCE_NODE = graph.nid("100")
MAX_HOP = 3

# Implement the custom K-Hop algorithm based on the Pregel model.
# Set ntype to bool, indicating whether the vertex value is traversed.
# The parameter combiner cannot be set to bool. Even if it is set, the value will be ignored.
@pregel_types(ntype=bool)
class PregelKHop(PregelModel):
    @staticmethod
    def compute(ctx, nid, msgs) -> None:
        if ctx.superstep == 0:
            if nid == SOURCE_NODE:
                ctx.set_value(nid, True)
                if ctx.superstep < MAX_HOP:
                    for e in ctx.out_edges(nid):
                        ctx.send(ctx.edge_dst(e), True)
            else:
                ctx.set_value(nid, INFINITY_VALUE)
        else:
            if ctx.value(nid) == INFINITY_VALUE:
                ctx.set_value(nid, True)
                if ctx.superstep < MAX_HOP:
                    for e in ctx.out_edges(nid):
                        ctx.send(ctx.edge_dst(e), True)

        ctx.halt(nid)

# Run the custom K-Hop algorithm and filter the results using the result_filter parameter.
result = graph.run_pregel(PregelKHop,
                          result_filter=
                          lambda ctx, nid: nid != SOURCE_NODE
                                           and ctx.value(nid))
print(len(result), result.keys())