您的位置:首页 > 编程语言

Tensorflow2.0:Faster RCNN 代码详解(一)

2020-03-06 12:26 411 查看

第一部分给出Fater RCNN文件的代码解析,主要是模型主体的执行过程,在此文件
引入下述几个文件的函数引用,对于backbones,necks和test_mixins文件来说,主要是用来构建模型结构,并不复杂,主要是按照tf语法搭积木就可以了,因此在第二到第四部分给出,并不多做解析,至于剩下的四个文件,后面的文章继续解析。

from detection.models.backbones import resnet
from detection.models.necks import fpn
from detection.models.detectors.test_mixins import RPNTestMixin, BBoxTestMixin
from detection.models.rpn_heads import rpn_head
from detection.models.bbox_heads import bbox_head
from detection.models.roi_extractors import roi_align
from detection.core.bbox import bbox_target
import tensorflow as tf

第一部分 针对Fater RCNN文件代码解析

class FasterRCNN(tf.keras.Model, RPNTestMixin, BBoxTestMixin):

def __init__(self, num_classes, **kwags):
super(FasterRCNN, self).__init__(**kwags)
self.NUM_CLASSES = num_classes
# RPN configuration
# Anchor attributes
self.ANCHOR_SCALES = (32, 64, 128, 256, 512)
self.ANCHOR_RATIOS = (0.5, 1, 2)
self.ANCHOR_FEATURE_STRIDES = (4, 8, 16, 32, 64)
# Bounding box refinement mean and standard deviation
self.RPN_TARGET_MEANS = (0., 0., 0., 0.)
self.RPN_TARGET_STDS = (0.1, 0.1, 0.2, 0.2)
# RPN training configuration
self.PRN_BATCH_SIZE = 256
self.RPN_POS_FRAC = 0.5
self.RPN_POS_IOU_THR = 0.7
self.RPN_NEG_IOU_THR = 0.3
# ROIs kept configuration
self.PRN_PROPOSAL_COUNT = 2000
self.PRN_NMS_THRESHOLD = 0.7
# RCNN configuration
# Bounding box refinement mean and standard deviation
self.RCNN_TARGET_MEANS = (0., 0., 0., 0.)
self.RCNN_TARGET_STDS = (0.1, 0.1, 0.2, 0.2)
# ROI Feat Size
self.POOL_SIZE = (7, 7)

# RCNN training configuration
self.RCNN_BATCH_SIZE = 256
self.RCNN_POS_FRAC = 0.25
self.RCNN_POS_IOU_THR = 0.5
self.RCNN_NEG_IOU_THR = 0.5

# Boxes kept configuration
self.RCNN_MIN_CONFIDENCE = 0.7
self.RCNN_NME_THRESHOLD = 0.3
self.RCNN_MAX_INSTANCES = 100

# Target Generator for the second stage.
self.bbox_target = bbox_target.ProposalTarget(
target_means=self.RCNN_TARGET_MEANS,
target_stds=self.RPN_TARGET_STDS,
num_rcnn_deltas=self.RCNN_BATCH_SIZE,
positive_fraction=self.RCNN_POS_FRAC,
pos_iou_thr=self.RCNN_POS_IOU_THR,
neg_iou_thr=self.RCNN_NEG_IOU_THR)

# Modules
self.backbone = resnet.ResNet(
depth=101,
name='res_net')

self.neck = fpn.FPN(
name='fpn')

self.rpn_head = rpn_head.RPNHead(
anchor_scales=self.ANCHOR_SCALES,
anchor_ratios=self.ANCHOR_RATIOS,
anchor_feature_strides=self.ANCHOR_FEATURE_STRIDES,
proposal_count=self.PRN_PROPOSAL_COUNT,
nms_threshold=self.PRN_NMS_THRESHOLD,
target_means=self.RPN_TARGET_MEANS,
target_stds=self.RPN_TARGET_STDS,
num_rpn_deltas=self.PRN_BATCH_SIZE,
positive_fraction=self.RPN_POS_FRAC,
pos_iou_thr=self.RPN_POS_IOU_THR,
neg_iou_thr=self.RPN_NEG_IOU_THR,
name='rpn_head')

self.roi_align = roi_align.PyramidROIAlign(
pool_shape=self.POOL_SIZE,
name='pyramid_roi_align')

self.bbox_head = bbox_head.BBoxHead(
num_classes=self.NUM_CLASSES,
pool_size=self.POOL_SIZE,
target_means=self.RCNN_TARGET_MEANS,
target_stds=self.RCNN_TARGET_STDS,
min_confidence=self.RCNN_MIN_CONFIDENCE,
nms_threshold=self.RCNN_NME_THRESHOLD,
max_instances=self.RCNN_MAX_INSTANCES,
name='b_box_head')
# training是训练模式,是训练模型得出相应的参数;不加training是测试模式,读取训练好的模型参数进行模拟就可以
def call(self, inputs, training=True):
"""
:param inputs: [1, 1216, 1216, 3], [1, 11], [1, 14, 4], [1, 14]
:param training:
:return:
"""
if training:  # 训练模式
imgs, img_metas, gt_boxes, gt_class_ids = inputs
else:  # 测试模式
imgs, img_metas = inputs
# 1. 生成feature map
# 图片输入ResNet网络,输出5种feature map
# [1, 304, 304, 256] => [1, 152, 152, 512]=>[1,76,76,1024]=>[1,38,38,2048]
C2, C3, C4, C5 = self.backbone(imgs, training=training)
# 经过FPN网络
# [1, 304, 304, 256] <= [1, 152, 152, 256]<=[1,76,76,256]<=[1,38,38,256]=>[1,19,19,256]
P2, P3, P4, P5, P6 = self.neck([C2, C3, C4, C5], training=training)
rpn_feature_maps = [P2, P3, P4, P5, P6]
rcnn_feature_maps = [P2, P3, P4, P5]

# 2. 生成proposal区域
# 具体:将feature map上每个anchors上的三种anchor box输入到RPN网络,输出2000个anchor box的坐标,即2000个proposal的坐标
#       该坐标是大小为1216*1216的填充图片上的归一化坐标

# 经过RPN鉴别 输出每个
# [1, 369303, 2] [1, 369303, 2], [1, 369303, 4], includes all anchors on pyramid level of features
rpn_class_logits, rpn_probs, rpn_deltas = self.rpn_head(rpn_feature_maps, training=training)
# 筛选出6000个能用的box,其格式为左上和右下的坐标
# [369303, 4] => [215169, 4], valid => [6000, 4], performance =>[2000, 4],  NMS
proposals_list = self.rpn_head.get_proposals(rpn_probs, rpn_deltas, img_metas)

# 3. 训练模式:计算全连接层的优化目标
# 具体:
if training:
# get target value for these proposal target label and target delta
rois_list, rcnn_target_matchs_list, rcnn_target_deltas_list = \
self.bbox_target.build_targets(
proposals_list, gt_boxes, gt_class_ids, img_metas)
else:
rois_list = proposals_list

# 4.进行Roi Pooling
# 具体:将2000个proposal的坐标输入到Roi Pooling,输出2000个7*7*256的特征向量
# rois_list only contains coordinates, rcnn_feature_maps save the 5 features data=>[192,7,7,256]

pooled_regions_list = self.roi_align((rois_list, rcnn_feature_maps, img_metas), training=training)
# [192, 81], [192, 81], [192, 81, 4]

# 5.分类和位置

rcnn_class_logits_list, rcnn_probs_list, rcnn_deltas_list = \
self.bbox_head(pooled_regions_list, training=training)

# 6. 训练模式:计算RPN的分类和位置回归误差,计算全连接层的分类和位置回归误差
#    测试模式:在原始图片上输出框的位置和类别

if training:
rpn_class_loss, rpn_bbox_loss = self.rpn_head.loss(
rpn_class_logits, rpn_deltas, gt_boxes, gt_class_ids, img_metas)

rcnn_class_loss, rcnn_bbox_loss = self.bbox_head.loss(
rcnn_class_logits_list, rcnn_deltas_list,
rcnn_target_matchs_list, rcnn_target_deltas_list)

return [rpn_class_loss, rpn_bbox_loss,
rcnn_class_loss, rcnn_bbox_loss]

else:

detections_list = self.bbox_head.get_bboxes(
rcnn_probs_list, rcnn_deltas_list, rois_list, img_metas)

return detections_list

第二部分 针对backbones文件代码解析

'''ResNet model for Keras.

# Reference:
- [Deep Residual Learning for Image Recognition](
https://arxiv.org/abs/1512.03385)

'''
import tensorflow as tf
from tensorflow.keras import layers
class _Bottleneck(tf.keras.Model):
def __init__(self, filters, block,
downsampling=False, stride=1, **kwargs):
super(_Bottleneck, self).__init__(**kwargs)

filters1, filters2, filters3 = filters
conv_name_base = 'res' + block + '_branch'
bn_name_base   = 'bn'  + block + '_branch'

self.downsampling = downsampling
self.stride = stride
self.out_channel = filters3

self.conv2a = layers.Conv2D(filters1, (1, 1), strides=(stride, stride),
kernel_initializer='he_normal',
name=conv_name_base + '2a')
self.bn2a = layers.BatchNormalization(name=bn_name_base + '2a')

self.conv2b = layers.Conv2D(filters2, (3, 3), padding='same',
kernel_initializer='he_normal',
name=conv_name_base + '2b')
self.bn2b = layers.BatchNormalization(name=bn_name_base + '2b')

self.conv2c = layers.Conv2D(filters3, (1, 1),
kernel_initializer='he_normal',
name=conv_name_base + '2c')
self.bn2c = layers.BatchNormalization(name=bn_name_base + '2c')

if self.downsampling:
self.conv_shortcut = layers.Conv2D(filters3, (1, 1), strides=(stride, stride),
kernel_initializer='he_normal',
name=conv_name_base + '1')
self.bn_shortcut = layers.BatchNormalization(name=bn_name_base + '1')

def call(self, inputs, training=False):
x = self.conv2a(inputs)
x = self.bn2a(x, training=training)
x = tf.nn.relu(x)

x = self.conv2b(x)
x = self.bn2b(x, training=training)
x = tf.nn.relu(x)

x = self.conv2c(x)
x = self.bn2c(x, training=training)

if self.downsampling:
shortcut = self.conv_shortcut(inputs)
shortcut = self.bn_shortcut(shortcut, training=training)
else:
shortcut = inputs

x += shortcut
x = tf.nn.relu(x)

return x

def compute_output_shape(self, input_shape):
shape = tf.TensorShape(input_shape).as_list()

shape[1] = shape[1] // self.stride
shape[2] = shape[2] // self.stride
shape[-1] = self.out_channel
return tf.TensorShape(shape)

class ResNet(tf.keras.Model):

def __init__(self, depth, **kwargs):
super(ResNet, self).__init__(**kwargs)

if depth not in [50, 101]:
raise AssertionError('depth must be 50 or 101.')
self.depth = depth

self.padding = layers.ZeroPadding2D((3, 3))
self.conv1 = layers.Conv2D(64, (7, 7),
strides=(2, 2),
kernel_initializer='he_normal',
name='conv1')
self.bn_conv1 = layers.BatchNormalization(name='bn_conv1')
self.max_pool = layers.MaxPooling2D((3, 3), strides=(2, 2), padding='same')

self.res2a = _Bottleneck([64, 64, 256], block='2a',
downsampling=True, stride=1)
self.res2b = _Bottleneck([64, 64, 256], block='2b')
self.res2c = _Bottleneck([64, 64, 256], block='2c')

self.res3a = _Bottleneck([128, 128, 512], block='3a',
downsampling=True, stride=2)
self.res3b = _Bottleneck([128, 128, 512], block='3b')
self.res3c = _Bottleneck([128, 128, 512], block='3c')
self.res3d = _Bottleneck([128, 128, 512], block='3d')

self.res4a = _Bottleneck([256, 256, 1024], block='4a',
downsampling=True, stride=2)
self.res4b = _Bottleneck([256, 256, 1024], block='4b')
self.res4c = _Bottleneck([256, 256, 1024], block='4c')
self.res4d = _Bottleneck([256, 256, 1024], block='4d')
self.res4e = _Bottleneck([256, 256, 1024], block='4e')
self.res4f = _Bottleneck([256, 256, 1024], block='4f')
if self.depth == 101:
self.res4g = _Bottleneck([256, 256, 1024], block='4g')
self.res4h = _Bottleneck([256, 256, 1024], block='4h')
self.res4i = _Bottleneck([256, 256, 1024], block='4i')
self.res4j = _Bottleneck([256, 256, 1024], block='4j')
self.res4k = _Bottleneck([256, 256, 1024], block='4k')
self.res4l = _Bottleneck([256, 256, 1024], block='4l')
self.res4m = _Bottleneck([256, 256, 1024], block='4m')
self.res4n = _Bottleneck([256, 256, 1024], block='4n')
self.res4o = _Bottleneck([256, 256, 1024], block='4o')
self.res4p = _Bottleneck([256, 256, 1024], block='4p')
self.res4q = _Bottleneck([256, 256, 1024], block='4q')
self.res4r = _Bottleneck([256, 256, 1024], block='4r')
self.res4s = _Bottleneck([256, 256, 1024], block='4s')
self.res4t = _Bottleneck([256, 256, 1024], block='4t')
self.res4u = _Bottleneck([256, 256, 1024], block='4u')
self.res4v = _Bottleneck([256, 256, 1024], block='4v')
self.res4w = _Bottleneck([256, 256, 1024], block='4w')

self.res5a = _Bottleneck([512, 512, 2048], block='5a',
downsampling=True, stride=2)
self.res5b = _Bottleneck([512, 512, 2048], block='5b')
self.res5c = _Bottleneck([512, 512, 2048], block='5c')

self.out_channel = (256, 512, 1024, 2048)

def call(self, inputs, training=True):
x = self.padding(inputs)
x = self.conv1(x)
x = self.bn_conv1(x, training=training)
x = tf.nn.relu(x)
x = self.max_pool(x)

x = self.res2a(x, training=training)
x = self.res2b(x, training=training)
C2 = x = self.res2c(x, training=training)

x = self.res3a(x, training=training)
x = self.res3b(x, training=training)
x = self.res3c(x, training=training)
C3 = x = self.res3d(x, training=training)

x = self.res4a(x, training=training)
x = self.res4b(x, training=training)
x = self.res4c(x, training=training)
x = self.res4d(x, training=training)
x = self.res4e(x, training=training)
x = self.res4f(x, training=training)
if self.depth == 101:
x = self.res4g(x, training=training)
x = self.res4h(x, training=training)
x = self.res4i(x, training=training)
x = self.res4j(x, training=training)
x = self.res4k(x, training=training)
x = self.res4l(x, training=training)
x = self.res4m(x, training=training)
x = self.res4n(x, training=training)
x = self.res4o(x, training=training)
x = self.res4p(x, training=training)
x = self.res4q(x, training=training)
x = self.res4r(x, training=training)
x = self.res4s(x, training=training)
x = self.res4t(x, training=training)
x = self.res4u(x, training=training)
x = self.res4v(x, training=training)
x = self.res4w(x, training=training)
C4 = x

x = self.res5a(x, training=training)
x = self.res5b(x, training=training)
C5 = x = self.res5c(x, training=training)

return (C2, C3, C4, C5)

def compute_output_shape(self, input_shape):
shape = tf.TensorShape(input_shape).as_list()
batch, H, W, C = shape

C2_shape = tf.TensorShape([batch, H // 4, W // 4, self.out_channel[0]])
C3_shape = tf.TensorShape([batch, H // 8, W // 8, self.out_channel[1]])
C4_shape = tf.TensorShape([batch, H // 16, W // 16, self.out_channel[2]])
C5_shape = tf.TensorShape([batch, H // 32, W // 32, self.out_channel[3]])

return (C2_shape, C3_shape, C4_shape, C5_shape)

第三部分 针对neck文件代码解析

'''
FRN model for Keras.

# Reference:
- [Feature Pyramid Networks for Object Detection](
https://arxiv.org/abs/1612.03144)

'''
import tensorflow as tf
from tensorflow.keras import layers

class FPN(tf.keras.Model):

def __init__(self, out_channels=256, **kwargs):
'''
Feature Pyramid Networks

Attributes
---
out_channels: int. the channels of pyramid feature maps.
'''
super(FPN, self).__init__(**kwargs)

self.out_channels = out_channels

self.fpn_c2p2 = layers.Conv2D(out_channels, (1, 1),
kernel_initializer='he_normal', name='fpn_c2p2')
self.fpn_c3p3 = layers.Conv2D(out_channels, (1, 1),
kernel_initializer='he_normal', name='fpn_c3p3')
self.fpn_c4p4 = layers.Conv2D(out_channels, (1, 1),
kernel_initializer='he_normal', name='fpn_c4p4')
self.fpn_c5p5 = layers.Conv2D(out_channels, (1, 1),
kernel_initializer='he_normal', name='fpn_c5p5')

self.fpn_p3upsampled = layers.UpSampling2D(size=(2, 2), name='fpn_p3upsampled')
self.fpn_p4upsampled = layers.UpSampling2D(size=(2, 2), name='fpn_p4upsampled')
self.fpn_p5upsampled = layers.UpSampling2D(size=(2, 2), name='fpn_p5upsampled')

self.fpn_p2 = layers.Conv2D(out_channels, (3, 3), padding='SAME',
kernel_initializer='he_normal', name='fpn_p2')
self.fpn_p3 = layers.Conv2D(out_channels, (3, 3), padding='SAME',
kernel_initializer='he_normal', name='fpn_p3')
self.fpn_p4 = layers.Conv2D(out_channels, (3, 3), padding='SAME',
kernel_initializer='he_normal', name='fpn_p4')
self.fpn_p5 = layers.Conv2D(out_channels, (3, 3), padding='SAME',
kernel_initializer='he_normal', name='fpn_p5')

self.fpn_p6 = layers.MaxPooling2D(pool_size=(1, 1), strides=2, name='fpn_p6')

def call(self, inputs, training=True):
C2, C3, C4, C5 = inputs

P5 = self.fpn_c5p5(C5)
P4 = self.fpn_c4p4(C4) + self.fpn_p5upsampled(P5)
P3 = self.fpn_c3p3(C3) + self.fpn_p4upsampled(P4)
P2 = self.fpn_c2p2(C2) + self.fpn_p3upsampled(P3)

# Attach 3x3 conv to all P layers to get the final feature maps.
P2 = self.fpn_p2(P2)
P3 = self.fpn_p3(P3)
P4 = self.fpn_p4(P4)
P5 = self.fpn_p5(P5)

# subsampling from P5 with stride of 2.
P6 = self.fpn_p6(P5)

return [P2, P3, P4, P5, P6]

def compute_output_shape(self, input_shape):
C2_shape, C3_shape, C4_shape, C5_shape = input_shape

C2_shape, C3_shape, C4_shape, C5_shape = \
C2_shape.as_list(), C3_shape.as_list(), C4_shape.as_list(), C5_shape.as_list()

C6_shape = [C5_shape[0], (C5_shape[1] + 1) // 2, (C5_shape[2] + 1) // 2, self.out_channels]

C2_shape[-1] = self.out_channels
C3_shape[-1] = self.out_channels
C4_shape[-1] = self.out_channels
C5_shape[-1] = self.out_channels

return [tf.TensorShape(C2_shape),
tf.TensorShape(C3_shape),
tf.TensorShape(C4_shape),
tf.TensorShape(C5_shape),
tf.TensorShape(C6_shape)]

if __name__ == '__main__':

C2 = tf.random.normal((2, 256, 256,  256))
C3 = tf.random.normal((2, 128, 128,  512))
C4 = tf.random.normal((2,  64,  64, 1024))
C5 = tf.random.normal((2,  32,  32, 2048))

fpn = FPN()

P2, P3, P4, P5, P6 = fpn([C2, C3, C4, C5])

print('P2 shape:', P2.shape.as_list())
print('P3 shape:', P3.shape.as_list())
print('P4 shape:', P4.shape.as_list())
print('P5 shape:', P5.shape.as_list())
print('P6 shape:', P6.shape.as_list())

第四部分 针对test_mixins文件代码解析

import numpy as np
import tensorflow as tf

from detection.core.bbox import transforms
from detection.utils.misc import *

class RPNTestMixin:

def simple_test_rpn(self, img, img_meta):
'''
Args
---
imgs: np.ndarray. [height, width, channel]
img_metas: np.ndarray. [11]

'''
imgs = tf.Variable(np.expand_dims(img, 0))
img_metas = tf.Variable(np.expand_dims(img_meta, 0))

x = self.backbone(imgs, training=False)
x = self.neck(x, training=False)

rpn_class_logits, rpn_probs, rpn_deltas = self.rpn_head(x, training=False)

proposals_list = self.rpn_head.get_proposals(
rpn_probs, rpn_deltas, img_metas, with_probs=False)

return proposals_list[0]

class BBoxTestMixin(object):

def _unmold_detections(self, detections_list, img_metas):
return [
self._unmold_single_detection(detections_list[i], img_metas[i])
for i in range(img_metas.shape[0])
]

def _unmold_single_detection(self, detections, img_meta):
zero_ix = tf.where(tf.not_equal(detections[:, 4], 0))
detections = tf.gather_nd(detections, zero_ix)

# Extract boxes, class_ids, scores, and class-specific masks
boxes = detections[:, :4]
class_ids = tf.cast(detections[:, 4], tf.int32)
scores = detections[:, 5]

boxes = transforms.bbox_mapping_back(boxes, img_meta)

return {'rois': boxes.numpy(),
'class_ids': class_ids.numpy(),
'scores': scores.numpy()}

def simple_test_bboxes(self, img, img_meta, proposals):
'''
Args
---
imgs: np.ndarray. [height, width, channel]
img_meta: np.ndarray. [11]

'''
imgs = tf.Variable(np.expand_dims(img, 0))
img_metas = tf.Variable(np.expand_dims(img_meta, 0))
rois_list = [tf.Variable(proposals)]

x = self.backbone(imgs, training=False)
P2, P3, P4, P5, _ = self.neck(x, training=False)

rcnn_feature_maps = [P2, P3, P4, P5]

pooled_regions_list = self.roi_align(
(rois_list, rcnn_feature_maps, img_metas), training=False)

rcnn_class_logits_list, rcnn_probs_list, rcnn_deltas_list = \
self.bbox_head(pooled_regions_list, training=False)

detections_list = self.bbox_head.get_bboxes(
rcnn_probs_list, rcnn_deltas_list, rois_list, img_metas)

return self._unmold_detections(detections_list, img_metas)[0]
  • 点赞
  • 收藏
  • 分享
  • 文章举报
DocPark 发布了30 篇原创文章 · 获赞 2 · 访问量 788 私信 关注
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: