(1)边回归问题,对图中已存在的边的定量(回归)/定性(分类),结合训练得到的边的embedding做GNN神经网络预测。

(2)边的embedding通常由两端节点计算而得,常采用点积或者拼接的方式。而节点embedding的更新同前。

(3)由于节点拼接concat需要考虑先后顺序,所以可能需要考虑无向图(双向图)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import dgl
import dgl.nn  as dglnn

import torch
import torch.nn as nn
import torch.nn.functional as F

import numpy as np
import itertools
from random import sample 

0、预测数据与任务

假设100个药物两两之间已知存在1000个相互作用,边的标签为互作强度,药物节点有50个特征。 目的是预测两药物之间可能存在的相互作用强度是多少

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
eids_raw = [i for i in itertools.combinations(range(0,100), 2)]
eids_sp1k = sample(eids_raw, 1000)
src = np.array(eids_sp1k)[:,0]
dst = np.array(eids_sp1k)[:,1]
g_drug = dgl.graph((src, dst), num_nodes=100)
g_drug.ndata['feature'] = torch.randn(100, 50)  # 节点特征
g_drug.edata['label'] = torch.randn(1000)    # 边的label

#添加双向边:前1k与后1k一一对应
g_drug_bi = dgl.add_reverse_edges(g_drug)
g_drug_bi.edata["label"] = torch.concat((g_drug.edata["label"], 
                                         g_drug.edata["label"]))
print(g_drug_bi.num_nodes())
# 100
print(g_drug_bi.num_edges())
# 2000

1、全局训练模型

1.1 定义GNN模型框架

模型分为两部分:第一部分根据图关系更新节点特征,第二部分根据组成节点计算边的特征

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class GNN_MLP_model(nn.Module):
    #定义GNN层以及相关参数
    def __init__(self, 
                 in_feats, hid_feats_1, hid_feats_2, 
                 mlp_hid_feats_1, mlp_out_feats,
                 dropout):
        super().__init__()
        #GNN更新节点特征
        self.SAGE1 = dglnn.SAGEConv(in_feats=in_feats,
                                    out_feats=hid_feats_1,
                                    aggregator_type="mean")
        self.SAGE2 = dglnn.SAGEConv(in_feats=hid_feats_1,
                                    out_feats=hid_feats_2,
                                    aggregator_type="mean")
        #MLP更新边的特征
        self.MLP1 = nn.Linear(hid_feats_2*2, mlp_hid_feats_1)
        self.MLP2 = nn.Linear(mlp_hid_feats_1, mlp_out_feats)
        
        self.dropout = nn.Dropout(dropout)
    
    def apply_edges(self, edges):
        h_u,  h_v = edges.src['h'], edges.dst['h']
        h_concat = torch.cat([h_u, h_v], 1)
        
        h2 = F.relu(self.MLP1(h_concat))
        h2 = self.dropout(h2)
        h2 = self.MLP2(h2)
        return {'score':h2}
         
    #定义前向传播函数:需要输入图结构、节点特征
    def forward(self, graph, inputs):
        # GNN部分
        h = F.relu(self.SAGE1(graph, inputs))
        h = self.dropout(h)
        h = F.relu(self.SAGE2(graph, h))
        
        # MLP部分
        with graph.local_scope():
            graph.ndata['h'] = h
            graph.apply_edges(self.apply_edges)
            return graph.edata['score']

1.2 定义训练模型流程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def train(model, graph, feature_name, label_name, train_mask, test_mask,
          num_epochs, learning_rate, weight_decay, patience, verbose=True):
    optimizer = torch.optim.Adam(model.parameters(),
                                 lr=learning_rate,
                                 weight_decay=weight_decay)
    node_feats = graph.ndata[feature_name]
    edge_labels = graph.edata[label_name].reshape((-1,1))
    val_loss_best = 100000
    trigger_times = -1
    
    for epoch in range(num_epochs):
        model.train()
        predict_labels = model(graph, node_feats)
        loss = F.mse_loss(predict_labels[train_mask], edge_labels[train_mask])
        model.eval()
        test_loss = F.mse_loss(predict_labels[test_mask], edge_labels[test_mask])
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        if verbose :
            print("Epoch {:03d} | Loss {:.4f} |  Test Loss {:.4f} ".format(
                                  epoch, loss.item(), test_loss.item()))
        if test_loss.item() > val_loss_best:
            trigger_times += 1
            if trigger_times >= patience:
                break
        else:
            trigger_times = 0
            val_loss_best = test_loss.item()
    return loss.item(), test_loss.item()

1.3 训练一次模型

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
graph = g_drug_bi
feature_name = "feature"
label_name = "label"
# 按同一种边划分出训练集与测试集
train_mask = torch.zeros(int(g_drug_bi.num_edges()/2), dtype=torch.bool).bernoulli(0.7).tile((2,))
test_mask =  ~ train_mask

in_feats, hid_feats_1, hid_feats_2, mlp_hid_feats_1, mlp_out_feats, dropout = 50, 32, 16, 16, 1, 0.1
num_epochs, learning_rate, weight_decay, patience = 100, 0.01, 1e-4, 5

#实例化模型
model = GNN_MLP_model(in_feats, hid_feats_1, hid_feats_2, mlp_hid_feats_1, mlp_out_feats, dropout)
#训练模型
metrics = train(model, graph, feature_name, label_name, train_mask, test_mask,
                num_epochs, learning_rate, weight_decay, patience, verbose=True)

1.4 K折交叉验证

  • 定义生存定j/k折的训练集与测试集掩码,其中要确保同一种边(不同方向)位于同一个集合
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
def get_k_fold_data(k, j, edge_id, shuffle=True):
    assert k > 1
    train_mask = torch.ones(int(len(edge_id)/2) , dtype=torch.bool)
    np.random.seed(42)
    if shuffle:
        edge_id2 = np.random.permutation(range(len(train_mask)))
    else :
        edge_id2 = train_mask
        
    fold_size = len(train_mask) // k
    idx = slice(j*fold_size, (j+1)*fold_size)
    train_mask[edge_id2[idx]] = False
    
    train_mask = train_mask.tile((2,))
    test_mask = ~ train_mask
    return train_mask, test_mask

# edge_id = g_drug_bi.edges(form="all")[2]
# train_mask, test_mask = get_k_fold_data(10, 0, edge_id)
  • K折训练
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
def k_fold(k, in_feats, hid_feats_1, hid_feats_2, mlp_hid_feats_1, mlp_out_feats, dropout,
           graph, feature_name, label_name, 
           num_epochs, learning_rate, weight_decay, patience):
    k_fold_metrics = []  #收集每一折的结果
    edge_id = graph.edges(form="all")[2]
    for j in range(k):
        print(f'Fold-{j+1}')
        train_mask, test_mask = get_k_fold_data(k, j, edge_id)
        model = GNN_MLP_model(in_feats, hid_feats_1, hid_feats_2, mlp_hid_feats_1, mlp_out_feats, dropout)
        metrics = train(model, graph, feature_name, label_name, train_mask, test_mask,
                        num_epochs, learning_rate, weight_decay, patience, verbose=False)
        
        k_fold_metrics.append(metrics)
    return k_fold_metrics

k = 10
k_fold_metrics = k_fold(k, in_feats, hid_feats_1, hid_feats_2, mlp_hid_feats_1, mlp_out_feats, dropout,
                        graph, feature_name, label_name, 
                        num_epochs, learning_rate, weight_decay, patience)
np.array(k_fold_metrics).mean(0)

2、小批量训练模型

2.1 DGL边采样器

如上可知需要先更新节点特征、再计算边的特征,所以边采样器同样需要考虑边两端节点的邻居节点。

如下表示从所有边中,随机抽取10条边的相关信息(两端节点做两层聚合)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
E = int(g_drug_bi.number_of_edges()/2)
reverse_eids = torch.cat([torch.arange(E, 2 * E), torch.arange(0, E)])

sampler = dgl.dataloading.MultiLayerFullNeighborSampler(2)
sampler = dgl.dataloading.as_edge_prediction_sampler(
    sampler, exclude='reverse_id', reverse_eids=reverse_eids)

train_eid = graph.edges(form="all")[2]
dataloader = dgl.dataloading.DataLoader(
    g_drug_bi, train_eid, sampler,
    batch_size=32,
    shuffle=True)

input_nodes, pair_graph, blocks = next(iter(dataloader))
# pair_graph:包含10条边的子图结构
# blocks:上图中涉及结构的两层聚合结构
# input_nodes:blocks第一层所需要的全部节点信息

2.2 定义GNN模型框架

与1.1部分基本类似,只是前向传播的输入数据由全图变成了局部的blocks

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class GNN_MLP_model_batch(nn.Module):
    #定义GNN层以及相关参数
    def __init__(self, 
                 in_feats, hid_feats_1, hid_feats_2, 
                 mlp_hid_feats_1, mlp_out_feats,
                 dropout):
        super().__init__()
        self.SAGE1 = dglnn.SAGEConv(in_feats=in_feats,
                                    out_feats=hid_feats_1,
                                    aggregator_type="mean")
        self.SAGE2 = dglnn.SAGEConv(in_feats=hid_feats_1,
                                    out_feats=hid_feats_2,
                                    aggregator_type="mean")
        
        self.MLP1 = nn.Linear(hid_feats_2*2, mlp_hid_feats_1)
        self.MLP2 = nn.Linear(mlp_hid_feats_1, mlp_out_feats)
        
        self.dropout = nn.Dropout(dropout)
    
    def apply_edges(self, edges):
        h_u, h_v = edges.src['h'], edges.dst['h']
        h_concat = torch.cat([h_u, h_v], 1)
        
        h2 = F.relu(self.MLP1(h_concat))
        h2 = self.dropout(h2)
        h2 = h2 = torch.sigmoid(self.MLP2(h2))
        return {'score':h2}
        
    #定义前向传播函数
    def forward(self, blocks, inputs, edge_subgraph):
        # GNN部分
        h = F.relu(self.SAGE1(blocks[0], inputs))
        h = self.dropout(h)
        h = F.relu(self.SAGE2(blocks[1], h))
        # MLP部分
        with edge_subgraph.local_scope():
            edge_subgraph.ndata['h'] = h
            edge_subgraph.apply_edges(self.apply_edges)
            return edge_subgraph.edata['score']

2.3 定义训练模型流程

每一轮中分为若干的mini-batch迭代训练,最后将所有的train/test edge的score作为该轮epoch的训练结果

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def train_batch(model, graph, feature_name, label_name, train_mask, test_mask,
          num_epochs, learning_rate, weight_decay, patience, batch_size, verbose=True):
    optimizer = torch.optim.Adam(model.parameters(),
                                 lr=learning_rate,
                                 weight_decay=weight_decay)
    
    node_feats = graph.ndata[feature_name]
    val_loss_best = 100000
    trigger_times = -1
    
    E = int(graph.number_of_edges()/2)
    reverse_eids = torch.cat([torch.arange(E, 2 * E), torch.arange(0, E)])
    sampler = dgl.dataloading.MultiLayerFullNeighborSampler(2)
    sampler = dgl.dataloading.as_edge_prediction_sampler(
        sampler, exclude='reverse_id', reverse_eids=reverse_eids)

    train_eid = graph.edges(form="all")[2][train_mask]
    test_eid = graph.edges(form="all")[2][test_mask]
    
    train_dataloader = dgl.dataloading.DataLoader(
        graph, train_eid, sampler,
        batch_size=batch_size,
        shuffle=True)
    all_train_dataloader = dgl.dataloading.DataLoader(
        graph, train_eid, sampler,
        batch_size=len(train_eid),  # 全部训练集作为一个batch,计算每个epoch的train loss
        shuffle=False)
    all_test_dataloader = dgl.dataloading.DataLoader(
        graph, test_eid, sampler,
        batch_size=len(test_eid),   # 全部测试集作为一个batch,计算每个epoch的test loss
        shuffle=False)
    for epoch in range(num_epochs):
        model.train()
        for it, (input_nodes, pair_graph, blocks) in enumerate(train_dataloader):
            predict_labels = model(blocks, node_feats[input_nodes], pair_graph)
            edge_labels = pair_graph.edata["label"]
            loss = F.mse_loss(predict_labels, edge_labels.reshape((-1,1)))
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        model.eval()
        input_nodes, pair_graph, blocks = next(iter(all_train_dataloader))
        predict_labels = model(blocks, node_feats[input_nodes], pair_graph)
        edge_labels = pair_graph.edata["label"]
        epoch_loss = F.mse_loss(predict_labels, edge_labels.reshape((-1,1)))
        
        model.eval()
        input_nodes, pair_graph, blocks = next(iter(all_test_dataloader))
        predict_labels = model(blocks, node_feats[input_nodes], pair_graph)
        edge_labels = pair_graph.edata["label"]
        test_loss = F.mse_loss(predict_labels, edge_labels.reshape((-1,1)))
        
        if verbose :
            print("Epoch {:03d} | Loss {:.4f} |  Test Loss {:.4f} ".format(
                                  epoch, loss.item(), test_loss.item()))
        if test_loss.item() > val_loss_best:
            trigger_times += 1
            if trigger_times >= patience:
                break
        else:
            trigger_times = 0
            val_loss_best = test_loss.item()
    
    return epoch_loss.item(), test_loss.item()

2.4 训练一次模型

1
2
3
4
5
6
7
8
#其余参数同前
batch_size = 32

#模型实例化
model = GNN_MLP_model_batch(in_feats, hid_feats_1, hid_feats_2, mlp_hid_feats_1, mlp_out_feats, dropout)
#模型训练
metrics = train_batch(model, graph, feature_name, label_name, train_mask, test_mask,
                      num_epochs, learning_rate, weight_decay,  patience, batch_size, verbose=True)

2.5 K折交叉验证

生成第i/k折的训练集/测试集掩码的函数同1.4

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
def k_fold(k, in_feats, hid_feats_1, hid_feats_2, mlp_hid_feats_1, mlp_out_feats, dropout,
           graph, feature_name, label_name, 
           num_epochs, learning_rate, weight_decay, patience, batch_size):
    k_fold_metrics = []
    edge_id = graph.edges(form="all")[2]
    for j in range(k):
        print(f'Fold-{j+1}')
        train_mask, test_mask = get_k_fold_data(k, j, edge_id)
        model = GNN_MLP_model_batch(in_feats, hid_feats_1, hid_feats_2, mlp_hid_feats_1, mlp_out_feats, dropout)
        metrics = train_batch(model, graph, feature_name, label_name, train_mask, test_mask,
                        num_epochs, learning_rate, weight_decay, patience, batch_size, verbose=False)
        
        k_fold_metrics.append(metrics)
    return k_fold_metrics

k = 10
k_fold_metrics = k_fold(k, in_feats, hid_feats_1, hid_feats_2, mlp_hid_feats_1, mlp_out_feats, dropout,
                        graph, feature_name, label_name, 
                        num_epochs, learning_rate, weight_decay, patience, batch_size)
np.array(k_fold_metrics).mean(0)