我们知道在计算机处理的处理过程中,CPU运算和内存访问是很快的,而涉及到硬盘访问的数据库或文件操作相对就很慢了。所以在处理较高并发的请求过程中,数据库或文件操作往往会成为处理流程中的性能瓶颈。本文介绍使用缓冲区配合多线程的方法来实现高并发处理能力。
下面我们以springboot作为基础,实现一个简单的http接口,需求是按请求到达服务器端的先后顺序对请求进行编号,并且将请求的编号按序打印在控制台(打印时加入sleep延时以模拟慢处理)。
打印控制台的实现类如下:(writeString
是打印单行字符串,writeStringInBatch
是批量打印多行字符串,用于模拟批量数据库写入,在写入大量数据的时候批量写入数据库往往比循环多次写入单条数据的效率高很多)
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Random;
@Component
public class DataWriter {
public void writeString(String line) {
try {
Thread.sleep(new Random().nextInt(500));
// Thread.sleep(500);
System.out.println(Thread.currentThread().getName()+"->"+line);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void writeStringInBatch(List<String> lines) {
try {
Thread.sleep(new Random().nextInt(500));
// Thread.sleep(500);
for(String l: lines) {
System.out.println(Thread.currentThread().getName() + "->" + l);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
controller层:收到/queue
请求直接调用service层接口
@RestController
public class HelloController {
@Autowired
private HelloService helloService;
@GetMapping("/queue")
public String queue() {
return helloService.writeNormal();
}
}
service层:使用seqNo
来记录每个请求的编号,收到请求时,将seqNo
加1后交给ISeqHandler
处理,因为springboot是多线程处理,因此seqNo
加1时需要使用synchronized
加锁来保证编号不重复。
@Service
public class HelloService {
private long seqNo = 0;
@Qualifier("NormalSeqHandlerImpl")
@Autowired
private ISeqHandler normalHandler;
public synchronized String writeNormal() {
seqNo++;
normalHandler.handleData(seqNo+"");
return "OK";
}
}
@Component("NormalSeqHandlerImpl")
public class NormalSeqHandlerImpl implements ISeqHandler {
@Autowired
private DataWriter dataWriter;
@Override
public void handleData(String line) {
dataWriter.writeString(line);
}
@Override
public void handleDataInBatch(List<String> lines) {
handleDataInBatch(lines);
}
}
接口测试:使用jmeter对接口做性能测试,创建测试项,步骤如下:
启动jmeter后默认有一个TestPlan,右键点击TestPlan添加一个线程组
Number od Threads表示一次测试并发的总线程数,Ramp-up period表示这些线程在几秒内创建完:
右键点击建好的线程组,添加http请求:
右键点击线程组,添加聚合结果查看器:
如果要查看每个http请求返回的结果可以,右键点击线程组,点击Add->Listener->View Results Tree
添加结果树查看器
一个TestPlan下还可以添加多个线程组进行多组不同的测试
创建后点击保存,右键点击某个线程组,点击Start即可开始测试,测试跑完后可以在聚合报告中查看测试情况,我们上面的接口没做任何优化,50个并发请求,平均响应时间达到了6489毫秒:
注意:springboot后台启动后的第一次测试可能需要做一些线程创建操作会比较慢可以多测试几次。
上面的service层所有的处理过程(包括慢的打印操作)都放在synchronized中,所以多线程一点没发挥出来,完全是串行执行。
为了提高性能,引入缓冲区,seqNo编号加1后按顺序放入缓冲区后直接返回(只放在内存中所以很快)。慢操作不能加锁,但是为了保证按编号顺序打印输出,我们引入一个独立的线程作为消费者专门从缓冲区中获取数据进行控制台打印,为了提高打印效率,采用批量打印方式。
@Service
public class HelloService {
private long seqNo = 0;
@Qualifier("NormalSeqHandlerImpl") // 低效处理实现类
@Autowired
private ISeqHandler normalHandler;
@Qualifier("BetterSeqHandlerImpl") // 优化的实现类
@Autowired
private ISeqHandler betterHandler;
public synchronized String writeNormal() {
seqNo++;
normalHandler.handleData(seqNo+"");
return "OK";
}
public synchronized String writeBetter() {
seqNo++;
betterHandler.handleData(seqNo+"");
return "OK";
}
}
@Component("BetterSeqHandlerImpl")
public class BetterSeqHandlerImpl implements ISeqHandler, Runnable {
@Autowired
private DataWriter dataWriter;
private ConcurrentLinkedDeque<String> buffer = new ConcurrentLinkedDeque();
private Thread flushThread;
private boolean isRunning = true;
@PostConstruct // 实例化后启动线程
public void initThread() {
flushThread = new Thread(this);
flushThread.start();
}
@PreDestroy
public void exitThread() {
isRunning = false;
}
@Override
public void handleData(String line) {
buffer.add(line);
}
@Override
public void handleDataInBatch(List<String> lines) {
buffer.addAll(lines);
}
@Override
public void run() {
System.out.println("flushThread is running...");
List<String> lines = new ArrayList<>();
while (isRunning) { // 线程一直运行,从缓冲区拿数据进行处理
if(buffer.isEmpty()) {
try {
Thread.sleep(10); // 这里要加个sleep,不然CPU会100%
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
while(!buffer.isEmpty()) {
lines.add(buffer.poll());
}
dataWriter.writeStringInBatch(lines); // 批量处理
lines.clear();
}
}
System.out.println("flushThread exit.");
}
}
优化后50个并发很快(平均2毫秒):
并发加到2000,依然很快(平均6毫秒):
上面的缓冲区消费使用了一个独立线程来消费缓冲的队列,直接使用线程编程比较简单,但对这个线程的可靠性要求比较高,万一线程崩溃了缓冲区就没有消费者了,springboot处理http请求使用的是线程池,我们也可以利用这个线程池来提高一下可靠性,在处理http请求的线程中,由一个线程来消费缓冲区,注意这里不是固定一个线程,而是设置一个标志,每个处理请求线程将数据放入缓冲后判断一下标识,如果没有消费线程就自己来做一次队列消费批量处理,为了避免在有线程处于消费刷写的情况下,其他线程新加的数据无人处理,我们需要设置一个等待线程,这样能确保队列中的数据不会遗漏,下面用一个新的service来完成这个功能:
@Service
public class DoubleBufferService {
@Autowired
private DataWriter dataWriter;
private DoubleBuffer doubleBuffer = new DoubleBuffer();
private long seqNo = 0;
// isWaiting表示是否有线程处于等待中, isFlushing表示是否有线程在进行消费
private boolean isWaiting = false;
private boolean isFlushing = false;
public String writeNo() {
synchronized(this) {
seqNo++;
doubleBuffer.addData(seqNo + "");
if(isWaiting) {
return "OK";
}
isWaiting = true;
while(isFlushing) {
try {
this.wait(100); // wait期间会释放锁
} catch (InterruptedException e) {
e.printStackTrace();
}
}
isWaiting = false;
// flush if there is no thread is flushing
if(doubleBuffer.getCurrentSize() == 0) {
return "OK";
}
doubleBuffer.exchangeBuf();
isFlushing = true;
}
doubleBuffer.flushBuf();
synchronized (this) {
isFlushing = false;
this.notifyAll(); // 唤醒wait的线程
}
return "OK";
}
private class DoubleBuffer {
private LinkedList<String> inBuf = new LinkedList();
private LinkedList<String> outBuf = new LinkedList();
public void addData(String data) {
inBuf.add(data);
}
public void exchangeBuf() {
LinkedList<String> tmp = inBuf;
inBuf = outBuf;
outBuf = tmp;
}
public void flushBuf() {
dataWriter.writeStringInBatch(outBuf);
/*for(String l: outBuf) {
dataWriter.writeString(l);
}*/
outBuf.clear();
}
public int getCurrentSize() {
return inBuf.size();
}
}
}
测试结果: