修改官网代码
去掉了不方便直接引用的from official.utils.*代码,关键处理处添加了注释。
从official.utils.flags扒出一个有用的小函数:
def get_gpus_num():
""" 获取GPU个数 """
from tensorflow.python.client import device_lib
local_device_protos = device_lib.list_local_devices()
return sum([1 for d in local_device_protos if d.device_type == "GPU"])
1. 模型实现
NCF模型图示:
对着代码看模型结构图,发现实现起来不难。NCF模型代码:
from six.moves import xrange # pylint: disable=redefined-builtin
import tensorflow as tf
import constants # pylint: disable=g-bad-import-order
class NeuMF(tf.keras.models.Model):
"""Neural matrix factorization (NeuMF) model for recommendations."""
def __init__(self, num_users, num_items, mf_dim, model_layers, batch_size,
mf_regularization, mlp_reg_layers):
"""Initialize NeuMF model.
Args:
num_users: An integer, the number of users.
num_items: An integer, the number of items.
mf_dim: An integer, the embedding size of Matrix Factorization (MF) model.
model_layers: A list of integers for Multi-Layer Perceptron (MLP) layers.
Note that the first layer is the concatenation of user and item
embeddings. So model_layers[0]//2 is the embedding size for MLP.
batch_size: An integer for the batch size.
mf_regularization: A floating number, the regularization factor for MF
embeddings.
mlp_reg_layers: A list of floating numbers, the regularization factors for
each layer in MLP.
Raises:
ValueError: if the first model layer is not even.
"""
# ["32", "16", "8"]
if model_layers[0] % 2 != 0:
raise ValueError("The first layer size should be multiple of 2!")
# Input variables
user_input = tf.keras.layers.Input(
shape=(1,), dtype=tf.int32, name=constants.USER)
item_input = tf.keras.layers.Input(
shape=(1,), dtype=tf.int32, name=constants.ITEM)
# Initializer for embedding layer
embedding_initializer = tf.keras.initializers.RandomNormal(stddev=0.01)
# Embedding layers of GMF and MLP
# GMF--user Embedding
# 把num_users个用户索引号Embedding成mf_dim大小的向量
mf_embedding_user = tf.keras.layers.Embedding(
num_users,
mf_dim,
embeddings_initializer=embedding_initializer,
embeddings_regularizer=tf.keras.regularizers.l2(mf_regularization),
input_length=1)
# GMF--item Embedding
mf_embedding_item = tf.keras.layers.Embedding(
num_items,
mf_dim,
embeddings_initializer=embedding_initializer,
embeddings_regularizer=tf.keras.regularizers.l2(mf_regularization),
input_length=1)
# MLP--user Embedding
# model_layers[0]//2 is the embedding size for MLP
mlp_embedding_user = tf.keras.layers.Embedding(
num_users,
model_layers[0]//2,
embeddings_initializer=embedding_initializer,
embeddings_regularizer=tf.keras.regularizers.l2(mlp_reg_layers[0]),
input_length=1)
# MLP--item Embedding
mlp_embedding_item = tf.keras.layers.Embedding(
num_items,
model_layers[0]//2,
embeddings_initializer=embedding_initializer,
embeddings_regularizer=tf.keras.regularizers.l2(mlp_reg_layers[0]),
input_length=1)
# GMF part
# 就是输入的User向量和Item向量的乘积,后面和MLP的最后一层合并
# Flatten the embedding vector as latent features in GMF
mf_user_latent = tf.keras.layers.Flatten()(mf_embedding_user(user_input))
mf_item_latent = tf.keras.layers.Flatten()(mf_embedding_item(item_input))
# Element-wise multiply
mf_vector = tf.keras.layers.multiply([mf_user_latent, mf_item_latent])
# MLP part
# 第一步把embedding_user和embedding_item 合并起来成为mlp_vector;
# Flatten the embedding vector as latent features in MLP
mlp_user_latent = tf.keras.layers.Flatten()(mlp_embedding_user(user_input))
mlp_item_latent = tf.keras.layers.Flatten()(mlp_embedding_item(item_input))
# Concatenation of two latent features
mlp_vector = tf.keras.layers.concatenate([mlp_user_latent, mlp_item_latent])
# 第二步把mlp_vector传入Dense层,一层传入下一层,逐层相连
num_layer = len(model_layers) # Number of layers in the MLP
# model_layers比如:["32", "16", "8"]
for layer in xrange(1, num_layer):
model_layer = tf.keras.layers.Dense(
model_layers[layer],
kernel_regularizer=tf.keras.regularizers.l2(mlp_reg_layers[layer]),
activation="relu")
mlp_vector = model_layer(mlp_vector)
# 第三步 把之前的GMF向量和MLP的最后一层Concatenate起来
# Concatenate GMF and MLP parts
predict_vector = tf.keras.layers.concatenate([mf_vector, mlp_vector])
# Final prediction layer
# 最后,把两部分合并的结果传入只有一个神经元的全连接层,作为结果层
# 该层使用sigmoid激活,LeCun均匀抽样初始化。
prediction = tf.keras.layers.Dense(
1, activation="sigmoid", kernel_initializer="lecun_uniform",
name=constants.RATING)(predict_vector)
super(NeuMF, self).__init__(
inputs=[user_input, item_input], outputs=prediction)
2. 数据集预处理
使用MovieLens数据集,
ml-1m dataset contains 1,000,209 anonymous ratings of approximately 3,706 movies made by 6,040 users who joined MovieLens in 2000.格式如下:
UserID::MovieID::Rating::Timestamp
Ratings are made on a 5-star scale (whole-star ratings only).
ml-20m格式:
userId,movieId,rating,timestamp
Ratings are made on a 5-star scale, with half-star increments (0.5 stars - 5.0 stars).
第一步 下载转换数据
下载,解压,解析成csv,写入文件备用.
得到三个csv文件,train-rating和test-rating都是三列,test是留一法得到的最新一条记录。
train/test都是只有正例,负例在单独的test-negative文件里,
它是从每个user的没有发生过交互的负例集合中,随机取100个。
import collections
import os
import sys
import time
import zipfile
# pylint: disable=g-bad-import-order
import numpy as np
import pandas as pd
from six.moves import urllib # pylint: disable=redefined-builtin
from absl import app as absl_app
from absl import flags
import tensorflow as tf
# pylint: enable=g-bad-import-order
import constants
# URL to download dataset
_DATA_URL = "http://files.grouplens.org/datasets/movielens/"
_USER_COLUMN = "user_id"
_ITEM_COLUMN = "item_id"
_RATING_COLUMN = "rating"
_TIMESTAMP_COLUMN = "timestamp"
# The number of negative examples attached with a positive example
# in training dataset. It is set as 100 in the paper.
_NUMBER_NEGATIVES = 100
# In both datasets, each user has at least 20 ratings.
_MIN_NUM_RATINGS = 20
RatingData = collections.namedtuple(
"RatingData", ["items", "users", "ratings", "min_date", "max_date"])
def main(_):
"""Download and extract the data from GroupLens website."""
tf.logging.set_verbosity(tf.logging.INFO)
# make_dir(FLAGS.data_dir)
# Download the zip dataset of ml-1m
data_dir = "movielens-data/"
dataset = "ml-1m"
dataset_zip = dataset + ".zip"
file_path = os.path.join(data_dir, dataset_zip)
if not tf.gfile.Exists(file_path):
def _progress(count, block_size, total_size):
sys.stdout.write("\r>> Downloading {} {:.1f}%".format(
file_path, 100.0 * count * block_size / total_size))
sys.stdout.flush()
file_path, _ = urllib.request.urlretrieve(
_DATA_URL + dataset_zip, file_path, _progress)
statinfo = os.stat(file_path)
# A new line to clear the carriage return from download progress
# tf.logging.info is not applicable here
print()
tf.logging.info(
"Successfully downloaded {} {} bytes".format(
file_path, statinfo.st_size))
# 解压zip数据集
if not tf.gfile.Exists(os.path.join(data_dir, dataset)):
zipfile.ZipFile(file_path, "r").extractall(data_dir)
# Preprocess and parse the dataset to csv
train_ratings = dataset + "-" + constants.TRAIN_RATINGS_FILENAME
if not tf.gfile.Exists(os.path.join(data_dir, train_ratings)):
parse_file_to_csv(data_dir, dataset)
parse_file_to_csv函数:
1,过滤出至少有20次rating的用户,把user和item映射成从0开始的索引号
2,生成train数据和test数据,还有测试负例数据,使用generate_train_eval_data函数
3,序列化成csv文件,每个csv文件包含三列:(user_id, item_id, interaction),由于all_ratings和test_ratings表示[user_id, item_id] with interactions,我们添加一列fake_rating变成三列,默认值为1.
Tips: 输出数据全是int整数。
主要是generate_train_eval_data函数:
已知所有user和item的interaction信息,对每个用户,首先按timestamp排序,
然后取出最新的一条interaction记录作为Test评分(留一法),剩下的作为训练数据。
Test negatives(负例)是从所有non-interacted的items中随机抽取的,默认取100个(由_NUMBER_NEGATIVES定义)
def generate_train_eval_data(df, original_users, original_items):
# ...
# Need to sort before popping to get last item
tf.logging.info("Sorting user_item_map by timestamp...")
df.sort_values(by=_TIMESTAMP_COLUMN, inplace=True)
all_ratings = set(zip(df[_USER_COLUMN], df[_ITEM_COLUMN]))
# key为userId,value为用户打分过的itemId列表[item1,item2,...,itemk]
user_to_items = collections.defaultdict(list)
# Generate user_item rating matrix for training
# 生成user-item打分矩阵用来训练
t1 = time.time()
row_count = 0
for row in df.itertuples():
user_to_items[getattr(row, _USER_COLUMN)].append(getattr(row, _ITEM_COLUMN))
row_count += 1
if row_count % 50000 == 0:
tf.logging.info("Processing user_to_items row: {}".format(row_count))
tf.logging.info(
"Process {} rows in [{:.1f}]s".format(row_count, time.time() - t1))
# Generate test ratings and test negatives
t2 = time.time()
test_ratings = []
test_negs = []
# Generate the 0-based index for each item, and put it into a set
all_items = set(range(len(original_items)))
for user in range(len(original_users)):
# 弹出每个user的最后一个item id
test_item = user_to_items[user].pop()
all_ratings.remove((user, test_item)) # Remove the test item
# 从所有items中,去掉每个用户有过interaction的,得到负例集合
all_negs = all_items.difference(user_to_items[user])
all_negs = sorted(list(all_negs)) # determinism
# 每个user和最新的time组成测试数据
test_ratings.append((user, test_item))
# 从每个user的负例集合中,随机取100个作为Test negatives
test_negs.append(list(np.random.choice(all_negs, _NUMBER_NEGATIVES)))
if user % 1000 == 0:
tf.logging.info("Processing user: {}".format(user))
tf.logging.info("Process {} users in {:.1f}s".format(
len(original_users), time.time() - t2))
all_ratings = list(all_ratings) # convert set to list
return all_ratings, test_ratings, test_negs
第二步 数据预处理
要加载的数据包括训练数据,测试数据,负例数据。
通过dataset.data_preprocessing()加载后,生成一个NCFDataSet的数据类。包含:
- train_data: 训练用的正例集合
- num_users: An integer, the number of users in training dataset.
- num_items: An integer, the number of items in training dataset.
- num_negatives: An integer, the number of negative instances for each user
- true_items: 从test.csv文件读取正例item作为真实值,用于评估结果
- all_items: 每个user的前100个负例,添加一个真实值,得到所有的items
- all_test_data: 所有user的测试数据,每一个user 带上他的all_items。
主要代码:
def data_preprocessing(train_fname, test_fname, test_neg_fname, num_negatives):
# 读取正例到内存,load_data()读取每一行数值列表,返回整个list
train_data = load_data(train_fname)
# 获取unique的用户数
num_users = len(np.unique(np.array(train_data)[:, 0]))
test_ratings = load_data(test_fname)
test_negatives = load_data(test_neg_fname)
# 获取train,test中第二列items列的并集,得到所有item数
num_items = len(
set(np.array(train_data)[:, 1]) | set(np.array(test_ratings)[:, 1]))
# Generate test instances for each user
true_items, all_items = [], []
all_test_data = []
for idx in range(num_users):
items = test_negatives[idx]
rating = test_ratings[idx]
user = rating[0] # User
true_item = rating[1] # Positive item as ground truth
# 所有items由前100个负例添加一个测试真值的正例组成
items.append(true_item)
users = np.full(len(items), user, dtype=np.int32)
users_items = list(zip(users, items)) # User-item list
true_items.append(true_item) # all ground truth items
all_items.append(items) # All items (including positive and negative items)
all_test_data.extend(users_items) # Generate test dataset
# Create NCFDataset object
ncf_dataset = NCFDataSet(
train_data, num_users, num_items, num_negatives, true_items, all_items,
np.asarray(all_test_data)
)
return ncf_dataset
第三步 训练和评估
创建NeuMF模型并转换成Estimator,一边训练一边Evaluate the model;
达到hit-ratio阈值则停止训练。
Hit Ratio 就是按预测得分从大到小排序取前十个,里边有命中测试真值的个数占比。
NDCG 就是 math.log(2) / math.log(ranklist.index(true_item) + 2)
import heapq
import math
import os
# pylint: disable=g-bad-import-order
import numpy as np
from absl import app as absl_app
from absl import flags
import tensorflow as tf
# pylint: enable=g-bad-import-order
import constants
import dataset
import neumf_model
_TOP_K = 10 # Top-k list for evaluation
# keys for evaluation metrics
_HR_KEY = "HR"
_NDCG_KEY = "NDCG"
def evaluate_model(estimator, batch_size, num_gpus, ncf_dataset):
# 定义预测的 input function
def pred_input_fn():
return dataset.input_fn(
False, per_device_batch_size(batch_size, num_gpus), ncf_dataset)
# 用estimator预测
predictions = estimator.predict(input_fn=pred_input_fn)
all_predicted_scores = [p[constants.RATING] for p in predictions]
# 计算Hit Ratio
def _get_hr(ranklist, true_item):
return 1 if true_item in ranklist else 0
# 计算 NDCG 得分
def _get_ndcg(ranklist, true_item):
if true_item in ranklist:
return math.log(2) / math.log(ranklist.index(true_item) + 2)
return 0
hits, ndcgs = [], []
num_users = len(ncf_dataset.eval_true_items)
# Reshape the predicted scores and each user takes one row
predicted_scores_list = np.asarray(
all_predicted_scores).reshape(num_users, -1)
for i in range(num_users):
items = ncf_dataset.eval_all_items[i]
predicted_scores = predicted_scores_list[i]
# Map item and score for each user
map_item_score = {}
for j, item in enumerate(items):
score = predicted_scores[j]
map_item_score[item] = score
# Evaluate top rank list with HR and NDCG
ranklist = heapq.nlargest(_TOP_K, map_item_score, key=map_item_score.get)
true_item = ncf_dataset.eval_true_items[i]
hr = _get_hr(ranklist, true_item)
ndcg = _get_ndcg(ranklist, true_item)
hits.append(hr)
ndcgs.append(ndcg)
# Get average HR and NDCG scores
hr, ndcg = np.array(hits).mean(), np.array(ndcgs).mean()
global_step = estimator.get_variable_value(tf.GraphKeys.GLOBAL_STEP)
eval_results = {
_HR_KEY: hr,
_NDCG_KEY: ndcg,
tf.GraphKeys.GLOBAL_STEP: global_step
}
return eval_results
def convert_keras_to_estimator(keras_model, num_gpus, model_dir):
"""Configure and convert keras model to Estimator.
Args:
keras_model: A Keras model object.
num_gpus: An integer, the number of gpus.
model_dir: A string, the directory to save and restore checkpoints.
Returns:
est_model: The converted Estimator.
"""
flags_learning_rate = 0.001
# TODO(b/79866338): update GradientDescentOptimizer with AdamOptimizer
optimizer = tf.train.GradientDescentOptimizer(
learning_rate=flags_learning_rate)
keras_model.compile(optimizer=optimizer, loss="binary_crossentropy")
if num_gpus == 0:
distribution = tf.contrib.distribute.OneDeviceStrategy("device:CPU:0")
elif num_gpus == 1:
distribution = tf.contrib.distribute.OneDeviceStrategy("device:GPU:0")
else:
distribution = tf.contrib.distribute.MirroredStrategy(num_gpus=num_gpus)
run_config = tf.estimator.RunConfig(train_distribute=distribution)
estimator = tf.keras.estimator.model_to_estimator(
keras_model=keras_model, model_dir=model_dir, config=run_config)
return estimator
def get_gpus_num():
""" 获取GPU个数 """
from tensorflow.python.client import device_lib
local_device_protos = device_lib.list_local_devices()
return sum([1 for d in local_device_protos if d.device_type == "GPU"])
def per_device_batch_size(batch_size, num_gpus):
"""For multi-gpu, batch-size must be a multiple of the number of GPUs.
Note that this should eventually be handled by DistributionStrategies
directly. Multi-GPU support is currently experimental, however,
so doing the work here until that feature is in place.
"""
if num_gpus <= 1:
return batch_size
remainder = batch_size % num_gpus
if remainder:
err = ("When running with multiple GPUs, batch size "
"must be a multiple of the number of available GPUs. Found {} "
"GPUs with a batch size of {}; try --batch_size={} instead."
).format(num_gpus, batch_size, batch_size - remainder)
raise ValueError(err)
return int(batch_size / num_gpus)
def main(_):
# 手动设置参数
# The file name of training and test dataset
flags_data_dir="movielens-data/"
flags_dataset = "ml-1m"
# The Number of negative instances to pair with a positive instance, default=4.
flags_num_neg = 4
# The sizes of hidden layers for MLP, default=["64", "32", "16", "8"]
flags_layers = ["32", "16", "8", "4"]
# name="mlp_regularization", default=["0.", "0.", "0.", "0."],
# "The regularization factor for each MLP layer. See mf_regularization "
flags_mlp_regularization = ["0.", "0.01", "0.01", "0."]
flags_mf_regularization = 0.001
#
flags_batch_size = 256
# The Embedding size of MF model.
flags_num_factors = 8
#
flags_model_dir = "ncf-model/"
# hit ratio threshold For dataset ml-1m, the
# desired hr_threshold is 0.68 which is the result from the paper;
# For dataset ml-20m, the threshold can be set as 0.95 which is
# achieved by MLPerf implementation.
flags_hr_threshold = 0.68
#
flags_train_epochs = 2
# The number of training epochs to run between evaluations, default=1.
flags_epochs_between_evals = 1
train_fname = os.path.join(
flags_data_dir, flags_dataset + "-" + constants.TRAIN_RATINGS_FILENAME)
test_fname = os.path.join(
flags_data_dir, flags_dataset + "-" + constants.TEST_RATINGS_FILENAME)
neg_fname = os.path.join(
flags_data_dir, flags_dataset + "-" + constants.TEST_NEG_FILENAME)
assert os.path.exists(train_fname), (
"Run data_download.py first to download and extract {} dataset".format(
flags_dataset))
# ============================
# 参数设置完,开始数据处理
# ============================
tf.logging.info("Data preprocessing...")
ncf_dataset = dataset.data_preprocessing(
train_fname, test_fname, neg_fname, flags_num_neg)
# Create NeuMF model and convert it to Estimator
tf.logging.info("Creating Estimator from Keras model...")
layers = [int(layer) for layer in flags_layers]
mlp_regularization = [float(reg) for reg in flags_mlp_regularization]
keras_model = neumf_model.NeuMF(
ncf_dataset.num_users, ncf_dataset.num_items, flags_num_factors,
layers, flags_batch_size, flags_mf_regularization,
mlp_regularization)
num_gpus = get_gpus_num()
estimator = convert_keras_to_estimator(keras_model, num_gpus, flags_model_dir)
# Create hooks that log information about the training and metric values
train_hooks = [tf.train.ProfilerHook(save_steps=1000, output_dir="hook-profile/")]
# Training and evaluation cycle
def train_input_fn():
return dataset.input_fn(
True, per_device_batch_size(flags_batch_size, num_gpus),
ncf_dataset, flags_epochs_between_evals)
total_training_cycle = flags_train_epochs // flags_epochs_between_evals
for cycle_index in range(total_training_cycle):
tf.logging.info("Starting a training cycle: {}/{}".format(
cycle_index + 1, total_training_cycle))
# Train the model
estimator.train(input_fn=train_input_fn, hooks=train_hooks)
# Evaluate the model
eval_results = evaluate_model(
estimator, flags_batch_size, num_gpus, ncf_dataset)
# Log the HR and NDCG results.
hr = eval_results[_HR_KEY]
ndcg = eval_results[_NDCG_KEY]
tf.logging.info(
"Iteration {}: HR = {:.4f}, NDCG = {:.4f}".format(
cycle_index + 1, hr, ndcg))
# 如果达到了evaluation threshold
# if model_helpers.past_stop_threshold(flags_hr_threshold, hr):
if hr >= flags_hr_threshold:
tf.logging.info(
"Stop threshold of {} was passed with metric value {}.".format(
flags_hr_threshold, hr))
break
# Clear the session explicitly to avoid session delete error
tf.keras.backend.clear_session()
if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
absl_app.run(main)
得到结果打印:
result: HR = 0.1228, NDCG = 0.0575, Loss for final step: 0.57292986.
平均命中率还是挺高的。