Skip to content

Euler 2.0 在有属性图上的应用

origin edited this page Jun 29, 2020 · 5 revisions

本章的章节安排如下:

在本章,我们将介绍euler2.0是如何在有属性图上应用。

有属性图一般指的是,在一张图当中除了有节点和节点之间的连边以外,每一个节点还有对应的属性特征。现实世界中,有很多图都是有属性的图,比如在论文与论文之间的应用关系网络中,每个节点(即论文)都带有表示论文内容的属性特征。如何刻画网络的拓扑结构以及节点的属性特征是图神经网络研究的一个重要方向。

Euler2.0 提供了相应的api支持在有属性的图上构建相应的表示学习算法,并且也实现一些对应有属性神经网络表示学习算法,比如GCN,GraphSAGE,GAT等等,详细见这里

本章以图神经网络模型GraphSAGE,有属性图Cora为例,介绍如何利用Euler2.0构建图神经网络来解决有属性图上的表示学习,完整的代码见这里

Note:

在章会对相应的一些句子加粗或者在代码做注释来显式地区别与其他场景的应用的一些不同处理。与其他场景的联系和区别的详细对比见这里

数据准备

Euler2.0 有属性图数据生成

这里我们以有属性图Cora为例,介绍如何生成Euler2.0所能加载的有属性图数据。关于Euler2.0的图数据建模与生成的详细介绍见这里

整体上分为两步:

1.构建图数据的JSON文件和Meta文件, 其中在JSON文件中可以创建节点的属性特征

如下所示,为生成Cora数据对应的JSON文件,其中Meta文件为可选文件,这里不提供Meta文件。完整的代码见这里

通过load_data(convert_dir, 'cora'),加载原始的图数据,在通过convert2json()函数生成相应的JSON文件,其中节点的属性主要为dense属性。

def convert2json(self, convert_dir, out_dir):
    def add_node(id, type, weight, label, feature):
        node_buf = {}
        node_buf["id"] = id
        node_buf["type"] = type
        node_buf["weight"] = weight
        node_buf["features"] = [{}, {}]
        node_buf['features'][0]['name'] = 'label'
        node_buf['features'][0]['type'] = 'dense'
        node_buf['features'][0]['value'] = label.astype(
                                           float).tolist()
        node_buf['features'][1]['name'] = 'feature'
        node_buf['features'][1]['type'] = 'dense'
        feature = feature.astype(float)
        feature /= np.sum(feature) + 1e-7
        node_buf['features'][1]['value'] = feature.tolist()
        return node_buf

    def add_edge(src, dst, type, weight):
        edge_buf = {}
        edge_buf["src"] = src
        edge_buf["dst"] = dst
        edge_buf["type"] = type
        edge_buf["weight"] = weight
        edge_buf["features"] = []
        return edge_buf

    node_ids, node_type, node_label, node_feature, edge_src, edge_dst, edge_type = \
        parse_graph_file(convert_dir, self.num_classes, 'cora', self.feature_dim + 2, self.test_start_num)


    with open(out_dir, 'w') as out, open(self.id_file, 'w') as out_test:
        buf = {}
        buf["nodes"] = []
        buf["edges"] = []
        for one_node, one_type, one_label, one_feature in zip(node_ids, node_type, node_label, node_feature):
            buf["nodes"].append(add_node(one_node, one_type, 1, one_label, one_feature))
            if one_type == "test":
                out_test.write(str(one_node) + "\n")
        for one_src, one_dst, one_type in zip(edge_src, edge_dst, edge_type):
            buf["edges"].append(add_edge(one_src, one_dst, one_type, 1))
        out.write(json.dumps(buf))

2.利用Euler 2.0的图数据转化工具将步骤1中的JSON文件和Meta文件转化成Euler2.0可以加载的二进制文件

这里利用Euler2.0 python工具(详细介绍见这里)将JSON和Meta文件转化成对应的二进制文件。

def convert2euler(self, convert_dir, out_dir):
    dir_name = os.path.dirname(os.path.realpath(__file__))
    convert_meta = self.meta_file
    g = EulerGenerator(convert_dir,
                       convert_meta,
                       out_dir,
                       self.partition_num)
    g.do()

Euler2.0 有属性图数据加载

有属性图数据生成之后,用户需要在训练的时候加载对应的数据,并在每个Batch获取具体的数据,方式如下:

import tf_euler
#加载图数据
euler_graph = tf_euler.dataset.get_dataset('cora')
euler_graph.load_graph()



#通过tf_euler.sample_node采样训练的节点,生成Batch data,
def get_train_from_input(self, inputs, params):
    result = tf_euler.sample_node(inputs, params['train_node_type'])
    result.set_shape([self.params['batch_size']])
    return result

模型实现

Euler-2.0将GNN类算法抽象成Message Passing接口范式,这里将介绍如何通过Message Passing接口范式构建编写一个有属性图上应用的GNN模型,这里以GraphSAGE为例。

整体上分为两步:

1.实现GNN Encoder

2.实现GNN Model

实现GNN Encoder

在构建GNN算法的时候,用户首先需要实现GNN Encoder,可以通过继承BaseGNNNet基类实现。GNN Encoder的作用是定义一个多层图神经网络的节点特征表达向量的多层图卷积过程。

BaseGNNNet

Euler-2.0提供了BaseGNNNet基类(详见这里)。该类已经封装好了一个多层图神经网络的节点特征表达向量的多层图卷积过程。其中图卷积在Message Passing接口范式下,会被抽象为一个子图抽样方法(flow)和一个卷积汇聚(conv)方法。

class BaseGNNNet(object):

    def __init__(self, conv, flow, dims,
                 fanouts, metapath,
                 add_self_loops=True,
                 max_id=-1,
                 **kwargs):
        conv_class = utils.get_conv_class(conv)
        flow_class = utils.get_flow_class(flow)
        if flow_class == 'whole':
            self.whole_graph = True
        else:
            self.whole_graph = False
        self.convs = []
        for dim in dims[:-1]:
            self.convs.append(self.get_conv(conv_class, dim))
        self.fc = tf.layers.Dense(dims[-1])
        self.sampler = flow_class(fanouts, metapath, add_self_loops, max_id=max_id)

    def get_conv(self, conv_class, dim):
        return conv_class(dim)

    def to_x(self, n_id):
        raise NotImplementedError

    def to_edge(self, n_id_src, n_id_dst, e_id):
        return e_id

    def get_edge_attr(self, block):
        n_id_dst = tf.cast(tf.expand_dims(block.n_id, -1),
                           dtype=tf.float32)
        n_id_src= mp_ops.gather(n_id_dst, block.res_n_id)
        n_id_src = mp_ops.gather(n_id_src,
                                 block.edge_index[0])
        n_id_dst = mp_ops.gather(n_id_dst,
                                 block.edge_index[1])
        n_id_src = tf.cast(tf.squeeze(n_id_src, -1), dtype=tf.int64)
        n_id_dst = tf.cast(tf.squeeze(n_id_dst, -1), dtype=tf.int64)
        edge_attr = self.to_edge(n_id_src, n_id_dst, block.e_id)
        return edge_attr



    def calculate_conv(self, conv, inputs, edge_index,
                       size=None, edge_attr=None):
        return conv(inputs, edge_index, size=size, edge_attr=edge_attr)

    def __call__(self, n_id):
        data_flow = self.sampler(n_id)
        num_layers = len(self.convs)
        x = self.to_x(data_flow[0].n_id)
        for i, conv, block in zip(range(num_layers), self.convs, data_flow):
            if block.e_id is None:
                edge_attr = None
            else:
                edge_attr = self.get_edge_attr(block)
            x_src = mp_ops.gather(x, block.res_n_id)
            x_dst = None if self.whole_graph else x
            x = self.calculate_conv(conv,
                                    (x_src, x_dst),
                                    block.edge_index,
                                    size=block.size,
                                    edge_attr=edge_attr)
            x = tf.nn.relu(x)
        x = self.fc(x)
        return x

继承BaseGNNNet,实现GNN Encoder

在实现自己的GraphEncoder的时候,用户需要继承这个BaseGNNNet类,并实现to_x函数来表示每一个节点H0层embedding的构建过程,这里H0层node的embedding为输入feature参数中的dense feature。

class GNN(BaseGNNNet):

    def __init__(self, conv, flow,
                 dims, fanouts, metapath,
                 feature_idx, feature_dim,
                 add_self_loops=False,
                 max_id=-1, **kwargs):
        super(GNN, self).__init__(conv=conv,
                                  flow=flow,
                                  dims=dims,
                                  fanouts=fanouts,
                                  metapath=metapath,
                                  add_self_loops=add_self_loops,
                                  max_id=max_id,
                                  **kwargs)
        if not isinstance(feature_idx, list):
            feature_idx = [feature_idx]
        if not isinstance(feature_dim, list):
            feature_dim = [feature_dim]
        self.feature_idx = feature_idx
        self.feature_dim = feature_dim

    def to_x(self, n_id):
        # ho层的embedding直接通过结点的dense feature决定
        x, = tf_euler.get_dense_feature(n_id,
                                        self.feature_idx,
                                        self.feature_dim)
        return x

参数:

  • conv:使用的卷积方法名称,参考message passing接口中的convolution
  • flow:使用的子图抽样方法名称,参考message passing接口中的dataflow
  • dims:一个列表,元素个数为[卷积层数+1],表示图卷积中每一个convolution的输出embedding维度和最后一个全连接层输出embedding的维度
  • fanouts:一个列表,对graphsage类算法有效,元素个数为[卷积层数],表示每层子图采样中邻居采样的个数
  • metapath:一个列表,元素个数为[卷积层数],表示每层子图采样的采样边类型
  • feature_idx:一个列表,表示H0层使用的dense feature名字集合
  • feature_dim:一个列表,表示H0层使用的dense feature的维度,和feature_idx一一对应
  • add_self_loops:表示是否在子图采样的过程中添加自环

该示例中to_x函数定义了,H0层node的embedding为输入feature参数中的dense feature

实现GNN模型

通过实现Graph Encoder,用户便可以得到每个节点图卷积后的embed向量。

为了实现GNN(GraphSAGE)模型,用户需要定义Graph Encoder所用的图抽样方法(dataflow)和卷积汇聚方法(convolution)以及模型的损失函数。

对于图抽样方法(dataflow)和卷积汇聚方法(convolution)而言,Euler2.0提供了:

  • 可选convolution(详见这里):gcn, sage, gat, tag, agnn, sgcn, graphgcn, appnp, arma, dna, gin, gated, relation
  • 可选dataflow(详见这里):full, sage, adapt, layerwise, whole, relation

对于模型的损失函数而言,Euler2.0针对GNN的无监督和有监督模型分别封装了SuperviseModel(详见这里)和UnsuperviseModel(详见这里)。

用户可以直接继承SuperviseModel或者UnsuperviseModel,并定义相应的dataflow和convolution来实现GNN模型,同时实现embed()方式,来定义具体获取节点embedding的方式。

对于GraphSAGE而言,其采用子图采样方式为‘sage’(具体实现详见这里),卷积方式为‘sage’(具体实现详见这里),损失为监督损失函数,实现如下

class SupervisedGraphSage(SuperviseModel):

    def __init__(self, dims, fanouts, metapath,
                 feature_idx, feature_dim,
                 label_idx, label_dim, max_id=-1):
        super(SupervisedGraphSage, self).__init__(label_idx,
                                                  label_dim)
        self.gnn = GNN('sage', 'sage', dims, fanouts, metapath,
                       feature_idx, feature_dim, max_id=max_id)

    #获取每个节点的embedding的方式,即通过多层卷积获取embedding
    def embed(self, n_id):
        return self.gnn(n_id)

参数:

  • dims:参考上一小节
  • metapath:参考上一小节
  • feature_idx:参考上一小节
  • feature_dim:参考上一小节
  • label_idx:在图数据中作为训练标签的特征名称
  • label_dim:在图数据中作为训练标签的特征维度

模型训练

加载图数据

euler_graph = tf_euler.dataset.get_dataset('cora')
euler_graph.load_graph()

创建GraphSAGE模型

model = SupervisedGraphSage(dims, fanouts, metapath,
                            euler_graph.feature_idx,
                            euler_graph.feature_dim,
                            euler_graph.label_idx,
                            euler_graph.label_dim)

利用NodeEstimator训练模型

Euler-2.0提供了NodeEstimator GraphEstimator EdgeEstimator类和相应接口(详见这里),方便用户快速的完成模型训练,预测,embedding导出任务。其中NodeEstimator为点分类模型,GraphEstimator为图分类模型,EdgeEstimator为边分类模型(link prediction任务)。

这里利用NodeEstimator来训练GraphSAGE。

params = {'train_node_type': euler_graph.train_node_type[0],
          'batch_size': flags_obj.batch_size,
          'optimizer': flags_obj.optimizer,
          'learning_rate': flags_obj.learning_rate,
          'log_steps': flags_obj.log_steps,
          'model_dir': flags_obj.model_dir,
          'id_file': euler_graph.id_file,
          'infer_dir': flags_obj.model_dir,
          'total_step': num_steps}
config = tf.estimator.RunConfig(log_step_count_steps=None)
model_estimator = NodeEstimator(model, params, config)

if flags_obj.run_mode == 'train':
    model_estimator.train()
elif flags_obj.run_mode == 'evaluate':
    model_estimator.evaluate()
elif flags_obj.run_mode == 'infer':
    model_estimator.infer()
else:
    raise ValueError('Run mode not exist!')
Clone this wiki locally