PointPillar+CenterPoint是比较成熟的自动驾驶3D目标检测落地方案。目标跟踪则选用了图森未来的SimpleTrack。本文意在梳理3个模型的原理以及相关重点部分的代码。文章分成上(PointPillar)中(CenterPoint)下(SimpleTrack)3篇。
PointPillar原始论文:
在自动驾驶行业,PointPillar是比较成熟的落地方案,在Transformer之前,应用广泛,精度和速度都兼备,并且可以表现出良好的性能。由于其在CUDA实时推理的优异表现,使得PointPillar可以成为工业界典范。它的主要思想是将点云分成柱子(Pillar),通过PointNet方法对柱子进行Encoder编码,得到伪图像(Pseudo Image)。再利用成熟的2D CNN网络对伪图进行特征提取,进而传递给下游目标检测任务头(Head)。
看模型,先看输入输出。输入点云,输出目标检测结果。中间是模型的主体部分:Pillar Feature Net,Backbone(2D CNN),Detection Head(SSD)。下面分别介绍。
Pillar Feature Net主要功能是对无序化的点云通过柱子(Pillar)的形式进行编码(Encoder)进而得到一个类似图像形式的伪图,之所以说是伪图(CHW),是因为他的通道数不是3维,而是C维。得到伪图后,便可以通过成熟的2D图像框架对伪图进行特征提取,以及下游的目标检测任务。
4个步骤下面做详细解释:
我们选取Pillar的大小0.2*0.2(单位m),在point cloud range x:[0, 150], y:[-25, 25], z:[-1, 8]范围上进行网格划分。得到[750, 250, 1]的BEV Grid-Map。由于点云的稀疏性,90%以上的格子都是空的,只有少量的Pillar里包含点云,一般会设置最大Pillar个数为12000个(总共187500个格子)。
记录每一个Pillar的index,这在后面会用到。现在操作每一个Pillar。每一个Pillar中的每一个点云Point,本身有4个维度x, y, z, r。(反射强度-归一化)。对点云的维度进行处理。xc, yc, zc表示Pillar中所有点云x, y, z的均值。另外,xp, yp表示当前点云与Pillar中心点的偏移量(offset)。这样一个点云的维度就从4维升到了9维 D=(x, y, z, r, xc, yc, zc, xp, yp)其中N是Pillar中的点云个数,一般取32,如果Pillar点数不够,则用0补全,如果多于32,则采样32个点。这样我们就得到了一个3维的Tensor: [D, P, N]。
作者对处理好的3维Tensor: [D, P, N]通过PointNet和全连接网络进行Pillar的Encoder,得到一个新的Tensor: [C, P, N]。再通过一个Maxpooling操作,把pillar中所有点N进行一个取最大值,这样3维就降到2维Tensor: [C, P]。
上面3步(a, b, c)代码都在类PillarVFE中:
import torch
from torch._C import _is_tracing
import torch.nn as nn
import torch.nn.functional as F
from .vfe_template import VFETemplate
class PFNLayer(nn.Module):
def __init__(self,
in_channels,
out_channels,
use_norm=True,
last_layer=False):
super().__init__()
self.last_vfe = last_layer
self.use_norm = use_norm
if not self.last_vfe:
out_channels = out_channels // 2
if self.use_norm:
self.linear = nn.Linear(in_channels, out_channels, bias=False)
self.norm = nn.BatchNorm1d(out_channels, eps=1e-3, momentum=0.01)
else:
self.linear = nn.Linear(in_channels, out_channels, bias=True)
#self.part = torch.tensor([50000])
def forward(self, inputs):
# comment out for tracing
# if inputs.shape[0] > self.part:
# # nn.Linear performs randomly when batch size is too large
# num_parts = inputs.shape[0] // self.part
# part_linear_out = [self.linear(inputs[num_part*self.part:(num_part+1)*self.part])
# for num_part in range(num_parts+1)]
# x = torch.cat(part_linear_out, dim=0)
# else:
# x = self.linear(inputs)
x = self.linear(inputs) # input: [16000, 32, 10]
torch.backends.cudnn.enabled = False
x = self.norm(x.permute(0, 2, 1)).permute(0, 2, 1) if self.use_norm else x
torch.backends.cudnn.enabled = True
x = F.relu(x) # [16000, 32, 64]
x_max = torch.max(x, dim=1, keepdim=True)[0] # [16000, 1, 64]
if self.last_vfe:
return x_max
else:
x_repeat = x_max.repeat(1, inputs.shape[1], 1)
x_concatenated = torch.cat([x, x_repeat], dim=2)
return x_concatenated
class PillarVFE(VFETemplate):
def __init__(self, model_cfg, num_point_features, voxel_size, point_cloud_range, **kwargs):
super().__init__(model_cfg=model_cfg)
self.is_tracing = kwargs.get('is_tracing', False)
self.use_norm = self.model_cfg.USE_NORM
self.with_distance = self.model_cfg.WITH_DISTANCE
self.use_absolute_xyz = self.model_cfg.USE_ABSLOTE_XYZ
num_point_features += 6 if self.use_absolute_xyz else 3
if self.with_distance:
num_point_features += 1
self.num_filters = self.model_cfg.NUM_FILTERS
assert len(self.num_filters) > 0
num_filters = [num_point_features] + list(self.num_filters)
pfn_layers = []
for i in range(len(num_filters) - 1):
in_filters = num_filters[i]
out_filters = num_filters[i + 1]
pfn_layers.append(
PFNLayer(in_filters, out_filters, self.use_norm, last_layer=(i >= len(num_filters) - 2))
)
self.pfn_layers = nn.ModuleList(pfn_layers)
self.voxel_x = voxel_size[0]
self.voxel_y = voxel_size[1]
self.voxel_z = voxel_size[2]
self.x_offset = self.voxel_x / 2 + point_cloud_range[0]
self.y_offset = self.voxel_y / 2 + point_cloud_range[1]
self.z_offset = self.voxel_z / 2 + point_cloud_range[2]
def get_output_feature_dim(self):
return self.num_filters[-1]
def get_paddings_indicator(self, actual_num, max_num, axis=0):
actual_num = torch.unsqueeze(actual_num, axis + 1)
max_num_shape = [1] * len(actual_num.shape)
max_num_shape[axis + 1] = -1
max_num = torch.arange(max_num, dtype=torch.int, device=actual_num.device).view(max_num_shape)
paddings_indicator = actual_num.int() > max_num
return paddings_indicator
def forward(self, batch_dict, **kwargs):
if self.is_tracing:
features = batch_dict['voxels']
else:
# voxel_features: [16000, 32, 4], voxel_num_points: [16000], coords: [16000, 4]
voxel_features, voxel_num_points, coords = batch_dict['voxels'], batch_dict['voxel_num_points'], batch_dict['voxel_coords']
points_mean = voxel_features[:, :, :3].sum(dim=1, keepdim=True) / voxel_num_points.type_as(voxel_features).view(-1, 1, 1)
f_cluster = voxel_features[:, :, :3] - points_mean
# voxel_coords, [B, 4], orders bzyx
f_center = torch.zeros_like(voxel_features[:, :, :3]) # f_center: [16000, 32, 3]
f_center[:, :, 0] = voxel_features[:, :, 0] - (coords[:, 3].to(voxel_features.dtype).unsqueeze(1) * self.voxel_x + self.x_offset)
f_center[:, :, 1] = voxel_features[:, :, 1] - (coords[:, 2].to(voxel_features.dtype).unsqueeze(1) * self.voxel_y + self.y_offset)
f_center[:, :, 2] = voxel_features[:, :, 2] - (coords[:, 1].to(voxel_features.dtype).unsqueeze(1) * self.voxel_z + self.z_offset)
if self.use_absolute_xyz:
features = [voxel_features, f_cluster, f_center]
else:
features = [voxel_features[..., 3:], f_cluster, f_center]
if self.with_distance:
points_dist = torch.norm(voxel_features[:, :, :3], 2, 2, keepdim=True)
features.append(points_dist)
features = torch.cat(features, dim=-1) # features: [16000, 32, 10]
voxel_count = features.shape[1] # 32
mask = self.get_paddings_indicator(voxel_num_points, voxel_count, axis=0)
mask = torch.unsqueeze(mask, -1).type_as(voxel_features)
features *= mask
for pfn in self.pfn_layers:
features = pfn(features) # features: [16000, 1, 64]
features = features.squeeze(dim=1) # features: [16000, 64]
batch_dict['pillar_features'] = features
return batch_dict
每个Pillar有C维的通道,这时通过步骤a中记录的index,将PIllar映射回原BEV Grid-Map[C, H, W]。
PointPillarScatter
import torch.nn as nn
from pcdet.utils import common_utils
class PointPillarScatter(nn.Module):
def __init__(self, model_cfg, grid_size, **kwargs):
super().__init__()
self.is_tracing = kwargs.get('is_tracing', False)
self.model_cfg = model_cfg
self.num_bev_features = self.model_cfg.NUM_BEV_FEATURES
self.nx, self.ny, self.nz = grid_size
assert self.nz == 1
def forward(self, batch_dict, **kwargs):
pillar_features, coords = batch_dict['pillar_features'], batch_dict['voxel_coords'] # pillar_features: [16000, 4], coords: [16000, 4]
new_grid_size = [self.nz, self.ny, self.nx] # new_grid_size: [1, 256, 256]
batch_dict['spatial_features'] = common_utils.pillarScatterToBEV(pillar_features, coords, new_grid_size, self.num_bev_features, self.is_tracing)
batch_dict.pop("pillar_features") # for tensorrt
return batch_dict
common_utils.pillarScatterToBEV
def pillarScatterToBEV(features:torch.Tensor, coords:torch.Tensor, grid_size:list, num_bev_features:int, is_tracing= False):
'''
scatter pillar feature to bev feature
coords: bzyx
grid_size: zyx
'''
nz, ny, nx = grid_size
assert nz==1
if not is_tracing:
batch_spatial_features = []
batch_size = coords[:, 0].max().int().item() + 1
for batch_idx in range(batch_size):
spatial_feature = torch.zeros( # spatial_feature: [64, 167936(656*256)]
(num_bev_features, nz * nx * ny), # num_bev_features: 64
dtype=features.dtype,
device=features.device)
batch_mask = coords[:, 0] == batch_idx
this_coords = coords[batch_mask, :] # this_coords: [16000, 4]
indices = this_coords[:, 1] + this_coords[:, 2] * nx + this_coords[:, 3] # indices 编码方式
indices = indices.long() # indices: [16000]
pillars = features[batch_mask, :].t() # pillars: [64, 16000]
spatial_feature[:, indices] = pillars # 将pillar的特征,scatter回去 spatial_feature: [64, 167936]
batch_spatial_features.append(spatial_feature)
batch_spatial_features = torch.stack(batch_spatial_features, 0) # batch_spatial_features: [1, 64, 167936]
batch_spatial_features = batch_spatial_features.view(batch_size, num_bev_features*nz, ny, nx) # batch_spatial_features: [1, 64, 256, 656]
else:
# to avoid introduce NonZero op into onnx model
batch_size = 1
batch_spatial_features = torch.zeros(
(num_bev_features, nz * ny * nx),
dtype=features.dtype,
device=features.device)
this_coords = coords
indices = this_coords[:, 1] + this_coords[:, 2] * nx + this_coords[:, 3]
indices = indices.long()
batch_spatial_features[:, indices] = features.t()
batch_spatial_features = batch_spatial_features.view(1, num_bev_features*nz, ny, nx)
return batch_spatial_features
2D Backbone作用是对BEV Map进行特征编码提取,让网络提取出更多的特征供下游网络使用。这里更像是Neck的作用,对不同维度的特征进行融合,以增强模型的泛化能力。
用一系列的Block(S, L, F)。来操作。每过一个Block维度上升1倍,特征图分辨率下降1倍。S是步长,L表示有L个 3 * 3的2D Conv,F表示输出的通道数。每一个Block跟着一个BN和Relu。
通过Deconv反卷积操作,对上面的每一个Block输出的特征图进行上采样。每一个原始特征图通过反卷积分别得到尺寸同为[H/2, W/2]。通道数为2C 的特征图。最后再将3个相同通道和尺寸的特征图进行Concat连接。输出下游目标检测的输入特征H/2, W/2. 6C]。
import numpy as np
import torch
import torch.nn as nn
class BaseBEVBackbone(nn.Module):
def __init__(self, model_cfg, input_channels, **kwargs):
super().__init__()
self.model_cfg = model_cfg
if self.model_cfg.get('LAYER_NUMS', None) is not None:
assert len(self.model_cfg.LAYER_NUMS) == len(self.model_cfg.LAYER_STRIDES) == len(self.model_cfg.NUM_FILTERS)
layer_nums = self.model_cfg.LAYER_NUMS
layer_strides = self.model_cfg.LAYER_STRIDES
num_filters = self.model_cfg.NUM_FILTERS
else:
layer_nums = layer_strides = num_filters = []
if self.model_cfg.get('UPSAMPLE_STRIDES', None) is not None:
assert len(self.model_cfg.UPSAMPLE_STRIDES) == len(self.model_cfg.NUM_UPSAMPLE_FILTERS)
num_upsample_filters = self.model_cfg.NUM_UPSAMPLE_FILTERS
upsample_strides = self.model_cfg.UPSAMPLE_STRIDES
else:
upsample_strides = num_upsample_filters = []
num_levels = len(layer_nums)
c_in_list = [input_channels, *num_filters[:-1]]
self.blocks = nn.ModuleList()
self.deblocks = nn.ModuleList()
for idx in range(num_levels):
cur_layers = [
nn.ZeroPad2d(1),
nn.Conv2d(
c_in_list[idx], num_filters[idx], kernel_size=3,
stride=layer_strides[idx], padding=0, bias=False
),
nn.BatchNorm2d(num_filters[idx], eps=1e-3, momentum=0.01),
nn.ReLU()
]
for k in range(layer_nums[idx]):
cur_layers.extend([
nn.Conv2d(num_filters[idx], num_filters[idx], kernel_size=3, padding=1, bias=False),
nn.BatchNorm2d(num_filters[idx], eps=1e-3, momentum=0.01),
nn.ReLU()
])
self.blocks.append(nn.Sequential(*cur_layers))
if len(upsample_strides) > 0:
stride = upsample_strides[idx]
if stride >= 1:
self.deblocks.append(nn.Sequential(
nn.ConvTranspose2d(
num_filters[idx], num_upsample_filters[idx],
upsample_strides[idx],
stride=upsample_strides[idx], bias=False
),
nn.BatchNorm2d(num_upsample_filters[idx], eps=1e-3, momentum=0.01),
nn.ReLU()
))
else:
stride = np.round(1 / stride).astype(np.int)
self.deblocks.append(nn.Sequential(
nn.Conv2d(
num_filters[idx], num_upsample_filters[idx],
stride,
stride=stride, bias=False
),
nn.BatchNorm2d(num_upsample_filters[idx], eps=1e-3, momentum=0.01),
nn.ReLU()
))
c_in = sum(num_upsample_filters)
if len(upsample_strides) > num_levels:
self.deblocks.append(nn.Sequential(
nn.ConvTranspose2d(c_in, c_in, upsample_strides[-1], stride=upsample_strides[-1], bias=False),
nn.BatchNorm2d(c_in, eps=1e-3, momentum=0.01),
nn.ReLU(),
))
self.num_bev_features = c_in
def forward(self, data_dict):
"""
Args:
data_dict:
spatial_features
Returns:
"""
spatial_features = data_dict['spatial_features'] # spatial_features: [1, 64, 256, 656]
ups = []
x = spatial_features
for i in range(len(self.blocks)): # len(self.blocks): 3
x = self.blocks[i](x) # 每个block: 1个ZeroPad2d, 3个conv2d, BN, Relu组合
if len(self.deblocks) > 0: #len(self.deblocks): 3
ups.append(self.deblocks[i](x)) # 每个deblock: 1个ConvTranspose2d, BN, Relu
else:
ups.append(x)
if len(ups) > 1: # ups存了3个反卷积之后的特征,每个: [1, 128(2C), 128(H), 328(W)]
x = torch.cat(ups, dim=1) # x: [1, 384(6C), 128, 328]
elif len(ups) == 1:
x = ups[0]
if len(self.deblocks) > len(self.blocks):
x = self.deblocks[-1](x)
data_dict['spatial_features_2d'] = x
data_dict.pop("spatial_features") # for tensorrt
return data_dict
由于本项目下游目标检测使用的CenterPoint的Head头。所有这里只讲一下原理部分。具体实战代码请看中篇(CenterPoint)。
PointPillar的主要思想其实已经在上面讲完,为了完成端到端的训练,PointPillar使用了SSD作为Detection Head。SSD是基于Anchor的方法做回归。输出每个Anchor的类别,位置,大小,角度。与SSD类似,这里只采用2D IOU匹配GT和Anchor。目标框的高度不参与匹配,但参与到了回归当中。
这里多说一些,点云和图像的Anchor设计机制有些不同。图像是透视结构,物体的大小会随着距离大小很大变化。物体形状在不同角度也会有差别。不同物体类别,大小和长宽比也会不一样。所以需要设计不同大小的Anchor,来识别不同远近,不同类别的目标。
3种不同的长宽比。1:1,2:1,1:2.每个有3个不同的尺度。每个位置有9个anchor,来适应不同类别,不同大小,不同长宽比的物体。
但是点云是俯视图,点云如果采用前视图的方式,Anchor设计与图像比较类似,但是基于俯视图的Anchor设计会有点不同。俯视图,物体大小和长宽比都是不变的,都是对应真是世界坐标系下的大小。同一物体,基本上也不会差别太大。这时候设计Anchor,就是根据不同类别设计。比如车辆的Anchor或者行人的Anchor。一般定义一个0度的Anchor,和一个90度的Anchor。Anchor越多,效果越好,计算量也就越大。Anchor是3D的,有长宽高信息。在与gt匹配时,我们高度信息是忽略的。如果anchor与gt高于IOU,就是positve的anchor,否则就是negative的anchor。
在俯视图下,不同类别,大小差别比较大。这对Anchor的positive和negative差别会很大。所以车辆的IOU的threshold会高一些,行人的threshold会低一些。更容易找到正样本。
PointPillar使用了和SECOND一样的损失函数,公式如图4,其中gt表示ground truth,a表示anchor。所以回归的是gt与anchor的差值(residuals)。其中 da为缩放因子,公式图中已给出。另外回归差值(residuals)是7个值的smoothL1的求和。Classification用了Focal Loss。
从指标看,PointPillar会比基于Voxel的SECOND好,但是我们实际项目用,还是用了3d 稀疏卷积的SECOND更好一些。另外,nvidia也开源了3d spconv的库,可以进行实时推理。不过我和领导探讨了这个问题,他讲,如果我们说一个行人的话,在激光雷达点云质量有限的情况下,根据经验,还是PointPillar好一些。工程上面的答案,还要去实际操作才有结论。