说说我的原因,项目中有一个通知表,用于个人工作通知到,全局广播通知,上线一个月出现了50w条数据,然后这些数据在业务上有需要保留不能物理删除。数据量多了,压测结果不是很理想,需要改造
说说我的思路,我准备将数据存到redis缓存中,看看了sql和代码逻辑,发现了两个问题,一,sql呢是根据两个字段进行排序,一个是scope,消息的权重一个是创建时间,redis是可以达到排序的要求,但这样多字段排序还是没办法达到的(可能本人较菜没找到具体方式);二,是需要实现分页的。那就需要动动脑筋了。
排序,肯定想到的就是redis的有序列表,它去重,且有序,在添加的时候有权重参数传入。接下来说说我的解决思路
我的方案有两
第一个,我可以在添加消息后获取消息对象在数据库的id,并将这个id存入zset 权重就给这个id,id是递增的,最新添加的id大于之前的也就说存到redis中权重大,可以根据这个取数据实现分页效果,找到对应的id,不要查表,不要查表,不要查表,问题的根据是连接数据库和查表,我们查出来id是为了在redis中查询数据,回到我们的添加,在添加id的同时也将这个添加对象放到redis,并设置一个过期时间,个人建议可以使用7天。redis的zset可以存放40亿个成员(来源于菜鸟教程)
也就是可以不用考虑zset中的成员量,根据权重范围取得id,再拿id去取对应的数据,取到了皆大欢喜,取不到拿id去数据库查询。整体思路就是这样。
方案二,直接将对象存到zset中,并将scope和创建时间格式化成数字,二者用字符串加起来再转换成double,并设置为权重,同时保持zset800条数据,因为是小程序的下拉分页,800条的话会有80次分页,应该够用了,如果超过80次分页,再zset是查不到数据的,这是会请求数据库,并且不会将查到的数据放到redis中,因为请求量不会很大,
添加的时候会先判断zset的size,超过800 会倒序取出权重最低的并将这些删除,然后再添加。
我采用的是第二种。准备必要代码
package com.example.demo.config;
import com.alibaba.fastjson.parser.ParserConfig;
import com.example.demo.tools.FastJson2JsonRedisSerializer;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
import org.springframework.cache.CacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.stereotype.Component;
import java.time.Duration;
@Component
public class RedisConfig {
RedisConfig(){
//打开autotype功能,需要强转的类一次添加其后
// ParserConfig.getGlobalInstance()
// .addAccept("com.example.demo.entity.dto.MessageDto");
ParserConfig.getGlobalInstance().setAutoTypeSupport(true);
}
@Bean
public RedisTemplate<Object,Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<Object, Object> template = new RedisTemplate<>();
template.setConnectionFactory(factory);
FastJson2JsonRedisSerializer serializer = new FastJson2JsonRedisSerializer(Object.class);
ObjectMapper mapper = new ObjectMapper();
mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
mapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);
serializer.setObjectMapper(mapper);
// 使用StringRedisSerializer来序列化和反序列化redis的key值
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(serializer);
// Hash的key也采用StringRedisSerializer的序列化方式
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(serializer);
template.afterPropertiesSet();
return template;
}
@Bean
public CacheManager cacheManager(RedisConnectionFactory factory) {
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
//解决查询缓存转换异常的问题
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
// 配置序列化(解决乱码的问题),过期时间600秒
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofSeconds(600)) //设置数据过期时间600秒
.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer))
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
.disableCachingNullValues();
RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
.cacheDefaults(config)
.build();
return cacheManager;
}
}
以上代码的注入redisTemplate供我们使用,接下来封装一个工具类来使用这个,
(后来补充:后来jackson出现无法序列化LocalDateTime,还有反序列化失败的问题,搞了我一下午要晕了,后来我直接换了fastjson,没有这个包的去pom里面加以一下,所以上面的redisconfig是我更新的,这个还需要一个文件也补充上)
依赖
<dependency>
<groupId>com.alibabagroupId>
<artifactId>fastjsonartifactId>
<version>1.2.83version>
dependency>
package com.example.demo.tools;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.TypeFactory;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.SerializationException;
import com.alibaba.fastjson.parser.ParserConfig;
import org.springframework.util.Assert;
import java.nio.charset.Charset;
/**
* FastJson2JsonRedisSerializer
* Redis使用FastJson序列化
* by zhengkai.blog.csdn.net
*/
public class FastJson2JsonRedisSerializer<T> implements RedisSerializer<T> {
public static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
private ObjectMapper objectMapper = new ObjectMapper();
private Class<T> clazz;
public FastJson2JsonRedisSerializer(Class<T> clazz) {
super();
this.clazz = clazz;
}
public byte[] serialize(T t) throws SerializationException {
if (t == null) {
return new byte[0];
}
return JSON.toJSONString(t, SerializerFeature.WriteClassName).getBytes(DEFAULT_CHARSET);
}
public T deserialize(byte[] bytes) throws SerializationException {
if (bytes == null || bytes.length <= 0) {
return null;
}
String str = new String(bytes, DEFAULT_CHARSET);
return (T) JSON.parseObject(str, clazz);
}
public void setObjectMapper(ObjectMapper objectMapper) {
Assert.notNull(objectMapper, "'objectMapper' must not be null");
this.objectMapper = objectMapper;
}
}
package com.example.demo.tools;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.stereotype.Component;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@Component
public class RedisTools {
public final RedisTemplate redisTemplate;
public RedisTools(@Qualifier(value = "redisTemplate") RedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
public <T> T getDataByKey(String key) {
ValueOperations<String, T> operation = redisTemplate.opsForValue();
return operation.get(key);
}
public <T> T getDataByKey(String key, String hKey) {
HashOperations<String, String, T> operation = redisTemplate.opsForHash();
return operation.get(key, hKey);
}
public <T> void setData(String key, T value) {
ValueOperations<String, T> operation = redisTemplate.opsForValue();
operation.set(key, value);
}
public <T> void setDataAndExpire(String key, T value, Integer integer, TimeUnit timeUnit) {
ValueOperations<String, T> operation = redisTemplate.opsForValue();
operation.set(key, value, integer, timeUnit);
}
public <T> void setData(String key, String hKey, T value) {
HashOperations<String, String, T> operation = redisTemplate.opsForHash();
operation.put(key, hKey, value);
}
/**
* 不一定有 设置存活时间
*/
public <T> void setZsetData(String key, T value, double scope) {
ZSetOperations<String, T> operation = redisTemplate.opsForZSet();
operation.add(key, value, scope);
}
/**
*
* 获取存储数据 默认根据scope倒序
* */
public <T> Set<T> getZsetData(String key, long start,long end ,boolean type) {
ZSetOperations<String, T> operation = redisTemplate.opsForZSet();
if (type) {
//倒序
Set<T> set = operation.reverseRange(key,start,end);
return set;
}
//正序
Set<T> set = operation.range(key,start,end);
return set;
}
/**
*
* 获取存储数据的size,用于保持size条数固定,超过可以删除等
* */
public <T> long getZsetSize(String key) {
ZSetOperations<String, T> operation = redisTemplate.opsForZSet();
return operation.size(key);
}
/**
* 删除zset中的对象数据
* */
public <T> long removeZsetByObject(String key,T... values) {
ZSetOperations<String, T> operation = redisTemplate.opsForZSet();
return operation.remove(key,values);
}
/**
* 删除redis中的数据,根据key
* */
public <T> Boolean delete(String key){
Boolean result = redisTemplate.delete(key);
return result;
}
/**
* 删除redis中的数据,根据key和hkeys
* */
public <T> Long delete(String key,String... hKeys){
long result = redisTemplate.opsForHash().delete(key,hKeys);
return result;
}
public <T> Long ttl(String key){
long times = redisTemplate.getExpire(key);
return times;
}
/**
* 返回储存在给定键的 HyperLogLog 的近似基数,如果键不存在,那么返回 0
* */
public <T> Long hyperLogLog(String... keys){
long index = redisTemplate.opsForHyperLogLog().size(keys);
return index;
}
/**
* */
public <T> Long addHyperLogLog(String key,T... values){
long index = redisTemplate.opsForHyperLogLog().add(key,values);
return index;
}
}
准备代码交代完毕,接下来是逻辑代码了
package com.example.demo.controller;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.util.RandomUtil;
import com.example.demo.entity.Message;
import com.example.demo.entity.dto.MessageDto;
import com.example.demo.service.MessageService;
import com.example.demo.tools.RedisTools;
import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.format.DateTimeFormatter;
import java.util.*;
@RestController
@Slf4j
public class TestController {
@Autowired
RedisTools redisTools;
@Autowired
MessageService messageService;
@GetMapping("getkey")
public Set get(long pageIndex, long pageSize) {
//pageIndex 从1开始,不要像之前可以传0,或者也可以学习mybatis做一个偏移量设计
long start = (pageIndex - 1) * pageSize;
long end = (pageSize * pageIndex) - 1;
Set<MessageDto> set = redisTools.getZsetData(MESSAGE_KEY, start, end, Boolean.TRUE);
log.info("size:{}", set.size());
return set;
}
private final static String MESSAGE_KEY = "MESSAGE_KEY";
@GetMapping("insert")
public void insert() throws InterruptedException, JsonProcessingException {
for (int i = 0; i < 20; i++) {
Message message = new Message();
message.setMsgTitle("测试标题");
message.setMsgContext("测试内容");
String score = RandomUtil.randomNumbers(1);
message.setScope(Convert.toInt(score));
message.setSendPeo(1L);
message.setReceivePeo(1L);
//这里使用的是mybatis-plus的ar模式添加,就是添加自己判断
message.insert();
//我的业务场景没有批量添加所以时间不会相同,这个防止相同后面scope相同排序出问题
Thread.sleep(1000L);
Long tempScope;
//这里是创建时间是mybatis-plus字段的添加@TableField("create_time", fill = FieldFill.INSERT) 这个注解,并写自动注入添加上值的
//相关注入可以去mybatis-plus官网查看
String timeStr = message.getCreateTime().format(DateTimeFormatter.ofPattern("yyMMddHHmmss"));
log.info("timeStr:{}", timeStr);
// 如果你是批量添加,创建时间精确到秒相同,也可以用id作为后面你拼接的scope,id需要保持位数相同,
// 推荐使用功能雪花算法,可以百度下这个mybatis-plus怎么使用,这里就不再多说
if (message.getScope() == 0) {
tempScope = Convert.toLong(timeStr);
} else {
String str = message.getScope().toString() + timeStr;
log.info("str:{}", str);
tempScope = Convert.toLong(message.getScope().toString() + timeStr);
}
log.info("tempScope:{},scopr:{}", tempScope, message.getScope());
double scope = tempScope / 10000;
log.info("scope:{}", scope);
MessageDto dto = new MessageDto();
BeanUtil.copyProperties(message, dto);
redisTools.setZsetData(MESSAGE_KEY, dto, scope);
}
Long size = redisTools.getZsetSize(MESSAGE_KEY);
log.info("zset长度为:{}",size);
if (size > 20) {
Set<MessageDto> set = redisTools.getZsetData(MESSAGE_KEY, 0, (size - 21), Boolean.FALSE);
for (MessageDto dto : set) {
Long flag = redisTools.removeZsetByObject(MESSAGE_KEY,dto);
log.info(flag.toString());
}
size = redisTools.getZsetSize(MESSAGE_KEY);
log.info("size:{}", size);
}
}
@GetMapping("remove")
public void remove() {
redisTools.redisTemplate.delete(MESSAGE_KEY);
}
}
总的来说代码就是这样,最后把表和实体类展示一下,至于相关依赖,service等文件,就不放了,后面可能会考虑创建项目放gitee供参考, 我这是kotlin文件 你参考下该成Java 或者直接使用也行
CREATE TABLE `message` (
`id` bigint(20) NOT NULL,
`message_title` varchar(50) DEFAULT NULL,
`message_context` varchar(200) DEFAULT NULL,
`send_peo` bigint(20) DEFAULT NULL,
`receive_peo` bigint(20) DEFAULT NULL,
`scope` int(11) DEFAULT '0',
`del_flag` tinyint(4) DEFAULT '0',
`create_time` datetime DEFAULT NULL,
`creator` tinyint(4) DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
package com.example.demo.entity
import com.baomidou.mybatisplus.annotation.*
import com.baomidou.mybatisplus.extension.activerecord.Model
import com.fasterxml.jackson.annotation.JsonFormat
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import com.fasterxml.jackson.databind.annotation.JsonSerialize
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer
import lombok.Data
import java.time.LocalDateTime
@Data
@TableName("message")
class Message : Model<Message>() {
/**
* id
* */
@TableId(value = "id", type = IdType.ASSIGN_ID)
var id: Long? = null;
/**
* 消息标题
* */
@TableField("message_title")
var msgTitle: String? = null;
/**
* 消息主体
* */
@TableField("message_context")
var msgContext: String? = null
/**
* 发送人
* */
@TableField("send_peo")
var sendPeo: Long? = null;
/**
* 接收人
* */
@TableField("receive_peo")
var receivePeo: Long? = null;
/**
* 权重
* */
@TableField("scope")
var scope: Int? = null;
/**
* 逻辑删除标志
* */
@TableField("del_flag")
var delFalg: Int? = null;
/**
* 创建时间
* */
@TableField("create_time", fill = FieldFill.INSERT)
@JsonDeserialize(using = LocalDateTimeDeserializer::class)
@JsonSerialize(using = LocalDateTimeSerializer::class)
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
var createTime: LocalDateTime? = null;
/**
* 创建人
* */
@TableField("creator")
var creator: Long? = null;
/**
* 修改时间
* */
@TableField("update_time",fill = FieldFill.INSERT_UPDATE)
@JsonDeserialize(using = LocalDateTimeDeserializer::class)
@JsonSerialize(using = LocalDateTimeSerializer::class)
var updateTime: LocalDateTime? = null
}
package com.example.demo.entity.dto
import com.fasterxml.jackson.annotation.JsonFormat
import java.io.Serializable
import java.time.LocalDateTime
class MessageDto : Serializable {
/**
* id
* */
var id: Long? = null;
/**
* 消息标题
* */
var msgTitle: String? = null;
/**
* 消息主体
* */
var msgContext: String? = null
/**
* 发送人
* */
var sendPeo: Long? = null;
/**
* 接收人
* */
var receivePeo: Long? = null;
/**
* 权重
* */
var scope: Int? = null;
/**
* 创建时间
* */
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
var createTime: LocalDateTime? = null;
}
接下来我给大家演示一下最终效果
先调用一个删除接口
再调用两次添加接口,两次是为了验证zset里面数据固定长度
第一次结束
第二次结束,长度正常,接下来就是看我们的顺序是否和slq查询顺序一样就行了
调用getkey获取数据
复制数据获得id
至此完成redis实现mysql多字段排序功能。觉得有用的点个赞吧,小白我谢谢你了。