在并发编程中,我们通过锁,来避免由于竞争而造成的数据不一致问题。通常,我们以synchronized 、Lock
来使用它(单机情况)
我们来看一个案例:
- @Autowired
- RedisTemplate<String,String> redisTemplate;
-
- String maotai = "maotai20210321001";//茅台商品编号
-
- @PostConstruct
- public void init(){
- //此处模拟向缓存中存入商品库存操作
- redisTemplate.opsForValue().set(maotai,"100");
- }
-
-
- @GetMapping("/get/maotai2")
- public String seckillMaotai2() {
- synchronized (this) {
- Integer count = Integer.parseInt(redisTemplate.opsForValue().get(maotai)); // 1
- //如果还有库存
- if (count > 0) {
- //抢到了茅台,库存减一
- redisTemplate.opsForValue().set(maotai,String.valueOf(count-1));
- //后续操作 do something
- log.info("我抢到茅台了!");
- return "ok";
- }else {
- return "no";
- }
- }
- }
- 复制代码
问题分析:
实现思路:
锁的实现主要基于redis的SETNX
命令:
SETNX key value
将 key 的值设为 value ,当且仅当 key 不存在。
若给定的 key 已经存在,则 SETNX 不做任何动作。
SETNX 是『SET if Not eXists』(如果不存在,则 SET)的简写。
返回值: 设置成功,返回 1 设置失败,返回 0
使用SETNX
完成同步锁的流程及事项如下:
SETNX
命令获取锁,若返回0(key已存在,锁已存在)则获取失败,反之获取成功SETNX
命令总是返回0而进入死锁状态,需要为该key设置一个“合理”的过期时间DEL
命令将锁数据删除实现代码版本1:
- @GetMapping("/get/maotai3")
- public String seckillMaotai3() {
-
- //获取锁
- Boolean islock = redisTemplate.opsForValue().setIfAbsent(lockey, "1");
- if (islock) {
- //设置锁的过期时间
- redisTemplate.expire(lockey,5, TimeUnit.SECONDS);
- try {
- Integer count = Integer.parseInt(redisTemplate.opsForValue().get(maotai)); // 1
- //如果还有库存
- if (count > 0) {
- //抢到了茅台,库存减一
- redisTemplate.opsForValue().set(maotai,String.valueOf(count-1));
- //后续操作 do something
- log.info("我抢到茅台了!");
- return "ok";
- }else {
- return "no";
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- //释放锁
- redisTemplate.delete(lockey);
- }
- }
- return "dont get lock";
- }
- 复制代码
问题分析:
错误解锁问题解决:
- @GetMapping("/get/maotai4")
- public String seckillMaotai4() {
- String requestid = UUID.randomUUID().toString() + Thread.currentThread().getId();
- /*String locklua ="" +
- "if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then redis.call('expire',KEYS[1],ARGV[2]) ; return true " +
- "else return false " +
- "end";
- Boolean islock = redisTemplate.execute(new RedisCallback<Boolean>() {
- @Override
- public Boolean doInRedis(RedisConnection redisConnection) throws DataAccessException {
- Boolean eval = redisConnection.eval(
- locklua.getBytes(),
- ReturnType.BOOLEAN,
- 1,
- lockey.getBytes(),
- requestid.getBytes(),
- "5".getBytes()
- );
- return eval;
- }
- });*/
- //获取锁
- Boolean islock = redisTemplate.opsForValue().setIfAbsent(lockey,requestid,5,TimeUnit.SECONDS);
- if (islock) {
- try {
- Integer count = Integer.parseInt(redisTemplate.opsForValue().get(maotai)); // 1
- //如果还有库存
- if (count > 0) {
- //抢到了茅台,库存减一
- redisTemplate.opsForValue().set(maotai,String.valueOf(count-1));
- //后续操作 do something
- log.info("我抢到茅台了!");
- return "ok";
- }else {
- return "no";
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- //释放锁
- //判断是自己的锁才能去释放 这种操作不是原子性的
- /*String id = redisTemplate.opsForValue().get(lockey);
- if (id !=null && id.equals(requestid)) {
- redisTemplate.delete(lockey);
- }*/
- String unlocklua = "" +
- "if redis.call('get',KEYS[1]) == ARGV[1] then redis.call('del',KEYS[1]) ; return true " +
- "else return false " +
- "end";
- redisTemplate.execute(new RedisCallback<Boolean>() {
- @Override
- public Boolean doInRedis(RedisConnection redisConnection) throws DataAccessException {
- Boolean eval = redisConnection.eval(
- unlocklua.getBytes(),
- ReturnType.BOOLEAN,
- 1,
- lockey.getBytes(),
- requestid.getBytes()
- );
- return eval;
- }
- });
- }
- }
- return "dont get lock";
- }
- 复制代码
锁续期/锁续命
- /**
- * 3,锁续期/锁续命
- * 拿到锁之后执行业务,业务的执行时间超过了锁的过期时间
- *
- * 如何做?
- * 给拿到锁的线程创建一个守护线程(看门狗),守护线程定时/延迟 判断拿到锁的线程是否还继续持有锁,如果持有则为其续期
- *
- */
- //模拟一下守护线程为其续期
- ScheduledExecutorService executorService;//创建守护线程池
- ConcurrentSkipListSet<String> set = new ConcurrentSkipListSet<String>();//队列
-
- @PostConstruct
- public void init2(){
- executorService = Executors.newScheduledThreadPool(1);
-
- //编写续期的lua
- String expirrenew = "" +
- "if redis.call('get',KEYS[1]) == ARGV[1] then redis.call('expire',KEYS[1],ARGV[2]) ; return true " +
- "else return false " +
- "end";
-
- executorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- Iterator<String> iterator = set.iterator();
- while (iterator.hasNext()) {
- String rquestid = iterator.next();
-
- redisTemplate.execute(new RedisCallback<Boolean>() {
- @Override
- public Boolean doInRedis(RedisConnection redisConnection) throws DataAccessException {
- Boolean eval = false;
- try {
- eval = redisConnection.eval(
- expirrenew.getBytes(),
- ReturnType.BOOLEAN,
- 1,
- lockey.getBytes(),
- rquestid.getBytes(),
- "5".getBytes()
- );
- } catch (Exception e) {
- log.error("锁续期失败,{}",e.getMessage());
- }
- return eval;
- }
- });
-
- }
- }
- },0,1,TimeUnit.SECONDS);
- }
-
- @GetMapping("/get/maotai5")
- public String seckillMaotai5() {
- String requestid = UUID.randomUUID().toString() + Thread.currentThread().getId();
- //获取锁
- Boolean islock = redisTemplate.opsForValue().setIfAbsent(lockey,requestid,5,TimeUnit.SECONDS);
- if (islock) {
- //获取锁成功后让守护线程为其续期
- set.add(requestid);
- try {
- Integer count = Integer.parseInt(redisTemplate.opsForValue().get(maotai)); // 1
- //如果还有库存
- if (count > 0) {
- //抢到了茅台,库存减一
- redisTemplate.opsForValue().set(maotai,String.valueOf(count-1));
- //后续操作 do something
- //seckillMaotai5();
- //模拟业务超时
- TimeUnit.SECONDS.sleep(10);
- log.info("我抢到茅台了!");
- return "ok";
- }else {
- return "no";
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- //解除锁续期
- set.remove(requestid);
- //释放锁
- String unlocklua = "" +
- "if redis.call('get',KEYS[1]) == ARGV[1] then redis.call('del',KEYS[1]) ; return true " +
- "else return false " +
- "end";
- redisTemplate.execute(new RedisCallback<Boolean>() {
- @Override
- public Boolean doInRedis(RedisConnection redisConnection) throws DataAccessException {
- Boolean eval = redisConnection.eval(
- unlocklua.getBytes(),
- ReturnType.BOOLEAN,
- 1,
- lockey.getBytes(),
- requestid.getBytes()
- );
- return eval;
- }
- });
- }
- }
- return "dont get lock";
- }
- 复制代码
锁的可重入/阻塞锁(redisson)
- /**
- *
- * 4,如何支持可重入
- * 重入次数/过期时间
- * 获取
- * 获取
- * 获取
- *
- * 释放
- * 释放
- * 释放
- *
- * 基于本地实现
- * 还是基于redis但是更换了数据类型,采用hash类型来实现
- * key field value
- * 锁key 请求id 重入次数
- * 用lua实现
- *
- *
- * 5,阻塞/非阻塞的问题:现在的锁是非阻塞的,一旦获取不到锁直接返回了
- * 如何做一个阻塞锁呢?
- * 获取不到就等待锁的释放,直到获取到锁或者等待超时
- * 1:基于客户端轮询的方案
- * 2:基于redis的发布/订阅方案
- *
- *
- * 有没有好的实现呢?
- * Redisson
- *
- */
- @Value("${spring.redis.host}")
- String host;
- @Value("${spring.redis.port}")
- String port;
-
- @Bean
- public RedissonClient redissonClient() {
- Config config = new Config();
- config.useSingleServer().setAddress("redis://"+host+":"+port);
- return Redisson.create(config);
- }
-
- @Autowired
- RedissonClient redissonClient;
-
-
- @GetMapping("/get/maotai6")
- public String seckillMaotai6() {
- //要去获取锁
- RLock lock = redissonClient.getLock(lockey);
- lock.lock();
- try {
- Integer count = Integer.parseInt(redisTemplate.opsForValue().get(maotai)); // 1
- //如果还有库存
- if (count > 0) {
- //抢到了茅台,库存减一
- redisTemplate.opsForValue().set(maotai,String.valueOf(count-1));
- //后续操作 do something
- log.info("我抢到茅台了!");
- return "ok";
- }else {
- return "no";
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- lock.unlock();;
- }
- return "";
- }
- 复制代码
概述
Redisson内置了一系列的分布式对象,分布式集合,分布式锁,分布式服务等诸多功能特性,是一款基于Redis实现,拥有一系列分布式系统功能特性的工具包,是实现分布式系统架构中缓存中间件的最佳选择。
实现
- <dependency>
- <groupId>org.redisson</groupId>
- <artifactId>redisson</artifactId>
- <version>3.8.2</version>
- </dependency>
- 复制代码
- @Bean
- public RedissonClient redissonClient() {
- Config config = new Config();
- config.useSingleServer().setAddress("redis://"+host+":"+port);
- return Redisson.create(config);
- }
-
- @Autowired
- RedissonClient redissonClient;
-
-
- @GetMapping("/get/maotai6")
- public String seckillMaotai6() {
- //要去获取锁
- RLock lock = redissonClient.getLock(lockey);
- lock.lock();
- try {
- Integer count = Integer.parseInt(redisTemplate.opsForValue().get(maotai)); // 1
- //如果还有库存
- if (count > 0) {
- //抢到了茅台,库存减一
- redisTemplate.opsForValue().set(maotai,String.valueOf(count-1));
- //后续操作 do something
- log.info("我抢到茅台了!");
- return "ok";
- }else {
- return "no";
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- lock.unlock();;
- }
- return "";
- }
- 复制代码
源码剖析
- 1,加锁的(是否支持重入)
- 2,锁续期的
- 3,阻塞获取
- 4,释放
- 复制代码
- /**
- * 源码如下:
- * 1,加锁
- * <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
- * internalLockLeaseTime = unit.toMillis(leaseTime);
- *
- * return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
- * #如果锁key不存在
- * "if (redis.call('exists', KEYS[1]) == 0) then " +
- * #设置锁key,field是唯一标识,value是重入次数
- * "redis.call('hset', KEYS[1], ARGV[2], 1); " +
- * #设置锁key的过期时间 默认30s
- * "redis.call('pexpire', KEYS[1], ARGV[1]); " +
- * "return nil; " +
- * "end; " +
- * #如果锁key存在
- * "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
- * #重入次数+1
- * "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
- * #重置过期时间
- * "redis.call('pexpire', KEYS[1], ARGV[1]); " +
- * "return nil; " +
- * "end; " +
- * "return redis.call('pttl', KEYS[1]);",
- * Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
- * }
- *
- * 2,锁续期
- * private void scheduleExpirationRenewal(final long threadId) {
- * if (expirationRenewalMap.containsKey(getEntryName())) {
- * return;
- * }
- *
- * Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
- * @Override
- * public void run(Timeout timeout) throws Exception {
- * //续期函数的真正实现
- * RFuture<Boolean> future = renewExpirationAsync(threadId);
- *
- * future.addListener(new FutureListener<Boolean>() {
- * @Override
- * public void operationComplete(Future<Boolean> future) throws Exception {
- * expirationRenewalMap.remove(getEntryName());
- * if (!future.isSuccess()) {
- * log.error("Can't update lock " + getName() + " expiration", future.cause());
- * return;
- * }
- *
- * if (future.getNow()) {
- * // reschedule itself 再次调用自己,最终形成的结果就是每隔10秒续期一次
- * scheduleExpirationRenewal(threadId);
- * }
- * }
- * });
- * }
- * // internalLockLeaseTime=30 * 1000 即30秒
- * }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); //30/3=10秒后异步执行续期函数
- *
- * if (expirationRenewalMap.putIfAbsent(getEntryName(), new ExpirationEntry(threadId, task)) != null) {
- * task.cancel();
- * }
- * }
- *
- * 续期的lua脚本:判断key,field存在则重置过期时间
- * protected RFuture<Boolean> renewExpirationAsync(long threadId) {
- * return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
- * "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
- * "redis.call('pexpire', KEYS[1], ARGV[1]); " +
- * "return 1; " +
- * "end; " +
- * "return 0;",
- * Collections.<Object>singletonList(getName()),
- * internalLockLeaseTime, getLockName(threadId));
- * }
- *
- *
- *
- * 4,阻塞锁实现
- * public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
- * long threadId = Thread.currentThread().getId();
- * Long ttl = tryAcquire(leaseTime, unit, threadId);
- * // lock acquired
- * if (ttl == null) {
- * return;
- * }
- * //如果没有获取到锁,则订阅:redisson_lock__channel:{key} 频道
- * RFuture<RedissonLockEntry> future = subscribe(threadId);
- * commandExecutor.syncSubscription(future);
- *
- * try {
- * while (true) {
- * //尝试再获取一次
- * ttl = tryAcquire(leaseTime, unit, threadId);
- * // lock acquired
- * if (ttl == null) {
- * break;
- * }
- *
- * // waiting for message 阻塞等待锁订阅频道的消息,一旦锁被释放,就会得到信号通知,继续尝试获取锁
- * if (ttl >= 0) {
- * getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
- * } else {
- * getEntry(threadId).getLatch().acquire();
- * }
- * }
- * } finally {
- * //获取到锁后取消订阅
- * unsubscribe(future, threadId);
- * }
- * // get(lockAsync(leaseTime, unit));
- * }
- *
- *
- * 5,解锁
- * protected RFuture<Boolean> unlockInnerAsync(long threadId) {
- * return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
- * //key已经不存在了,则向redisson_lock__channel:{key}频道发布锁释放消息
- * "if (redis.call('exists', KEYS[1]) == 0) then " +
- * "redis.call('publish', KEYS[2], ARGV[1]); " +
- * "return 1; " +
- * "end;" +
- * // hash 中的field 不存在时直接返回,
- * "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
- * "return nil;" +
- * "end; " +
- * "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
- * //重入次数-1后如果还大于0,延长过期时间
- * "if (counter > 0) then " +
- * "redis.call('pexpire', KEYS[1], ARGV[2]); " +
- * "return 0; " +
- * "else " +
- * //重入次数-1后如果归0,则删除key,并向redisson_lock__channel:{key}频道发布锁释放消息
- * "redis.call('del', KEYS[1]); " +
- * "redis.call('publish', KEYS[2], ARGV[1]); " +
- * "return 1; "+
- * "end; " +
- * "return nil;",
- * Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
- *
- * }
- */
- 复制代码
引言:
问题1:什么是Redis缓存穿透?缓存穿透如何解决?
问题2:如何在海量元素中(例如 10 亿无序、不定长、不重复)快速判断一个元素是否存在?
布隆过滤器(英语:Bloom Filter)是 1970 年由Burton Howard Bloom提出的,是一种空间效率高的概率型数据结构。
本质上其实就是一个很长的二进制向量和一系列随机映射函数。专门用来检测集合中是否存在特定的元素
产生的契机
回想一下,我们平常在检测集合中是否存在某元素时,都会采用比较的方法。考虑以下情况:
总而言之,当集合中元素的数量极多时,不仅查找会变得很慢,而且占用的空间也会大到无法想象。BF就是解决这个矛盾的利器。
数据结构&设计思想
BF是由一个长度为m比特的位数组(bit array) 与k个哈希函数(hash function) 组成的数据结构。位数组均初始化为0,所有哈希函数都可以分别把输入数据尽量均匀地散列。
基于BitMap:
如果我们要映射一个值到布隆过滤器中,我们需要使用多个不同的哈希函数生成多个哈希值, 并对每个生成的哈希值指向的 bit 位,设置为1
例:
当要插入一个元素时,将其数据分别输入k个哈希函数,产生k个哈希值。以哈希值作为位数组中的下标,将所有k个对应的比特置为1。
当要查询(即判断是否存在)一个元素时,同样将其数据输入哈希函数,然后检查对应的k个比特。如果有任意一个比特为0,表明该元素一定不在集合中。如果所有比特均为1,表明该集合有(较大的)可能性在集合中。为什么不是一定在集合中呢?因为一个比特被置为1有可能会受到其他元素的影响,这就是所谓“假阳性”(false positive)。相对地,“假阴性”(false negative)在BF中是绝不会出现的。
误判率问题分析
哈希函数有以下两个特点:
布隆过滤器的误判是指多个输入经过哈希之后在相同的bit位置1了,这样就无法判断究竟是哪个输入产生的,因此误判的根源在于相同的 bit 位被多次映射且置 1。
不支持删除
hash碰撞这种情况也造成了布隆过滤器的删除问题,传统的布隆过滤器并不支持删除操作,因为布隆过滤器的每一个 bit 并不是独占的,很有可能多个元素共享了某一位。如果我们直接删除这一位的话,会影响其他的元素。
如何选择哈希函数个数和布隆过滤器长度
很显然,过小的布隆过滤器很快所有的 bit 位均为 1,那么查询任何值都会返回“可能存在”,起不到过滤的目的了。布隆过滤器的长度会直接影响误报率,布隆过滤器越长其误报率越小。
另外,哈希函数的个数也需要权衡,个数越多则布隆过滤器 bit 位置位 1 的速度越快,且布隆过滤器的效率越低;但是如果太少的话,那我们的误报率会变高。
如何选择适合业务的 k 和 m 值呢,这里直接贴一个公式:
第一种方式:Guava
1、引入Guava pom配置
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>29.0-jre</version>
- </dependency>
- 复制代码
2、代码实现
- public class BloomFilterTest {
-
- @Test
- public void test1() {
- BloomFilter<Integer> bloomFilter = BloomFilter.create(Funnels.integerFunnel(), size, fpp);
- // 插入10万样本数据
- for (int i = 0; i < size; i++) {
- bloomFilter.put(i);
- }
-
- // 用另外十万测试数据,测试误判率
- int count = 0;
- for (int i = capacity; i < size + 100000; i++) {
- if (bloomFilter.mightContain(i)) {
- count++;
- System.out.println(i + "误判了");
- }
- }
- System.out.println("总共的误判数:" + count);
- }
- }
- }
- 复制代码
运行结果:
10万数据里有947个误判,约等于0.01%,也就是代码里设置的误判率:fpp = 0.01
代码分析:
核心BloomFilter.create
方法:
- @VisibleForTesting
- static <T> BloomFilter<T> create(
- Funnel<? super T> funnel, long expectedInsertions, double fpp, Strategy strategy) {
- 。。。。
- }
- 复制代码
这里有四个参数:
funnel
:数据类型(通常是调用Funnels工具类中的)expectedInsertions
:指望插入的值的个数fpp
:误判率(默认值为0.03)strategy
:哈希算法咱们重点讲一下fpp
参数
fpp误判率
情景一:fpp = 0.01
情景二:fpp = 0.03
(默认参数)
总结
fpp
参数进行调节第二种方式:Redisson
上面使用Guava实现的布隆过滤器是把数据放在了本地内存中。分布式的场景中就不合适了,没法共享内存
还能够用Redis来实现布隆过滤器,这里使用Redis封装好的客户端工具Redisson
pom配置:
- <dependency>
- <groupId>org.redisson</groupId>
- <artifactId>redisson-spring-boot-starter</artifactId>
- <version>3.13.4</version>
- </dependency>
- 复制代码
java代码:
- public class RedissonBloomFilter {
-
- public static void main(String[] args) {
- Config config = new Config();
- config.useSingleServer().setAddress("redis://127.0.0.1:6379");
- config.useSingleServer().setPassword("1234");
- //构造Redisson
- RedissonClient redisson = Redisson.create(config);
-
- RBloomFilter<String> bloomFilter = redisson.getBloomFilter("phoneList");
- //初始化布隆过滤器:预计元素为100000000L,偏差率为3%
- bloomFilter.tryInit(100000000L,0.03);
- //将号码10086插入到布隆过滤器中
- bloomFilter.add("10086");
-
- //判断下面号码是否在布隆过滤器中
- //输出false
- System.out.println(bloomFilter.contains("123456"));
- //输出true
- System.out.println(bloomFilter.contains("10086"));
- }
- }
- 复制代码
1.1.1 简介
cluster是redis官方提供的集群方案,功能确实强大(在线扩容,缩容等等),除了官方的cluster,业界有很多三方的缓存代理中间件,比如: predixy, codis, redis-cerberus,squirrel ,cellar act。Twemproxy是使用最广泛、同时也是redis官方所认可的实现方案。
- Twemproxy(又称为nutcracker)由Twitter开源。是一个轻量级的Redis和Memcached代理,主要用来减少对后端缓存服务器的连接数。
- 复制代码
特点:
memcached时代可以称王称霸,但随着redis自身发展,尤其高版本cluster出现,已逐渐被弱化
优点:
简单可靠,具备生产级别应用能力
减少了redis连接数,降低redis连接成本,cluster的所有节点之间都需要互相建立连接。
除了redis,Twemproxy可以对Memcached 协议做代理,在缓存界是个通用性的解决方案。
缺点:
和cluster相比,性能有一定的损失(twitter测试约20%)
自身也会成为一个单点,所以,做双活很重要!
它只是一个代理转发,底层的主从切换等依然靠redis自身的主从和哨兵或cluster。这一点上cluster已经完虐它
1.1.2 下载与部署
- yum install -y autoconf automake libtool
- yum remove -y autoconf
-
- export twemproxy_path=/opt/redis/latest/twemproxy/
-
- mkdir -p $twemproxy_path
-
- cd $twemproxy_path
- wget ftp://ftp.gnu.org/gnu/autoconf/autoconf-2.69.tar.gz
- tar -zxvf autoconf-2.69.tar.gz
- cd autoconf-2.69
- .configure --prefix=/usr
- make && make install
-
- cd $twemproxy_path
- wget http://ftp.gnu.org/gnu/automake/automake-1.14.tar.gz
- tar -zxvf automake-1.14.tar.gz
- cd automake-1.14
- .bootstrap.sh
- .configure --prefix=/usr
- make && make install
-
- cd $twemproxy_path
- wget http://ftp.gnu.org/gnu/libtool/libtool-2.4.2.tar.gz
- tar -zxvf libtool-2.4.2.tar.gz
- cd libtool-2.4.2
- .configure --prefix=/usr
- make && make install
-
- cd $twemproxy_path
- wget https://github.com/twitter/twemproxy/archive/v0.4.1.tar.gz
- tar -zxvf v0.4.1.tar.gz
- cd twemproxy-0.4.1
- .configure --prefix=/usr
- make && make install
-
- #编译完,启动文件在src目录中。
- 复制代码
1.1.3 配置与启动
1)先准备好两台redis
- #将redis.conf拷贝一份,注意以下配置项
- #后台启动
- daemonize yes
- #bind这一行一定要注释掉!允许外部ip连接,否则将来用redis-cli连接操作命令的时候会报一个错误:
- #Error: Connection reset by peer
- #bind 127.0.0.1 -::1
-
- #启动两个实例,在8081和8082端口
- [root@iZ8vb3a9qxofwannyywl6zZ twemproxy]# pwd
- /opt/redis/latest/twemproxy
- [root@iZ8vb3a9qxofwannyywl6zZ twemproxy]# ..src/redis-server redis.conf --port 8081
- [root@iZ8vb3a9qxofwannyywl6zZ twemproxy]# ..src/redis-server redis.conf --port 8082
-
-
- #确认服务启动成功
- [root@iZ8vb3a9qxofwannyywl6zZ twemproxy]# ps aux | grep redis-server
- root 18209 0.1 0.0 162492 2680 ? Ssl 13:35 0:00 ..src/redis-server 127.0.0.1:8081
- root 18217 0.0 0.0 162492 2688 ? Ssl 13:35 0:00 ..src/redis-server 127.0.0.1:8082
- 复制代码
2)配置twemproxy
- #将yml文件拷贝一份,test.yml,并修改内容为自己的redis地址
- [root@iZ8vb3a9qxofwannyywl6zZ conf]# pwd
- /opt/redis/latest/twemproxy/twemproxy-0.4.1/conf
- [root@iZ8vb3a9qxofwannyywl6zZ conf]# cp nutcracker.yml test.yml
- 复制代码
- #test.yml文件说明
- alpha: #标记,如果多组,就alpha,beta……往上加,参考 nutcracker.yml 样本
- listen: 127.0.0.1:22121 # 这组集群暴露的端口,将来连这个
- hash: fnv1a_64 #hash散列算法
- distribution: modula #分片算法,这里用取模方式,一共三种,后面详细介绍
- auto_eject_hosts: true #自动摘除故障节点
- redis: true #是不是redis,false则表示memcache
- server_retry_timeout: 2000 #每隔2秒判断故障节点是否正常,如果正常则放回一致性hash环
- server_failure_limit: 3 #多少次无响应,就从一致性hash环中摘除
- #redis实例列表,一定要加别名,否则宕机后更换机器,分片就不一样了
- #加了别名后,将用别名做分片节点名,否则用ip加端口权重,一旦ip变更会重新迁移
- servers:
- - 127.0.0.1:8081:1 redis-1
- - 127.0.0.1:8082:1 redis-2
-
- 复制代码
- #启动:-d后台启动,-c指定启动文件
- [root@iZ8vb3a9qxofwannyywl6zZ conf]# ..src/nutcracker -d -c test.yml
- [root@iZ8vb3a9qxofwannyywl6zZ conf]# ps aux | grep nutcracker
- root 14601 0.0 0.0 1136 248 pts/0 Ssl+ 09:25 0:00 /usr/sbin/nutcracker -c /opt/nutcracker.yml
- 复制代码
3)连接与验证
- #连接非常的简单,用redis-cli和直连redis一样
-
- #首先在twemproxy上设置多个key,均成功
- [root@iZ8vb3a9qxofwannyywl6zZ ~]# redis-cli -p 22121
- 127.0.0.1:22121> set a a
- OK
- 127.0.0.1:22121> set b b
- OK
- 127.0.0.1:22121> set c c
- OK
- 127.0.0.1:22121> set d d
- OK
-
- #先连redis-1 , 取到ac, bd取不到
- [root@iZ8vb3a9qxofwannyywl6zZ ~]# redis-cli -p 8081
- 127.0.0.1:8081> get a
- "a"
- 127.0.0.1:8081> get b
- (nil)
- 127.0.0.1:8081> get c
- "c"
- 127.0.0.1:8081> get d
- (nil)
- 127.0.0.1:8081>
-
-
- #再连redis-2 , 发现ac不存在,bd在这里,验证集群分片成功!
- [root@iZ8vb3a9qxofwannyywl6zZ ~]# redis-cli -p 8082
- 127.0.0.1:8082> get a
- (nil)
- 127.0.0.1:8082> get b
- "b"
- 127.0.0.1:8082> get c
- (nil)
- 127.0.0.1:8082> get d
- "d"
- 复制代码
1.1.4 分片策略
1)读写原理
写入时,twemproxy将多个对应的key计算hash值路由到对应的后端redis机器。
而要在redis集群中查询对应的key/value时,twemproxy同样计算hash值从对应的后端redis收集过来,然后拼接起来返回给用户。
2)策略
后台的redis或memcached集群可以通过以下几种算法进行key/value的分配(distribution属性):
答:
Redis6.0 引入多线程主要是为了提高网络 IO 读写性能(Redis 的瓶颈并不在 CPU,而在内存和网络。)
虽然,Redis6.0 引入了多线程,但是 Redis 的多线程只是在网络数据的读写这类耗时操作上使用了, 执行命令仍然是单线程顺序执行。因此,你也不需要担心线程安全问题。
流程简述如下:
该设计有如下特点:
1、IO 线程要么同时在读 socket,要么同时在写,不会同时读或写
2、IO 线程只负责读写 socket 解析命令,不负责命令处理
Redis6.0 的多线程默认是禁用的,只使用主线程
如需开启需要修改 redis 配置文件 redis.conf
:
- io-threads-do-reads yes
- 复制代码
开启多线程后,还需要设置线程数,否则是不生效的。同样需要修改 redis 配置文件 redis.conf
:
- io-threads 4 #官网建议4核的机器建议设置为2或3个线程,8核的建议设置为6个线程
- 复制代码
概念:缓存穿透是指缓存和数据库中都没有的数据,而用户不断发起请求,造成数据库的压力倍增的情况
例:发起为id值为 -1 的数据或 id 为特别大不存在的数据
解决方案:
(1)接口层增加校验,比如用户鉴权校验,参数做校验 比如:id 做基础校验,id <=0的直接拦截
(2)对于像ID为负数的非法请求直接过滤掉,采用布隆过滤器(Bloom Filter)
(3)针对在数据库中找不到记录的,我们仍然将该空数据存入缓存中,当然一般会设置一个较短的过期时间
概念:缓存服务器宕机或者大量缓存集中某个时间段失效,导致请求全部去到数据库,造成数据库压力倍增的情况,这个是针对多个key而言
解决:
(1)实现缓存高可用,通过redis cluster将热点数据均匀分布在不同的Redis库中也能避免全部失效的问题
(2)批量往Redis存数据的时候,把每个Key的失效时间都加个随机值
- setRedis(Key,value,time + Math.random() * 10000);
- 复制代码
概念:redis过期后的一瞬间,有大量用户请求同一个缓存数据,导致这些请求都去请求数据库,造成数据库压力倍增的情,针对一个key而言
缓存击穿与缓存雪崩的区别是这里针对的是某一热门key缓存,而雪崩针对的是大量缓存的集中失效
解决方案
● 设置热点数据永远不过期。
● 使用互斥锁(mutex key)
好了,今天就先码到这里了,接着去搬砖了,茅台没抢着,倒是学习了不少技术,也是收获颇丰
希望大家不要吝啬你的小手,给狂野君点个赞,你的认可,是我最大的动力
以后,会持续为大家送上干货的,欢迎大家关注,以免迷路
以下是往期Redis的干货,欢迎收藏
- 伙伴们有兴趣想了解内容和更多相关学习资料的请点赞收藏+评论转发+关注我,
- 后面会有很多干货。我有一些面试题、架构、设计类资料可以说是程序员面试必备!
- 所有资料都整理到网盘了,需要的话欢迎下载!私信我回复【999】即可免费获取
作者:博学谷狂野架构师
链接:https://juejin.cn/post/7098957369224200200
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。