你好,我是黄鸿波。

上节课我们讲了关于YouTubeDNN的召回模型,接下来,我们来看看如何用代码来实现它。

我们在做YouTubeDNN的时候,要把代码分成两个步骤,第一个步骤是对数据的清洗和处理,第二个步骤是搭建模型然后把数据放进去进行训练和预测。

数据的清洗和处理

先来讲数据部分。

按照YouTubeDNN论文来看,输入的数据是用户的信息、视频的ID序列、用户搜索的特征和一些地理信息等其他信息。到了基于文章内容的信息流产品中,就变成了用户ID、年龄、性别、城市、阅读的时间戳再加上视频的ID。我们把这些内容可以组合成YouTubeDNN需要的内容,最后处理成需要的Embedding。

由于前面没有太多的用户浏览数据,所以我先造了一批数据,数据集我会放到GitHub上(后续更新),数据的形式如下。

接下来我们就把这批数据处理成YouTubeDNN需要的形式。首先在recommendation-class项目中的utils目录下建立一个preprocess.py文件,作为处理数据的文件。

我们要处理这一批数据,需要下面五个步骤。

  1. 加载数据集。
  2. 处理数据特征。
  3. 特征转化为模型输入。
  4. 模型的搭建和训练。
  5. 模型评估。

在正式写代码之前,需要安装几个库,如下。

1
2
3
4
deepctr
deepmatch
tensorflow==2.2
pandas

我们可以使用pip install加上库名来安装它们,也可以把它们放在一个叫requirements.txt的文件中,使用pip install -r进行安装。

安装完成之后,我们来写preprocess.py的代码。为了能够让你看得更明白,我在函数里加了一些注释,先上代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from tqdm import tqdm
import numpy as np
import random
from tensorflow.python.keras.preprocessing.sequence import pad_sequences
 
def gen_data_set(data, negsample=0):
    data.sort_values("timestamp", inplace=True)  #是否用排序后的数据集替换原来的数据,这里是替换
    item_ids = data['item_id'].unique()    #item需要进行去重
 
    train_set = list()
    test_set = list()
    for reviewrID, hist in tqdm(data.groupby('user_id')):   #评价过,  历史记录
        pos_list = hist['item_id'].tolist()
        rating_list = hist['rating'].tolist()
 
        if negsample > 0:    #负样本
            candidate_set = list(set(item_ids) - set(pos_list))   #去掉用户看过的item项目
            neg_list = np.random.choice(candidate_set, size=len(pos_list) * negsample, replace=True)  #随机选择负采样样本
        for i in range(1, len(pos_list)):
            if i != len(pos_list) - 1:
                train_set.append((reviewrID, hist[::-1], pos_list[i], 1, len(hist[:: -1]), rating_list[i]))  #训练集和测试集划分  [::-1]从后玩前数
                for negi in range(negsample):
                    train_set.append((reviewrID, hist[::-1], neg_list[i * negsample + negi], 0, len(hist[::-1])))
            else:
                test_set.append((reviewrID, hist[::-1], pos_list[i], 1, len(hist[::-1]), rating_list[i]))
 
    random.shuffle(train_set)     #打乱数据集
    random.shuffle(test_set)
    return train_set, test_set
 
def gen_model_input(train_set, user_profile, seq_max_len):
    train_uid = np.array([line[0] for line in train_set])
    train_seq = [line[1] for line in train_set]
    train_iid = np.array([line[2] for line in train_set])
    train_label = np.array([line[3] for line in train_set])
    train_hist_len = np.array([line[4] for line in train_set])
 
    """
    pad_sequences数据预处理
    sequences:浮点数或整数构成的两层嵌套列表
    maxlen:None或整数,为序列的最大长度。大于此长度的序列将被截短,小于此长度的序列将在后部填0.
    dtype:返回的numpy array的数据类型
    padding:‘pre’或‘post’,确定当需要补0时,在序列的起始还是结尾补`
    truncating:‘pre’或‘post’,确定当需要截断序列时,从起始还是结尾截断
    value:浮点数,此值将在填充时代替默认的填充值0
    """
    train_seq_pad = pad_sequences(train_seq, maxlen=seq_max_len, padding='post', truncating='post', value=0)
    train_model_input = {"user_id": train_uid, "item_id": train_iid, "hist_item_id": train_seq_pad,
                         "hist_len": train_hist_len}
    for key in {"gender", "age", "city"}:
        train_model_input[key] = user_profile.loc[train_model_input['user_id']][key].values   #训练模型的关键字
 
return train_model_input, train_label

这段代码主要用于生成训练集和测试集以及模型的输入。它看起来有点长,我来分别解释一下。

gen_data_set()函数接受一个数据集(data)和一个负采样(negsample)参数,返回一个训练集列表和一个测试集列表。该函数首先将数据集根据时间戳排序,然后从每一个用户的历史记录中选取正样本和负样本,并将它们保存到训练集和测试集中。

gen_model_input()函数接受一个训练集列表、用户画像信息和序列最大长度参数,返回训练模型的输入和标签。该函数将训练集列表拆分成train_uid、train_seq、train_iid、train_label和train_hist_len五部分。

  • train_uid和train_iid为用户ID和物品ID。
  • train_seq为历史交互序列。
  • train_label为正负样本标签。
  • train_hist_len为历史交互序列的长度。

此外,它对历史交互序列进行了填充处理(pad_sequences),并且将用户画像信息加入到训练模型的关键字中。最终,该函数返回训练模型的输入和标签。

在gen_data_set()函数中,首先使用data.sort_values(“timestamp”, inplace=True)函数将数据集按照时间戳排序,这是为了保证数据按照时间顺序排列,便于后续处理。接下来使用data[‘item_id’].unique()函数获取数据集中所有不重复的物品ID。因为后续需要筛选出用户未曾购买过的物品,要先获取数据集中所有的物品ID以便后续处理。

接下来使用groupby()函数将用户ID(user_id)相同的数据分组。对于每一组数据,将其分成正样本和负样本。其中正样本为用户已经购买过的物品,负样本为用户未购买过的其他物品。如果negsample参数大于0,则需要进行负采样。随机选取一些未曾购买过的物品作为负样本,并将它们保存到训练集列表中。最后,将正负样本数据以及其他信息(如历史交互序列、用户ID和历史交互序列的长度)保存到训练集列表和测试集列表中。

在gen_model_input()函数中,首先将训练集列表拆分成5个列表,分别保存用户ID、物品ID、历史交互序列、正负样本标签和历史交互序列长度。然后使用pad_sequences()函数对历史交互序列进行填充处理,将其变成长度相同的序列。最后,将用户画像信息加入到训练模型的关键字中,返回训练模型的输入和标签。

搭建模型进行训练和预测

当数据处理完成后,接下来就可以来做YouTubeDNN的模型部分了,我们在recall目录下新建一个文件,名字叫YouTubeDNN,然后编写如下代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from models.recall.preprocess import gen_data_set, gen_model_input
from deepctr.feature_column import SparseFeat, VarLenSparseFeat
from tensorflow.python.keras import backend as K
from tensorflow.python.keras.models import Model
import tensorflow as tf
from deepmatch.models import *
from deepmatch.utils import recall_N
from deepmatch.utils import sampledsoftmaxloss
import numpy as np
from tqdm import tqdm
 
 
class YoutubeModel(object):
    def __init__(self, embedding_dim=32):
        self.SEQ_LEN = 50
        self.embedding_dim = embedding_dim
        self.user_feature_columns = None
        self.item_feature_columns = None
 
    def training_set_construct(self):
        # 加载数据
        data = pd.read_csv('../../data/read_history.csv')
        # 负采样个数
        negsample = 0
        # 特征编码
        features = ["user_id", "item_id", "gender", "age", "city"]
        features_max_idx = {}
        for feature in features:
            lbe = LabelEncoder()
            data[feature] = lbe.fit_transform(data[feature]) + 1
            features_max_idx[feature] = data[feature].max() + 1
 
        # 抽取用户、物品特征
        user_info = data[["user_id", "gender", "age", "city"]].drop_duplicates('user_id')  # 去重操作
        item_info = data[["item_id"]].drop_duplicates('item_id')
        user_info.set_index("user_id", inplace=True)
 
        # 构建输入数据
        train_set, test_set = gen_data_set(data, negsample)
        # 转化为模型的输入
        train_model_input, train_label = gen_model_input(train_set, user_info, self.SEQ_LEN)
        test_model_input, test_label = gen_model_input(test_set, user_info, self.SEQ_LEN)
        # 用户端特征输入
        self.user_feature_columns = [SparseFeat('user_id', features_max_idx['user_id'], 16),
                                     SparseFeat('gender', features_max_idx['gender'], 16),
                                     SparseFeat('age', features_max_idx['age'], 16),
                                     SparseFeat('city', features_max_idx['city'], 16),
                                     VarLenSparseFeat(SparseFeat('hist_item_id', features_max_idx['item_id'],
                                                                 self.embedding_dim, embedding_name='item_id'),
                                                      self.SEQ_LEN, 'mean', 'hist_len')
                                     ]
        # 物品端的特征输入
        self.item_feature_columns = [SparseFeat('item_id', features_max_idx['item_id'], self.embedding_dim)]
 
        return train_model_input, train_label, test_model_input, test_label, train_set, test_set, user_info, item_info
 
    def training_model(self, train_model_input, train_label):
        K.set_learning_phase(True)
        if tf.__version__ >= '2.0.0':
            tf.compat.v1.disable_eager_execution()
        # 定义模型
        model = YouTubeDNN(self.user_feature_columns, self.item_feature_columns, num_sampled=100,
                           user_dnn_hidden_units=(128, 64, self.embedding_dim))
        model.compile(optimizer="adam", loss=sampledsoftmaxloss)
        # 保存训练过程中的数据
        model.fit(train_model_input, train_label, batch_size=512, epochs=20, verbose=1, validation_split=0.0,)
        return model
 
    def extract_embedding_layer(self, model, test_model_input, item_info):
        all_item_model_input = {"item_id": item_info['item_id'].values, }
        # 获取用户、item的embedding_layer
        user_embedding_model = Model(inputs=model.user_input, outputs=model.user_embedding)
        item_embedding_model = Model(inputs=model.item_input, outputs=model.item_embedding)
 
        user_embs = user_embedding_model.predict(test_model_input, batch_size=2 ** 12)
        item_embs = item_embedding_model.predict(all_item_model_input, batch_size=2 ** 12)
        print(user_embs.shape)
        print(item_embs.shape)
        return user_embs, item_embs
 
    def eval(self, user_embs, item_embs, test_model_input, item_info, test_set):
        test_true_label = {line[0]: line[2] for line in test_set}
        index = faiss.IndexFlagIP(self.embedding_dim)
        index.add(item_embs)
        D, I = index.search(np.ascontiguousarray(user_embs), 50)
        s = []
        hit = 0
 
        # 统计预测结果
        for i, uid in tqdm(enumerate(test_model_input['user_id'])):
            try:
                pred = [item_info['item_id'].value[x] for x in I[i]]
                recall_score = recall_N(test_true_label[uid], pred, N=50)
                s.append(recall_score)
                if test_true_label[uid] in pred:
                    hit += 1
            except:
                print(i)
 
        # 计算召回率和命中率
        recall = np.mean(s)
        hit_rate = hit / len(test_model_input['user_id'])
 
        return recall, hit_rate
 
    def scheduler(self):
        # 构建训练集、测试集
        train_model_input, train_label, test_model_input, test_label, \
        train_set, test_set, user_info, item_info = self.training_set_construct()
        #
        self.training_model(train_model_input, train_label)
 
        # 获取用户、item的layer
        # user_embs, item_embs = self.extract_embedding_layer(model, test_model_input, item_info)
        # # 评估模型
        # recall, hit_rate = self.eval(user_embs, item_embs, test_model_input, item_info, test_set)
        # print(recall, hit_rate)
 
 
if __name__ == '__main__':
    model = YoutubeModel()
    model.scheduler()

我来详细地解释下这段代码。首先根据导入的模块,可以看出这段代码主要使用了下面表格里的几个工具和库。

首先我们使用下面的代码加载数据。

1
data = pd.read_csv('../../data/read_history.csv')

这行代码使用Pandas库来读取CSV格式的历史阅读记录数据文件,将其存储到data这个DataFrame对象中。

然后我们对数据进行特征编码。

1
2
3
4
5
6
features = ["user_id", "item_id", "gender", "age", "city"]
features_max_idx = {}
for feature in features:
lbe = LabelEncoder()
data[feature] = lbe.fit_transform(data[feature]) + 1
features_max_idx[feature] = data[feature].max() + 1

这段代码使用sklearn.preprocessing.LabelEncoder对原始数据的几个特征进行编码,将连续或离散的特征转化为整数类型。这里编码的特征包括user_id、item_id、gender、age、city。将特征编码后,将最大索引值保存到features_max_idx字典中。

接下来,我们使用下面的代码来构建了数据集。

1
train_set, test_set = gen_data_set(data, negsample)

这行代码使用gen_data_set函数将原始数据划分为训练集和测试集,同时进行负采样操作。该函数的输入参数为原始数据和负采样个数。输出结果为经过负采样后的训练集和测试集。

然后我们就可以调用之前的gen_model_input函数将训练集和测试集转化为模型的输入格式,包括训练集/测试集的用户ID、历史物品ID序列、历史物品ID序列的长度和待预测物品ID。这些数据会作为训练模型的输入。

1
2
train_model_input, train_label = gen_model_input(train_set, user_info, self.SEQ_LEN) 
test_model_input, test_label = gen_model_input(test_set, user_info, self.SEQ_LEN)

接着,我们使用deepctr库中的SparseFeat和VarLenSparseFeat函数,分别构建了用户和物品的特征输入。其中SparseFeat表示离散特征,VarLenSparseFeat表示变长特征。具体地,用户特征输入由4个离散特征和一个变长特征(历史物品ID序列)组成,物品特征输入只有一个离散特征(物品ID)。

1
2
3
4
5
6
7
8
9
10
11
# 用户端特征输入
self.user_feature_columns = [SparseFeat('user_id', features_max_idx['user_id'], 16),
SparseFeat('gender', features_max_idx['gender'], 16),
SparseFeat('age', features_max_idx['age'], 16),
SparseFeat('city', features_max_idx['city'], 16),
VarLenSparseFeat(SparseFeat('hist_item_id', features_max_idx['item_id'],
self.embedding_dim, embedding_name='item_id'),
self.SEQ_LEN, 'mean', 'hist_len')
]
# 物品端的特征输入
self.item_feature_columns = [SparseFeat('item_id', features_max_idx['item_id'], self.embedding_dim)]

然后我们使用deepmatch库构建了含有DNN的YouTube推荐模型。该模型的输入由上一步定义的用户和物品特征输入组成,其中num_sampled表示分类器使用的采样点的数目。在模型构建和编译后,使用fit函数进行训练。

1
2
3
4
5
6
# 定义模型
model = YouTubeDNN(self.user_feature_columns, self.item_feature_columns, num_sampled=100,
user_dnn_hidden_units=(128, 64, self.embedding_dim))
model.compile(optimizer="adam", loss=sampledsoftmaxloss)
# 保存训练过程中的数据
model.fit(train_model_input, train_label, batch_size=512, epochs=20, verbose=1, validation_split=0.0,)

最后,利用训练好的模型提取用户和物品的Embedding Layer,以便后续计算召回率和命中率。具体地,使用Model函数将模型的输入和它的用户/物品Embedding层关联起来,然后调用predict函数计算得到预测结果。

1
2
user_embs = user_embedding_model.predict(test_model_input, batch_size=2 ** 12) 
item_embs = item_embedding_model.predict(all_item_model_input, batch_size=2 ** 12)

实际上,到这里整个数据处理和训练部分的代码就已经结束了,接下来,就是要做召回率和命中率的计算。在这个部分,我们利用Faiss库计算用户和物品Embedding Layer之间的近邻关系,并根据预测的物品列表计算召回率和命中率。具体来说就是根据用户ID索引到对应的Embedding向量,然后在物品Embedding向量集合中搜索近邻,得到预测的物品列表。最后,根据预测的物品列表和真实的物品ID,计算召回率和命中率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def eval(self, user_embs, item_embs, test_model_input, item_info, test_set):
   test_true_label = {line[0]: line[2] for line in test_set}
   index = faiss.IndexFlagIP(self.embedding_dim)
   index.add(item_embs)
   D, I = index.search(np.ascontiguousarray(user_embs), 50)
   s = []
   hit = 0

   # 统计预测结果
   for i, uid in tqdm(enumerate(test_model_input['user_id'])):
       try:
           pred = [item_info['item_id'].value[x] for x in I[i]]
           recall_score = recall_N(test_true_label[uid], pred, N=50)
           s.append(recall_score)
           if test_true_label[uid] in pred:
               hit += 1
       except:
           print(i)

   # 计算召回率和命中率
   recall = np.mean(s)
   hit_rate = hit / len(test_model_input['user_id'])

   return recall, hit_rate
 整个流程实际上到这里就结束了,那么最后,我们使用一个scheduler函数将它们串起来:
def scheduler(self):
   # 构建训练集、测试集
   train_model_input, train_label, test_model_input, test_label, \
   train_set, test_set, user_info, item_info = self.training_set_construct()
   #
   self.training_model(train_model_input, train_label)

   # 获取用户、item的layer
   # user_embs, item_embs = self.extract_embedding_layer(model, test_model_input, item_info)
   # # 评估模型
   # recall, hit_rate = self.eval(user_embs, item_embs, test_model_input, item_info, test_set)
   # print(recall, hit_rate)
 

这里有一点需要注意,Faiss库目前在Windows上无法使用,必须在Linux上才行。因此,在最后的Schedule阶段,我将这段代码进行了注释。

整个YouTubeDNN的召回层训练和预测到这里就结束了。

总结

到这里,今天的课程就讲完了,接下来我们来对今天的课程做一个简单的总结,学完本节课你应该知道下面三大要点。

  1. 在YouTubeDNN中,数据处理会经过加载数据集、处理数据特征、特征转化为模型输入、模型的搭建和训练、模型评估这5个部分。
  2. YouTubeDNN模型通过将用户历史行为序列嵌入到低维向量空间中,来学习用户和物品之间的关系。它的输入包括用户历史行为序列以及物品ID,输出包括用户和物品的嵌入向量以及它们之间的相似度得分。
  3. 熟悉使用Python来搭建一整套YouTubeDNN模型代码。

课后题

本节课学完了,我来给你留两道课后题。

  1. 实现本节课的代码。
  2. 根据我们前面的知识,自动生成数据集。

欢迎你在留言区与我交流讨论,如果这节课对你有帮助,也欢迎你推荐给朋友一起学习。