{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "

研究生《深度学习》课程
实验报告

\n", "
\n", "
课程名称:深度学习 M502019B
\n", "
实验题目:Pytorch基本操作实验
\n", "
学号:25120323
\n", "
姓名:柯劲帆
\n", "
授课老师:原继东
\n", "
报告日期:2025年8月4日
\n", "
" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Pytorch version: 2.7.1+cu118\n", "CUDA version: 11.8\n", "CUDA device count: 1\n", "CUDA device name: NVIDIA TITAN Xp\n", "CUDA device capability: (6, 1)\n", "CUDA device memory: 11.90 GB\n", "CPU count: 4\n" ] } ], "source": [ "import numpy as np\n", "import torch\n", "from torch.autograd import Variable\n", "from torch.utils.data import Dataset, DataLoader, Subset, random_split\n", "from torch import nn\n", "from torchvision import datasets, transforms\n", "from multiprocessing import cpu_count\n", "from sklearn.model_selection import KFold\n", "import matplotlib.pyplot as plt\n", "from tqdm.notebook import tqdm\n", "import pandas as pd\n", "from typing import Literal, Union, Optional\n", "\n", "print('Pytorch version:',torch.__version__)\n", "if not torch.cuda.is_available():\n", " print('CUDA is_available:', torch.cuda.is_available())\n", "else:\n", " print('CUDA version:', torch.version.cuda)\n", " print('CUDA device count:', torch.cuda.device_count())\n", " print('CUDA device name:', torch.cuda.get_device_name())\n", " print('CUDA device capability:', torch.cuda.get_device_capability())\n", " print('CUDA device memory:', f'{torch.cuda.get_device_properties(0).total_memory/1024/1024/1024:.2f}', 'GB')\n", "print('CPU count:', cpu_count())\n", "\n", "device = torch.device(\"cuda:0\" if torch.cuda.is_available() else \"cpu\")\n", "seed = 42\n", "np.random.seed(seed)\n", "torch.manual_seed(seed)\n", "torch.cuda.manual_seed(seed)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "# 任务一\n", "**手动实现前馈神经网络解决上述回归、二分类、多分类任务。**\n", "- 从训练时间、预测精度、Loss变化等角度分析实验结果(最好使用图表展示)" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "torch.cuda.empty_cache()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "首先生成数据集。\n", "\n", "一共有3个数据集:\n", "\n", "1. 回归任务数据集。\n", " - 生成单个数据集。\n", " - 数据集的大小为$10000$且训练集大小为$7000$,测试集大小为$3000$。\n", " - 数据集的样本特征维度$p$为$500$,且服从如下的高维线性函数:$y = 0.028 + \\sum_{i=1}^{p}0.0056 x_i + \\epsilon $。\n", "2. 二分类任务数据集。\n", " - 共生成两个数据集。\n", " - 两个数据集的大小均为$10000$且训练集大小为$7000$,测试集大小为$3000$。\n", " - 两个数据集的样本特征$x$的维度均为$200$,且分别服从均值互为相反数且方差相同的正态分布。\n", " - 两个数据集的样本标签分别为$0$和$1$。\n", "3. MNIST手写体数据集。\n", " - 该数据集包含$60,000$个用于训练的图像样本和$10,000$个用于测试的图像样本。\n", " - 图像是固定大小($28\\times 28$像素),其标签为$0$到$10$。 " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "实现回归任务数据集。" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "训练数据集大小:7000,测试数据集大小:3000\n", "训练数据集的第1对数据:\n", "输入x[0]第1个特征维度数据x[0][0]:-0.25091975927352905\n", "标签y[0]:tensor([-1.0123])\n" ] } ], "source": [ "class My_Regression_Dataset(Dataset):\n", " def __init__(self, train=True, num_features=500):\n", " data_size = (7000 if train else 3000)\n", " x = np.random.uniform(low=-1, high=1, size=(data_size, num_features))\n", " noise = np.random.normal(loc=0, scale=1, size=(data_size, 1))\n", " y = 0.028 - 0.0056 * x.sum(axis=1, keepdims=True) + noise\n", " self.inputs = torch.tensor(x, dtype=torch.float32)\n", " self.labels = torch.tensor(y, dtype=torch.float32)\n", "\n", " def __len__(self):\n", " return self.inputs.shape[0]\n", "\n", " def __getitem__(self, index):\n", " return self.inputs[index], self.labels[index]\n", "\n", " \n", "# 测试,并后面的训练创建变量\n", "train_regression_dataset = My_Regression_Dataset(train=True)\n", "test_regression_dataset = My_Regression_Dataset(train=False)\n", "print(\n", " f\"训练数据集大小:{len(train_regression_dataset)},\"\n", " f\"测试数据集大小:{len(test_regression_dataset)}\"\n", ")\n", "x0, y0 = train_regression_dataset[0]\n", "print(f\"训练数据集的第1对数据:\")\n", "print(f\"输入x[0]第1个特征维度数据x[0][0]:{x0[0]}\")\n", "print(f\"标签y[0]:{y0}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "实现二分类任务数据集。" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "训练数据集大小:14000,测试数据集大小:6000\n", "训练数据集的第1对数据:\n", "x[0]第1个特征维度数据x[0][0] = -0.4228987991809845\n", "y[0] = tensor([0.])\n", "训练数据集的第7001对数据:\n", "x[7000]第1个特征维度数据x[7000][0] = 0.4295016825199127\n", "y[7000] = tensor([1.])\n" ] } ], "source": [ "class My_BinaryCLS_Dataset(Dataset):\n", " def __init__(self, train=True, num_features=200):\n", " num_samples = (7000 if train else 3000)\n", " x_1 = np.random.normal(loc=-0.5, scale=0.2, size=(num_samples, num_features))\n", " x_2 = np.random.normal(loc=0.5, scale=0.2, size=(num_samples, num_features))\n", " labels_1, labels_2 = np.zeros((num_samples, 1)), np.ones((num_samples, 1))\n", " self.inputs = torch.tensor(np.concatenate((x_1, x_2), axis=0), dtype=torch.float32)\n", " self.labels = torch.tensor(np.concatenate((labels_1, labels_2), axis=0), dtype=torch.float32)\n", "\n", " def __len__(self):\n", " return self.inputs.shape[0]\n", "\n", " def __getitem__(self, index):\n", " return self.inputs[index], self.labels[index]\n", "\n", "\n", "# 测试,并后面的训练创建变量\n", "train_binarycls_dataset = My_BinaryCLS_Dataset(train=True)\n", "test_binarycls_dataset = My_BinaryCLS_Dataset(train=False)\n", "\n", "print(\n", " f\"训练数据集大小:{len(train_binarycls_dataset)},\"\n", " f\"测试数据集大小:{len(test_binarycls_dataset)}\"\n", ")\n", "x0, y0 = train_binarycls_dataset[0]\n", "print(f\"训练数据集的第1对数据:\")\n", "print(f\"x[0]第1个特征维度数据x[0][0] = {x0[0]}\")\n", "print(f\"y[0] = {y0}\")\n", "\n", "x7000, y7000 = train_binarycls_dataset[7000]\n", "print(f\"训练数据集的第7001对数据:\")\n", "print(f\"x[7000]第1个特征维度数据x[7000][0] = {x7000[0]}\")\n", "print(f\"y[7000] = {y7000}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "使用MNIST数据集。" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "调用`torchvision.datasets.MNIST()`,获取数据集。" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "训练数据集大小:60000,测试数据集大小:10000\n", "A Train Sample:\n", "\n" ] }, { "data": { "image/png": "iVBORw0KGgoAAAANSUhEUgAAAIcAAACdCAYAAACeqmv3AAAAOnRFWHRTb2Z0d2FyZQBNYXRwbG90bGliIHZlcnNpb24zLjEwLjMsIGh0dHBzOi8vbWF0cGxvdGxpYi5vcmcvZiW1igAAAAlwSFlzAAAPYQAAD2EBqD+naQAACYZJREFUeJzt3WtIFG8bBvB73zTXzIzKA4htmGZZykoHQxS1AxYaaAlhSEUlEQoSZCcolUgLsyIUC6KDJBWIlRlURFpEsSYdoIOdSEgJ28y0Mg31+X94cfGZ8XbXVt3dvH7gh2ua2XmUq/HZcWdGI4QQBDCA/9l6AGC/UA5goRzAQjmAhXIAC+UAFsoBLJQDWCgHsP7pcjQ0NJBGo6EjR44M22vW1NSQRqOhmpqaYXtNe2V35Th37hxpNBqqq6uz9VBGRE5ODmk0GtWXVqu19dBUnGw9gLGqpKSEJk6caMrjxo2z4WgGhnLYSHJyMk2bNs3WwxiU3f1ascSfP39o//79NH/+fPLw8CA3NzeKioqi6upqdptjx46RTqcjV1dXio6OphcvXqjWqa+vp+TkZJoyZQpptVpasGABVVZWmh1PR0cH1dfX09evXy3+HoQQ1N7eTvb8R3GHLEd7ezudPn2aYmJi6PDhw5STk0NGo5Hi4uLo2bNnqvVLS0vpxIkTlJ6eTnv27KEXL17QkiVLqLm52bTOy5cvafHixfT69WvavXs3FRYWkpubGyUmJtKVK1cGHU9tbS3NmTOHioqKLP4e/P39ycPDg9zd3Sk1NVUai90Qdubs2bOCiMTjx4/Zdbq7u0VXV5e0rLW1VXh7e4tNmzaZln38+FEQkXB1dRWNjY2m5QaDQRCR2L59u2nZ0qVLRUhIiOjs7DQt6+3tFRERESIwMNC0rLq6WhCRqK6uVi3Lzs42+/0dP35cZGRkiLKyMlFeXi4yMzOFk5OTCAwMFG1tbWa3H00OWY7+enp6REtLizAajSI+Pl7o9XrTv/WVIyUlRbVdeHi4CAoKEkII0dLSIjQajThw4IAwGo3SV25uriAiU7kGKoe1ysrKBBGJ/Pz8YXvN4eCQv1aIiM6fP0+hoaGk1Wpp6tSp5OnpSTdu3KC2tjbVuoGBgapls2bNooaGBiIiev/+PQkhaN++feTp6Sl9ZWdnExHRly9fRux7WbduHfn4+NCdO3dGbB9/wyHfrVy4cIE2btxIiYmJlJWVRV5eXjRu3DjKz8+nDx8+DPn1ent7iYhox44dFBcXN+A6AQEBVo3ZHD8/P/r27duI7mOoHLIc5eXl5O/vTxUVFaTRaEzL+/6XK71790617O3btzRjxgwi+v/kkIjI2dmZli1bNvwDNkMIQQ0NDRQWFjbq+x6MQ/5a6TthJPq9DTQYDPTo0aMB17969So1NTWZcm1tLRkMBlq5ciUREXl5eVFMTAydOnWKPn/+rNreaDQOOp6hvJUd6LVKSkrIaDTSihUrzG4/muz2yHHmzBm6efOmanlmZiYlJCRQRUUFJSUlUXx8PH38+JFOnjxJwcHB9PPnT9U2AQEBFBkZSdu2baOuri46fvw4TZ06lXbu3Glap7i4mCIjIykkJITS0tLI39+fmpub6dGjR9TY2EjPnz9nx1pbW0uxsbGUnZ1NOTk5g35fOp2O1q5dSyEhIaTVaunBgwd06dIl0uv1tHXrVst/QKPBxhNilb53K9zXp0+fRG9vr8jLyxM6nU64uLiIsLAwUVVVJTZs2CB0Op3ptfrerRQUFIjCwkLh5+cnXFxcRFRUlHj+/Llq3x8+fBDr168XPj4+wtnZWfj6+oqEhARRXl5uWsfat7JbtmwRwcHBwt3dXTg7O4uAgACxa9cu0d7ebs2PbURohLDjU3RgUw4554DRgXIAC+UAFsoBLJQDWCgHsFAOYFl8hrT/3zDA8VlyegtHDmChHMBCOYCFcgAL5QAWygEslANYKAewUA5goRzAQjmAhXIAC+UAFsoBLJQDWCgHsFAOYKEcwEI5gGW3V9mPFOX9Pj08PIa0fUZGhpQnTJigWicoKEjK6enpUlbeUTklJUXKnZ2dUj506JCUc3NzLRuslXDkABbKASyUA1gONeeYPn26lMePHy/liIgI1TaRkZFSnjx5spTXrFkzPIPrp7GxUconTpyQclJSkpR//PghZeVdhO7duzeMo7McjhzAQjmAhXIAy+J7gtniWlm9Xi/lu3fvSnmo5yhGQt8NbvvbtGmTlAe6w2F/yttbtra2SvnNmzd/OToerpUFq6AcwEI5gGXXc44pU6ZI2WAwSLnvnuXDSbmP79+/Szk2NlbKf/78Ub2GPcyFzMGcA6yCcgAL5QCWXf9tRflwmqysLCknJCRI+enTp6rXUP5dQ0n5wMDly5dL+devX1KeO3eulDMzMwd9fUeGIwewUA5goRzAsuvzHOZMmjRJysrPRRARnTp1SsqbN2+WcmpqqpQvXrw4TKOzbzjPAVZBOYCFcgAL5QCWXZ8EM6e9vd3sOgM9vry/tLQ0KV++fFnKA32YZ6zAkQNYKAewUA5gOfRJMEu4ublJ+fr161KOjo6Wct/z7fvcvn17ZAZmYzgJBlZBOYCFcgDrn59zKM2cOVPKT548kbLyA8XV1dVSrqurk3JxcbFqHxb+SG0Kcw6wCsoBLJQDWGNuzqGkvJHK2bNnpezu7j7o9nv37lUtKy0tlbLyQml7gDkHWAXlABbKAawxP+dQmjdvnpSPHj0q5aVLl5p9DeWHmg8ePCjlpqamvxzd8MGcA6yCcgAL5QAW5hxmKG9qu2rVKikrz4sQqX9WyhvdKS/WtgXMOcAqKAewUA5gYc5hpa6uLtUyJyf5cqDu7m4px8XFSbmmpmbYx2UO5hxgFZQDWCgHsFAOYDn0hdQjITQ0VMrJyclSXrhwoZSVk8+BvHr1Ssr379//y9GNLhw5gIVyAAvlANaYm3MonxatfML06tWrpezj4zPkffT09EhZ+QFjR7khDI4cwEI5gIVyAOufmnMMND9ISUmRsnKOMWPGDKv2qbywmkj9geLKykqr9mErOHIAC+UAFsoBLIeac3h7e0s5ODhYykVFRaptZs+ebdU+lU+LLCgokPK1a9dU2zjKeQxzcOQAFsoBLJQDWHY151A+gVp5QbJer5fycDyR+uHDh1IuLCyU8q1bt6T8+/dvq/fpKHDkABbKASyUA1ijOucIDw+XsvIJ04sWLZKyr6+v1fvs6OiQsvIJ1Xl5eVJWPoF6LMORA1goB7BQDmCN6pxDeUNYZTZHef1HVVWVlJUXLBOpz1sob3wPPBw5gIVyAAvlABbKASzc2WeMwp19wCooB7BQDmChHMBCOYCFcgAL5QAWygEslANYKAewUA5gWfxhHwv/BAP/EBw5gIVyAAvlABbKASyUA1goB7BQDmChHMBCOYD1H8F52RZgXt0eAAAAAElFTkSuQmCC", "text/plain": [ "
" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "{'Image Type': , 'Image Shape': torch.Size([1, 28, 28]), 'Label Type': , 'Label Value': 5}\n" ] } ], "source": [ "transform = transforms.Compose(\n", " [\n", " transforms.ToTensor(),\n", " transforms.Normalize((0.5,), (0.5,)),\n", " ]\n", ")\n", "\n", "train_mnist_dataset = datasets.MNIST(root=\"dataset\", train=True, transform=transform, download=True)\n", "test_mnist_dataset = datasets.MNIST(root=\"dataset\", train=False, transform=transform, download=True)\n", "print(\n", " f\"训练数据集大小:{len(train_mnist_dataset)},\"\n", " f\"测试数据集大小:{len(test_mnist_dataset)}\"\n", ")\n", "\n", "image, label = train_mnist_dataset[0]\n", "sample = {\n", " 'Image Type': type(image),\n", " 'Image Shape': image.shape,\n", " 'Label Type': type(label),\n", " 'Label Value': label\n", "}\n", "print('A Train Sample:\\n')\n", "plt.figure(figsize=(1.5, 1.5))\n", "plt.imshow(image.squeeze(0), cmap='gray')\n", "plt.title(f\"Label: {label}\")\n", "plt.axis('off')\n", "plt.show()\n", "print(sample)\n", "\n", "image_width, image_height = 28, 28\n", "num_classes = 10" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "接下来手动实现前馈神经网络并训练。" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "首先手动实现一些工具和基本模型层。这些工具都在前一个实验中实现并测试过,在此就不再分析其原理和具体实现步骤,也不在此重新测试。" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "# 手动实现torch.nn.functional.softmax\n", "def my_softmax(x: torch.Tensor, dim: int):\n", " max_x = torch.max(x, dim=dim, keepdim=True).values\n", " exp_x = torch.exp(x - max_x)\n", " return exp_x / torch.sum(exp_x, dim=dim, keepdim=True)\n", "\n", "\n", "# 手动实现torch.nn.Linear\n", "class My_Linear:\n", " def __init__(self, in_features: int, out_features: int):\n", " self.weight = torch.randn((out_features, in_features), requires_grad=True, dtype=torch.float32)\n", " self.bias = torch.zeros(1, requires_grad=True, dtype=torch.float32)\n", " self.params = [self.weight, self.bias]\n", "\n", " def __call__(self, x: torch.Tensor):\n", " return self.forward(x)\n", "\n", " def forward(self, x: torch.Tensor):\n", " x = torch.matmul(x, self.weight.T) + self.bias\n", " return x\n", "\n", " def to(self, device: str):\n", " for param in self.params:\n", " param.data = param.data.to(device=device)\n", " return self\n", "\n", " def parameters(self):\n", " return self.params\n", "\n", " \n", "# 手动实现torch.nn.Flatten\n", "class My_Flatten:\n", " def __call__(self, x: torch.Tensor):\n", " x = x.view(x.shape[0], -1)\n", " return x\n", "\n", " \n", "# 手动实现torch.nn.ReLU\n", "class My_ReLU():\n", " def __call__(self, x: torch.Tensor):\n", " return torch.where(x > 0, x, torch.zeros_like(x))\n", "\n", "\n", "# 手动实现torch.nn.LeakyReLU\n", "class My_LeakyReLU():\n", " def __init__(self, negative_slope=0.01):\n", " self.negative_slope = negative_slope\n", " \n", " def __call__(self, x: torch.Tensor):\n", " return torch.where(x > 0, x, x * self.negative_slope)\n", "\n", "\n", "# 手动实现torch.nn.Sigmoid\n", "class My_Sigmoid():\n", " def __call__(self, x: torch.Tensor):\n", " z = torch.exp(-x.abs())\n", " return torch.where(x >= 0, 1 / (1 + z), z / (1 + z))\n", "\n", "\n", "# 手动实现torch.nn.Softmax\n", "class My_Softmax:\n", " def __init__(self, dim: int):\n", " self.dim = dim\n", " def __call__(self, x: torch.Tensor):\n", " max_x = torch.max(x, dim=self.dim, keepdim=True).values\n", " exp_x = torch.exp(x - max_x)\n", " return exp_x / torch.sum(exp_x, dim=self.dim, keepdim=True)\n", "\n", "\n", "# 手动实现torch.nn.MSELoss\n", "class My_MSELoss: \n", " def __call__(self, prediction: torch.Tensor, target: torch.Tensor):\n", " loss = torch.mean(torch.square(prediction - target))\n", " return loss\n", "\n", "\n", "# 手动实现torch.nn.BCELoss\n", "class My_BCELoss:\n", " def __call__(self, prediction: torch.Tensor, target: torch.Tensor):\n", " eps = 1e-9\n", " loss = -torch.mean(target * torch.log(prediction + eps) + (1 - target) * torch.log(1 - prediction + eps))\n", " return loss\n", "\n", "\n", "# 手动实现torch.nn.CrossEntropyLoss\n", "class My_CrossEntropyLoss:\n", " def __call__(\n", " self, \n", " x: torch.Tensor, \n", " targets: torch.Tensor, \n", " reduction: Literal[\"mean\", \"sum\"] = \"mean\"\n", " ):\n", " max_x = torch.max(x, dim=1, keepdim=True).values\n", " exp_x = torch.exp(x - max_x)\n", " log_probs = x - max_x - torch.log(torch.sum(exp_x, dim=1, keepdim=True))\n", " \n", " if len(x.shape) == len(targets.shape) + 1:\n", " nll_loss = -log_probs.gather(1, targets.unsqueeze(-1)).squeeze()\n", " else:\n", " nll_loss = -torch.sum(targets * log_probs, dim=1)\n", " \n", " if reduction == \"mean\": \n", " return torch.mean(nll_loss)\n", " else: \n", " return torch.sum(nll_loss)\n", "\n", "\n", "# 手动实现torch.optim.SGD\n", "class My_Optimizer:\n", " def __init__(self, params: list[torch.Tensor], lr: float):\n", " self.params = list(params)\n", " self.lr = lr\n", "\n", " def step(self):\n", " for param in self.params:\n", " if param.grad is not None:\n", " param.data = param.data - self.lr * param.grad.data\n", "\n", " def zero_grad(self):\n", " for param in self.params:\n", " if param.grad is not None:\n", " param.grad.data.zero_()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "接下来实现Regression回归的Trainer,包括训练流程、测试和画图。\n", "\n", "训练Regression回归模型,进行如下步骤:\n", "1. 定义模型、数据集、损失函数、优化器和其他超参数\n", "2. 训练\n", " 1. 从训练dataloader中获取批量数据\n", " 2. 传入模型\n", " 3. 使用损失函数计算与ground_truth的损失\n", " 4. 使用优化器进行反向传播\n", " 5. 循环以上步骤\n", "3. 验证及测试\n", " 1. 从验证或测试dataloader中获取批量数据\n", " 2. 传入模型,验证时需要将模型输出与ground_truth进行比较得计算loss\n", " 3. 将预测值与ground_truth进行比较,得出正确率\n", " 4. 对整个训练集统计正确率,从而分析训练效果" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "class RegressionTrainer():\n", " def __init__(\n", " self,\n", " model,\n", " train_dataset: Union[Dataset, DataLoader],\n", " eval_dataset: Union[Dataset, DataLoader],\n", " optimizer: Literal['torch', 'manual'],\n", " criterion: Literal['torch', 'manual'],\n", " learning_rate: float,\n", " num_epochs: int,\n", " batch_size: int,\n", " test_dataset: Union[Dataset, DataLoader] = None,\n", " plot: bool = True, \n", " print_test_result = True,\n", " return_curves: bool = False,\n", " log_epoch: int = 1\n", " ):\n", " self.model = model\n", " self.learning_rate = learning_rate\n", " self.num_epochs = num_epochs\n", " self.batch_size = batch_size\n", " self.plot = plot\n", " self.print_test_result = print_test_result\n", " self.return_curves = return_curves\n", " self.log_epoch = log_epoch\n", "\n", " if isinstance(train_dataset, Dataset):\n", " self.train_dataloader = DataLoader(\n", " dataset=train_dataset, batch_size=batch_size, shuffle=True, num_workers=cpu_count()\n", " )\n", " else:\n", " self.train_dataloader = train_dataset\n", " if isinstance(eval_dataset, Dataset):\n", " self.eval_dataloader = DataLoader(\n", " dataset=eval_dataset, batch_size=batch_size, shuffle=True, num_workers=cpu_count()\n", " )\n", " else:\n", " self.eval_dataloader = eval_dataset\n", " if isinstance(test_dataset, Dataset):\n", " self.test_dataloader = DataLoader(\n", " dataset=test_dataset, batch_size=batch_size, shuffle=True, num_workers=cpu_count()\n", " )\n", " else:\n", " self.test_dataloader = test_dataset\n", "\n", " if optimizer == 'torch':\n", " self.optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)\n", " else:\n", " self.optimizer = My_Optimizer(model.parameters(), lr=learning_rate)\n", "\n", " if criterion == 'torch':\n", " self.criterion = nn.MSELoss()\n", " else:\n", " self.criterion = My_MSELoss()\n", "\n", " def train(self):\n", " train_loss_curve = []\n", " eval_loss_curve = []\n", " eval_error_curve = []\n", " step = 0\n", " total_train_steps = self.num_epochs * len(self.train_dataloader)\n", " with tqdm(total=total_train_steps) as pbar:\n", " for epoch in range(self.num_epochs):\n", " total_train_loss = 0\n", " for x, targets in self.train_dataloader:\n", " x = x.to(device=device, dtype=torch.float32)\n", " targets = targets.to(device=device, dtype=torch.float32)\n", "\n", " self.optimizer.zero_grad()\n", " output = self.model(x)\n", " loss = self.criterion(output, targets)\n", " total_train_loss += loss.item()\n", " train_loss_curve.append(loss.item())\n", " \n", " loss.backward()\n", " self.optimizer.step()\n", " step += 1\n", " pbar.update(1)\n", "\n", " avg_eval_loss, avg_eval_error = self.eval()\n", " eval_loss_curve.append(avg_eval_loss)\n", " eval_error_curve.append(avg_eval_error)\n", " if self.log_epoch > 0 and (epoch + 1) % self.log_epoch == 0:\n", " log_info = {\n", " 'Epoch': f'{epoch + 1}/{self.num_epochs}',\n", " 'Total Train Loss': f'{total_train_loss:.2f}',\n", " 'Scaled Total Valid Loss': f'{avg_eval_loss * len(self.train_dataloader):.2f}',\n", " 'Avg Valid Error': f'{avg_eval_error:.2f}'\n", " }\n", " print(log_info)\n", "\n", " return_info = {}\n", " if self.test_dataloader:\n", " test_error = self.test()\n", " if self.print_test_result:\n", " print('Avg Test Error:', f'{test_error:.2f}')\n", " return_info['test_error'] = test_error\n", " if self.plot:\n", " self.plot_results(train_loss_curve, eval_loss_curve, eval_error_curve)\n", " if self.return_curves:\n", " curves = {\n", " 'train_loss_curve': train_loss_curve,\n", " 'eval_loss_curve': eval_loss_curve,\n", " 'eval_error_curve': eval_error_curve\n", " }\n", " return_info['curves'] = curves\n", " return return_info\n", "\n", " def eval(self):\n", " total_eval_loss = 0\n", " total_eval_error = 0\n", " total_eval_samples = 0\n", " with torch.inference_mode():\n", " for x, targets in self.eval_dataloader:\n", " x = x.to(device=device, dtype=torch.float32)\n", " targets = targets.to(device=device, dtype=torch.float32)\n", " output = self.model(x)\n", " loss = self.criterion(output, targets)\n", " total_eval_loss += loss.item()\n", " total_eval_error += torch.square(output - targets).sum().item()\n", " total_eval_samples += targets.numel()\n", " avg_eval_loss = total_eval_loss / len(self.eval_dataloader)\n", " avg_eval_error = total_eval_error / total_eval_samples\n", " return avg_eval_loss, avg_eval_error\n", "\n", " def test(self):\n", " total_test_error = 0\n", " total_test_samples = 0\n", " with torch.inference_mode():\n", " for x, targets in self.test_dataloader:\n", " x = x.to(device=device, dtype=torch.float32)\n", " targets = targets.to(device=device, dtype=torch.float32)\n", " output = self.model(x)\n", " total_test_error += torch.square(output - targets).sum().item()\n", " total_test_samples += targets.numel()\n", " avg_test_error = total_test_error / total_test_samples\n", " return avg_test_error\n", " \n", " def plot_results(self, train_loss_curve, eval_loss_curve, eval_error_curve):\n", " fig, axes = plt.subplots(1, 2, figsize=(10, 4))\n", " \n", " axes[0].plot(train_loss_curve, label='Training Loss', color='blue')\n", " axes[0].plot(\n", " np.linspace(len(self.train_dataloader), len(train_loss_curve), len(eval_loss_curve), endpoint=True),\n", " eval_loss_curve, label='Validation Loss', color='orange'\n", " )\n", " axes[0].set_xlabel('Step')\n", " axes[0].set_ylabel('Loss')\n", " axes[0].set_title('Loss Curve')\n", " axes[0].legend()\n", " axes[0].grid(True, linestyle='--', linewidth=0.5, alpha=0.6)\n", " \n", " axes[1].plot(eval_error_curve, label='Validation Error', color='red', marker='o')\n", " axes[1].set_xlabel('Epoch')\n", " axes[1].set_ylabel('Error')\n", " axes[1].set_title('Validation Error Curve')\n", " axes[1].legend()\n", " axes[1].grid(True, linestyle='--', linewidth=0.5, alpha=0.6)\n", " \n", " plt.tight_layout()\n", " plt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "手动构建回归任务的模型。" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "class Model_1_1:\n", " def __init__(self, input_features=500, output_features=1):\n", " self.linear = My_Linear(in_features=input_features, out_features=output_features)\n", " self.params = self.linear.params\n", "\n", " def __call__(self, x):\n", " return self.forward(x)\n", "\n", " def forward(self, x):\n", " x = self.linear(x)\n", " return x\n", "\n", " def to(self, device: str):\n", " for param in self.params:\n", " param.data = param.data.to(device=device)\n", " return self\n", "\n", " def parameters(self):\n", " return self.params\n", " \n", " def train(self):\n", " for param in self.params:\n", " param.requires_grad = True\n", " \n", " def eval(self):\n", " for param in self.params:\n", " param.requires_grad = False" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "开始训练。" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "8d341fbe13fc4c4bbb254738ed3a3210", "version_major": 2, "version_minor": 0 }, "text/plain": [ " 0%| | 0/70 [00:00" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "hyper_params = {\n", " 'learning_rate': 1.0e-1,\n", " 'num_epochs': 10,\n", " 'batch_size': 1024,\n", "}\n", "\n", "model = Model_1_1().to(device)\n", "\n", "trainer = RegressionTrainer(\n", " model=model, \n", " train_dataset=train_regression_dataset, eval_dataset=test_regression_dataset,\n", " optimizer='manual', criterion='manual', **hyper_params\n", ")\n", "_ = trainer.train()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "接下来实现二分类任务的Trainer,包括训练流程、测试和画图。\n", "\n", "训练二分类任务模型,进行如下步骤:\n", "1. 定义模型、数据集、损失函数、优化器和其他超参数\n", "2. 训练\n", " 1. 从训练dataloader中获取批量数据\n", " 2. 传入模型\n", " 3. 使用损失函数计算与ground_truth的损失\n", " 4. 使用优化器进行反向传播\n", " 5. 循环以上步骤\n", "3. 验证及测试\n", " 1. 从验证或测试dataloader中获取批量数据\n", " 2. 传入模型,验证时需要将模型输出与ground_truth进行比较得计算loss\n", " 3. 将预测值与ground_truth进行比较,得出正确率\n", " 4. 对整个训练集统计正确率,从而分析训练效果" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "class BinaryCLSTrainer():\n", " def __init__(\n", " self,\n", " model,\n", " train_dataset: Union[Dataset, DataLoader],\n", " eval_dataset: Union[Dataset, DataLoader],\n", " optimizer: Literal['torch', 'manual'],\n", " criterion: Literal['torch', 'manual'],\n", " learning_rate: float,\n", " num_epochs: int,\n", " batch_size: int,\n", " test_dataset: Union[Dataset, DataLoader] = None,\n", " plot: bool = True, \n", " print_test_result: bool = True,\n", " return_curves: bool = False,\n", " log_epoch: int = 1\n", " ):\n", " self.model = model\n", " self.learning_rate = learning_rate\n", " self.num_epochs = num_epochs\n", " self.batch_size = batch_size\n", " self.plot = plot\n", " self.print_test_result = print_test_result\n", " self.return_curves = return_curves\n", " self.log_epoch = log_epoch\n", "\n", " if isinstance(train_dataset, Dataset):\n", " self.train_dataloader = DataLoader(\n", " dataset=train_dataset, batch_size=batch_size, shuffle=True, num_workers=cpu_count()\n", " )\n", " else:\n", " self.train_dataloader = train_dataset\n", " if isinstance(eval_dataset, Dataset):\n", " self.eval_dataloader = DataLoader(\n", " dataset=eval_dataset, batch_size=batch_size, shuffle=True, num_workers=cpu_count()\n", " )\n", " else:\n", " self.eval_dataloader = eval_dataset\n", " if isinstance(test_dataset, Dataset):\n", " self.test_dataloader = DataLoader(\n", " dataset=test_dataset, batch_size=batch_size, shuffle=True, num_workers=cpu_count()\n", " )\n", " else:\n", " self.test_dataloader = test_dataset\n", "\n", " if optimizer == 'torch':\n", " self.optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)\n", " else:\n", " self.optimizer = My_Optimizer(model.parameters(), lr=learning_rate)\n", "\n", " if criterion == 'torch':\n", " self.criterion = nn.BCELoss()\n", " else:\n", " self.criterion = My_BCELoss()\n", "\n", " def train(self):\n", " train_loss_curve = []\n", " eval_loss_curve = []\n", " eval_acc_curve = []\n", " step = 0\n", " total_train_steps = self.num_epochs * len(self.train_dataloader)\n", " with tqdm(total=total_train_steps) as pbar:\n", " for epoch in range(self.num_epochs):\n", " total_train_loss = 0\n", " for x, targets in self.train_dataloader:\n", " x = x.to(device=device, dtype=torch.float32)\n", " targets = targets.to(device=device, dtype=torch.float32)\n", "\n", " self.optimizer.zero_grad()\n", " output = self.model(x)\n", " loss = self.criterion(output, targets)\n", " total_train_loss += loss.item()\n", " train_loss_curve.append(loss.item())\n", " \n", " loss.backward()\n", " self.optimizer.step()\n", "\n", " step += 1\n", " pbar.update(1)\n", "\n", " avg_eval_loss, avg_eval_acc = self.eval()\n", " eval_loss_curve.append(avg_eval_loss)\n", " eval_acc_curve.append(avg_eval_acc)\n", " if self.log_epoch > 0 and (epoch + 1) % self.log_epoch == 0:\n", " log_info = {\n", " 'Epoch': f'{epoch + 1}/{self.num_epochs}',\n", " 'Total Train Loss': f'{total_train_loss:.2f}',\n", " 'Scaled Total Valid Loss': f'{avg_eval_loss * len(self.train_dataloader):.2f}',\n", " 'Avg Valid Acc': f'{avg_eval_acc:.2%}'\n", " }\n", " print(log_info)\n", "\n", " return_info = {}\n", " if self.test_dataloader:\n", " test_acc = self.test()\n", " if self.print_test_result:\n", " print('Avg Test Acc:', f'{test_acc:.2%}')\n", " return_info['test_acc'] = test_acc\n", " if self.plot:\n", " self.plot_results(train_loss_curve, eval_loss_curve, eval_acc_curve)\n", " if self.return_curves:\n", " curves = {\n", " 'train_loss_curve': train_loss_curve,\n", " 'eval_loss_curve': eval_loss_curve,\n", " 'eval_acc_curve': eval_acc_curve\n", " }\n", " return_info['curves'] = curves\n", " return return_info\n", "\n", " def eval(self):\n", " total_eval_loss = 0\n", " total_eval_acc = 0\n", " total_eval_samples = 0\n", " with torch.inference_mode():\n", " for x, targets in self.eval_dataloader:\n", " x = x.to(device=device, dtype=torch.float32)\n", " targets = targets.to(device=device, dtype=torch.float32)\n", " output = self.model(x)\n", " loss = self.criterion(output, targets)\n", " total_eval_loss += loss.item()\n", " preds = (output >= 0.5).float()\n", " total_eval_acc += (preds == targets.to(dtype=torch.long)).float().sum().item()\n", " total_eval_samples += targets.numel()\n", " avg_eval_loss = total_eval_loss / len(self.eval_dataloader)\n", " avg_eval_acc = total_eval_acc / total_eval_samples\n", " return avg_eval_loss, avg_eval_acc\n", "\n", " def test(self):\n", " total_test_acc = 0\n", " total_test_samples = 0\n", " with torch.inference_mode():\n", " for x, targets in self.test_dataloader:\n", " x = x.to(device=device, dtype=torch.float32)\n", " targets = targets.to(device=device, dtype=torch.long)\n", " output = self.model(x)\n", " preds = (output >= 0.5).float()\n", " total_test_acc += (preds == targets).float().sum().item()\n", " total_test_samples += targets.numel()\n", " avg_test_acc = total_test_acc / total_test_samples\n", " return avg_test_acc\n", " \n", " def plot_results(self, train_loss_curve, eval_loss_curve, eval_acc_curve):\n", " fig, axes = plt.subplots(1, 2, figsize=(10, 4))\n", " \n", " axes[0].plot(train_loss_curve, label='Training Loss', color='blue')\n", " axes[0].plot(\n", " np.linspace(len(self.train_dataloader), len(train_loss_curve), len(eval_loss_curve), endpoint=True),\n", " eval_loss_curve, label='Validation Loss', color='orange'\n", " )\n", " axes[0].set_xlabel('Step')\n", " axes[0].set_ylabel('Loss')\n", " axes[0].set_title('Loss Curve')\n", " axes[0].legend()\n", " axes[0].grid(True, linestyle='--', linewidth=0.5, alpha=0.6)\n", " \n", " axes[1].plot(eval_acc_curve, label='Validation Accuracy', color='green', marker='o')\n", " axes[1].set_xlabel('Epoch')\n", " axes[1].set_ylabel('Accuracy')\n", " axes[1].set_title('Validation Accuracy Curve')\n", " axes[1].legend()\n", " axes[1].grid(True, linestyle='--', linewidth=0.5, alpha=0.6)\n", " \n", " plt.tight_layout()\n", " plt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "手动构建二分类任务的模型。" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "class Model_1_2:\n", " def __init__(self, input_features=200):\n", " self.fc = My_Linear(in_features=input_features, out_features=1)\n", " self.sigmoid = My_Sigmoid()\n", " self.params = self.fc.parameters()\n", "\n", " def __call__(self, x):\n", " return self.forward(x)\n", "\n", " def forward(self, x):\n", " x = self.fc(x)\n", " x = self.sigmoid(x)\n", " return x\n", "\n", " def to(self, device: str):\n", " for param in self.params:\n", " param.data = param.data.to(device=device)\n", " return self\n", "\n", " def parameters(self):\n", " return self.params\n", " \n", " def train(self):\n", " for param in self.params:\n", " param.requires_grad = True\n", " \n", " def eval(self):\n", " for param in self.params:\n", " param.requires_grad = False" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "训练并测试上述二分类模型。" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "0ddf036c37e040188f27cb5bab5ea3aa", "version_major": 2, "version_minor": 0 }, "text/plain": [ " 0%| | 0/140 [00:00" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "hyper_params = {\n", " 'learning_rate': 5.0e-2,\n", " 'num_epochs': 10,\n", " 'batch_size': 1024,\n", "}\n", "\n", "model = Model_1_2().to(device)\n", "\n", "trainer = BinaryCLSTrainer(\n", " model=model, \n", " train_dataset=train_binarycls_dataset, eval_dataset=test_binarycls_dataset,\n", " optimizer='manual', criterion='manual', **hyper_params\n", ")\n", "_ = trainer.train()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "接下来实现多分类任务的Trainer,包括训练流程、测试和画图。\n", "\n", "训练多分类任务模型,与二分类任务大部分一致,仅修改损失函数,以及对数据类型做适配。" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "class MultiCLSTrainer():\n", " def __init__(\n", " self,\n", " model,\n", " train_dataset: Union[Dataset, DataLoader],\n", " eval_dataset: Union[Dataset, DataLoader],\n", " optimizer,\n", " criterion: Literal['torch', 'manual'],\n", " learning_rate: float,\n", " num_epochs: int,\n", " batch_size: int,\n", " weight_decay: float = 0.0,\n", " test_dataset: Union[Dataset, DataLoader] = None,\n", " plot: bool = True, \n", " print_test_result: bool = True,\n", " return_curves: bool = False,\n", " log_epoch: int = 1\n", " ):\n", " self.model = model\n", " self.learning_rate = learning_rate\n", " self.num_epochs = num_epochs\n", " self.batch_size = batch_size\n", " self.plot = plot\n", " self.print_test_result = print_test_result\n", " self.return_curves = return_curves\n", " self.log_epoch = log_epoch\n", "\n", " if isinstance(train_dataset, Dataset):\n", " self.train_dataloader = DataLoader(\n", " dataset=train_dataset, batch_size=batch_size, shuffle=True, num_workers=cpu_count()\n", " )\n", " else:\n", " self.train_dataloader = train_dataset\n", " if isinstance(eval_dataset, Dataset):\n", " self.eval_dataloader = DataLoader(\n", " dataset=eval_dataset, batch_size=batch_size, shuffle=True, num_workers=cpu_count()\n", " )\n", " else:\n", " self.eval_dataloader = eval_dataset\n", " if isinstance(test_dataset, Dataset):\n", " self.test_dataloader = DataLoader(\n", " dataset=test_dataset, batch_size=batch_size, shuffle=True, num_workers=cpu_count()\n", " )\n", " else:\n", " self.test_dataloader = test_dataset\n", "\n", " if isinstance(optimizer, str):\n", " if optimizer == 'torch':\n", " self.optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate, weight_decay=weight_decay)\n", " else:\n", " self.optimizer = My_Optimizer(model.parameters(), lr=learning_rate)\n", " else:\n", " self.optimizer = optimizer(model.parameters(), lr=learning_rate, weight_decay=weight_decay)\n", "\n", " if criterion == 'torch':\n", " self.criterion = nn.CrossEntropyLoss()\n", " self.softmax = nn.Softmax(dim=1)\n", " else:\n", " self.criterion = My_CrossEntropyLoss()\n", " self.softmax = My_Softmax(dim=1)\n", "\n", " def train(self):\n", " train_loss_curve = []\n", " eval_loss_curve = []\n", " eval_acc_curve = []\n", " step = 0\n", " total_train_steps = self.num_epochs * len(self.train_dataloader)\n", " with tqdm(total=total_train_steps) as pbar:\n", " for epoch in range(self.num_epochs):\n", " total_train_loss = 0\n", " for x, targets in self.train_dataloader:\n", " x = x.to(device=device, dtype=torch.float32)\n", " targets = targets.to(device=device, dtype=torch.long)\n", "\n", " self.optimizer.zero_grad()\n", " output = self.model(x)\n", " loss = self.criterion(output, targets)\n", " total_train_loss += loss.item()\n", " train_loss_curve.append(loss.item())\n", " \n", " loss.backward()\n", " self.optimizer.step()\n", " step += 1\n", " pbar.update(1)\n", "\n", " avg_eval_loss, avg_eval_acc = self.eval()\n", " eval_loss_curve.append(avg_eval_loss)\n", " eval_acc_curve.append(avg_eval_acc)\n", " if self.log_epoch > 0 and (epoch + 1) % self.log_epoch == 0:\n", " log_info = {\n", " 'Epoch': f'{epoch + 1}/{self.num_epochs}',\n", " 'Total Train Loss': f'{total_train_loss:.2f}',\n", " 'Scaled Total Valid Loss': f'{avg_eval_loss * len(self.train_dataloader):.2f}',\n", " 'Avg Valid Acc': f'{avg_eval_acc:.2%}'\n", " }\n", " print(log_info)\n", "\n", " return_info = {}\n", " if self.test_dataloader:\n", " test_acc = self.test()\n", " if self.print_test_result:\n", " print('Avg Test Acc:', f'{test_acc:.2%}')\n", " return_info['test_acc'] = test_acc\n", " if self.plot:\n", " self.plot_results(train_loss_curve, eval_loss_curve, eval_acc_curve)\n", " if self.return_curves:\n", " curves = {\n", " 'train_loss_curve': train_loss_curve,\n", " 'eval_loss_curve': eval_loss_curve,\n", " 'eval_acc_curve': eval_acc_curve\n", " }\n", " return_info['curves'] = curves\n", " return return_info\n", "\n", " def eval(self):\n", " total_eval_loss = 0\n", " total_eval_acc = 0\n", " total_eval_samples = 0\n", " with torch.inference_mode():\n", " for x, targets in self.eval_dataloader:\n", " x = x.to(device=device, dtype=torch.float32)\n", " targets = targets.to(device=device, dtype=torch.long)\n", " output = self.model(x)\n", " loss = self.criterion(output, targets)\n", " total_eval_loss += loss.item()\n", " preds = self.softmax(output).argmax(dim=1)\n", " total_eval_acc += (preds == targets).float().sum().item()\n", " total_eval_samples += targets.numel()\n", " avg_eval_loss = total_eval_loss / len(self.eval_dataloader)\n", " avg_eval_acc = total_eval_acc / total_eval_samples\n", " return avg_eval_loss, avg_eval_acc\n", "\n", " def test(self):\n", " total_test_acc = 0\n", " total_test_samples = 0\n", " with torch.inference_mode():\n", " for x, targets in self.test_dataloader:\n", " x = x.to(device=device, dtype=torch.float32)\n", " targets = targets.to(device=device, dtype=torch.long)\n", " output = self.model(x)\n", " preds = self.softmax(output).argmax(dim=1)\n", " total_test_acc += (preds == targets).float().sum().item()\n", " total_test_samples += targets.numel()\n", " avg_test_acc = total_test_acc / total_test_samples\n", " return avg_test_acc\n", " \n", " def plot_results(self, train_loss_curve, eval_loss_curve, eval_acc_curve):\n", " fig, axes = plt.subplots(1, 2, figsize=(10, 4))\n", " \n", " axes[0].plot(train_loss_curve, label='Training Loss', color='blue')\n", " axes[0].plot(\n", " np.linspace(len(self.train_dataloader), len(train_loss_curve), len(eval_loss_curve), endpoint=True),\n", " eval_loss_curve, label='Validation Loss', color='orange'\n", " )\n", " axes[0].set_xlabel('Step')\n", " axes[0].set_ylabel('Loss')\n", " axes[0].set_title('Loss Curve')\n", " axes[0].legend()\n", " axes[0].grid(True, linestyle='--', linewidth=0.5, alpha=0.6)\n", " \n", " axes[1].plot(eval_acc_curve, label='Validation Accuracy', color='green', marker='o')\n", " axes[1].set_xlabel('Epoch')\n", " axes[1].set_ylabel('Accuracy')\n", " axes[1].set_title('Validation Accuracy Curve')\n", " axes[1].legend()\n", " axes[1].grid(True, linestyle='--', linewidth=0.5, alpha=0.6)\n", " \n", " plt.tight_layout()\n", " plt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "手动构建MNIST多分类任务的模型。" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [], "source": [ "class Model_1_3:\n", " def __init__(self, input_features=784, num_classes=10):\n", " self.flatten = My_Flatten()\n", " self.linear = My_Linear(in_features=input_features, out_features=num_classes)\n", " self.params = self.linear.params\n", "\n", " def __call__(self, x: torch.Tensor):\n", " return self.forward(x)\n", "\n", " def forward(self, x: torch.Tensor):\n", " x = self.flatten(x)\n", " x = self.linear(x)\n", " return x\n", "\n", " def to(self, device: str):\n", " for param in self.params:\n", " param.data = param.data.to(device=device)\n", " return self\n", "\n", " def parameters(self):\n", " return self.params\n", " \n", " def train(self):\n", " for param in self.params:\n", " param.requires_grad = True\n", " \n", " def eval(self):\n", " for param in self.params:\n", " param.requires_grad = False" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "训练并测试上述MNIST多分类模型。" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "45fbb247fc7a45bb9acfdab217c6a235", "version_major": 2, "version_minor": 0 }, "text/plain": [ " 0%| | 0/590 [00:00" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "hyper_params = {\n", " 'learning_rate': 6.0e-2,\n", " 'num_epochs': 10,\n", " 'batch_size': 1024,\n", "}\n", "\n", "model = Model_1_3(\n", " input_features=image_width * image_height, \n", " num_classes=num_classes\n", ").to(device)\n", "\n", "trainer = MultiCLSTrainer(\n", " model=model, \n", " train_dataset=train_mnist_dataset, eval_dataset=test_mnist_dataset, \n", " optimizer='manual', criterion='manual', **hyper_params\n", ")\n", "_ = trainer.train()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "# 任务二\n", "**利用torch.nn实现前馈神经网络解决上述回归、二分类、多分类任务。**\n", "- 从训练时间、预测精度、Loss变化等角度分析实验结果(最好使用图表展示)" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [], "source": [ "torch.cuda.empty_cache()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "使用`torch.nn`构建回归任务的模型。" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [], "source": [ "class Model_2_1(nn.Module):\n", " def __init__(self, input_features=500):\n", " super().__init__()\n", " self.linear = nn.Linear(in_features=input_features, out_features=1)\n", " self.sigmoid = nn.Sigmoid()\n", "\n", " def forward(self, x):\n", " x = self.linear(x)\n", " x = self.sigmoid(x)\n", " return x" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "训练并测试上述回归模型。" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "891aac516c0a425192093fab78c3e7d7", "version_major": 2, "version_minor": 0 }, "text/plain": [ " 0%| | 0/70 [00:00" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "hyper_params = {\n", " 'learning_rate': 1.0e-1,\n", " 'num_epochs': 10,\n", " 'batch_size': 1024,\n", "}\n", "\n", "model = Model_2_1().to(device)\n", "\n", "trainer = RegressionTrainer(\n", " model=model, \n", " train_dataset=train_regression_dataset, eval_dataset=test_regression_dataset,\n", " optimizer='torch', criterion='torch', **hyper_params\n", ")\n", "_ = trainer.train()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "使用`torch.nn`构建二分类任务的模型。" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [], "source": [ "class Model_2_2(nn.Module):\n", " def __init__(self, input_features=200):\n", " super().__init__()\n", " self.fc = nn.Linear(in_features=input_features, out_features=1)\n", " self.sigmoid = nn.Sigmoid()\n", "\n", " def forward(self, x):\n", " x = self.fc(x)\n", " x = self.sigmoid(x)\n", " return x" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "训练并测试上述二分类模型。" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "35b427ce904541dfb6156882068e21ce", "version_major": 2, "version_minor": 0 }, "text/plain": [ " 0%| | 0/140 [00:00" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "hyper_params = {\n", " 'learning_rate': 5.0e-4,\n", " 'num_epochs': 10,\n", " 'batch_size': 1024,\n", "}\n", "\n", "model = Model_2_2().to(device)\n", "\n", "trainer = BinaryCLSTrainer(\n", " model=model, \n", " train_dataset=train_binarycls_dataset, eval_dataset=test_binarycls_dataset,\n", " optimizer='torch', criterion='torch', **hyper_params\n", ")\n", "_ = trainer.train()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "使用`torch.nn`构建MNIST多分类任务的模型。" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [], "source": [ "class Model_2_3(nn.Module):\n", " def __init__(self, input_features=784, num_classes=10):\n", " super().__init__()\n", " self.flatten = nn.Flatten()\n", " self.linear = nn.Linear(in_features=input_features, out_features=num_classes)\n", "\n", " def forward(self, x: torch.Tensor):\n", " x = self.flatten(x)\n", " x = self.linear(x)\n", " return x" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "训练并测试上述MNIST多分类模型。" ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "f5758a4202d0422bad77b31145b7f80b", "version_major": 2, "version_minor": 0 }, "text/plain": [ " 0%| | 0/590 [00:00" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "hyper_params = {\n", " 'learning_rate': 6.0e-2,\n", " 'num_epochs': 10,\n", " 'batch_size': 1024,\n", "}\n", "\n", "model = Model_2_3(\n", " input_features=image_width * image_height, \n", " num_classes=num_classes\n", ").to(device)\n", "\n", "trainer = MultiCLSTrainer(\n", " model=model, \n", " train_dataset=train_mnist_dataset, eval_dataset=test_mnist_dataset, \n", " optimizer='torch', criterion='torch', **hyper_params\n", ")\n", "_ = trainer.train()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "# 任务三\n", "**在多分类任务中使用至少三种不同的激活函数。**\n", "- 使用不同的激活函数,进行对比实验并分析实验结果\n" ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [], "source": [ "torch.cuda.empty_cache()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "接下来定义4个模型,分别使用`nn.ReLU()`、`nn.Sigmoid()`、`nn.Tanh()`和`nn.LeakyReLU()`的实例作为激活函数。\n", "\n", "分别训练和测试。并将损失曲线和正确率曲线分别画在一个图内以进行比较4种激活函数的效果。" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "以ReLU为激活函数的模型开始训练:\n" ] }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "efd5837ab4d34678acf767b988a8fd3a", "version_major": 2, "version_minor": 0 }, "text/plain": [ " 0%| | 0/590 [00:00" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "class Model_3(nn.Module):\n", " def __init__(\n", " self, \n", " activate_fn: Literal['ReLU', 'Sigmoid', 'Tanh', 'LeakyReLU'],\n", " input_features: int = 784, \n", " num_classes: int = 10\n", " ):\n", " super().__init__()\n", " self.flatten = nn.Flatten()\n", " self.fc1 = nn.Linear(in_features=input_features, out_features=1024)\n", " self.fc2 = nn.Linear(in_features=1024, out_features=256)\n", " self.fc3 = nn.Linear(in_features=256, out_features=num_classes)\n", " if activate_fn == 'ReLU':\n", " self.activate_fn = nn.ReLU()\n", " elif activate_fn == 'Sigmoid':\n", " self.activate_fn = nn.Sigmoid()\n", " elif activate_fn == 'Tanh':\n", " self.activate_fn = nn.Tanh()\n", " elif activate_fn == 'LeakyReLU':\n", " self.activate_fn = nn.LeakyReLU()\n", "\n", " def forward(self, x: torch.Tensor):\n", " x = self.flatten(x)\n", " x = self.fc1(x)\n", " x = self.activate_fn(x)\n", "\n", " x = self.fc2(x)\n", " x = self.activate_fn(x)\n", "\n", " x = self.fc3(x)\n", " x = self.activate_fn(x)\n", " return x\n", " \n", "models = {\n", " fn: Model_3(fn, input_features=image_width * image_height, num_classes=num_classes).to(device) \n", " for fn in ['ReLU', 'Sigmoid', 'Tanh', 'LeakyReLU']\n", "}\n", "plot_colors = {'ReLU': 'blue', 'Sigmoid': 'green', 'Tanh': 'orange', 'LeakyReLU': 'purple'}\n", "\n", "fig, axes = plt.subplots(1, 2, figsize=(7, 3.5))\n", "\n", "axes[0].set_xlabel('Epoch')\n", "axes[0].set_ylabel('Loss')\n", "axes[0].set_title('Validation Loss Curve')\n", "axes[0].grid(True, linestyle='--', linewidth=0.5, alpha=0.6)\n", "axes[1].set_xlabel('Epoch')\n", "axes[1].set_ylabel('Accuracy')\n", "axes[1].set_title('Validation Accuracy Curve')\n", "axes[1].grid(True, linestyle='--', linewidth=0.5, alpha=0.6)\n", "\n", "hyper_params = {\n", " 'learning_rate': 6.0e-2,\n", " 'num_epochs': 10,\n", " 'batch_size': 1024,\n", "}\n", "\n", "for fn_name, model in models.items():\n", " print(f\"以{fn_name}为激活函数的模型开始训练:\")\n", " trainer = MultiCLSTrainer(\n", " model=model, \n", " train_dataset=train_mnist_dataset, eval_dataset=test_mnist_dataset, \n", " optimizer='torch', criterion='torch', **hyper_params, \n", " plot=False, return_curves=True, log_epoch=10\n", " )\n", " curves = trainer.train()['curves']\n", "\n", " axes[0].plot(\n", " range(1, len(curves['eval_loss_curve']) + 1), curves['eval_loss_curve'], \n", " label=fn_name, color=plot_colors[fn_name]\n", " )\n", " axes[1].plot(\n", " range(1, len(curves['eval_acc_curve']) + 1), curves['eval_acc_curve'], \n", " label=fn_name, color=plot_colors[fn_name]\n", " )\n", "\n", "axes[0].legend()\n", "axes[1].legend()\n", "plt.tight_layout()\n", "plt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "在性能表现上,激活函数为`Sigmoid`的模型训练过程中损失下降速度非常慢,可见发生了梯度消失,这验证了`Sigmoid`非常容易出现梯度消失的问题。\n", "\n", "激活函数为`ReLU`的模型比较不稳定,有时会出现神经元死亡过多(值为$0$或接近$0$)的情况。\n", "\n", "`Tanh`以及`LeakyReLU`的表现相对优秀。\n", "\n", "在用时上,各激活函数的模型训练用时相近。" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "# 任务四\n", "**对多分类任务中的模型评估隐藏层层数和隐藏单元个数对实验结果的影响。**\n", "- 使用不同的隐藏层层数和隐藏单元个数,进行对比实验并分析实验结果\n" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [], "source": [ "torch.cuda.empty_cache()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "接下来定义6个模型,隐藏层层数和隐藏单元个数分别如下:\n", "\n", "| 模型编号 | hidden_layer (层数) | hidden_size (隐藏单元个数) |\n", "|:--------:|:-------------------:|:--------------------------:|\n", "| 1 | 2 | 64 |\n", "| 2 | 2 | 1024 |\n", "| 3 | 4 | 64 |\n", "| 4 | 4 | 1024 |\n", "| 5 | 8 | 64 |\n", "| 6 | 8 | 1024 |\n", "\n", "\n", "\n", "分别训练和测试。并将损失曲线和正确率曲线分别画在一个图内以进行比较6个模型的效果。" ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "模型1(隐藏层层数为2,隐藏单元个数为64)开始训练:\n" ] }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "137bb123d4664678b57c3a4c2e7ea977", "version_major": 2, "version_minor": 0 }, "text/plain": [ " 0%| | 0/590 [00:00" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "class Model_4(nn.Module):\n", " def __init__(\n", " self, \n", " hidden_size: int,\n", " num_hidden_layer: int,\n", " input_features: int = 784, \n", " num_classes: int = 10\n", " ):\n", " super().__init__()\n", " self.net = nn.Sequential(\n", " nn.Flatten(),\n", " nn.Linear(in_features=input_features, out_features=hidden_size),\n", " nn.LeakyReLU()\n", " )\n", " for i in range(num_hidden_layer - 1):\n", " self.net.append(nn.Linear(in_features=hidden_size, out_features=hidden_size))\n", " self.net.append(nn.LeakyReLU())\n", " self.net.append(nn.Linear(in_features=hidden_size, out_features=num_classes))\n", " \n", " def forward(self, x: torch.Tensor):\n", " return self.net(x)\n", " \n", "model_arch_params = [\n", " {'num_hidden_layer': 2, 'hidden_size': 64},\n", " {'num_hidden_layer': 2, 'hidden_size': 1024},\n", " {'num_hidden_layer': 4, 'hidden_size': 64},\n", " {'num_hidden_layer': 4, 'hidden_size': 1024},\n", " {'num_hidden_layer': 8, 'hidden_size': 64},\n", " {'num_hidden_layer': 8, 'hidden_size': 1024},\n", "]\n", "plot_colors = ['blue', 'blue', 'green', 'green', 'orange', 'orange']\n", "plot_linestyles = ['solid', 'dashed', 'solid', 'dashed', 'solid', 'dashed']\n", "\n", "fig, axes = plt.subplots(1, 2, figsize=(7, 3.5))\n", "\n", "axes[0].set_xlabel('Epoch')\n", "axes[0].set_ylabel('Loss')\n", "axes[0].set_title('Validation Loss Curve')\n", "axes[0].grid(True, linestyle='--', linewidth=0.5, alpha=0.6)\n", "axes[1].set_xlabel('Epoch')\n", "axes[1].set_ylabel('Accuracy')\n", "axes[1].set_title('Validation Accuracy Curve')\n", "axes[1].grid(True, linestyle='--', linewidth=0.5, alpha=0.6)\n", "\n", "hyper_params = {\n", " 'learning_rate': 6.0e-2,\n", " 'num_epochs': 10,\n", " 'batch_size': 1024,\n", "}\n", "\n", "for index, arch_param in enumerate(model_arch_params):\n", " num_hidden_layer, hidden_size = arch_param['num_hidden_layer'], arch_param['hidden_size']\n", " model = Model_4(**arch_param, input_features=image_width * image_height, num_classes=num_classes).to(device)\n", " \n", " print(f\"模型{index + 1}(隐藏层层数为{num_hidden_layer},隐藏单元个数为{hidden_size})开始训练:\")\n", " trainer = MultiCLSTrainer(\n", " model=model, \n", " train_dataset=train_mnist_dataset, eval_dataset=test_mnist_dataset, \n", " optimizer='torch', criterion='torch', **hyper_params, \n", " plot=False, return_curves=True, log_epoch=10\n", " )\n", " curves = trainer.train()['curves']\n", "\n", " axes[0].plot(\n", " range(1, len(curves['eval_loss_curve']) + 1), curves['eval_loss_curve'], color=plot_colors[index],\n", " label=f\"({num_hidden_layer}, {hidden_size})\", linestyle=plot_linestyles[index]\n", " )\n", " axes[1].plot(\n", " range(1, len(curves['eval_acc_curve']) + 1), curves['eval_acc_curve'], color=plot_colors[index], \n", " label=f\"({num_hidden_layer}, {hidden_size})\", linestyle=plot_linestyles[index]\n", " )\n", "\n", "axes[0].legend()\n", "axes[1].legend()\n", "plt.tight_layout()\n", "plt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "在最终性能表现上,前4个模型相差无几,正确率都能达到$89\\%$左右。在训练过程中,深层网络的收敛速度慢。当模型层数增加到4以上时,模型无法正常收敛。\n", "\n", "以上两个现象可分析得知:深层网络容易出现梯度消失的问题。需要采用残差网络等结构优化。\n", "\n", "在用时上,由于模型较小,数据集也较小,GPU算力较高,用时基本没有差别。" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 任务五\n", "\n", "**在多分类任务实验中分别手动实现和用torch.nn实现dropout**\n", "\n", "- 探究不同丢弃率对实验结果的影响(可用loss曲线进行展示)\n" ] }, { "cell_type": "code", "execution_count": 27, "metadata": {}, "outputs": [], "source": [ "torch.cuda.empty_cache()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "手动实现Dropout。\n", "\n", "由于需要判断此时传入的张量是否是训练状态,需要继承`nn.Module`来获取`self.training`,否则需要手动传入training参数。这里采取前者。" ] }, { "cell_type": "code", "execution_count": 28, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "输入:\n", "tensor([[ 1., 2., 3., 4., 5.],\n", " [ 6., 7., 8., 9., 10.]])\n", "My_Dropout输出:\n", "tensor([[ 2., 0., 0., 0., 0.],\n", " [ 0., 14., 0., 0., 0.]])\n", "nn.Dropout输出:\n", "tensor([[ 0., 4., 6., 8., 0.],\n", " [12., 14., 0., 18., 0.]])\n" ] } ], "source": [ "class My_Dropout(nn.Module):\n", " def __init__(self, p=0.5):\n", " super().__init__()\n", " self.p = p\n", " self.mask = None\n", "\n", " def forward(self, x: torch.Tensor):\n", " if self.training:\n", " mask = (torch.rand(x.shape, device=x.device) > self.p).to(x.dtype)\n", " return x * mask / (1 - self.p)\n", " else:\n", " return x\n", " \n", "\n", "# 测试\n", "my_dropout = My_Dropout(p=0.5)\n", "nn_dropout = nn.Dropout(p=0.5)\n", "x = torch.tensor([[1.0, 2.0, 3.0, 4.0, 5.0],\n", " [6.0, 7.0, 8.0, 9.0, 10.0]])\n", "print(f\"输入:\\n{x}\")\n", "output_my_dropout = my_dropout(x)\n", "output_nn_dropout = nn_dropout(x)\n", "print(f\"My_Dropout输出:\\n{output_my_dropout}\")\n", "print(f\"nn.Dropout输出:\\n{output_nn_dropout}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "使用手动实现的Dropout进行多分类任务训练。" ] }, { "cell_type": "code", "execution_count": 29, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "9dbe15ae148c48f1b3b63f5fc56aaa98", "version_major": 2, "version_minor": 0 }, "text/plain": [ " 0%| | 0/590 [00:00" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "class Model_5_1():\n", " def __init__(\n", " self, \n", " dropout_p: float = 0.5,\n", " input_features: int = 784, \n", " num_classes: int = 10\n", " ):\n", " self.flatten = My_Flatten()\n", " self.fc1 = My_Linear(in_features=input_features, out_features=1024)\n", " self.leakyrelu = My_LeakyReLU()\n", " self.dropout = My_Dropout(p=dropout_p)\n", " self.fc2 = My_Linear(in_features=1024, out_features=num_classes)\n", " self.params = self.fc1.params + self.fc2.params\n", "\n", " def __call__(self, x: torch.Tensor):\n", " return self.forward(x)\n", "\n", " def forward(self, x: torch.Tensor):\n", " x = self.flatten(x)\n", " x = self.dropout(self.leakyrelu(self.fc1(x)))\n", " x = self.fc2(x)\n", " return x\n", "\n", " def to(self, device: str):\n", " for param in self.params:\n", " param.data = param.data.to(device=device)\n", " return self\n", "\n", " def parameters(self):\n", " return self.params\n", " \n", " def train(self):\n", " for param in self.params:\n", " param.requires_grad = True\n", " \n", " def eval(self):\n", " for param in self.params:\n", " param.requires_grad = False\n", "\n", "\n", "hyper_params = {\n", " 'learning_rate': 6.0e-2,\n", " 'num_epochs': 10,\n", " 'batch_size': 1024,\n", "}\n", "\n", "model = Model_5_1(\n", " input_features=image_width * image_height, \n", " num_classes=num_classes\n", ").to(device)\n", "\n", "trainer = MultiCLSTrainer(\n", " model=model, \n", " train_dataset=train_mnist_dataset, eval_dataset=test_mnist_dataset, \n", " optimizer='manual', criterion='manual', **hyper_params\n", ")\n", "_ = trainer.train()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "探究不同丢弃率对实验结果的影响。" ] }, { "cell_type": "code", "execution_count": 30, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "模型1(丢弃率为0.0)开始训练:\n" ] }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "00396f5e8fd24b02ac36d992f605f531", "version_major": 2, "version_minor": 0 }, "text/plain": [ " 0%| | 0/590 [00:00" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "class Model_5_2(nn.Module):\n", " def __init__(\n", " self, \n", " dropout_p: float = None,\n", " input_features: int = 784, \n", " num_classes: int = 10\n", " ):\n", " super().__init__()\n", " self.net = nn.Sequential(\n", " nn.Flatten(),\n", " nn.Linear(in_features=input_features, out_features=4096),\n", " nn.LeakyReLU(),\n", " nn.Dropout(p=dropout_p),\n", " nn.Linear(in_features=4096, out_features=4096),\n", " nn.LeakyReLU(),\n", " nn.Dropout(p=dropout_p),\n", " nn.Linear(in_features=4096, out_features=4096),\n", " nn.LeakyReLU(),\n", " nn.Dropout(p=dropout_p),\n", " nn.Linear(in_features=4096, out_features=4096),\n", " nn.LeakyReLU(),\n", " nn.Dropout(p=dropout_p),\n", " nn.Linear(in_features=4096, out_features=num_classes)\n", " )\n", "\n", " def forward(self, x: torch.Tensor):\n", " return self.net(x)\n", " \n", "dropout_ratios = [0.0, 0.2, 0.5, 0.9]\n", "plot_colors = ['blue', 'green', 'orange', 'purple']\n", "\n", "fig, axes = plt.subplots(1, 2, figsize=(7, 3.5))\n", "\n", "axes[0].set_xlabel('Epoch')\n", "axes[0].set_ylabel('Loss')\n", "axes[0].set_title('Validation Loss Curve')\n", "axes[0].grid(True, linestyle='--', linewidth=0.5, alpha=0.6)\n", "axes[1].set_xlabel('Epoch')\n", "axes[1].set_ylabel('Accuracy')\n", "axes[1].set_title('Validation Accuracy Curve')\n", "axes[1].grid(True, linestyle='--', linewidth=0.5, alpha=0.6)\n", "\n", "hyper_params = {\n", " 'learning_rate': 6.0e-2,\n", " 'num_epochs': 10,\n", " 'batch_size': 1024,\n", "}\n", "\n", "for index, dropout_ratio in enumerate(dropout_ratios):\n", " model = Model_5_2(dropout_p=dropout_ratio, input_features=image_width * image_height, num_classes=num_classes).to(device)\n", " \n", " print(f\"模型{index + 1}(丢弃率为{dropout_ratio})开始训练:\")\n", " trainer = MultiCLSTrainer(\n", " model=model, \n", " train_dataset=train_mnist_dataset, eval_dataset=test_mnist_dataset, \n", " optimizer='torch', criterion='torch', **hyper_params, \n", " plot=False, return_curves=True, log_epoch=10\n", " )\n", " curves = trainer.train()['curves']\n", "\n", " axes[0].plot(\n", " range(1, len(curves['eval_loss_curve']) + 1), curves['eval_loss_curve'],\n", " label=f\"dropout={dropout_ratio}\", color=plot_colors[index]\n", " )\n", " axes[1].plot(\n", " range(1, len(curves['eval_acc_curve']) + 1), curves['eval_acc_curve'], \n", " label=f\"dropout={dropout_ratio}\", color=plot_colors[index]\n", " )\n", "\n", "axes[0].legend()\n", "axes[1].legend()\n", "plt.tight_layout()\n", "plt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 任务六\n", "\n", "**在多分类任务实验中分别手动实现和用torch.nn实现$L_2$正则化**\n", "\n", "- 探究惩罚项的权重对实验结果的影响(可用loss曲线进行展示)\n" ] }, { "cell_type": "code", "execution_count": 31, "metadata": {}, "outputs": [], "source": [ "torch.cuda.empty_cache()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "$L_2$正则化,又称权重衰减,pytorch的的实现是将$L_2$正则化的系数作为优化器的`weight_decay`参数传入,在`step()`的过程中计算完成。具体原理如下:\n", "\n", "$L_2$正则化的公式是\n", "\n", "$$\n", "L = L_0 + \\frac{\\lambda }{2n} \\sum_{w}^{}w^2\n", "$$\n", "\n", "其中$L$是进行$L_2$正则化后的损失,$L_0$是损失函数计算出来的原损失,$\\lambda$是$L_2$正则化系数/惩罚项权重(即optimizer的`weight_decay`参数),$n$是样本大小。\n", "\n", "反向传播:\n", "\n", "$$\n", "\\begin{align}\n", "\\frac{\\partial L}{\\partial w} & = \\frac{\\partial L_0}{\\partial w} + \\sum_{w}^{} \\frac{\\lambda }{n} w \\\\\n", "\\frac{\\partial L}{\\partial b} & = \\frac{\\partial L_0}{\\partial b}\n", "\\end{align}\n", "$$\n", "\n", "所以,参数更新为:\n", "\n", "$$\n", "\\begin{align}\n", "w: & = w - \\frac{\\eta}{n}\\frac{\\partial L_0}{\\partial w} - \\frac{\\eta \\lambda }{n}\\sum_{w}^{}w \\\\\n", "b: & = b - \\frac{\\eta}{n}\\frac{\\partial L_0}{\\partial b}\n", "\\end{align}\n", "$$\n", "\n", "其中$\\eta$是学习率。\n", "\n", "所以,手动在优化器中实现$L_2$正则化如下:" ] }, { "cell_type": "code", "execution_count": 32, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "params1的梯度为:\n", " tensor([[2., 2.]])\n", "params2的梯度为:\n", " tensor([[2., 2.]])\n", "经过L_2正则化后的My_SGD反向传播结果:\n", " tensor([[-0.0500, 0.9000]])\n", "经过L_2正则化后的torch.optim.SGD反向传播结果:\n", " tensor([[-0.0500, 0.9000]])\n" ] } ], "source": [ "class My_SGD:\n", " def __init__(self, params: list[torch.Tensor], lr: float, weight_decay=0.0):\n", " self.params = params\n", " self.lr = lr\n", " self.weight_decay = weight_decay\n", "\n", " def step(self):\n", " with torch.no_grad():\n", " for param in self.params:\n", " if param.grad is not None:\n", " if len(param.data.shape) > 1:\n", " param.data = param.data - self.lr * (param.grad + self.weight_decay * param.data)\n", " else:\n", " param.data = param.data - self.lr * param.grad\n", "\n", " def zero_grad(self):\n", " for param in self.params:\n", " if param.grad is not None:\n", " param.grad.data = torch.zeros_like(param.grad.data)\n", "\n", "\n", "# 测试\n", "params1 = torch.tensor([[1., 2, ]], requires_grad=True)\n", "params2 = torch.tensor([[1., 2, ]], requires_grad=True)\n", "\n", "my_sgd = My_SGD(params=[params1], lr=0.5, weight_decay=0.1)\n", "optim_sgd = torch.optim.SGD(params=[params2], lr=0.5, weight_decay=0.1)\n", "my_sgd.zero_grad()\n", "optim_sgd.zero_grad()\n", "\n", "loss1 = 2 * params1.sum()\n", "loss2 = 2 * params2.sum()\n", " # 偏导为2\n", "loss1.backward()\n", "loss2.backward()\n", "print(\"params1的梯度为:\\n\", params1.grad.data)\n", "print(\"params2的梯度为:\\n\", params2.grad.data)\n", "\n", "my_sgd.step()\n", "optim_sgd.step()\n", "# 结果为:w - lr * grad - lr * weight_decay_rate * w\n", "# w[0] = 1 - 0.5 * 2 - 0.5 * 0.1 * 1 = -0.0500\n", "# w[1] = 2 - 0.5 * 2 - 0.5 * 0.1 * 2 = 0.9000\n", "print(\"经过L_2正则化后的My_SGD反向传播结果:\\n\", params1.data)\n", "print(\"经过L_2正则化后的torch.optim.SGD反向传播结果:\\n\", params2.data)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "使用手动实现的带$L_2$正则项的优化器进行多分类任务训练。" ] }, { "cell_type": "code", "execution_count": 33, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "3e2d7d0e1bee4e9198ddbac0c330bfdf", "version_major": 2, "version_minor": 0 }, "text/plain": [ " 0%| | 0/590 [00:00" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "hyper_params = {\n", " 'learning_rate': 6.0e-2,\n", " 'num_epochs': 10,\n", " 'batch_size': 1024,\n", " 'weight_decay': 0.1\n", "}\n", "\n", "model = Model_5_1(\n", " dropout_p=0.0,\n", " input_features=image_width * image_height, \n", " num_classes=num_classes\n", ").to(device)\n", "\n", "trainer = MultiCLSTrainer(\n", " model=model, \n", " train_dataset=train_mnist_dataset, eval_dataset=test_mnist_dataset, \n", " optimizer=My_SGD, criterion='manual', **hyper_params\n", ")\n", "_ = trainer.train()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "探究惩罚项的权重对实验结果的影响。" ] }, { "cell_type": "code", "execution_count": 34, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "模型1(训练正则项系数为0.0)开始训练:\n" ] }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "6b3d5ec1bdba4ed7ba4cc88234ed38d7", "version_major": 2, "version_minor": 0 }, "text/plain": [ " 0%| | 0/590 [00:00" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "weight_decays = [0.0, 0.1, 0.5, 0.9]\n", "plot_colors = ['blue', 'green', 'orange', 'purple']\n", "\n", "fig, axes = plt.subplots(1, 2, figsize=(7, 3.5))\n", "\n", "axes[0].set_xlabel('Epoch')\n", "axes[0].set_ylabel('Loss')\n", "axes[0].set_title('Validation Loss Curve')\n", "axes[0].grid(True, linestyle='--', linewidth=0.5, alpha=0.6)\n", "axes[1].set_xlabel('Epoch')\n", "axes[1].set_ylabel('Accuracy')\n", "axes[1].set_title('Validation Accuracy Curve')\n", "axes[1].grid(True, linestyle='--', linewidth=0.5, alpha=0.6)\n", "\n", "hyper_params_list = [\n", " {\n", " 'learning_rate': 6.0e-2, 'num_epochs': 10, 'batch_size': 1024,\n", " 'weight_decay': weight_decay\n", " } \n", " for weight_decay in weight_decays\n", "]\n", "\n", "for index, hyper_params in enumerate(hyper_params_list):\n", " model = Model_5_1(dropout_p=0.0, input_features=image_width * image_height, num_classes=num_classes).to(device)\n", " \n", " print(f\"模型{index + 1}(训练正则项系数为{hyper_params['weight_decay']})开始训练:\")\n", " trainer = MultiCLSTrainer(\n", " model=model, \n", " train_dataset=train_mnist_dataset, eval_dataset=test_mnist_dataset, \n", " optimizer='torch', criterion='torch', **hyper_params, \n", " plot=False, return_curves=True, log_epoch=10\n", " )\n", " curves = trainer.train()['curves']\n", "\n", " axes[0].plot(\n", " range(1, len(curves['eval_loss_curve']) + 1), curves['eval_loss_curve'],\n", " label=f\"weight_decay={hyper_params['weight_decay']}\", color=plot_colors[index]\n", " )\n", " axes[1].plot(\n", " range(1, len(curves['eval_acc_curve']) + 1), curves['eval_acc_curve'], \n", " label=f\"weight_decay={hyper_params['weight_decay']}\", color=plot_colors[index]\n", " )\n", "\n", "axes[0].legend()\n", "axes[1].legend()\n", "plt.tight_layout()\n", "plt.show()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "# 任务七\n", "\n", "**对回归、二分类、多分类任务分别选择上述实验中效果最好的模型,采用10折交叉验证评估实验结果**\n", "\n", "- 要求除了最终结果外还需以表格的形式展示每折的实验结果\n" ] }, { "cell_type": "code", "execution_count": 35, "metadata": {}, "outputs": [], "source": [ "torch.cuda.empty_cache()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "制造$k$折数据集,将原先的训练集和测试集混合,然后按照$k-1:1$的比例,按照顺序拆分出$k$对训练集和测试集。" ] }, { "cell_type": "code", "execution_count": 36, "metadata": {}, "outputs": [], "source": [ "class KFoldDatasetSplitter:\n", " def __init__(self, dataset, k_folds: int=10, shuffle=True):\n", " self.dataset = dataset\n", " self.k_folds = k_folds\n", " self.shuffle = shuffle\n", "\n", " self.kfold = KFold(n_splits=k_folds, shuffle=shuffle, random_state=seed)\n", " self.indices = list(range(len(dataset)))\n", " self.splits = list(self.kfold.split(self.indices))\n", "\n", " def get_fold(self, fold_idx: int):\n", " assert 0 <= fold_idx and fold_idx < self.k_folds, \"fold_idx out of range\"\n", "\n", " train_idx, val_idx = self.splits[fold_idx]\n", " train_subset = Subset(self.dataset, train_idx)\n", " val_subset = Subset(self.dataset, val_idx)\n", "\n", " return train_subset, val_subset\n", "\n", " def __len__(self):\n", " return self.k_folds" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "回归任务。" ] }, { "cell_type": "code", "execution_count": 37, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "模型使用第1组数据开始训练:\n" ] }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "0641e17cd94442ccbd2c7afdac1e8a07", "version_major": 2, "version_minor": 0 }, "text/plain": [ " 0%| | 0/70 [00:00" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "k_fold_splitter = KFoldDatasetSplitter(train_regression_dataset, k_folds=10, shuffle=True)\n", "\n", "hyper_params = {\n", " 'learning_rate': 1.0e-1,\n", " 'num_epochs': 10,\n", " 'batch_size': 1024,\n", "}\n", "\n", "eval_errors = []\n", "test_errors = []\n", "\n", "for index in range(len(k_fold_splitter)):\n", " model = Model_2_1().to(device)\n", " train_dataset, eval_dataset = k_fold_splitter.get_fold(fold_idx=index)\n", " print(f\"模型使用第{index + 1}组数据开始训练:\")\n", " trainer = RegressionTrainer(\n", " model=model, \n", " train_dataset=train_dataset, eval_dataset=eval_dataset, test_dataset=test_regression_dataset,\n", " optimizer='torch', criterion='torch', **hyper_params,\n", " plot=False, print_test_result=False, return_curves=True, log_epoch=0\n", " )\n", " train_result = trainer.train()\n", " eval_errors.append(train_result['curves']['eval_error_curve'][-1])\n", " test_errors.append(train_result['test_error'])\n", "\n", "fig, ax = plt.subplots(figsize=(7, 3.5))\n", "\n", "fold_indices = list(range(1, len(k_fold_splitter) + 1))\n", "bar_width = 0.35\n", "x = np.arange(len(fold_indices))\n", "\n", "val_bars = ax.bar(x - bar_width / 2, eval_errors, width=bar_width, label='Validation Error', color='blue')\n", "test_bars = ax.bar(x + bar_width / 2, test_errors, width=bar_width, label='Test Error', color='green')\n", "\n", "ax.set_xlabel('Fold Index')\n", "ax.set_ylabel('Error')\n", "ax.set_title('Validation vs Test Error per Fold')\n", "ax.set_xticks(x)\n", "ax.set_xticklabels(fold_indices)\n", "ax.legend()\n", "ax.grid(True, linestyle='--', linewidth=0.5, alpha=0.6)\n", "\n", "for bar in val_bars:\n", " height = bar.get_height()\n", " ax.text(bar.get_x() + bar.get_width() / 2, height + 0.01, f'{height:.2f}', ha='center', va='bottom', fontsize=6)\n", "for bar in test_bars:\n", " height = bar.get_height()\n", " ax.text(bar.get_x() + bar.get_width() / 2, height + 0.01, f'{height:.2f}', ha='center', va='bottom', fontsize=6)\n", "\n", "plt.tight_layout()\n", "plt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "二分类任务。" ] }, { "cell_type": "code", "execution_count": 38, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "模型使用第1组数据开始训练:\n" ] }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "c57b0a4859784e3280f53bf5f4164c33", "version_major": 2, "version_minor": 0 }, "text/plain": [ " 0%| | 0/130 [00:00" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "k_fold_splitter = KFoldDatasetSplitter(train_binarycls_dataset, k_folds=10, shuffle=True)\n", "\n", "hyper_params = {\n", " 'learning_rate': 5.0e-4,\n", " 'num_epochs': 10,\n", " 'batch_size': 1024,\n", "}\n", "\n", "eval_accs = []\n", "test_accs = []\n", "\n", "for index in range(len(k_fold_splitter)):\n", " model = Model_2_2().to(device)\n", " train_dataset, eval_dataset = k_fold_splitter.get_fold(fold_idx=index)\n", " print(f\"模型使用第{index + 1}组数据开始训练:\")\n", " trainer = BinaryCLSTrainer(\n", " model=model, \n", " train_dataset=train_dataset, eval_dataset=eval_dataset, test_dataset=test_binarycls_dataset,\n", " optimizer='torch', criterion='torch', **hyper_params,\n", " plot=False, print_test_result=False, return_curves=True, log_epoch=0\n", " )\n", " train_result = trainer.train()\n", " eval_accs.append(train_result['curves']['eval_acc_curve'][-1])\n", " test_accs.append(train_result['test_acc'])\n", "\n", "fig, ax = plt.subplots(figsize=(7, 3.5))\n", "\n", "fold_indices = list(range(1, len(k_fold_splitter) + 1))\n", "bar_width = 0.35\n", "x = np.arange(len(fold_indices))\n", "\n", "val_bars = ax.bar(x - bar_width / 2, eval_accs, width=bar_width, label='Validation Accuracy', color='blue')\n", "test_bars = ax.bar(x + bar_width / 2, test_accs, width=bar_width, label='Test Accuracy', color='green')\n", "\n", "ax.set_xlabel('Fold Index')\n", "ax.set_ylabel('Accuracy')\n", "ax.set_title('Validation vs Test Accuracy per Fold')\n", "ax.set_xticks(x)\n", "ax.set_xticklabels(fold_indices)\n", "ax.legend()\n", "ax.grid(True, linestyle='--', linewidth=0.5, alpha=0.6)\n", "\n", "for bar in val_bars:\n", " height = bar.get_height()\n", " ax.text(bar.get_x() + bar.get_width() / 2, height + 0.01, f'{height:.1%}', ha='center', va='bottom', fontsize=6)\n", "for bar in test_bars:\n", " height = bar.get_height()\n", " ax.text(bar.get_x() + bar.get_width() / 2, height + 0.01, f'{height:.1%}', ha='center', va='bottom', fontsize=6)\n", "\n", "plt.tight_layout()\n", "plt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "多分类任务。" ] }, { "cell_type": "code", "execution_count": 39, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "模型使用第1组数据开始训练:\n" ] }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "f652aaa3f4b4473d9bbbf665dc40f192", "version_major": 2, "version_minor": 0 }, "text/plain": [ " 0%| | 0/530 [00:00" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "k_fold_splitter = KFoldDatasetSplitter(train_mnist_dataset, k_folds=10, shuffle=True)\n", "\n", "hyper_params = {\n", " 'learning_rate': 6.0e-2,\n", " 'num_epochs': 10,\n", " 'batch_size': 1024,\n", "}\n", "\n", "eval_accs = []\n", "test_accs = []\n", "\n", "for index in range(len(k_fold_splitter)):\n", " model = Model_4(\n", " num_hidden_layer=4, hidden_size=1024, \n", " input_features=image_width * image_height, num_classes=num_classes\n", " ).to(device)\n", " train_dataset, eval_dataset = k_fold_splitter.get_fold(fold_idx=index)\n", " print(f\"模型使用第{index + 1}组数据开始训练:\")\n", " trainer = MultiCLSTrainer(\n", " model=model, \n", " train_dataset=train_dataset, eval_dataset=eval_dataset, test_dataset=test_mnist_dataset,\n", " optimizer='torch', criterion='torch', **hyper_params,\n", " plot=False, print_test_result=False, return_curves=True, log_epoch=0\n", " )\n", " train_result = trainer.train()\n", " eval_accs.append(train_result['curves']['eval_acc_curve'][-1])\n", " test_accs.append(train_result['test_acc'])\n", "\n", "fig, ax = plt.subplots(figsize=(7, 3.5))\n", "\n", "fold_indices = list(range(1, len(k_fold_splitter) + 1))\n", "bar_width = 0.35\n", "x = np.arange(len(fold_indices))\n", "\n", "val_bars = ax.bar(x - bar_width / 2, eval_accs, width=bar_width, label='Validation Accuracy', color='blue')\n", "test_bars = ax.bar(x + bar_width / 2, test_accs, width=bar_width, label='Test Accuracy', color='green')\n", "\n", "ax.set_xlabel('Fold Index')\n", "ax.set_ylabel('Accuracy')\n", "ax.set_title('Validation vs Test Accuracy per Fold')\n", "ax.set_xticks(x)\n", "ax.set_xticklabels(fold_indices)\n", "ax.legend()\n", "ax.grid(True, linestyle='--', linewidth=0.5, alpha=0.6)\n", "\n", "for bar in val_bars:\n", " height = bar.get_height()\n", " ax.text(bar.get_x() + bar.get_width() / 2, height + 0.01, f'{height:.1%}', ha='center', va='bottom', fontsize=5.5)\n", "for bar in test_bars:\n", " height = bar.get_height()\n", " ax.text(bar.get_x() + bar.get_width() / 2, height + 0.01, f'{height:.1%}', ha='center', va='bottom', fontsize=5.5)\n", "\n", "plt.tight_layout()\n", "plt.show()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.13" } }, "nbformat": 4, "nbformat_minor": 4 }