前言
本文基于chainer实现EfficientnetV2网络结构,并基于torch的结构方式构建chainer版的,并计算EfficientnetV2的参数量。
本次模型会基于chainer实现SiLU激活函数及drop_connect(droppath)
代码实现
class SiLU(chainer.Chain):
def __init__(self):
super(SiLU, self).__init__()
def __call__(self, x):
out = x * F.sigmoid(x)
return out
def drop_connect(inputs, p):
if not configuration.config.train:
return inputs
xp = backend.get_array_module(inputs)
keep_prob = 1 - p
batch_size = inputs.shape[0]
random_tensor = keep_prob
random_tensor += xp.random.uniform(size=[batch_size, 1, 1, 1])
binary_tensor = xp.floor(random_tensor)
output = (inputs / keep_prob) * binary_tensor
return output
class ConvBNAct(chainer.Chain):
def __init__(self, in_planes: int, out_planes: int, kernel_size: int = 3, stride: int = 1, groups: int = 1, norm_layer = None, activation_layer = SiLU()):
super(ConvBNAct, self).__init__()
padding = (kernel_size - 1) // 2
if norm_layer is None:
norm_layer = BatchNormalization
self.layers = []
self.layers += [('conv',L.Convolution2D(in_channels=in_planes,out_channels=out_planes,ksize=kernel_size,stride=stride,pad=padding,groups=groups, nobias=True))]
self.layers += [('bn',norm_layer(out_planes))]
if activation_layer is not None:
self.layers += [('act',activation_layer)]
with self.init_scope():
for n in self.layers:
if not n[0].startswith('_'):
setattr(self, n[0], n[1])
def __call__(self, x):
for n, f in self.layers:
if not n.startswith('_'):
x = getattr(self, n)(x)
else:
x = f.apply((x,))[0]
return x
class SqueezeExcite(chainer.Chain):
def __init__(self, input_c: int, expand_c: int, se_ratio: float = 0.25):
super(SqueezeExcite, self).__init__()
squeeze_c = int(input_c * se_ratio)
self.layers = []
# self.layers += [('global_pool',functools.partial(F.mean, axis=(2, 3)))]
self.layers += [('conv_reduce',L.Convolution2D(expand_c, squeeze_c, 1))]
self.layers += [('ac1',SiLU())]
self.layers += [('conv_expand',L.Convolution2D(squeeze_c, expand_c, 1))]
self.layers += [('_ac2',Sigmoid())]
with self.init_scope():
for n in self.layers:
if not n[0].startswith('_'):
setattr(self, n[0], n[1])
def forward(self, x):
scale = x
for n, f in self.layers:
if not n.startswith('_'):
scale = getattr(self, n)(scale)
else:
scale = f.apply((scale,))[0]
return scale * x
class MBConv(chainer.Chain):
def __init__(self, kernel_size: int, input_c: int, out_c: int, expand_ratio: int, stride: int, se_ratio: float, drop_rate: float, norm_layer):
super(MBConv, self).__init__()
if stride not in [1, 2]:
raise ValueError("illegal stride value.")
self.has_shortcut = (stride == 1 and input_c == out_c)
activation_layer = SiLU()
expanded_c = input_c * expand_ratio
# 在EfficientNetV2中,MBConv中不存在expansion=1的情况所以conv_pw肯定存在
assert expand_ratio != 1
self.layers = []
# Point-wise expansion
self.layers += [('expand_conv',ConvBNAct(input_c, expanded_c, kernel_size=1, norm_layer=norm_layer, activation_layer=activation_layer))]
# Depth-wise convolution
self.layers += [('dwconv',ConvBNAct(expanded_c, expanded_c, kernel_size=kernel_size, stride=stride, groups=expanded_c, norm_layer=norm_layer, activation_layer=activation_layer))]
if se_ratio > 0:
self.layers += [('se',SqueezeExcite(input_c, expanded_c, se_ratio))]
# Point-wise linear projection
self.layers += [('project_conv',ConvBNAct(expanded_c, out_planes=out_c, kernel_size=1, norm_layer=norm_layer, activation_layer=None))]
self.out_channels = out_c
# 只有在使用shortcut连接时才使用dropout层
self.drop_rate = drop_rate
if self.has_shortcut and drop_rate > 0:
self.dropout = drop_rate
else:
self.dropout = None
with self.init_scope():
for n in self.layers:
if not n[0].startswith('_'):
setattr(self, n[0], n[1])
def forward(self, x):
shortcut = x
for n, f in self.layers:
if not n.startswith('_'):
x = getattr(self, n)(x)
else:
x = f.apply((x,))[0]
if self.has_shortcut:
if self.dropout is not None:
x = drop_connect(x,self.dropout)
x += shortcut
return x
class FusedMBConv(chainer.Chain):
def __init__(self, kernel_size: int, input_c: int, out_c: int, expand_ratio: int, stride: int, se_ratio: float, drop_rate: float, norm_layer):
super(FusedMBConv, self).__init__()
assert stride in [1, 2]
assert se_ratio == 0
self.has_shortcut = stride == 1 and input_c == out_c
self.drop_rate = drop_rate
self.has_expansion = expand_ratio != 1
activation_layer = SiLU() # alias Swish
expanded_c = input_c * expand_ratio
self.layers = []
# 只有当expand ratio不等于1时才有expand conv
if self.has_expansion:
# Expansion convolution
self.layers += [('expand_conv',ConvBNAct(input_c, expanded_c, kernel_size=kernel_size, stride=stride, norm_layer=norm_layer, activation_layer=activation_layer))]
self.layers += [('project_conv',ConvBNAct(expanded_c, out_c, kernel_size=1, norm_layer=norm_layer, activation_layer=None))]
else:
# 当只有project_conv时的情况
self.layers += [('project_conv',ConvBNAct(input_c, out_c, kernel_size=kernel_size, stride=stride, norm_layer=norm_layer, activation_layer=activation_layer))]
self.out_channels = out_c
# 只有在使用shortcut连接时才使用dropout层
self.drop_rate = drop_rate
if self.has_shortcut and drop_rate > 0:
self.dropout = drop_rate
else:
self.dropout = None
with self.init_scope():
for n in self.layers:
if not n[0].startswith('_'):
setattr(self, n[0], n[1])
def forward(self, x):
short_cut = x
for n, f in self.layers:
if not n.startswith('_'):
x = getattr(self, n)(x)
else:
x = f.apply((x,))[0]
if self.has_shortcut:
if self.dropout is not None:
x = drop_connect(x,self.dropout)
x += short_cut
return x
class EfficientNet_V2(chainer.Chain):
cfgs={
'efficientnetv2_s':{
'model_cnf':[[2, 3, 1, 1, 24, 24, 0, 0],
[4, 3, 2, 4, 24, 48, 0, 0],
[4, 3, 2, 4, 48, 64, 0, 0],
[6, 3, 2, 4, 64, 128, 1, 0.25],
[9, 3, 1, 6, 128, 160, 1, 0.25],
[15, 3, 2, 6, 160, 256, 1, 0.25]],
'dropout_rate':0.2,
'drop_connect_rate':0.2,
'num_features':1280,
'image_size':{
'train_size':300,
'val_size':384
}
},
'efficientnetv2_m':{
'model_cnf':[[3, 3, 1, 1, 24, 24, 0, 0],
[5, 3, 2, 4, 24, 48, 0, 0],
[5, 3, 2, 4, 48, 80, 0, 0],
[7, 3, 2, 4, 80, 160, 1, 0.25],
[14, 3, 1, 6, 160, 176, 1, 0.25],
[18, 3, 2, 6, 176, 304, 1, 0.25],
[5, 3, 1, 6, 304, 512, 1, 0.25]],
'dropout_rate':0.3,
'drop_connect_rate':0.2,
'num_features':1280,
'image_size':{
'train_size':384,
'val_size':480
}
},
'efficientnetv2_l':{
'model_cnf':[[4, 3, 1, 1, 32, 32, 0, 0],
[7, 3, 2, 4, 32, 64, 0, 0],
[7, 3, 2, 4, 64, 96, 0, 0],
[10, 3, 2, 4, 96, 192, 1, 0.25],
[19, 3, 1, 6, 192, 224, 1, 0.25],
[25, 3, 2, 6, 224, 384, 1, 0.25],
[7, 3, 1, 6, 384, 640, 1, 0.25]],
'dropout_rate':0.4,
'drop_connect_rate':0.2,
'num_features':1280,
'image_size':{
'train_size':384,
'val_size':480
}
}
}
def __init__(self,model_name='efficientnetv2_s',channels=3,batch_size=4,image_size=224,
num_classes: int = 1000,**kwargs):
super(EfficientNet_V2, self).__init__()
self.image_size = image_size
for cnf in self.cfgs[model_name]['model_cnf']:
assert len(cnf) == 8
norm_layer = partial(BatchNormalization, eps=1e-3)
stem_filter_num = self.cfgs[model_name]['model_cnf'][0][4]
self.layers = []
self.layers += [('stem',ConvBNAct(channels, stem_filter_num, kernel_size=3, stride=2, norm_layer=norm_layer))]
output_size = int((self.image_size-3+2*((3-1)//2))/2+1)
total_blocks = sum([i[0] for i in self.cfgs[model_name]['model_cnf']])
block_id = 0
for cnf in self.cfgs[model_name]['model_cnf']:
repeats = cnf[0]
op = FusedMBConv if cnf[-2] == 0 else MBConv
for i in range(repeats):
self.layers += [('block_{0}'.format(block_id),op(kernel_size=cnf[1], input_c=cnf[4] if i == 0 else cnf[5], out_c=cnf[5], expand_ratio=cnf[3], stride=cnf[2] if i == 0 else 1, se_ratio=cnf[-1], drop_rate=self.cfgs[model_name]['drop_connect_rate'] * block_id / total_blocks, norm_layer=norm_layer))]
block_id += 1
output_size = math.ceil(output_size / (cnf[2] if i == 0 else 1))
head_input_c = self.cfgs[model_name]['model_cnf'][-1][-3]
self.layers += [('project_conv',ConvBNAct(head_input_c, self.cfgs[model_name]['num_features'], kernel_size=1, norm_layer=norm_layer))]
output_size = int((output_size-1+2*((1-1)//2))/1+1)
self.layers += [('_avgpool',AveragePooling2D(ksize=output_size,stride=1,pad=0))]
self.layers += [('_reshape',Reshape((batch_size,self.cfgs[model_name]['num_features'])))]
if self.cfgs[model_name]['dropout_rate'] > 0:
self.layers += [("_dropout1",Dropout(self.cfgs[model_name]['dropout_rate']))]
self.layers += [('fc',L.Linear(self.cfgs[model_name]['num_features'], num_classes))]
with self.init_scope():
for n in self.layers:
if not n[0].startswith('_'):
setattr(self, n[0], n[1])
def forward(self, x):
for n, f in self.layers:
origin_size = x.shape
if not n.startswith('_'):
x = getattr(self, n)(x)
else:
x = f.apply((x,))[0]
print(n,origin_size,x.shape)
if chainer.config.train:
return x
return F.softmax(x)
注意此类就是EfficientNetV2的实现过程,注意网络的前向传播过程中,分了训练以及测试。
训练过程中直接返回x,测试过程中会进入softmax得出概率
调用方式
if __name__ == '__main__':
batch_size = 4
n_channels = 3
image_size = 224
num_classes = 123
model = EfficientNet_V2(num_classes=num_classes, channels=n_channels,image_size=image_size,batch_size=batch_size)
print('参数量',model.count_params())
x = np.random.rand(batch_size, n_channels, image_size, image_size).astype(np.float32)
t = np.random.randint(0, num_classes, size=(batch_size,)).astype(np.int32)
with chainer.using_config('train', True):
y1 = model(x)
loss1 = F.softmax_cross_entropy(y1, t)