需要源码和数据集请点赞关注收藏后评论区留言私信~~~
视觉问答(VQA)是一种同时设计计算机视觉和自然语言处理的学习任务。简单来说,VQA就是对给定的图片进行问答,一个VQA系统以一张图片和一个关于这张图片形式自由,开放式的自然语言问题作为输入,生成一条自然语言答案作为输出,视觉问题系统综合运用到了目前的计算机视觉和自然语言处理的技术,并设计模型设计,实验,以及可视化。
VQA问题的一种典型模型是联合嵌入模型,这种方法首先学习视觉与自然语言的两个不同模态特征在一个共同的特征空间的嵌入表示,然后根据这种嵌入表示产生回答。
这里使用VQA2.0数据集进行训练和验证,VQA2.0是一个公认有难度,并且语言验证得到了有效控制的数据集
本次使用到的图片为MSCOCO数据集中train2014子集和val2014子集,图片可以在官网下载
本次用到的图像特征是由目标检测网络Faster-RCNN检测并生成的,可评论区留言私信博主要
确保安装好PyTorch,然后在程序目录下运行pip install -r requirements.txt安装其他依赖项
1:FCnet模块
FCnet即一系列的全连接层,各个层的输入输出大小在模块构建时给出,这个模块默认使其中的全连接层具有bias,并以ReLU作为激活函数 并使用weight normalization
2:SimpleClassifier模块
它的作用是:在视觉问答系统的末端,根据融合的特征得到最终答案
3:问题嵌入模块
在联合嵌入模型中,需要使用RNN将输入的问题编码成向量,LSTM和GRU使两种代表性的RNN,由于实践中GRU与LSTM表现相近且占用显存较少,所以这里选用GRU
4:词嵌入
要获得问题句子的嵌入表示,首先应该获得词嵌入表示,每一个词需要用一个唯一的数字表示
baseline代码如下
- import torch
- import torch.nn as nn
- from lib.module import topdown_attention
- from lib.module.language_model import WordEmbedding, QuestionEmbedding
- from lib.module.classifier import SimpleClassifier
- from lib.module.fc import FCNet
-
- class Baseline(nn.Module):
-
- def __init__(self, w_emb, q_emb, v_att, q_net, v_net, classifer, need_internals=False):
- super(Baseline, self).__init__()
-
- self.need_internals = need_internals
-
- self.w_emb = w_emb
- self.q_emb = q_emb
- self.v_att = v_att
- self.q_net = q_net
- self.v_net = v_net
- self.classifier = classifer
-
- def forward(self, q_tokens, ent_features):
-
- w_emb = self.w_emb(q_tokens)
- q_emb = self.q_emb(w_emb)
-
- att = self.v_att(q_emb, ent_features) # [ B, n_ent, 1 ]
- v_emb = (att * ent_features).sum(1) # [ B, hid_dim ]
-
- internals = [att.squeeze()] if self.need_internals else None
-
- q_repr = self.q_net(q_emb)
- v_repr = self.v_net(v_emb)
- joint_repr = q_repr * v_repr
- logits = self.classifier(joint_repr)
-
- return logits, internals
-
- @classmethod
- def build_from_config(cls, cfg, dataset, need_internals):
- w_emb = WordEmbedding(dataset.word_dict.n_tokens, cfg.lm.word_emb_dim, 0.0)
- q_emb = QuestionEmbedding(cfg.lm.word_emb_dim, cfg.hid_dim, cfg.lm.n_layers, cfg.lm.bidirectional, cfg.lm.dropout, cfg.lm.rnn_type)
- q_dim = cfg.hid_dim
- att_cls = topdown_attention.classes[cfg.topdown_att.type]
- v_att = att_cls(1, q_dim, cfg.ent_dim, cfg.topdown_att.hid_dim, cfg.topdown_att.dropout)
- q_net = FCNet([q_dim, cfg.hid_dim])
- v_net = FCNet([cfg.ent_dim, cfg.hid_dim])
- classifier = SimpleClassifier(cfg.hid_dim, cfg.mlp.hid_dim, dataset.ans_dict.n_tokens, cfg.mlp.dropout)
- return cls(w_emb, q_emb, v_att, q_net, v_net, classifier, need_internals)
数据集目录如下

读取了之前训练好的模型之后,使用数据为配置文件中的val,程序运行完成后结果可视化如下
机器对于给出的图片会输出对于的问答结果


部分代码如下
训练类
- import os
- import time
- import torch
- import torch.nn as nn
- import torch.nn.functional as F
- from torch.optim.lr_scheduler import LambdaLR
- from torch.nn.utils import clip_grad_norm_
- from bisect import bisect
- from tqdm import tqdm
-
- def bce_with_logits(logits, labels):
- assert logits.dim() == 2
- loss = F.binary_cross_entropy_with_logits(logits, labels)
- loss *= labels.size(1) # multiply by number of QAs
- return loss
-
- def sce_with_logits(logits, labels):
- assert logits.dim() == 2
- loss = F.cross_entropy(logits, labels.nonzero()[:, 1])
- loss *= labels.size(1)
- return loss
-
- def compute_score_with_logits(logits, labels):
- with torch.no_grad():
- logits = torch.max(logits, 1)[1] # argmax
- one_hots = torch.zeros(*labels.size()).cuda()
- one_hots.scatter_(1, logits.view(-1, 1), 1)
- scores = (one_hots * labels)
- return scores
-
- def lr_schedule_func_builder(cfg):
- def func(step_idx):
- if step_idx <= cfg.train.warmup_steps:
- alpha = float(step_idx) / float(cfg.train.warmup_steps)
- return cfg.train.warmup_factor * (1. - alpha) + alpha
- else:
- idx = bisect(cfg.train.lr_steps, step_idx)
- return pow(cfg.train.lr_ratio, idx)
- return func
-
- def train(model, cfg, train_loader, val_loader, n_epochs, val_freq, out_dir):
-
- os.makedirs(out_dir, exist_ok=True)
- optim = torch.optim.Adamax(model.parameters(), **cfg.train.optim)
-
- n_train_batches = len(train_loader)
-
- train_score = 0.0
- loss_fn = bce_with_logits if cfg.model.loss == "logistic" else sce_with_logits
-
- for epoch in range(n_epochs):
-
- epoch_loss = 0.0
-
- tic_0 = time.time()
-
- for i, data in enumerate(train_loader):
-
- tic_1 = time.time()
-
- q_tokens = data[2].cuda()
- a_targets = data[3].cuda()
- v_features = [_.cuda() for _ in data[4:]]
-
- tic_2 = time.time()
-
- optim.zero_grad()
- logits, _ = model(q_tokens, *v_features)
- loss = loss_fn(logits, a_targets)
-
- tic_3 = time.time()
-
- loss.backward()
- if cfg.train.clip_grad: clip_grad_norm_(model.parameters(), cfg.train.max_grad_norm)
- optim.step()
-
- tic_4 = time.time()
-
- batch_score = compute_score_with_logits(logits, a_targets).sum()
- epoch_loss += float(loss.data.item() * logits.size(0))
- train_score += float(batch_score)
-
- del loss
-
- logstr = "epoch %2d batch %4d/%4d | ^ %4dms | => %4dms | <= %4dms" % \
- (epoch + 1, i + 1, n_train_batches, 1000*(tic_2-tic_0), 1000*(tic_3-tic_2), 1000*(tic_4-tic_3))
- print("%-80s" % logstr, end="\r")
-
- tic_0 = time.time()
-
- epoch_loss /= len(train_loader.dataset)
- train_score = 100 * train_score / len(train_loader.dataset)
-
- logstr = "epoch %2d | train_loss: %5.2f train_score: %5.2f" % (epoch + 1, epoch_loss, train_score)
- if (epoch + 1) % val_freq == 0:
- model.eval()
- val_score, upper_bound = validate(model, val_loader)
- model.train()
- logstr += " | val_score: %5.2f (%5.2f)" % (100 * val_score, 100 * upper_bound)
- print("%-80s" % logstr)
-
- model_path = os.path.join(out_dir, 'model_%d.pth' % (epoch + 1))
- torch.save(model.state_dict(), model_path)
-
- def validate(model, loader):
-
- score = 0
- upper_bound = 0
- n_qas = 0
-
- with torch.no_grad():
-
- for i, data in enumerate(loader):
-
- q_tokens = data[2].cuda()
- a_targets = data[3].cuda()
- v_features = [_.cuda() for _ in data[4:]]
-
- logits, _ = model(q_tokens, *v_features)
- batch_score = compute_score_with_logits(logits, a_targets)
- score += batch_score.sum()
-
- upper_bound += (a_targets.max(1)[0]).sum()
- n_qas += logits.size(0)
-
- logstr = "val batch %5d/%5d" % (i + 1, len(loader))
- print("%-80s" % logstr, end='\r')
-
- score = score / n_qas
- upper_bound = upper_bound / n_qas
- return score, upper_bound
infer类
- import os
- import time
- import json
- import torch
- import cv2
- import shutil
- import numpy as np
- import torch.nn as nn
- import torch.nn.functional as F
- from tqdm import tqdm
-
- colors = [ (175, 84, 65), (68, 194, 246), (136, 147, 65), (92, 192, 151) ]
-
- def attention_map(im, boxes, atts, p=0.8, bgc=1.0, compress=0.85, box_color=(65, 81, 226)):
-
- height, width, channel = im.shape
- im = im / 255.0
-
- att_map = np.zeros([height, width])
- boxes = boxes.astype(np.int)
- for box, att in zip(boxes, atts):
- x1, y1, x2, y2 = box
- roi = att_map[y1:y2, x1:x2]
- roi[roi < att] = att
-
- att_map /= att_map.max()
- att_map = att_map ** p
- att_map = att_map * compress + (1-compress)
-
- att_map = cv2.resize(att_map, (int(width/16), int(height/16)))
- att_map = cv2.resize(att_map, (width, height))
-
- att_map = np.expand_dims(att_map, axis=2)
-
- bg = np.ones_like(att_map) * bgc
- att_im = im * att_map + bg * (1-att_map)
-
- att_im = (att_im * 255).astype(np.uint8)
-
- center = np.argmax(atts)
- x1, y1, x2, y2 = boxes[center]
- cv2.rectangle(att_im, (x1, y1), (x2, y2), box_color, 5)
-
- return att_im
-
- def infer_visualize(model, args, cfg, ans_dict, loader):
-
- _, ckpt = os.path.split(args.checkpoint)
- ckpt, _ = os.path.splitext(ckpt)
- out_dir = os.path.join(args.out_dir, "%s_%s_%s_visualization" % (args.cfg_name, ckpt, args.data))
- os.makedirs(out_dir, exist_ok=True)
- model.eval()
-
- questions_path = cfg.data[args.data].composition[0].q_jsons[0]
- questions = json.load(open(questions_path))
-
- pbar = tqdm(total=args.n_batches * loader.batch_size)
-
- with torch.no_grad():
-
- for i, data in enumerate(loader):
-
- if i == args.n_batches: break
-
- question_ids = data[0]
- image_ids = data[1]
- q_tokens = data[2].cuda()
- obj_featuers = data[4].cuda()
- batch_boxes = data[5].numpy()
-
- logits, internals = model(q_tokens, obj_featuers)
- topdown_atts = internals[0]
- topdown_atts = topdown_atts.data.cpu().numpy()
- _, predictions = logits.max(dim=1)
-
- for idx in range(len(question_ids)):
-
- question_id = question_ids[idx]
- image_id = image_ids[idx]
- boxes = batch_boxes[idx]
- answer = ans_dict.idx2ans[predictions[idx]]
- q_entry = questions[question_id]
- topdown_att = topdown_atts[idx]
- question = q_entry["question"]
- gts = list(q_entry["answers"].items())
- gts = sorted(gts, reverse=True, key=lambda x: x[1])
- gt = gts[0][0]
-
- q_out_dir = os.path.join(out_dir, question_id)
- os.makedirs(q_out_dir, exist_ok=True)
-
- q_str = question + "\n" + "gt: %s\n" % gt + "answer: %s\n" % answer
- with open(os.path.join(q_out_dir, "qa.txt"), "w") as f: f.write(q_str)
-
- image_path = os.path.join(args.images_dir, "%s.jpg" % image_id)
- shutil.copy(image_path, os.path.join(q_out_dir, "original.jpg"))
-
- im = cv2.imread(image_path)
- att_map = attention_map(im.copy(), boxes, topdown_att)
- cv2.imwrite(os.path.join(q_out_dir, "topdown_att.jpg"), att_map)
-
- pbar.update(1)
创作不易 觉得有帮助请点赞关注收藏~~~