在电商项目中商品分类往往显示在首页,是一个热点数据,它具有查多改少的特点,因此添加缓存是非常合适的,而缓存我们常用redis, 先在在缓存中去获取数据,如果获取不到,就去数据库里面查询,并且把查询到的结果放到缓存中。
考虑到不同人群所用到的语言和redis工具的用法均不相同,本文的代码都是一些伪代码,运行不起来的......
- public List
getCategoryNames(){ - String categoryNamesJson = redis.Template.opsForValue().get("CategoryName");
- // No cache
- if(StringUtils.isEmpty(categoryNamesJson)){
- List
allCategoryNames = getCategoryNameInDB(); - redisTemplate.opsForValue().set("CategoryName",
- JSONObject.toJSONString(allCategoryNames ), 1, TimeUnit.DAYS);
- return allCategoryNames;
- }
- List
result = JSONObject.parseObject(categoryNamesJson, new TypeReference>);
- return result;
- }
这段代码在单线程的情况下没有问题,而在多线程的环境下,多个线程同时判断StringUtils.isEmpty(categoryNameJson); 为true, 同时会有多个线程去查询数据库然后写到缓存中,这就失去了缓存的效果。我们可以使用JVM提供的synchronized关键字来控制并发。
- public List
getCategoryNames(){ - String categoryNamesJson = redis.Template.opsForValue().get("CategoryName");
- // No cache
- if(StringUtils.isEmpty(categoryNamesJson)){
- synchronized (this) {
- categoryNamesJson = redis.Template.opsForValue().get("CategoryName");
- if(StringUtils.isEmpty(categoryNamesJson)){
- List
allCategoryNames = getCategoryNameInDB(); - // add cache if db has data
- if(allCategoryNames != null && allCategoryNames.length > 0){
- redisTemplate.opsForValue().set("CategoryName",
- JSONObject.toJSONString(allCategoryNames ), 1, TimeUnit.DAYS);
- }
- return allCategoryNames;
- }
- }
- }
- List
result = JSONObject.parseObject(categoryNamesJson, new TypeReference>);
- return result;
- }
这是一个双重检查,这样我们能够保证多线程环境下只有一个线程会去查库。
我们的方法目前还是有所缺陷
单机环境通过这种方式可以设置缓存,但是并发量较大的网站是不会使用单机部署的,我们会有多个机器,通过网关负载平衡到各个机器,这个时候就出现了新的问题。
比如有三台机器,那么上面的代码就会有三个锁,同时三个进程上的某一个线程获取到了锁,这就会查数据库3次,如果我们只想让缓存存在一份,就必须使用到分布式锁。分布式锁就是可以串行化多个进程之内的线程。
暂时先不对比性能,我们的目的就是多进程下只有一个线程能向数据库中添加数据。
我们先来最简单的实现一下
- public List
getCategoryNames(){ - String categoryNamesJson = redis.Template.opsForValue().get("CategoryName");
- // No cache
- if(StringUtils.isEmpty(categoryNamesJson)){
- String lock = redis.Template.opsForValue().get("Lock");
- // Success get lock
- if(StringUtils.isEmpty(lock)){
- redis.Template.opsForValue().set("Lock", 1);
- categoryNamesJson = redis.Template.opsForValue().get("CategoryName");
- if(StringUtils.isEmpty(categoryNamesJson)){
- List
allCategoryNames = getCategoryNameInDB(); - // add cache if db has data
- if(allCategoryNames != null && allCategoryNames.length > 0){
- redisTemplate.opsForValue().set("CategoryName",
- JSONObject.toJSONString(allCategoryNames ), 1, TimeUnit.DAYS);
- }
- // Unlock
- redis.Template.opsForValue().remove("Lock");
- return allCategoryNames;
- }
- }else{
- // Get lock fail, wait for get lock again.
- try {
- Thread.sleep(200);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return getCategoryNames();
- }
- }
- List
result = JSONObject.parseObject(categoryNamesJson, new TypeReference>);
- return result;
- }
判断redis中是否有lock来完成加锁操作,如果有lock证明无法获取到锁,过一段时间继续去获取锁,代码的问题也很明显,当多个线程同时走到String lock = redis.Template.opsForValue().get("Lock"); 全都判断正确,然后去查询缓存,然后又都过了,这时还会有多个请求发送到数据库,不满足预期。
现在缓存的问题主要变成了锁的问题,为了保持代码简短,我们把去掉与锁无关的代码。
- public List<string> getCategoryNamesWithRedisLock(){
- String lock = redis.Template.opsForValue().get("Lock");
- // Success get lock
- if(StringUtils.isEmpty(lock)){
- redis.Template.opsForValue().set("Lock", 1);
- List<string> names = getCategoryNamesImpl();
- redis.Template.opsForValue().remove("Lock");
- return names;
- }else{
- // Get lock fail, wait for get lock again.
- try {
- Thread.sleep(200);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return getCategoryNamesWithRedisLock();
- }
- }
这时候我们会发现,如果是这种情况,先从redis获取一下锁,然后判断锁是否存在的代码都是不可行的,我们至少需要让获取和判断是原子性的,如果我们可以编写一个伪单机版redis代码,我们会提供一个方法,如果没有这个键才让插入,如果有就禁止插入,返回false。注意这里是单机版的伪redis。这里使用了阻塞的锁,这里用非阻塞的锁会更好。
- Map<String,String> totalMap = new HashMap<String,String>()
- public boolean setNx(string key, string value){
- Lock(this){
- if(totalMap.contains(key)){
- return false;
- }else{
- totalMap.add(key, value);
- return true;
- }
- }
- }
redis当然也为我们提供了这种功能。来自redis官网。

尝试去添加一个键值对,如果键存在直接返回false, 如果设置成功返回true。
使用了SetNx的代码如下
- public List
getCategoryNamesWithRedisLock(){ - // Set nx
- Boolean getLockSuccess = redis.Template.opsForValue().setIfAbsent("Lock", 1);
- // Success get lock
- if(getLockSuccess){
- try{
- List
names = getCategoryNamesImpl(); - }finally {
- // unlock
- redis.Template.opsForValue().remove("Lock");
- }
- return names;
- }else{
- // Get lock fail, wait for get lock again.
- try {
- Thread.sleep(200);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return getCategoryNamesWithRedisLock();
- }
- }
这样,即便是在多台服务器上多个线程同时运行setIfAbsent这段代码,最终也只有一个线程会获取到getLockSuccess为true,在没有意外的情况下,当前代码就实现了分布式锁的功能。
那么下面就是意外情况了
针对上面三种情况,提出两种解决方向
如果选用第一种解决方式需要用到redis的 expire命令

简单点的做法就是跑业务的时候开一个线程去调用expire方法给锁续期,简单点来,用个中断标识符就可以。
- private static volatile boolean isInterrupt = false;
- public List
getCategoryNamesWithRedisLock(){ - // Set nx
- Boolean getLockSuccess = redis.Template.opsForValue().setIfAbsent("Lock", 1,30, TimeUnit.SECONDS);
- // Success get lock
- if(getLockSuccess){
- try{
- isInterrupt = false;
- new Thread(() -> {
- while(!isInterrupt){
- try{
- Thread.sleep(2000);
- }catch(InterruptedException ex){
- Log.error(ex);
- }
- redisTemplate.expire("lock",10,TimeUnit.SECONDS);
- }
- }).start();
- List
names = getCategoryNamesImpl(); - }finally {
- // unlock
- redis.Template.opsForValue().remove("Lock");
- isInterrupt = true;
- }
- return names;
- }else{
- // Get lock fail, wait for get lock again.
- try {
- Thread.sleep(200);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return getCategoryNamesWithRedisLock();
- }
- }
如果业务再跑,续期也会再跑,断电了有过期时间,锁也会释放,正常业务结束,续期线程也就结束了。
再说第二种不续期的,我们先谈论一下这个策略的合理性,比如我们设置30秒的过期时间,如果这个接口30秒都没有完成,就说明这个接口有问题。但是如果是调用别人的接口,就不能确保了。因此是否续期还是要看具体的业务场景。
不续期核心问题就是避免释放掉别人的锁,这个很容易想到给Lock的值存储一个线程的唯一标识,最后进行比对,如果是当前线程的标识就说明是当前线程的锁,才会释放,但是,这里又出现了先取值,后比较的操作,这会造成多线程问题,所以我们还是需要原子性的操作,这次没有命令了,不过redis提供了lua的方式来进行原子操作。
- String lockValue = redisTemplate.opsForValue().get("lock");
- // At this moment, another thread get lock and set Lock uuid(another), still delete another thread lock.
- if(uuid.equals(lockValue)){
- redisTemplate.delete("lock");
- }
用了Lua的代码。
- public List
getCategoryNamesWithRedisLock(){ - String uuid = UUID.randomUUID().toString();
- // Set nx
- Boolean getLockSuccess = redis.Template.opsForValue().setIfAbsent("Lock", uuid,30, TimeUnit.SECONDS);
- // Success get lock
- if(getLockSuccess){
- try{
- List
names = getCategoryNamesImpl(); - }finally {
- // Atomicity unlock
- String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
- Long lock1 = redisTemplate.execute(new DefaultRedisScript
(script, Long.class) , Arrays.asList("lock"), uuid); - }
- return names;
- }else{
- // Get lock fail, wait for get lock again.
- try {
- Thread.sleep(200);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return getCategoryNamesWithRedisLock();
- }
- }
最后发起一个思考,集群环境下的缓存是否有必要使用分布式锁?
如果使用synchronized 本地所,三台机器下最多也就是向数据库查询了3下,10台机器就是10下,这对数据库来说没什么压力可言。我们引入分布式锁代码变得复杂,而且性能不好,如果因为编程错误造成死锁,整个集群都会受影响,风险也比较大,分布式锁还是要在特定环境下做好权衡在使用。
感谢您看到这里。