节点预测与边预测任务实践

1.节点预测任务实践

本节我们将读取数据集类来实践节点预测任务。读取数据集的代码如下:

import os.path as osp
import torch
from torch_geometric.data import InMemoryDataset, download_url
from torch_geometric.io import read_planetoid_data
from torch_geometric.transforms import NormalizeFeatures
# 本节采用基于Planetoid类修改的方式得到PlanetoidPubMed数据类
class PlanetoidPubMed(InMemoryDataset):
    """ 节点代表文章,边代表引文关系。
   		 训练、验证和测试的划分通过二进制掩码给出。
    参数:
        root (string): 存储数据集的文件夹的路径
        transform (callable, optional): 数据转换函数,每一次获取数据时被调用。
        pre_transform (callable, optional): 数据转换函数,数据保存到文件前被调用。
    """

    url = 'https://github.com/kimiyoung/planetoid/raw/master/data'

    def __init__(self, root, split="public", num_train_per_class=20,
                 num_val=500, num_test=1000, transform=None,
                 pre_transform=None):

        super(PlanetoidPubMed, self).__init__(root, transform, pre_transform)
        self.data, self.slices = torch.load(self.processed_paths[0])

        self.split = split
        assert self.split in ['public', 'full', 'random']

        if split == 'full':
            data = self.get(0)
            data.train_mask.fill_(True)
            data.train_mask[data.val_mask | data.test_mask] = False
            self.data, self.slices = self.collate([data])

        elif split == 'random':
            data = self.get(0)
            data.train_mask.fill_(False)
            for c in range(self.num_classes):
                idx = (data.y == c).nonzero(as_tuple=False).view(-1)
                idx = idx[torch.randperm(idx.size(0))[:num_train_per_class]]
                data.train_mask[idx] = True

            remaining = (~data.train_mask).nonzero(as_tuple=False).view(-1)
            remaining = remaining[torch.randperm(remaining.size(0))]

            data.val_mask.fill_(False)
            data.val_mask[remaining[:num_val]] = True

            data.test_mask.fill_(False)
            data.test_mask[remaining[num_val:num_val + num_test]] = True

            self.data, self.slices = self.collate([data])

    @property
    def raw_dir(self):
        return osp.join(self.root, 'raw')

    @property
    def processed_dir(self):
        return osp.join(self.root, 'processed')

    @property
    def raw_file_names(self):
        names = ['x', 'tx', 'allx', 'y', 'ty', 'ally', 'graph', 'test.index']
        return ['ind.pubmed.{}'.format(name) for name in names]

    @property
    def processed_file_names(self):
        return 'data.pt'

    def download(self):
        for name in self.raw_file_names:
            download_url('{}/{}'.format(self.url, name), self.raw_dir)

    def process(self):
        data = read_planetoid_data(self.raw_dir, 'pubmed')
        data = data if self.pre_transform is None else self.pre_transform(data)
        torch.save(self.collate([data]), self.processed_paths[0])

    def __repr__(self):
        return 'PubMed()'

dataset = PlanetoidPubMed('/Dataset/Planetoid/PubMed', transform=NormalizeFeatures())
print('data.num_features:', dataset.num_features)
device = torch.device('cuda' if torch.cuda.torch.cuda.is_available() else 'cpu')
data = dataset[0].to(device)
# data.num_features: 500

1.1 定义GAT图神经网络

之前我们学习过由2层GATConv组成的图神经网络,现在我们重定义一个GAT图神经网络 ,使 其 能 够 通 过 参 数 来 定 义 GATConv的 层 数 ,以及每一 层GATConvout_channels。我们的图神经网络定义如下:

import torch
import torch.nn.functional as F
from torch_geometric.nn import GATConv, Sequential
from torch.nn import Linear, ReLU

class GAT(torch.nn.Module):
    def __init__(self, num_features, hidden_channnels_list, num_classes):
        super(GAT, self).__init__()
        torch.manual_seed(12345)
        hns = [num_features] + hidden_channnels_list
        conv_list = []
        for idx in range(len(hidden_channnels_list)):
            conv_list.append((GATConv(hns[idx], hns[idx+1]), 'x, edge_index -> x'))
            conv_list.append(ReLU(inplace=True), )  # inplace表示是否将得到的值计算得到的值覆盖之前的值

        self.convseq = Sequential('x, edge_index', conv_list)
        self.linear = Linear(hidden_channnels_list[-1], num_classes)

    def forward(self, x, edge_index):
        x = self.convseq(x, edge_index)
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.linear(x)
        return x

由 于 我 们 的 神 经 网 络 由 多 个 GATConv 顺 序 相 连 而 构 成 , 因 此 我 们 使 用 了torch_geometric.nn.Sequential容器。

  • hidden_channels_list:来设置每一层的GATConvoutchannel,所以hidden_channels_list长度即为GATConv的层数。
    通过修改hidden_channels_list参数,我们就可以构造出不同的图神经网络。

关于PyG的Sequential容器:
CLASS Sequential(args: str,modules: List[Union[Tuple[Callable, [str], Callable]])
其扩展自torch.nn.Sequential容器,用于定义顺序的GNN模型。因为GNN的运算符接收多个输入参数,所以torch_geometric.nn.Sequential需要全局输入参数和单个运算符的函数头定义。如果省略,中间模块将对前一个模块的输出进行操作。
参数说明:

  • args(str):模型输入的全局参数
    -modules ([(str, Callable) 或 Callable]):模块列表(带有可选的函数头定义)
from torch.nn import Linear, ReLU
from torch_geometric.nn import Sequential, GCNConv

model = Sequential('x, edge_index', [
    (GCNConv(in_channels, 64), 'x, edge_index -> x'),
    ReLU(inplace=True),
    (GCNConv(64, 64), 'x, edge_index -> x'),
    ReLU(inplace=True),
    Linear(64, out_channels),
])

在上面这个例子中,'x, edge_index'定义了模型的全局输入参数。'x,edge_index->x'定义了函数头,即GCNConv层的输入参数与返回类型。
此外,PyG 的Sequential容器还允许定义更复杂的模型,比如使用JumpingKnowledge:

from torch.nn import Linear, ReLU, Dropout
from torch_geometric.nn import Sequential, GCNConv, JumpingKnowledge
from torch_geometric.nn import global_mean_pool

model = Sequential('x, edge_index, batch', [
    (Dropout(p=0.5), 'x -> x'),
    (GCNConv(dataset.num_features, 64), 'x, edge_index -> x1'),
    ReLU(inplace=True),
    (GCNConv(64, 64), 'x1, edge_index -> x2'),
    ReLU(inplace=True),
    (lambda x1, x2: [x1, x2], 'x1, x2 -> xs'),
    (JumpingKnowledge("cat", 64, num_layers=2), 'xs -> x'),
    (global_mean_pool, 'x, batch -> x'),
    Linear(2 * 64, dataset.num_classes),
])

1.2 模型训练与测试

model = GAT(num_features=dataset.num_features, hidden_channnels_list=[200, 100], num_classes=dataset.num_classes).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01, weight_decay=5e-4)
criterion = torch.nn.CrossEntropyLoss()
print(model)

'''
GAT(
  (convseq): Sequential(
    (0): GATConv(500, 200, heads=1)
    (1): ReLU(inplace=True)
    (2): GATConv(200, 100, heads=1)
    (3): ReLU(inplace=True)
  )
  (linear): Linear(in_features=100, out_features=3, bias=True)
)
'''
def train():
    model.train()
    optimizer.zero_grad()   # 清空梯度
    out = model(data.x, data.edge_index)    # 执行一次前向传播
    # 基于训练的节点计算损失
    loss = criterion(out[data.train_mask], data.y[data.train_mask])
    loss.backward() # 反向传播
    optimizer.step()    # 基于梯度更新所有的参数
    return loss

def test():
    model.eval()
    out = model(data.x, data.edge_index)
    pred = out.argmax(dim=1)    # 采用可能性最高的进行预测
    test_correct = pred[data.test_mask] == data.y[data.test_mask]   # 选择预测正确的标签
    test_acc = int(test_correct.sum()) / int(data.test_mask.sum())  # 计算准确率
    return test_acc

for epoch in range(1, 201):
    loss =train()
    print(f'Epoch: {epoch:03d}, Loss: {loss:.4f}')

test_acc = test()
print(f'Test Accuracy: {test_acc:.4f}')

2.边预测任务实践

需求:

  • 正负样本平滑:data.edge_index存储了正样本,但为了构建预测任务还需要负样本(不存在边的节点对),同时正负样本数量要平衡
  • 分训练、验证、测试集
    解决:
    -torch_geometric.utils.train_test_split_edges(data, val_ratio=0.05,test_ratio=0.1)
  • 返回六个属性取代edge_index:train_pos_edge_index 、train_neg_adj_mask、val_pos_edge_index、val_neg_edge_index、test_pos_edge_index和test_neg_edge_index
  • train_neg_adj_mask由于属性不一致所以没用
from torch_geometric.datasets import Planetoid
import torch_geometric.transforms as T
from torch_geometric.utils import train_test_split_edges
dataset = Planetoid('dataset','Cora',transform=T.NormalizeFeatures())
data = dataset[0]
data.train_mask = data.val_mask = data.test_mask = data.y = None
data = train_test_split_edges(data)
for key in data.keys:
	print(key, getattr(data,key).shape)
	"""
	class A(object):
		bar = 1
	a = A()
	getattr(a, 'bar') 
	# 1
	getattr(a, 'bar2', 3)
	# bar2不存在,但设置了默认值3
	"""

Cora是无向图,所以统计边数量时正反方向各统计一次。训练集也包含了正反方向,但验证集与测试集只包含一个方向。理由:训练集要使网络学习出节点间信息流的传递,只考虑一个方向就会使信息缺失;而验证集和测试集只做网络能力的检验作用,只考虑一个方向即可。

2.1边预测任务实践

三部分:

  • 编码(encode):生成节点表征
  • 解码(decode):根据两端点节点表征,计算有边的概率,计算方法在下述代码中是通过头尾端点属性对应相乘后求和
  • 推理(decode_all):计算所有节点彼此有边的概率,方法同解码。torch.nonzero()
import torch
import torch.nn as nn
from torch_geometric.nn import GCNConv
class Net(nn.Module):
	def __init__(self, in_channels, out_channels):
		super(Net,self).__init__()
		self.conv1 = GCNConv(in_channels, 128)
		self.conv2 = GCNConv(128, out_channels)
	def encode(self, x, edge_index): # 节点表征学习
		x = self.conv1(x,edge_index)
		x = x.relu()
		x = self.conv2(x,edge_index)
		return x
	def decode(self, z, pos_edge_index, neg_edge_index): # z传入经过表征学习的所有节点特征矩阵
		edge_index = torch.cat([pos_edge_index,neg_edge_index], dim=-1) # dim=-1, 2维就是1
		return (z[edge_index[0]] * z[edge_index[1]]).sum(dim=-1) # 头尾节点属性对应相乘后求和
		# 返回一个 [(正样本数+负样本数),1] 的向量
	def decode_all(self,z):
		prob_adj = z @ z.t() # 头节点属性和尾节点属性对应相乘后求和,[节点数,节点数]
		return (prob_adj > 0).nonzero(as_tuple=False).t() # [2,m], 列存储有边的nodes的序号

2.2训练、验证与测试

  • 单个训练过程def train()中:
    • 每次训练都进行一次负采样,可以增加负样本的多样性,借助torch_geometric.utils.negative_sampling()
    • 调用model.encode进行节点表征学习时,记得传入pos边
    • 损失函数交叉熵:F.binary_cross_entropy_with_logits()对应的类是torch.nn.BCEWithLogitsLoss,用于二分类(F.cross_entropy函数对应的类是torch.nn.CrossEntropyLoss,用于多分类)
    • 单个测试过程def test()
    • 详解python中@的用法Pytorch中with torch.no_grad()或@torch.no_grad() 用法
    • roc_auc_scorelink_logits.sigmoid()
  • 其他
import os.path as osp

import torch
import torch.nn.functional as F
from sklearn.metrics import roc_auc_score

from torch_geometric.utils import negative_sampling
from torch_geometric.datasets import Planetoid
import torch_geometric.transforms as T
from torch_geometric.nn import GCNConv
from torch_geometric.utils import train_test_split_edges

# dataset = 'Cora'
# path = osp.join(osp.dirname(osp.realpath(__file__)), '..', 'data', dataset) 
# dataset = Planetoid(path, dataset, transform=T.NormalizeFeatures())
dataset = Planetoid('dataset','Cora',transform=T.NormalizeFeatures())
data = dataset[0]
data.train_mask = data.val_mask = data.test_mask = data.y = None
data = train_test_split_edges(data)
print(data)


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = Net(dataset.num_features, 64).to(device)
data = data.to(device)
optimizer = torch.optim.Adam(params=model.parameters(), lr=0.01)



# 训练
def train(data):
	model.train()

	# 负采样
	neg_edge_index = negative_sampling(edge_index = data.train_pos_edge_index, # 使得该函数只对训练集中不存在边的节点采样
										num_nodes = data.num_nodes,
										num_neg_samples = data.train_pos_edge_index.size(1))
	


	optimizer.zero_grad()
	
	# 节点表征学习
	z = model.encode(data.x, data.train_pos_edge_index) 
	# 有无边的概率计算
	link_logits = model.decode(z, data.train_pos_edge_index, neg_edge_index)
	# 真实边情况[0,1],调用get_link_labels
	link_labels = get_link_labels(data.train_pos_edge_index, neg_edge_index).to(device)
	# 损失计算
	loss = F.binary_cross_entropy_with_logits(link_logits, link_labels)
	# 反向求导
	loss.backward()
	# 迭代
	optimizer.step()

	return loss

# 生成正负样本边的标记
def get_link_labels(pos_edge_index, neg_edge_index):
	num_links = pos_edge_index.size(1) + neg_edge_index.size(1)
	link_labels = torch.zeros(num_links,dtype=torch.float) # 向量
	link_labels[:pos_edge_index.size(1)] = 1
	return link labels
									

# 测试
@torch.no_grad()
def test(data):
    model.eval()

	# 计算所有的节点表征
    z = model.encode(data.x, data.train_pos_edge_index)

	results = []
	for prefix in ['val','test']:
		# 正负edge_index
		pos_edge_index = data[f'{prefix}_pos_edge_index']
		neg_edge_index = data[f'{prefix}_neg_edge_index']
		# 有无边的概率预测
		link_logits = model.decode(z, pos_edge_index, neg_edge_index)
		link_probs = link_logits.sigmoid()
		# 真实情况
		link_labels = get_link_labels(pos_edge_index, neg_edge_index)
		# 存入准确率
		results.append(roc_auc_score(link_labels.cpu(),link_probs.cpu()))
	return results
# 训练验证与测试
best_val_auc = test_auc = 0
for epoch in range(1, 101):
    loss = train(data)
    val_auc, tmp_test_auc = test(data) # 训练一次计算一次验证、测试准确率
    if val_auc > best_val_auc:
        best_val = val_auc
        test_auc = tmp_test_auc
    print(f'Epoch: {epoch:03d}, Loss: {loss:.4f}, Val: {val_auc:.4f}, '
          f'Test: {test_auc:.4f}') # 03d,不足3位前面补0,大于3位照常输出
# 输出所有节点间书否有边的预测
z = model.encode(data.x, data.train_pos_edge_index)
final_edge_index = model.decode_all(z)

参考资料:
Sequential官网文档
边预测任务实践中的代码

猜你喜欢

转载自blog.csdn.net/weixin_44133327/article/details/118284643