1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311
| import re import torch import torch.nn as nn import torch.nn.functional as F import torch.utils.checkpoint as cp from collections import OrderedDict from utils import load_state_dict_from_url from torch import Tensor from torch.jit.annotations import List
__all__ = ['DenseNet', 'densenet121', 'densenet169', 'densenet201', 'densenet161']
# 对应模型预训练下载的地址 model_urls = { 'densenet121': 'https://download.pytorch.org/models/densenet121-a639ec97.pth', 'densenet169': 'https://download.pytorch.org/models/densenet169-b2777c0a.pth', 'densenet201': 'https://download.pytorch.org/models/densenet201-c1103571.pth', 'densenet161': 'https://download.pytorch.org/models/densenet161-8d451a50.pth', }
# step1:实现non-linear transformation:bn-relu-conv1x1-bn-relu-conv3x3 # feature维度变化: l*K->bn_size*k->k # 传入参数有: num_input_features 输入特征数channel # growth_rate 输出特征数channel # bn_size bottleneck结构需要先把k*l个通道变成4k个通道,用1x1conv变成k个通道,整体看来是一个降维过程 # drop_rate 进行drop_out时的比例
class _DenseLayer(nn.Module): def __init__(self, num_input_features, growth_rate, bn_size, drop_rate, memory_efficient=False): super(_DenseLayer, self).__init__() self.add_module('norm1', nn.BatchNorm2d(num_input_features)), self.add_module('relu1', nn.ReLU(inplace=True)), # 1x1conv降维到4k self.add_module('conv1', nn.Conv2d(num_input_features, bn_size * growth_rate, kernel_size=1, stride=1, bias=False)), self.add_module('norm2', nn.BatchNorm2d(bn_size * growth_rate)), self.add_module('relu2', nn.ReLU(inplace=True)), self.add_module('conv2', nn.Conv2d(bn_size * growth_rate, growth_rate, kernel_size=3, stride=1, padding=1, bias=False)), self.drop_rate = float(drop_rate) self.memory_efficient = memory_efficient
def bn_function(self, inputs): # type: (List[Tensor]) -> Tensor concated_features = torch.cat(inputs, 1) bottleneck_output = self.conv1(self.relu1(self.norm1(concated_features))) # noqa: T484 return bottleneck_output
# torchscript does not yet support *args, so we overload method # allowing it to take either a List[Tensor] or single Tensor def forward(self, input): # noqa: F811 if isinstance(input, Tensor): prev_features = [input] else: prev_features = input
if self.memory_efficient and self.any_requires_grad(prev_features): if torch.jit.is_scripting(): raise Exception("Memory Efficient not supported in JIT")
bottleneck_output = self.call_checkpoint_bottleneck(prev_features) else: bottleneck_output = self.bn_function(prev_features)
new_features = self.conv2(self.relu2(self.norm2(bottleneck_output))) if self.drop_rate > 0: new_features = F.dropout(new_features, p=self.drop_rate, training=self.training) return new_features
# step3:根据non-linear transformation创建_DenseBlock # 输入参数有:num_layers DenseBlock里有多少个提取特征的“层” # 接下来这几个参数都是non-linear transformation需要的 # num_input_features k0+k(l-1) # bn_size 默认为4 # growth_rate k # drop_rate
class _DenseBlock(nn.ModuleDict): """DenseBlock""" _version = 2
def __init__(self, num_layers, num_input_features, bn_size, growth_rate, drop_rate, memory_efficient=False): super(_DenseBlock, self).__init__() # 遍历num_layers,创建“层” for i in range(num_layers): # i=0,输入为k0,输出为k # i=1,输入为k0+k,输出为k # i=2,输入为k0+2k,输出为k # i=3,输入为k0+3k,输出为k # ... layer = _DenseLayer( num_input_features + i * growth_rate, growth_rate=growth_rate, bn_size=bn_size, drop_rate=drop_rate, memory_efficient=memory_efficient, ) # 把“层”添加到序列模型中 self.add_module('denselayer%d' % (i + 1), layer)
def forward(self, init_features): features = [init_features] for name, layer in self.items(): new_features = layer(features) features.append(new_features) return torch.cat(features, 1)
# step2:实现连接两个DenseBlock的_Transition # 输入参数有:num_input_feature 上一个DenseBlock的输出,(l-1)*k+k0 # num_output_features 下一个DenseBlock的输入 class _Transition(nn.Sequential): #继承Sequential类 """Transition layer between two adjacent DenseBlock""" def __init__(self, num_input_features, num_output_features): super(_Transition, self).__init__() self.add_module('norm', nn.BatchNorm2d(num_input_features)) self.add_module('relu', nn.ReLU(inplace=True)) # 两个方向的降维度:channel降为num_output_features设定值 # featuremap降为一半AvgPool2d self.add_module('conv', nn.Conv2d(num_input_features, num_output_features, kernel_size=1, stride=1, bias=False)) self.add_module('pool', nn.AvgPool2d(kernel_size=2, stride=2, ceil_mode=True))
# step4:根据DenseBlock和transition创建DenseNet # 输入参数有:growth_rate 每个denseblock输出维度k # block_config 存放每个denseblock中有多少"层"(num_layers) # num_init_features 初始卷积层输出channel数
class DenseNet(nn.Module): r"""Densenet-BC model class, based on `"Densely Connected Convolutional Networks" <https://arxiv.org/pdf/1608.06993.pdf>`_
Args: growth_rate (int) - how many filters to add each layer (`k` in paper) block_config (list of 4 ints) - how many layers in each pooling block num_init_features (int) - the number of filters to learn in the first convolution layer bn_size (int) - multiplicative factor for number of bottle neck layers (i.e. bn_size * k features in the bottleneck layer) drop_rate (float) - dropout rate after each dense layer num_classes (int) - number of classification classes memory_efficient (bool) - If True, uses checkpointing. Much more memory efficient, but slower. Default: *False*. See `"paper" <https://arxiv.org/pdf/1707.06990.pdf>`_ """
def __init__(self, growth_rate=32, block_config=(6, 12, 24, 16), num_init_features=64, bn_size=4, drop_rate=0, num_classes=1000, memory_efficient=False):
super(DenseNet, self).__init__()
# 初始卷积层,这个是独立于DenseBlock的 # 7X7conv -> BN+Relu -> maxpool # 输出channel为num_init_features,featuremap需要计算((n+2p-f)/s+1) # First convolution self.features = nn.Sequential(OrderedDict([ ('conv0', nn.Conv2d(3, num_init_features, kernel_size=7, stride=2, padding=3, bias=False)), ('norm0', nn.BatchNorm2d(num_init_features)), ('relu0', nn.ReLU(inplace=True)), ('pool0', nn.MaxPool2d(kernel_size=3, stride=2, padding=1)), ]))
# Each denseblock # 根据block_config中的数据创建DenseBlock # 例如这里会创建四个DenseBlock,以第一个为例 num_features = num_init_features for i, num_layers in enumerate(block_config): # num_layers为6,会创建6个“层”每层输入都是之前层输出的concat # i=0,输入为num_features,输出为k # i=1,输入为num_features+k,输出为k # i=2,输入为num_features+2k,输出为k # i=3,输入为num_features+3k,输出为k # ... block = _DenseBlock( num_layers=num_layers, num_input_features=num_features, bn_size=bn_size, growth_rate=growth_rate, drop_rate=drop_rate, memory_efficient=memory_efficient ) # 把创建好的_DenseBlock接到最开始创建的序列模型后边 self.features.add_module('denseblock%d' % (i + 1), block) num_features = num_features + num_layers * growth_rate # 如果不是最后一个DenseBlock,就需要创建transition连接 if i != len(block_config) - 1: trans = _Transition(num_input_features=num_features, num_output_features=num_features // 2) self.features.add_module('transition%d' % (i + 1), trans) num_features = num_features // 2
# Final batch norm self.features.add_module('norm5', nn.BatchNorm2d(num_features))
# Linear layer 定义分类器 self.classifier = nn.Linear(num_features, num_classes)
# Official init from torch repo. 初始化权重 for m in self.modules(): if isinstance(m, nn.Conv2d): nn.init.kaiming_normal_(m.weight) elif isinstance(m, nn.BatchNorm2d): nn.init.constant_(m.weight, 1) nn.init.constant_(m.bias, 0) elif isinstance(m, nn.Linear): nn.init.constant_(m.bias, 0) #前向传播 def forward(self, x): features = self.features(x) out = F.relu(features, inplace=True) out = F.adaptive_avg_pool2d(out, (1, 1)) out = torch.flatten(out, 1) out = self.classifier(out) return out
def _load_state_dict(model, model_url, progress): # '.'s are no longer allowed in module names, but previous _DenseLayer # has keys 'norm.1', 'relu.1', 'conv.1', 'norm.2', 'relu.2', 'conv.2'. # They are also in the checkpoints in model_urls. This pattern is used # to find such keys. pattern = re.compile( r'^(.*denselayer\d+\.(?:norm|relu|conv))\.((?:[12])\.(?:weight|bias|running_mean|running_var))$')
state_dict = load_state_dict_from_url(model_url, progress=progress) for key in list(state_dict.keys()): res = pattern.match(key) if res: new_key = res.group(1) + res.group(2) state_dict[new_key] = state_dict[key] del state_dict[key] model.load_state_dict(state_dict)
def _densenet(arch, growth_rate, block_config, num_init_features, pretrained, progress, **kwargs): model = DenseNet(growth_rate, block_config, num_init_features, **kwargs) if pretrained: _load_state_dict(model, model_urls[arch], progress) return model
def densenet121(pretrained=False, progress=True, **kwargs): r"""Densenet-121 model from `"Densely Connected Convolutional Networks" <https://arxiv.org/pdf/1608.06993.pdf>`_
Args: pretrained (bool): If True, returns a model pre-trained on ImageNet progress (bool): If True, displays a progress bar of the download to stderr memory_efficient (bool) - If True, uses checkpointing. Much more memory efficient, but slower. Default: *False*. See `"paper" <https://arxiv.org/pdf/1707.06990.pdf>`_ """ return _densenet('densenet121', 32, (6, 12, 24, 16), 64, pretrained, progress, **kwargs)
def densenet161(pretrained=False, progress=True, **kwargs): r"""Densenet-161 model from `"Densely Connected Convolutional Networks" <https://arxiv.org/pdf/1608.06993.pdf>`_
Args: pretrained (bool): If True, returns a model pre-trained on ImageNet progress (bool): If True, displays a progress bar of the download to stderr memory_efficient (bool) - If True, uses checkpointing. Much more memory efficient, but slower. Default: *False*. See `"paper" <https://arxiv.org/pdf/1707.06990.pdf>`_ """ return _densenet('densenet161', 48, (6, 12, 36, 24), 96, pretrained, progress, **kwargs)
def densenet169(pretrained=False, progress=True, **kwargs): r"""Densenet-169 model from `"Densely Connected Convolutional Networks" <https://arxiv.org/pdf/1608.06993.pdf>`_
Args: pretrained (bool): If True, returns a model pre-trained on ImageNet progress (bool): If True, displays a progress bar of the download to stderr memory_efficient (bool) - If True, uses checkpointing. Much more memory efficient, but slower. Default: *False*. See `"paper" <https://arxiv.org/pdf/1707.06990.pdf>`_ """ return _densenet('densenet169', 32, (6, 12, 32, 32), 64, pretrained, progress, **kwargs)
def densenet201(pretrained=False, progress=True, **kwargs): r"""Densenet-201 model from `"Densely Connected Convolutional Networks" <https://arxiv.org/pdf/1608.06993.pdf>`_
Args: pretrained (bool): If True, returns a model pre-trained on ImageNet progress (bool): If True, displays a progress bar of the download to stderr memory_efficient (bool) - If True, uses checkpointing. Much more memory efficient, but slower. Default: *False*. See `"paper" <https://arxiv.org/pdf/1707.06990.pdf>`_ """ return _densenet('densenet201', 32, (6, 12, 48, 32), 64, pretrained, progress, **kwargs)
if __name__ == '__main__': # 'DenseNet', 'densenet121', 'densenet169', 'densenet201', 'densenet161' # Example net = densenet169() net_weights = net.state_dict() print(net)
|