Pytorch实现扩散模型【DDPM代码解读篇1】

2024-05-05 14:36

本文主要是介绍Pytorch实现扩散模型【DDPM代码解读篇1】,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

本篇内容属于对DDPM 原理-代码 项目的解读。

具体内容参考一篇推文,里面对DDPM讲解相对细致:

扩散模型的原理及实现(Pytorch)

下面主要是对其中源码的细致注解,帮助有需要的朋友更好理解代码。

目录

ConvNext块

 正弦时间戳嵌入

时间多层感知器

注意力

整合


ConvNext块

class ConvNextBlock(nn.Module):def __init__(self,in_channels,out_channels,mult=2,  # 输出通道数相对于输入通道数的倍数time_embedding_dim=None,  # 表示时间嵌入的维度norm=True,  # 是否使用归一化层group=8,  # 卷积操作的分组数量,默认为8):super().__init__()# 多层感知机(MLP),用于处理时间嵌入。# 如果time_embedding_dim不为None,则创建一个包含GELU激活函数和线性层的序列;否则为None。self.mlp = (nn.Sequential(nn.GELU(), nn.Linear(time_embedding_dim, in_channels))if time_embedding_dimelse None)# in_conv是一个输入卷积层,对输入进行卷积操作。self.in_conv = nn.Conv2d(in_channels, in_channels, 7, padding=3, groups=in_channels)# block是一个序列模块,包含一系列卷积操作和归一化层。这里使用了GELU作为激活函数。self.block = nn.Sequential(nn.GroupNorm(1, in_channels) if norm else nn.Identity(),nn.Conv2d(in_channels, out_channels * mult, 3, padding=1),nn.GELU(),nn.GroupNorm(1, out_channels * mult),nn.Conv2d(out_channels * mult, out_channels, 3, padding=1),)# residual_conv是一个残差连接的卷积层,用于调整输入和输出的通道数,# 如果输入通道数和输出通道数不同,则使用1x1卷积进行调整;否则为恒等映射。self.residual_conv = (nn.Conv2d(in_channels, out_channels, 1)if in_channels != out_channelselse nn.Identity())def forward(self, x, time_embedding=None):h = self.in_conv(x)  # 首先对输入x进行输入卷积操作。# 如果mlp不为None且time_embedding不为None,则对时间嵌入进行处理并与输入相加。if self.mlp is not None and time_embedding is not None:assert self.mlp is not None, "MLP is None"h = h + rearrange(self.mlp(time_embedding), "b c -> b c 1 1")  # 然后将处理后的特征输入到块中进行卷积操作。h = self.block(h)return h + self.residual_conv(x)  # 最后将卷积结果与输入进行残差连接,并返回。

 正弦时间戳嵌入

# 通常用于为序列数据添加位置信息。
class SinusoidalPosEmb(nn.Module):def __init__(self, dim, theta=10000):super().__init__()self.dim = dim  # 位置编码的维度。self.theta = theta  # theta是用于计算位置编码的参数,默认值为10000。def forward(self, x):device = x.device  # 首先获取输入x的设备信息。half_dim = self.dim // 2  # 然后计算位置编码的维度一半的值half_dimemb = math.log(self.theta) / (half_dim - 1)emb = torch.exp(torch.arange(half_dim, device=device) * -emb)  # 生成位置编码矩阵emb,其中每一行对应一个位置的编码,使用正弦和余弦函数计算。emb = x[:, None] * emb[None, :]emb = torch.cat((emb.sin(), emb.cos()), dim=-1)return emb# DownSample & UpSample 上下采样
class DownSample(nn.Module):def __init__(self, dim, dim_out=None):super().__init__()self.net = nn.Sequential(# 用于对输入进行重新排列,将2x2的空间块转换为通道数的维度,从而将空间维度减小为原来的四分之一。'''Rearrange层用于对输入的张量进行重新排列,将其从四维张量(batch size、通道数、高度、宽度)转换为新的形状b:表示batch size,保持不变。c:表示通道数,保持不变。(h p1)和(w p2):表示对高度和宽度进行的操作。p1和p2是两个额外的参数,用于指定在高度和宽度上的扩展倍数。这意味着将输入的高度和宽度分别扩展为原来的p1倍和p2倍。b (c p1 p2) h w:表示输出张量的形状,其中通道数乘以高度和宽度。这样做的效果是将原始的空间维度拼接到通道维度后面,使得输出的张量变为三维(batch size、新的通道数、高度、宽度)。'''Rearrange("b c (h p1) (w p2) -> b (c p1 p2) h w", p1=2, p2=2),# 接着是一个1x1的卷积层,用于将输入通道数变换为dim_out或者保持不变nn.Conv2d(dim * 4, default(dim_out, dim), 1),)def forward(self, x):return self.net(x)  # 将输入通过Sequential网络进行前向传播,返回处理后的结果。class Upsample(nn.Module):def __init__(self, dim, dim_out=None):super().__init__()self.net = nn.Sequential(nn.Upsample(scale_factor=2, mode="nearest"),  # 用于对输入进行上采样,采用最近邻插值的方式,并将图像沿着两个维度放大两倍。nn.Conv2d(dim, dim_out or dim, kernel_size=3, padding=1), # 接着是一个3x3的卷积层,用于将输入通道数变换为dim_out或者保持不变)def forward(self, x):return self.net(x)

时间多层感知器

sinu_pos_emb = SinusoidalPosEmb(dim, theta=10000)  # 用于生成正弦和余弦位置编码time_dim = dim * 4  # 四倍维度,通常用于增加时间信息的表示能力time_mlp = nn.Sequential(sinu_pos_emb,  # 将输入的时间信息进行正弦和余弦位置编码nn.Linear(dim, time_dim),  # 一个线性层,将输入维度dim映射为time_dim,以增加时间信息的表示能力。nn.GELU(),  # 激活函数,用于引入非线性。nn.Linear(time_dim, time_dim),  # 另一个线性层,将time_dim映射回time_dim,以保持输出维度不变。)

注意力

class BlockAttention(nn.Module):# gate_in_channel:门输入的通道数, residual_in_channel:残差输入的通道数, scale_factor:尺度因子,用于初始化门和残差卷积层的权重。def __init__(self, gate_in_channel, residual_in_channel, scale_factor):super().__init__()self.gate_conv = nn.Conv2d(gate_in_channel, gate_in_channel, kernel_size=1, stride=1)self.residual_conv = nn.Conv2d(residual_in_channel, gate_in_channel, kernel_size=1, stride=1)self.in_conv = nn.Conv2d(gate_in_channel, 1, kernel_size=1, stride=1)  # 输入卷积层,将门和残差的输出进行卷积处理,将结果映射为范围在0到1之间的值self.relu = nn.ReLU()self.sigmoid = nn.Sigmoid()# 前向传播方法接受两个张量作为输入:x表示残差输入,g表示门输入。def forward(self, x: torch.Tensor, g: torch.Tensor) -> torch.Tensor:in_attention = self.relu(self.gate_conv(g) + self.residual_conv(x))in_attention = self.in_conv(in_attention)in_attention = self.sigmoid(in_attention)return in_attention * x

整合

将前面讨论的所有块(不包括注意力块)整合到一个Unet中。
每个块都包含两个残差连接,而不是一个。
这个修改是为了解决潜在的过度拟合问题。

# 这个模块实现了一个双重残差 U 型网络,用于图像处理任务,如图像去噪、超分辨率等。
class TwoResUNet(nn.Module):def __init__(self,dim,init_dim=None,out_dim=None,dim_mults=(1, 2, 4, 8),channels=3,sinusoidal_pos_emb_theta=10000,convnext_block_groups=8,  # 卷积块中的分组数):super().__init__()self.channels = channelsinput_channels = channels# init_dim 不为 None,则返回 init_dim;否则返回 dim。这样做的目的是提供了一种灵活的方式,允许用户在初始化模型时选择是否指定初始维度,如果未指定,则使用输入的维度作为初始维度。self.init_dim = default(init_dim, dim)self.init_conv = nn.Conv2d(input_channels, self.init_dim, 7, padding=3)dims = [self.init_dim, *map(lambda m: dim * m, dim_mults)]'''使用 map 函数对 dim_mults 中的每个值 m 进行操作,将其乘以 dim。这样可以得到一个新的列表,其中的每个值都是 dim 与 dim_mults 中的相应值相乘得到的结果。* 运算符用于解包操作,将 map 函数生成的结果解包成单独的元素。'''in_out = list(zip(dims[:-1], dims[1:]))# 使用 zip 函数将 dims 中的相邻两个元素组合成一个元组。这样可以得到一个列表,其中每个元素都是一个包含相邻两个阶段的输入通道数和输出通道数的元组。sinu_pos_emb = SinusoidalPosEmb(dim, theta=sinusoidal_pos_emb_theta)time_dim = dim * 4self.time_mlp = nn.Sequential(sinu_pos_emb,nn.Linear(dim, time_dim),nn.GELU(),nn.Linear(time_dim, time_dim),)self.downs = nn.ModuleList([])self.ups = nn.ModuleList([])num_resolutions = len(in_out)  # 计算了图像的分辨率数,存储在 num_resolutions 中# 下面的循环用于创建下采样部分(downs):for ind, (dim_in, dim_out) in enumerate(in_out):is_last = ind >= (num_resolutions - 1)self.downs.append(nn.ModuleList([ConvNextBlock(in_channels=dim_in,out_channels=dim_in,time_embedding_dim=time_dim,group=convnext_block_groups,),ConvNextBlock(in_channels=dim_in,out_channels=dim_in,time_embedding_dim=time_dim,group=convnext_block_groups,),DownSample(dim_in, dim_out)if not is_lastelse nn.Conv2d(dim_in, dim_out, 3, padding=1),]))# 创建了中间残差块 mid_block1 和 mid_block2:mid_dim = dims[-1]  # 通常会将最后一个阶段的输出通道数作为中间残差块的输入通道数self.mid_block1 = ConvNextBlock(mid_dim, mid_dim, time_embedding_dim=time_dim)self.mid_block2 = ConvNextBlock(mid_dim, mid_dim, time_embedding_dim=time_dim)# 下面的循环用于创建上采样部分(ups):for ind, (dim_in, dim_out) in enumerate(reversed(in_out)):is_last = ind == (len(in_out) - 1)is_first = ind == 0self.ups.append(nn.ModuleList([ConvNextBlock(in_channels=dim_out + dim_in,out_channels=dim_out,time_embedding_dim=time_dim,group=convnext_block_groups,),ConvNextBlock(in_channels=dim_out + dim_in,out_channels=dim_out,time_embedding_dim=time_dim,group=convnext_block_groups,),Upsample(dim_out, dim_in)if not is_lastelse nn.Conv2d(dim_out, dim_in, 3, padding=1)]))default_out_dim = channelsself.out_dim = default(out_dim, default_out_dim)# 创建了最终的残差块 final_res_block 和输出卷积层 final_conv:self.final_res_block = ConvNextBlock(dim * 2, dim, time_embedding_dim=time_dim)self.final_conv = nn.Conv2d(dim, self.out_dim, 1)def forward(self, x, time):b, _, h, w = x.shapex = self.init_conv(x)  # 对输入张量进行初始卷积操作r = x.clone()  # 克隆 xt = self.time_mlp(time)  # 使用时间多层感知器处理时间信息unet_stack = []  # 创建一个空列表 unet_stack,用于存放下采样阶段的特征。# 对每个下采样模块进行操作:先执行两个卷积块,然后执行下采样,并将特征存储在 unet_stack 中。for down1, down2, downsample in self.downs:x = down1(x, t)unet_stack.append(x)x = down2(x, t)unet_stack.append(x)x = downsample(x)# 中间残差块x = self.mid_block1(x, t)x = self.mid_block2(x, t)# 对每个上采样模块进行操作:从 unet_stack 中取出特征,与当前特征拼接后执行两个卷积块,然后执行上采样。for up1, up2, upsample in self.ups:x = torch.cat((x, unet_stack.pop()), dim=1)x = up1(x, t)x = torch.cat((x, unet_stack.pop()), dim=1)x = up2(x, t)x = upsample(x)# 将初始特征 r 与最终的特征拼接后,执行最终的残差块和输出卷积层,得到最终的输出。x = torch.cat((x, r), dim=1)x = self.final_res_block(x, t)return self.final_conv(x)

 Life is a journey. We pursue love and light with purity.

你的 “三连” 是小曦持续更新的动力!
下期将推出
扩散的代码实现,零距离解读扩散是如何实现的。

这篇关于Pytorch实现扩散模型【DDPM代码解读篇1】的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/961951

相关文章

MySQL中查找重复值的实现

《MySQL中查找重复值的实现》查找重复值是一项常见需求,比如在数据清理、数据分析、数据质量检查等场景下,我们常常需要找出表中某列或多列的重复值,具有一定的参考价值,感兴趣的可以了解一下... 目录技术背景实现步骤方法一:使用GROUP BY和HAVING子句方法二:仅返回重复值方法三:返回完整记录方法四:

IDEA中新建/切换Git分支的实现步骤

《IDEA中新建/切换Git分支的实现步骤》本文主要介绍了IDEA中新建/切换Git分支的实现步骤,通过菜单创建新分支并选择是否切换,创建后在Git详情或右键Checkout中切换分支,感兴趣的可以了... 前提:项目已被Git托管1、点击上方栏Git->NewBrancjsh...2、输入新的分支的

Python实现对阿里云OSS对象存储的操作详解

《Python实现对阿里云OSS对象存储的操作详解》这篇文章主要为大家详细介绍了Python实现对阿里云OSS对象存储的操作相关知识,包括连接,上传,下载,列举等功能,感兴趣的小伙伴可以了解下... 目录一、直接使用代码二、详细使用1. 环境准备2. 初始化配置3. bucket配置创建4. 文件上传到os

解读GC日志中的各项指标用法

《解读GC日志中的各项指标用法》:本文主要介绍GC日志中的各项指标用法,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、基础 GC 日志格式(以 G1 为例)1. Minor GC 日志2. Full GC 日志二、关键指标解析1. GC 类型与触发原因2. 堆

Java设计模式---迭代器模式(Iterator)解读

《Java设计模式---迭代器模式(Iterator)解读》:本文主要介绍Java设计模式---迭代器模式(Iterator),具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,... 目录1、迭代器(Iterator)1.1、结构1.2、常用方法1.3、本质1、解耦集合与遍历逻辑2、统一

关于集合与数组转换实现方法

《关于集合与数组转换实现方法》:本文主要介绍关于集合与数组转换实现方法,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、Arrays.asList()1.1、方法作用1.2、内部实现1.3、修改元素的影响1.4、注意事项2、list.toArray()2.1、方

使用Python实现可恢复式多线程下载器

《使用Python实现可恢复式多线程下载器》在数字时代,大文件下载已成为日常操作,本文将手把手教你用Python打造专业级下载器,实现断点续传,多线程加速,速度限制等功能,感兴趣的小伙伴可以了解下... 目录一、智能续传:从崩溃边缘抢救进度二、多线程加速:榨干网络带宽三、速度控制:做网络的好邻居四、终端交互

java实现docker镜像上传到harbor仓库的方式

《java实现docker镜像上传到harbor仓库的方式》:本文主要介绍java实现docker镜像上传到harbor仓库的方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录1. 前 言2. 编写工具类2.1 引入依赖包2.2 使用当前服务器的docker环境推送镜像2.2

C++20管道运算符的实现示例

《C++20管道运算符的实现示例》本文简要介绍C++20管道运算符的使用与实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录标准库的管道运算符使用自己实现类似的管道运算符我们不打算介绍太多,因为它实际属于c++20最为重要的

Java easyExcel实现导入多sheet的Excel

《JavaeasyExcel实现导入多sheet的Excel》这篇文章主要为大家详细介绍了如何使用JavaeasyExcel实现导入多sheet的Excel,文中的示例代码讲解详细,感兴趣的小伙伴可... 目录1.官网2.Excel样式3.代码1.官网easyExcel官网2.Excel样式3.代码