实战指南:构建多任务多标签深度学习模型

通过MovieLens数据集,学习构建和部署多任务多标签深度学习模型。

原文标题:深度学习实战:手把手教你构建多任务、多标签模型

原文作者:数据派THU

冷月清谈:

本文详细介绍了如何基于流行的MovieLens数据集构建一个多任务多标签深度学习模型。文章分为六个主要部分:环境设置、数据准备、模型定义、训练循环、推理和使用Ray Serve进行模型部署。首先,安装必要的库,并准备MovieLens数据集,包含用户、电影及评分信息,通过定义MovieLensDataset类进行数据预处理和分割。接着,创建一个多任务模型,使用稀疏嵌入技术来处理两个任务,预测用户对电影的评分和是否喜欢该电影。训练过程中采用均方误差和二元交叉熵损失函数,并使用TensorBoard监控训练过程。最后,通过Ray Serve将模型部署为API服务,使得模型可以轻松访问和使用。整个过程详细且实用,适合希望实践深度学习的读者。

怜星夜思:

1、多任务学习模型的优势和劣势是什么?
2、如何选择适合的超参数?
3、Ray Serve在模型部署中的实际应用效果如何?

原文内容

来源:DeepHub IMBA

本文约5100字,建议阅读10+分钟

本文将基于流行的 MovieLens 数据集,使用稀疏特征来创建一个多任务多标签模型,并逐步介绍整个过程。


多任务多标签模型是现代机器学习中的基础架构,这个任务在概念上很简单 -训练一个模型同时预测多个任务的多个输出。

在本文中,我们将基于流行的 MovieLens 数据集,使用稀疏特征来创建一个多任务多标签模型,并逐步介绍整个过程。所以本文将涵盖数据准备、模型构建、训练循环、模型诊断,最后使用 Ray Serve 部署模型的全部流程。

1. 设置环境

在深入代码之前,请确保安装了必要的库(以下不是详尽列表):

pip install pandas scikit-learn torch ray[serve] matplotlib requests tensorboard

我们在这里使用的数据集足够小,所以可以使用 CPU 进行训练。

2. 准备数据集

我们将从创建用于处理 MovieLens 数据集的下载、预处理的类开始,然后将数据分割为训练集和测试集。

MovieLens数据集包含有关用户、电影及其评分的信息,我们将用它来预测评分(回归任务)和用户是否喜欢这部电影(二元分类任务)。

import os  
 import pandas as pd  
 from sklearn.model_selection import train_test_split  
 from sklearn.preprocessing import LabelEncoder  
 import torch  
 from torch.utils.data import Dataset, DataLoader  
 import zipfile  
 import io  
 import requests  
   
 class MovieLensDataset(Dataset):  
   
     def __init__(self, dataset_version="small", data_dir="data"):  
         print("Initializing MovieLensDataset...")  
         if not os.path.exists(data_dir):  
             os.makedirs(data_dir)  
           
         if dataset_version == "small":  
             url = "https://files.grouplens.org/datasets/movielens/ml-latest-small.zip"  
             local_zip_path = os.path.join(data_dir, "ml-latest-small.zip")  
             file_path = 'ml-latest-small/ratings.csv'  
             parquet_path = os.path.join(data_dir, "ml-latest-small.parquet")  
         elif dataset_version == "full":  
             url = "https://files.grouplens.org/datasets/movielens/ml-latest.zip"  
             local_zip_path = os.path.join(data_dir, "ml-latest.zip")  
             file_path = 'ml-latest/ratings.csv'  
             parquet_path = os.path.join(data_dir, "ml-latest.parquet")  
         else:  
             raise ValueError("Invalid dataset_version. Choose 'small' or 'full'.")  
           
         if os.path.exists(parquet_path):  
             print(f"Loading dataset from {parquet_path}...")  
             movielens = pd.read_parquet(parquet_path)  
         else:  
             if not os.path.exists(local_zip_path):  
                 print(f"Downloading {dataset_version} dataset from {url}...")  
                 response = requests.get(url)  
                 with open(local_zip_path, "wb") as f:  
                     f.write(response.content)  
               
             with zipfile.ZipFile(local_zip_path, "r") as z:  
                 with z.open(file_path) as f:  
                     movielens = pd.read_csv(f, usecols=['userId', 'movieId', 'rating'], low_memory=True)  
             movielens.to_parquet(parquet_path, index=False)  
         movielens['liked'] = (movielens['rating'] >= 4).astype(int)  
         self.user_encoder = LabelEncoder()  
         self.movie_encoder = LabelEncoder()  
         movielens['user'] = self.user_encoder.fit_transform(movielens['userId'])  
         movielens['movie'] = self.movie_encoder.fit_transform(movielens['movieId'])  
         self.train_df, self.test_df = train_test_split(movielens, test_size=0.2, random_state=42)  
       
     def get_data(self, split="train"):  
         if split == "train":  
             data = self.train_df  
         elif split == "test":  
             data = self.test_df  
         else:  
             raise ValueError("Invalid split. Choose 'train' or 'test'.")  
           
         dense_features = torch.tensor(data[['user', 'movie']].values, dtype=torch.long)  
         labels = torch.tensor(data[['rating', 'liked']].values, dtype=torch.float32)  
           
         return dense_features, labels  
       
     def get_encoders(self):  
         return self.user_encoder, self.movie_encoder


定义了 MovieLensDataset,就可以将训练集和评估集加载到内存中。

 # Example usage with a single dataset object  
 print("Creating MovieLens dataset...")  
 # Feel free to use dataset_version="full" if you are using  
 # a GPU  
 dataset = MovieLensDataset(dataset_version="small")  
   
 print("Getting training data...")  
 train_dense_features, train_labels = dataset.get_data(split="train")  
 print("Getting testing data...")  
 test_dense_features, test_labels = dataset.get_data(split="test")  
 # Create DataLoader for training and testing  
 train_loader = DataLoader(torch.utils.data.TensorDataset(train_dense_features, train_labels), batch_size=64, shuffle=True)  
 test_loader = DataLoader(torch.utils.data.TensorDataset(test_dense_features, test_labels), batch_size=64, shuffle=False)  
 print("Accessing encoders...")  
 user_encoder, movie_encoder = dataset.get_encoders()  
 print("Setup complete.")

3. 定义多任务多标签模型

我们将定义一个基本的 PyTorch 模型,处理两个任务:预测评分(回归)和用户是否喜欢这部电影(二元分类)。

模型使用稀疏嵌入来表示用户和电影,并有共享层,这些共享层会输入到两个单独的输出层。

通过在任务之间共享一些层,并为每个特定任务的输出设置单独的层,该模型利用了共享表示,同时仍然针对每个任务定制其预测。

 from torch import nn  
   
 class MultiTaskMovieLensModel(nn.Module):  
     def __init__(self, n_users, n_movies, embedding_size, hidden_size):  
         super(MultiTaskMovieLensModel, self).__init__()  
         self.user_embedding = nn.Embedding(n_users, embedding_size)  
         self.movie_embedding = nn.Embedding(n_movies, embedding_size)  
         self.shared_layer = nn.Linear(embedding_size * 2, hidden_size)  
         self.shared_activation = nn.ReLU()  
         self.task1_fc = nn.Linear(hidden_size, 1)  
         self.task2_fc = nn.Linear(hidden_size, 1)  
         self.task2_activation = nn.Sigmoid()  
   
     def forward(self, x):  
         user = x[:, 0]  
         movie = x[:, 1]  
         user_embed = self.user_embedding(user)  
         movie_embed = self.movie_embedding(movie)  
         combined = torch.cat((user_embed, movie_embed), dim=1)  
         shared_out = self.shared_activation(self.shared_layer(combined))  
         rating_out = self.task1_fc(shared_out)  
         liked_out = self.task2_fc(shared_out)  
         liked_out = self.task2_activation(liked_out)  
         return rating_out, liked_out


输入 (x) :


  • 输入 x 预期是一个 2D 张量,其中每行包含一个用户 ID 和一个电影 ID。

用户和电影嵌入 :

  • user = x[:, 0]: 从第一列提取用户 ID。
  • movie = x[:, 1]: 从第二列提取电影 ID。
  • user_embed 和 movie_embed 是对应这些 ID 的嵌入。

连接 :

  • combined = torch.cat((user_embed, movie_embed), dim=1): 沿特征维度连接用户和电影嵌入。

共享层 :

  • shared_out = self.shared_activation(self.shared_layer(combined)): 将组合的嵌入通过共享的全连接层和激活函数。

任务特定输出:

  • rating_out = self.task1_fc(shared_out): 从第一个任务特定层输出预测评分。
  • liked_out = self.task2_fc(shared_out): 输出用户是否喜欢电影的原始分数。
  • liked_out = self.task2_activation(liked_out): 原始分数通过 sigmoid 函数转换为概率。

返回 :

模型返回两个输出:

  • rating_out: 预测的评分(回归输出)。
  • liked_out: 用户喜欢电影的概率(分类输出)。


4. 训练循环

首先,用一些任意选择的超参数(嵌入维度和隐藏层中的神经元数量)实例化我们的模型。对于回归任务将使用均方误差损失,对于分类任务,将使用二元交叉熵。

我们可以通过它们的初始值来归一化两个损失,以确保它们都大致处于相似的尺度(这里也可以使用不确定性加权来归一化损失)

然后将使用数据加载器训练模型,并跟踪两个任务的损失。损失将被绘制成图表,以可视化模型在评估集上随时间的学习和泛化情况。

  import torch.optim as optim  
 import matplotlib.pyplot as plt  
   
 # Check if GPU is available  
 device = torch.device("cuda" if torch.cuda.is_available() else "cpu")  
 print(f"Using device: {device}")  
 embedding_size = 16  
 hidden_size = 32  
 n_users = len(dataset.get_encoders()[0].classes_)  
 n_movies = len(dataset.get_encoders()[1].classes_)  
 model = MultiTaskMovieLensModel(n_users, n_movies, embedding_size, hidden_size).to(device)  
 criterion_rating = nn.MSELoss()  
 criterion_liked = nn.BCELoss()  
 optimizer = optim.Adam(model.parameters(), lr=0.001)  
 train_rating_losses, train_liked_losses = [], []  
 eval_rating_losses, eval_liked_losses = [], []  
 epochs = 10  
   
 # used for loss normalization  
 initial_loss_rating = None  
 initial_loss_liked = None  
   
 for epoch in range(epochs):  
     model.train()  
     running_loss_rating = 0.0  
     running_loss_liked = 0.0  
       
     for dense_features, labels in train_loader:  
         optimizer.zero_grad()  
         dense_features = dense_features.to(device)  
         labels = labels.to(device)  
           
         rating_pred, liked_pred = model(dense_features)  
         rating_target = labels[:, 0].unsqueeze(1)  
         liked_target = labels[:, 1].unsqueeze(1)  
           
         loss_rating = criterion_rating(rating_pred, rating_target)  
         loss_liked = criterion_liked(liked_pred, liked_target)  
   
         # Set initial losses  
         if initial_loss_rating is None:  
             initial_loss_rating = loss_rating.item()  
         if initial_loss_liked is None:  
             initial_loss_liked = loss_liked.item()  
           
         # Normalize losses  
         loss = (loss_rating / initial_loss_rating) + (loss_liked / initial_loss_liked)  
           
         loss.backward()  
         optimizer.step()  
           
         running_loss_rating += loss_rating.item()  
         running_loss_liked += loss_liked.item()  
       
     train_rating_losses.append(running_loss_rating / len(train_loader))  
     train_liked_losses.append(running_loss_liked / len(train_loader))  
       
     model.eval()  
     eval_loss_rating = 0.0  
     eval_loss_liked = 0.0  
       
     with torch.no_grad():  
         for dense_features, labels in test_loader:  
             dense_features = dense_features.to(device)  
             labels = labels.to(device)  
               
             rating_pred, liked_pred = model(dense_features)  
             rating_target = labels[:, 0].unsqueeze(1)  
             liked_target = labels[:, 1].unsqueeze(1)  
               
             loss_rating = criterion_rating(rating_pred, rating_target)  
             loss_liked = criterion_liked(liked_pred, liked_target)  
               
             eval_loss_rating += loss_rating.item()  
             eval_loss_liked += loss_liked.item()  
       
     eval_rating_losses.append(eval_loss_rating / len(test_loader))  
     eval_liked_losses.append(eval_loss_liked / len(test_loader))  
     print(f'Epoch {epoch+1}, Train Rating Loss: {train_rating_losses[-1]}, Train Liked Loss: {train_liked_losses[-1]}, Eval Rating Loss: {eval_rating_losses[-1]}, Eval Liked Loss: {eval_liked_losses[-1]}')  
 # Plotting losses  
 plt.figure(figsize=(14, 6))  
 plt.subplot(1, 2, 1)  
 plt.plot(train_rating_losses, label='Train Rating Loss')  
 plt.plot(eval_rating_losses, label='Eval Rating Loss')  
 plt.xlabel('Epoch')  
 plt.ylabel('Loss')  
 plt.title('Rating Loss')  
 plt.legend()  
 plt.subplot(1, 2, 2)  
 plt.plot(train_liked_losses, label='Train Liked Loss')  
 plt.plot(eval_liked_losses, label='Eval Liked Loss')  
 plt.xlabel('Epoch')  
 plt.ylabel('Loss')  
 plt.title('Liked Loss')  
 plt.legend()  
 plt.tight_layout()  
 plt.show()

还可以通过利用 Tensorboard 监控训练的过程:

from torch.utils.tensorboard import SummaryWriter  
 # Check if GPU is available  
 device = torch.device("cuda" if torch.cuda.is_available() else "cpu")  
 print(f"Using device: {device}")  
 # Model and Training Setup  
 embedding_size = 16  
 hidden_size = 32  
 n_users = len(user_encoder.classes_)  
 n_movies = len(movie_encoder.classes_)  
 model = MultiTaskMovieLensModel(n_users, n_movies, embedding_size, hidden_size).to(device)  
 criterion_rating = nn.MSELoss()  
 criterion_liked = nn.BCELoss()  
 optimizer = optim.Adam(model.parameters(), lr=0.001)  
 epochs = 10  
   
 # used for loss normalization  
 initial_loss_rating = None  
 initial_loss_liked = None  
   
 # TensorBoard setup  
 writer = SummaryWriter(log_dir='runs/multitask_movie_lens')  
   
 # Training Loop with TensorBoard Logging  
 for epoch in range(epochs):  
     model.train()  
     running_loss_rating = 0.0  
     running_loss_liked = 0.0  
     for batch_idx, (dense_features, labels) in enumerate(train_loader):  
         # Move data to GPU  
         dense_features = dense_features.to(device)  
         labels = labels.to(device)  
           
         optimizer.zero_grad()  
           
         rating_pred, liked_pred = model(dense_features)  
         rating_target = labels[:, 0].unsqueeze(1)  
         liked_target = labels[:, 1].unsqueeze(1)  
           
         loss_rating = criterion_rating(rating_pred, rating_target)  
         loss_liked = criterion_liked(liked_pred, liked_target)  
   
         # Set initial losses  
         if initial_loss_rating is None:  
             initial_loss_rating = loss_rating.item()  
         if initial_loss_liked is None:  
             initial_loss_liked = loss_liked.item()  
           
         # Normalize losses  
         loss = (loss_rating / initial_loss_rating) + (loss_liked / initial_loss_liked)  
           
         loss.backward()  
         optimizer.step()  
           
         running_loss_rating += loss_rating.item()  
         running_loss_liked += loss_liked.item()  
           
         # Log loss to TensorBoard  
         writer.add_scalar('Loss/Train_Rating', loss_rating.item(), epoch * len(train_loader) + batch_idx)  
         writer.add_scalar('Loss/Train_Liked', loss_liked.item(), epoch * len(train_loader) + batch_idx)  
       
     print(f'Epoch {epoch+1}/{epochs}, Train Rating Loss: {running_loss_rating / len(train_loader)}, Train Liked Loss: {running_loss_liked / len(train_loader)}')  
       
     # Evaluate on the test set  
     model.eval()  
     eval_loss_rating = 0.0  
     eval_loss_liked = 0.0  
     with torch.no_grad():  
         for dense_features, labels in test_loader:  
             # Move data to GPU  
             dense_features = dense_features.to(device)  
             labels = labels.to(device)  
               
             rating_pred, liked_pred = model(dense_features)  
             rating_target = labels[:, 0].unsqueeze(1)  
             liked_target = labels[:, 1].unsqueeze(1)  
               
             loss_rating = criterion_rating(rating_pred, rating_target)  
             loss_liked = criterion_liked(liked_pred, liked_target)  
             eval_loss_rating += loss_rating.item()  
             eval_loss_liked += loss_liked.item()  
       
     eval_loss_avg_rating = eval_loss_rating / len(test_loader)  
     eval_loss_avg_liked = eval_loss_liked / len(test_loader)  
     print(f'Epoch {epoch+1}/{epochs}, Eval Rating Loss: {eval_loss_avg_rating}, Eval Liked Loss: {eval_loss_avg_liked}')  
       
     # Log evaluation loss to TensorBoard  
     writer.add_scalar('Loss/Eval_Rating', eval_loss_avg_rating, epoch)  
     writer.add_scalar('Loss/Eval_Liked', eval_loss_avg_liked, epoch)  
 # Close the TensorBoard writer  
 writer.close()

我们在同一目录下运行 TensorBoard 来启动服务器,并在网络浏览器中检查训练和评估曲线。在以下 bash 命令中,将 runs/mutlitask_movie_lens 替换为包含事件文件(日志)的目录路径。

(base) $ tensorboard --logdir=runs/multitask_movie_lens
TensorFlow installation not found - running with reduced feature set.

运行结果如下:

NOTE: Using experimental fast data loading logic. To disable, pass
"--load_fast=false" and report issues on GitHub. More details:
<https://github.com/tensorflow/tensorboard/issues/4784> Serving TensorBoard on localhost; to expose to the network, use a proxy or pass --bind_all
TensorBoard 2.12.0 at <http://localhost:6006/> (Press CTRL+C to quit)

Tensorboard 损失曲线视图如上所示。

5. 推理

在训练完成后要使用 torch.save 函数将模型保存到磁盘。这个函数允许你保存模型的状态字典,其中包含模型的所有参数和缓冲区。保存的文件通常使用 .pth 或 .pt 扩展名。

import torch  torch.save(model.state_dict(), "model.pth")

状态字典包含所有模型参数(权重和偏置),当想要将模型加载回代码中时,可以使用以下步骤:

# Initialize the model (make sure the architecture matches the saved model)
model = MultiTaskMovieLensModel(n_users, n_movies, embedding_size, hidden_size)

Load the saved state dictionary into the model

model.load_state_dict(torch.load(“model.pth”))

Set the model to evaluation mode (important for inference)

model.eval()

为了在一些未见过的数据上评估模型,可以对单个用户-电影对进行预测,并将它们与实际值进行比较。

def predict_and_compare(user_id, movie_id, model, user_encoder, movie_encoder, train_dataset, test_dataset):
user_idx = user_encoder.transform([user_id])[0]
movie_idx = movie_encoder.transform([movie_id])[0]
example_user = torch.tensor([[user_idx]], dtype=torch.long)
example_movie = torch.tensor([[movie_idx]], dtype=torch.long)
example_dense_features = torch.cat((example_user, example_movie), dim=1)
model.eval()
with torch.no_grad():
rating_pred, liked_pred = model(example_dense_features)
predicted_rating = rating_pred.item()
predicted_liked = liked_pred.item()
actual_row = train_dataset.data[(train_dataset.data['userId'] == user_id) & (train_dataset.data['movieId'] == movie_id)]
if actual_row.empty:
actual_row = test_dataset.data[(test_dataset.data['userId'] == user_id) & (test_dataset.data['movieId'] == movie_id)]
if not actual_row.empty:
actual_rating = actual_row['rating'].values[0]
actual_liked = actual_row['liked'].values[0]
return {
'User ID': user_id,
'Movie ID': movie_id,
'Predicted Rating': round(predicted_rating, 2),
'Actual Rating': actual_rating,
'Predicted Liked': 'Yes' if predicted_liked >= 0.5 else 'No',
'Actual Liked': 'Yes' if actual_liked == 1 else 'No'
}
else:
return None
example_pairs = test_dataset.data.sample(n=5)
results = []
for _, row in example_pairs.iterrows():
user_id = row['userId']
movie_id = row['movieId']
result = predict_and_compare(user_id, movie_id, model, user_encoder, movie_encoder, train_dataset, test_dataset)
if result:
results.append(result)
results_df = pd.DataFrame(results)
results_df.head()


6. 使用 Ray Serve 部署模型

最后就是将模型部署为一个服务,使其可以通过 API 访问,这里使用使用 Ray Serve。

使用 Ray Serve是因为它可以从单机无缝扩展到大型集群,可以处理不断增加的负载。Ray Serve 还集成了 Ray 的仪表板,为监控部署的健康状况、性能和资源使用提供了用户友好的界面。

步骤 1:加载训练好的模型

# Load your trained model (assuming it's saved as 'model.pth')
n_users = 1000  # 示例值,替换为实际用户数
n_movies = 1000  # 示例值,替换为实际电影数
embedding_size = 16
hidden_size = 32
model = MultiTaskMovieLensModel(n_users, n_movies, embedding_size, hidden_size)
model.load_state_dict(torch.load("model.pth"))
model.eval()

步骤 2:定义模型服务类

import ray
from ray import serve
@serve.deployment
class ModelServeDeployment:
def __init__(self, model):
self.model = model
self.model.eval()
async def __call__(self, request):
json_input = await request.json()
user_id = torch.tensor([json_input["user_id"]])
movie_id = torch.tensor([json_input["movie_id"]])
with torch.no_grad():
rating_pred, liked_pred = self.model(user_id, movie_id)
return {
"rating_prediction": rating_pred.item(),
"liked_prediction": liked_pred.item()
}

步骤 3:初始化 Ray 服务器

# 初始化 Ray 和 Ray Serve
ray.init()
serve.start()
# 部署模型
model_deployment = ModelServeDeployment.bind(model)
serve.run(model_deployment)

现在应该能够在 localhost:8265 看到 ray 服务器。

步骤 4:查询模型

最后就是测试 API 了。运行以下代码行时,应该可以看到一个响应,其中包含查询用户和电影的评分和喜欢预测。

import requests

定义服务器地址(Ray Serve 默认为 http://127.0.0.1:8000

url = “http://127.0.0.1:8000/ModelServeDeployment

示例输入

data = {
“user_id”: 123, # 替换为实际用户 ID
“movie_id”: 456 # 替换为实际电影 ID
}

向模型服务器发送 POST 请求

response = requests.post(url, json=data)

打印模型的响应

print(response.json())

图片

就是这样,我们刚刚训练并部署了一个多任务多标签模型!

作者:Cole Diamond

编辑:黄继彦



关于我们

数据派THU作为数据科学类公众号,背靠清华大学大数据研究中心,分享前沿数据科学与大数据技术创新研究动态、持续传播数据科学知识,努力建设数据人才聚集平台、打造中国大数据最强集团军。




新浪微博:@数据派THU

微信视频号:数据派THU

今日头条:数据派THU

多任务学习的一个主要优势在于它能够通过共享表示来提高模型的泛化能力,减少过拟合的风险。然而,劣势在于可能导致任务间的干扰,影响模型的专注度。

我觉得多任务学习的好处主要是可以节省训练成本,就像我们平时做事情时,能同时做几件事总会省时省力,但有的时候,尤其是任务之间相互影响时,可能会适得其反。

多任务学习的优势在于提升模型的学习速度和效果,尤其是在数据量较小的情况下。劣势则是模型设计可能复杂,调试和优化的难度较大。

选择超参数的过程往往是实验性的,建议使用网格搜索或随机搜索以探索不同组合的效果。此外,也可以考虑使用贝叶斯优化等高级方法来自动化超参数调整。

我一般会先参考一些文献中的参数设置,再结合自己的任务不断进行实验调整,最后选出效果最好的组合。

可以尝试使用交叉验证来选择超参数,确保模型的稳定性和泛化能力。关注训练和验证的损失情况,适时进行调整。

Ray Serve在分布式服务方面表现较好,能够在不同节点之间高效处理请求,适合处理大型模型和高并发的情况,实际应用中能显著提高响应速度。

我觉得Ray Serve很方便,能让模型的部署变得简单,而且可以在需要时很容易扩展服务。之前用过,体验不错!

Ray Serve的使用门槛相对较低,易于上手,尤其是在需要快速原型验证的时候,非常适合小团队或个人开发者使用。