Skip to content

Euler 2.0 Message Passing接口

origin edited this page Jun 30, 2020 · 4 revisions

本章的章节内容安排如下:

Euler2.0对GNN类的算法模型进行了Message Passing范式的抽象,将一个GNN类的算法模型定义为子图抽样模块,图卷积模块,以及可选的池化模块的组合。Message Passing范式能够提高高稀疏不规则数据的计算效率,同时更利于算法模型的创新或组合。

MessagePassing范式的计算流程

图神经网络的模型,可以看作消息传递的过程,每一个节点会发出它自己所含有的消息,也会接受其他节点传来的消息。然后将得到的所有信息做聚合,计算节点新的表示。

在Euler2.0中,为了提高分布式并行计算能力,Message Passing接口会对训练的全图做子图抽样,得到一个有汇聚方向的二部图。然后在这个二部图上进行消息传递过程,得到汇聚目标节点的新的表示。汇聚目标节点新的表示会经过可选的模块,完成一个GNN类算法的计算。Message Passing范式下的计算流程如下图所示。

image

Dataflow子图抽样

Message Passing通过Euler2.0提供的一系列OP来构造各种各样的子图抽样方式,来生成一组表示从源节点到目标节点信息汇聚的二部图。目前,Message Passing提供的子图生成方式包括:

DataFlow Paper Note
whole - 输入节点到输入节点邻接关系的二部图
Full GCN 输入节点到输入节点全部邻居邻接关系的二部图
sage GraphSage 输入节点到采样邻居邻接关系的二部图
fast FastGCN 输入节点到随机采样节点邻接关系的二部图(最后一层为全部邻居)
adapt AdaptiveGCN 输入节点到Layerwise采样邻居邻接关系的二部图(最后一层为全部邻居)
layerwise - 输入节点到Layerwise采样邻居邻接关系的二部图(第一层为采样邻居)
relation RGCN 输入节点到输入节点全部邻居邻接关系的二部图(可获取边信息)

Messsage Passing同样支持通过Euler2.0 OP自定义一个DataFlow。一个自定义的DataFlow可以通过实现继承UniqueDataFlow/NeighborDataFlow的类,并提供get_neighbor方法来完成。get_neighbor方法的输入为源节点,返回值为邻居节点以及与邻居节点对应的目标节点index。下面的代码为GCN Dataflow的示例。

import tensorflow as tf
import tf_euler

from tf_euler.python.dataflow.neighbor_dataflow import UniqueDataFlow, NeighborDataFlow

class GCNDataFlow(UniqueDataFlow):
    def __init__(self, metapath,
                 add_self_loops=True):
        super(GCNDataFlow, self).__init__(num_hops=len(metapath),
                                          add_self_loops=add_self_loops)
        self.metapath = metapath

    def get_neighbors(self, n_id):
        '''
        The neighbor sampler in a mini-batch of n_id.
        It returns: neighbors: a list of 'tensor';
                    neighbor_src: a list of 'tensor'
        Input:
          n_id: input nodes
        Output:
          neighbors: [[n_id's neighbor], [n_id's neighbor's neighbor], ...]
          neighbor_src: [[n_neighbor_src_idx], [n_neighbor_neighbor_src_idx], ...]
        '''
        neighbors = []
        neighbor_src = []
        for i in range(len(self.metapath)):
            n_id = tf.reshape(n_id, [-1])
            one_neighbor = tf_euler.get_full_neighbor(n_id,
                                                      self.metapath[i])[0]
            neighbors.append(tf.reshape(one_neighbor.values, [-1]))
            one_indices = one_neighbor.indices[:, 0]
            neighbor_src.append(tf.cast(one_indices, tf.int32))
            new_n_id = tf.reshape(one_neighbor.values, [-1])
            n_id = tf.concat([new_n_id, n_id], axis=0)
            n_id, _ = tf.unique(n_id)

        return neighbors, neighbor_src

Convolution图卷积

Message Passing中的图卷积模块定义了信息在二部图上的消息传递汇聚方式。convolution函数在Message Passing范式下的过程和公式如下所示: image

该公式由三部分组成:

  • 消息传递函数img:表示源节点与目标节点所含有消息在连接边的传递过程
  • 消息汇聚函数img:表示目标节点如何处理汇聚所有源节点传递过来的消息
  • 消息更新函数img:表示目标节点如何更新自己所含有的消息

可以通过Messsage Passing接口自定义一个Convolution函数。一个自定义的Convolution可以通过实现继承Conv的类,并完成表示消息汇聚函数的apply_edge,表示消息更新函数的apply_node,使用(或自定义)表示消息汇聚函数的scatter_op来组成。下面的代码为GCN Convlution的示例。

import tensorflow as tf

from tf_euler.python.convolution import conv
from tf_euler.python.euler_ops import mp_ops


class GCNConv(conv.Conv):

    def __init__(self, dim, **kwargs):
        super(GCNConv, self).__init__(aggr='add', **kwargs)
        self.fc = tf.layers.Dense(dim, use_bias=False)

    @staticmethod
    def norm(edge_index, size):
        edge_weight = tf.ones([tf.shape(edge_index)[1], 1])

        def deg_inv_sqrt(i):
            deg = mp_ops.scatter_add(edge_weight, edge_index[i], size[i])
            return deg ** -0.5

        return tuple(map(deg_inv_sqrt, [0, 1]))

    def __call__(self, x, edge_index, size=None, **kwargs):
        '''
        x和edge_index参数共同描述了一个二部子图
        x以二元组[src_node_emb, dst_node_emb],表示该二部图中src_node和dst_node集合的embedding
        edge_index以连续紧密编号的二元组[src_node_idx, dst_node_idx]表示该二部图中src_node和dst_node之间的链接关系
        a--c   x = [[a, b], [c, d]]
          \    edge_index = [[0, 0, 1],  ->表示存在3条边 a, a, b
        b--d                 [0, 1, 1]]                c, d, d
        下面的讲解均以该图为例
        '''
        # 计算子图度的normalize
        norm = self.norm(edge_index, size)
        '''
        gather_feature函数会将输入的list,分别按照edge_index做gather拓展
        x = [[a, b], [c, d]]
        edge_index = [[0, 0, 1], [0, 1, 1]]
        gather_x = [[a, a, b], [c, d, d]]
        '''
        gather_x, gather_norm, = self.gather_feature([x, norm], edge_index)
        '''
        消息传递apply_edge
        apply_edge表示对二部图中每一条边上信息的处理方式
        GCN中每一条边保留的信息为:dst_node_emb * src_norm * dst_norm
        得到每条边保留下来的message的embedding
        分别得到 c->a, d->a, d->b 分别传递的信息embedding
        '''
        out = self.apply_edge(gather_x[1], gather_norm[0], gather_norm[1])
        '''
        消息汇聚scatter: mean, add, max, softmax可选
        将多组边的信息向src node做汇聚,
        定义了a如何处理c 和 d节点分别发送过来的消息embedding,以及b如何处理d节点发送过来的embedding
        得到a和b节点汇聚之后的信息结果
        '''
        out = mp_ops.scatter_(self.aggr, out, edge_index[0], size=size[0])
        '''
        消息更新apply_node
        表示src节点如何处理自己原来的消息embedding和从邻居节点汇聚过来的消息embedding
        在GCN中仅将汇聚过来的消息过全链接作为src节点输出的embedding
        '''
        out = self.apply_node(out)
        return out

    def apply_edge(self, x_j, norm_i, norm_j):
        return norm_i * norm_j * x_j

    def apply_node(self, aggr_out):
        return self.fc(aggr_out)

自定义模型

使用Message Passing接口,在已有的convolution和dataflow模块下自定义一个算法模型,可以分为以下两步:

  • 实现GNN模型:通过继承BaseGNN的方式定义一个GNN模型。实现to_x函数,来表示convlolution过程h0层的embedding结果。
  • 定义模型损失及验证metric:可以通过继承Message Passing接口提供的SuperviseMode/UnsuperviseModel完成模型的定义。

Example:

import tensorflow as tf
import tf_euler

from tf_euler.python.mp_utils.base_gnn import BaseGNNNet
from tf_euler.python.mp_utils.base import SuperviseModel, UnsuperviseModel


class GNN(BaseGNNNet):

    def __init__(self, conv, flow,
                 dims, fanouts, metapath,
                 feature_idx, feature_dim, add_self_loops=True):
        super(GNN, self).__init__(conv=conv,
                                  flow=flow,
                                  dims=dims,
                                  fanouts=fanouts,
                                  metapath=metapath,
                                  add_self_loops=add_self_loops)
        self.feature_idx = feature_idx
        self.feature_dim = feature_dim

    def to_x(self, n_id):
        # ho层的embedding直接通过结点的dense feature决定
        x, = tf_euler.get_dense_feature(n_id,
                                        self.feature_idx,
                                        self.feature_dim)
        return x


class SupervisedGCN(SuperviseModel):
    def __init__(self, dims, metapath,
                 feature_idx, feature_dim,
                 label_idx, label_dim):
        super(SupervisedGCN, self).__init__(label_idx,
                                            label_dim)
        ''''
        定义GNN模型的dataflow,convolutin
        可选convolution:gcn, sage, gat, tag, agnn, sgcn, graphgcn, appnp,
                        arma, dna, gin, gated, relation
        可选dataflow:full, sage, adapt, layerwise, whole, relation
        '''
        self.gnn = GNN('gcn', 'full', dims, None, metapath,
                       feature_idx, feature_dim)
    # 定义root结点的表示方法
    def embed(self, n_id):
        return self.gnn(n_id)
Clone this wiki locally