• 高并发 -- 多级缓存


     

     JVM 进程缓存

     

     

    1. @SpringBootTest
    2. public class CaffeineTest {
    3. @Test
    4. void test() {
    5. //构建cache对象
    6. Cache cache = Caffeine.newBuilder().build();
    7. //存数据
    8. cache.put("gf", "lb");
    9. //取数据
    10. String gf = cache.getIfPresent("gf");
    11. System.out.println(gf);
    12. //取取数据,如果未命中,查询数据库
    13. String defaultGF = cache.get("defaultGF", key -> {
    14. //根据key,去查询数据库
    15. return "hana";
    16. });
    17. System.out.println(defaultGF);
    18. }
    19. @Test
    20. void maxSize() throws InterruptedException {
    21. //构建cache对象,最大存储数量为1,如果存入多个,不会立刻删除
    22. Cache cache = Caffeine.newBuilder().maximumSize(1).build();
    23. cache.put("gf1", "gf1");
    24. cache.put("gf2", "gf2");
    25. cache.put("gf3", "gf3");
    26. Thread.sleep(10L);
    27. System.out.println(cache.getIfPresent("gf1"));
    28. System.out.println(cache.getIfPresent("gf2"));
    29. System.out.println(cache.getIfPresent("gf3"));
    30. }
    31. @Test
    32. void expire() throws InterruptedException {
    33. //构建cache对象,写入后多长时间失效
    34. Cache cache = Caffeine.newBuilder().expireAfterWrite(1L, TimeUnit.SECONDS).build();
    35. cache.put("gf1", "gf1");
    36. System.out.println(cache.getIfPresent("gf1"));
    37. Thread.sleep(1200L);
    38. System.out.println(cache.getIfPresent("gf1"));
    39. }
    40. }

     实现进程缓存

    1. @Configuration
    2. public class CaffeineConfig {
    3. @Bean
    4. public Cache userCache(){
    5. return Caffeine.newBuilder()
    6. .initialCapacity(100)
    7. .maximumSize(10_000)
    8. .build();
    9. }
    10. @Bean
    11. public Cache blogCache(){
    12. return Caffeine.newBuilder()
    13. .initialCapacity(100)
    14. .maximumSize(10_000)
    15. .build();
    16. }
    17. }
    18. @Component
    19. public class CaffeineService {
    20. @Autowired
    21. private Cache userCache;
    22. @Autowired
    23. private Cache blogCache;
    24. public User getUser(Long id) {
    25. //当缓存中没有,则查询数据库,查到结果放入缓存。减少数据库交互,提供并发量
    26. return userCache.get(id, key -> {
    27. //select * from user where id = key
    28. return new User("lb", 32);
    29. });
    30. }
    31. public Blog getBlog(Long id) {
    32. return blogCache.get(id, key -> {
    33. //select * from blog where id = key
    34. return new Blog();
    35. });
    36. }
    37. }

    Lua 语言

    Lua语言可以在nginx中开发使用的语言

     

     

     

     

     

    多级缓存

    openResty

     安装

    1.安装开发库

    首先要安装OpenResty的依赖开发库,执行命令:

    yum install -y pcre-devel openssl-devel gcc --skip-broken

    2.安装OpenResty仓库

    你可以在你的 CentOS 系统中添加 openresty 仓库,这样就可以便于未来安装或更新我们的软件包(通过 yum check-update 命令)。运行下面的命令就可以添加我们的仓库:

    yum-config-manager --add-repo https://openresty.org/package/centos/openresty.repo

    如果提示说命令不存在,则运行:

    yum install -y yum-utils 

    然后再重复上面的命令

    3.安装OpenResty

    然后就可以像下面这样安装软件包,比如 openresty

    yum install -y openresty

    4.安装opm工具

    opm是OpenResty的一个管理工具,可以帮助我们安装一个第三方的Lua模块。

    如果你想安装命令行工具 opm,那么可以像下面这样安装 openresty-opm 包:

    yum install -y openresty-opm

    5.目录结构

    默认情况下,OpenResty安装的目录是:/usr/local/openresty

     看到里面的nginx目录了吗,OpenResty就是在Nginx基础上集成了一些Lua模块。

    6.配置nginx的环境变量

    打开配置文件:

    vi /etc/profile

    在最下面加入两行:

    1. export NGINX_HOME=/usr/local/openresty/nginx
    2. export PATH=${NGINX_HOME}/sbin:$PATH

    NGINX_HOME:后面是OpenResty安装目录下的nginx的目录

    然后让配置生效:

    source /etc/profile

    启动和运行

    OpenResty底层是基于Nginx的,查看OpenResty目录的nginx目录,结构与windows中安装的nginx基本一致:

     所以运行方式与nginx基本一致:

    1. # 启动nginx
    2. nginx
    3. # 重新加载配置
    4. nginx -s reload
    5. # 停止
    6. nginx -s stop

    nginx的默认配置文件注释太多,影响后续编辑,这里将nginx.conf中的注释部分删除,保留有效部分。

    修改/usr/local/openresty/nginx/conf/nginx.conf文件,内容如下:

    1. #user nobody;
    2. worker_processes 1;
    3. error_log logs/error.log;
    4. events {
    5. worker_connections 1024;
    6. }
    7. http {
    8. include mime.types;
    9. default_type application/octet-stream;
    10. sendfile on;
    11. keepalive_timeout 65;
    12. server {
    13. listen 8081;
    14. server_name localhost;
    15. location / {
    16. root html;
    17. index index.html index.htm;
    18. }
    19. error_page 500 502 503 504 /50x.html;
    20. location = /50x.html {
    21. root html;
    22. }
    23. }
    24. }

    在Linux的控制台输入命令以启动nginx

    nginx
    

    然后访问页面:http://192.168.150.101:8081,注意ip地址替换为你自己的虚拟机IP:

     请注意这里的nginx作用是下图中本地缓存nginx,不是反向代理的nginx

    openResty使用

    在反向代理的Nginx服务上已经配置 /api 的url将路由到 openResty的Nginx服务上

     openResty配置

    1.修改nginx.comf文件

    1. #lua 模块
    2. lua_package_path "/usr/local/openresty/lualib/?.lua;;";
    3. #c模块
    4. lua_package_cpath "/usr/local/openresty/lualib/?.so;;";
    1. location /api/item {
    2. default_type application/json;
    3. content_by_lua_file lua/item.lua;
    4. }

    2.编写item.lua文件

    ngx.say('{"id":10001,"name":"Phone"}')

     3.openResty 获取请求参数

     

    1. local id = ngx.var[1]
    2. -- ..是lua中的拼接符号
    3. ngx.say('{"id":' .. id .. ',"name":"Phone"}')

    OpenResty发送请求到一台JVM 服务器

     

     

     实现:

    nginx.conf

    1. location /item {
    2. proxy_pass http://192.168.150.1:8081;
    3. }

     封装http请求工具类

     common.lua

    1. -- 封装函数,发送http请求,并解析响应
    2. local function read_http(path, params)
    3. local resp = ngx.location.capture(path,{
    4. method = ngx.HTTP_GET,
    5. args = params,
    6. })
    7. if not resp then
    8. -- 记录错误信息,返回404
    9. ngx.log(ngx.ERR, "http not found, path: ", path , ", args: ", args)
    10. ngx.exit(404)
    11. end
    12. return resp.body
    13. end
    14. -- 将方法导出
    15. local _M = {
    16. read_http = read_http
    17. }
    18. return _M

    其中会用到cjson工具库

     lua/item.lua

    1. --导入common函数库
    2. local common = require('commnon')
    3. local read_http = common.read_http
    4. --导入cjson函数库,
    5. local cjson = require('cjson')
    6. --获取路径参数
    7. local id = ngx.var[1]
    8. --查询商品信息 nil代表无参
    9. local itemJSON = read_http("/item/" .. id, nil)
    10. --查询库存信息
    11. local stockJSON = read_http("/item/stock/" .. id, nil)
    12. --转化为lua的table类型
    13. --JSON转table则需要cjson工具包
    14. local item = cjson.decode(itemJSON)
    15. local stock = cjson.decode(stockJSON)
    16. --组合数据 商品的库存(stock)和销量(sold),存储在stock对象中
    17. item.stock = stock.stock
    18. item.sold = stock.sold
    19. --返回结果
    20. ngx.say(cjson.encode(item))

    OpenResty发送请求到多台JVM 服务器(负载均衡)

     但是如果存在多台JVM服务器时,缓存只存在上一次接受请求的JVM服务器上,其他的JVM中没有JVM缓存。如果JVM服务器数量很多,并且负载均衡的策略为轮训时,则依旧会造成对数据库的大量访问。则可以对request的uri进行哈希算法,让同一个id永远只会路由到同一台JVM服务器上使其JVM缓存永远生效.

    修改/usr/local/openresty/nginx/conf/nginx.conf文件,内容如下:

    1. #user nobody;
    2. worker_processes 1;
    3. error_log logs/error.log;
    4. events {
    5. worker_connections 1024;
    6. }
    7. http {
    8. include mime.types;
    9. default_type application/octet-stream;
    10. sendfile on;
    11. keepalive_timeout 65;
    12. #lua 模块
    13. lua_package_path "/usr/local/openresty/lualib/?.lua;;";
    14. #c模块
    15. lua_package_cpath "/usr/local/openresty/lualib/?.so;;";
    16. upstream tomcat-cluster {
    17. hash $request_uri;
    18. server 192.168.150.1:8081;
    19. server 192.168.150.1:8082;
    20. }
    21. server {
    22. listen 8081;
    23. server_name localhost;
    24. location /item {
    25. proxy_pass http://tomcat-cluster;
    26. }
    27. location ~ /api/(\d+) {
    28. default_type application/json;
    29. content_by_lua_file lua/item.lua;
    30. }
    31. location / {
    32. root html;
    33. index index.html index.htm;
    34. }
    35. error_page 500 502 503 504 /50x.html;
    36. location = /50x.html {
    37. root html;
    38. }
    39. }
    40. }

    OpenResty 请求Redis获取数据

     

     

    1. @Component
    2. public class RedisHandler implements InitializingBean {
    3. @Autowired
    4. StringRedisTemplate stringRedisTemplate;
    5. @Override
    6. public void afterPropertiesSet() throws Exception {
    7. //初始化缓存
    8. //查询热点数据存储到Redis中使用json
    9. //查询热点数据的库存和销量到Redis中使用json
    10. }
    11. }

    下图中导入的Redis工具类"resty.redis"表示在lua文件夹下的resty文件夹下的redis.lua文件 

     

     common.lua

    1. --导入redis
    2. local redis = require('resty.redis')
    3. --初始化redis
    4. local red = redis:new()
    5. red:set_timeouts(1000,1000,1000)
    6. -- 关闭redis连接的工具方法,其实是放入连接池
    7. local function close_redis(red)
    8. local pool_max_idle_time = 10000 -- 连接的空闲时间,单位是毫秒
    9. local pool_size = 100 --连接池大小
    10. local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)
    11. if not ok then
    12. ngx.log(ngx.ERR, "放入redis连接池失败: ", err)
    13. end
    14. end
    15. -- 查询redis的方法 ip和port是redis地址,key是查询的key
    16. local function read_redis(ip, port, key)
    17. -- 获取一个连接
    18. local ok, err = red:connect(ip, port)
    19. if not ok then
    20. ngx.log(ngx.ERR, "连接redis失败 : ", err)
    21. return nil
    22. end
    23. -- 查询redis
    24. local resp, err = red:get(key)
    25. -- 查询失败处理
    26. if not resp then
    27. ngx.log(ngx.ERR, "查询Redis失败: ", err, ", key = " , key)
    28. end
    29. --得到的数据为空处理
    30. if resp == ngx.null then
    31. resp = nil
    32. ngx.log(ngx.ERR, "查询Redis数据为空, key = ", key)
    33. end
    34. close_redis(red)
    35. return resp
    36. end
    37. -- 封装函数,发送http请求,并解析响应
    38. local function read_http(path, params)
    39. local resp = ngx.location.capture(path,{
    40. method = ngx.HTTP_GET,
    41. args = params,
    42. })
    43. if not resp then
    44. -- 记录错误信息,返回404
    45. ngx.log(ngx.ERR, "http not found, path: ", path , ", args: ", args)
    46. ngx.exit(404)
    47. end
    48. return resp.body
    49. end
    50. -- 将方法导出
    51. local _M = {
    52. read_http = read_http,
    53. read_redis = read_redis
    54. }
    55. return _M

    则修改lua/item.lua的逻辑为先查询Redis,未查到在查JVM服务器

    1. --导入common函数库
    2. local common = require('commnon')
    3. local read_http = common.read_http
    4. local read_redis = common.read_redis
    5. --导入cjson函数库,
    6. local cjson = require('cjson')
    7. local function read_data(key, path, params)
    8. --查询redis
    9. local resp = read_redis('192.168.99.100', 6379, key)
    10. --判断查询结果
    11. if not resp then
    12. ngx.log('redis查询失败,尝试查询http,key:', key)
    13. resp = read_http(path, params)
    14. end
    15. return resp
    16. end
    17. --获取路径参数
    18. local id = ngx.var[1]
    19. --查询商品信息 nil代表无参
    20. local itemJSON = read_data('item:id:' .. id, "/item/" .. id, nil)
    21. --查询库存信息
    22. local stockJSON = read_data('item:stock:id:' .. id, "/item/stock/" .. id, nil)
    23. --转化为lua的table类型
    24. --JSON转table则需要cjson工具包
    25. local item = cjson.decode(itemJSON)
    26. local stock = cjson.decode(stockJSON)
    27. --组合数据 商品的库存(stock)和销量(sold),存储在stock对象中
    28. item.stock = stock.stock
    29. item.sold = stock.sold
    30. --返回结果
    31. ngx.say(cjson.encode(item))

    OpenResty 本地缓存

     

     修改/usr/local/openresty/nginx/conf/nginx.conf文件,内容如下:

    1. #user nobody;
    2. worker_processes 1;
    3. error_log logs/error.log;
    4. events {
    5. worker_connections 1024;
    6. }
    7. http {
    8. include mime.types;
    9. default_type application/octet-stream;
    10. sendfile on;
    11. keepalive_timeout 65;
    12. #lua 模块
    13. lua_package_path "/usr/local/openresty/lualib/?.lua;;";
    14. #c模块
    15. lua_package_cpath "/usr/local/openresty/lualib/?.so;;";
    16. #添加共享词典,本地缓存
    17. lua_shared_dict item_cache 150m;
    18. upstream tomcat-cluster {
    19. hash $request_uri;
    20. server 192.168.150.1:8081;
    21. server 192.168.150.1:8082;
    22. }
    23. server {
    24. listen 8081;
    25. server_name localhost;
    26. location /item {
    27. proxy_pass http://tomcat-cluster;
    28. }
    29. location ~ /api/(\d+) {
    30. default_type application/json;
    31. content_by_lua_file lua/item.lua;
    32. }
    33. location / {
    34. root html;
    35. index index.html index.htm;
    36. }
    37. error_page 500 502 503 504 /50x.html;
    38. location = /50x.html {
    39. root html;
    40. }
    41. }
    42. }

    item.lua

    1. --导入common函数库
    2. local common = require('commnon')
    3. local read_http = common.read_http
    4. local read_redis = common.read_redis
    5. --导入cjson函数库,
    6. local cjson = require('cjson')
    7. --导入共享词典,本地缓存
    8. local item_cache = ngx.shared.item_cache
    9. local function read_data(key, path, params, expire)
    10. --查询本地缓存
    11. local resp = item_cache:get(key);
    12. if not resp then
    13. ngx.log(ngx.ERR, '本地缓存查询失败,尝试查询redis , key:', key)
    14. --查询redis
    15. resp = read_redis('192.168.99.100', 6379, key)
    16. --判断查询结果
    17. if not resp then
    18. ngx.log(ngx.ERR, 'redis查询失败,尝试查询http , key:', key)
    19. resp = read_http(path, params)
    20. end
    21. --查询成功,把数据写入本地缓存
    22. item_cache:set(key, resp, expire)
    23. end
    24. return resp
    25. end
    26. --获取路径参数
    27. local id = ngx.var[1]
    28. --查询商品信息 nil代表无参
    29. local itemJSON = read_data('item:id:' .. id, "/item/" .. id, nil, 1800)
    30. --查询库存信息
    31. local stockJSON = read_data('item:stock:id:' .. id, "/item/stock/" .. id, nil, 60)
    32. --转化为lua的table类型
    33. --JSON转table则需要cjson工具包
    34. local item = cjson.decode(itemJSON)
    35. local stock = cjson.decode(stockJSON)
    36. --组合数据 商品的库存(stock)和销量(sold),存储在stock对象中
    37. item.stock = stock.stock
    38. item.sold = stock.sold
    39. --返回结果
    40. ngx.say(cjson.encode(item))

     缓存同步

     

     

    Canal

     

    canal

    优点:,对代码没有任何耦合性,直接监控mysql的binary log在触发时间,异步MQ依然会存在少许的藕合。

    缺点:需要Mysql数据库开启主从

     安装 、启动

    1.开启Mysql主从

    Canal是基于MySQL的主从同步功能,因此必须先开启MySQL的主从功能才可以。

    这里以之前用Docker运行的mysql为例:

    1.1开启binlog

    打开mysql容器挂载的日志文件,我的在/tmp/mysql/conf目录:

    修改文件:

    vi /tmp/mysql/conf/my.cnf

     添加内容:

    1. log-bin=/var/lib/mysql/mysql-bin
    2. binlog-do-db=heima

    配置解读:

    • log-bin=/var/lib/mysql/mysql-bin:设置binary log文件的存放地址和文件名,叫做mysql-bin

    • binlog-do-db=heima:指定对哪个database记录binary log events,这里记录heima这个库

    最终效果:

    1. skip-name-resolve
    2. character_set_server=utf8
    3. datadir=/var/lib/mysql
    4. server-id=1000
    5. log-bin=/var/lib/mysql/mysql-bin
    6. binlog-do-db=heima

    1.2设置用户权限

    接下来添加一个仅用于数据同步的账户,出于安全考虑,这里仅提供对test这个库的操作权限.

    1. create user canal@'%' IDENTIFIED by 'canal';
    2. GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%' identified by 'canal';
    3. FLUSH PRIVILEGES;

    重启mysql容器即可

    docker restart mysql

    测试设置是否成功:在mysql控制台,或者Navicat中,输入命令:

    show master status;

     2.安装Canal

    2.1创建网络

    我们需要创建一个网络,将MySQL、Canal、MQ放到同一个Docker网络中:

    docker network create heima

    让mysql加入这个网络:

    docker network connect heima mysql

    2.2安装Canal

    提前准备好的canal的镜像压缩包 canal.tar

    docker load -i canal.tar

    2.3运行Canal容器

    然后运行命令创建Canal容器:

    1. docker run -p 11111:11111 --name canal \
    2. -e canal.destinations=heima \
    3. -e canal.instance.master.address=mysql:3306 \
    4. -e canal.instance.dbUsername=canal \
    5. -e canal.instance.dbPassword=canal \
    6. -e canal.instance.connectionCharset=UTF-8 \
    7. -e canal.instance.tsdb.enable=true \
    8. -e canal.instance.gtidon=false \
    9. -e canal.instance.filter.regex=heima\\..* \
    10. --network heima \
    11. -d canal/canal-server:v1.1.5

    说明:

    • -p 11111:11111:这是canal的默认监听端口

    • -e canal.instance.master.address=mysql:3306:数据库地址和端口,如果不知道mysql容器地址,可以通过docker inspect 容器id来查看

    • -e canal.instance.dbUsername=canal:数据库用户名

    • -e canal.instance.dbPassword=canal :数据库密码

    • -e canal.instance.filter.regex=:要监听的表名称

     表名称监听支持的语法:

    1. mysql 数据解析关注的表,Perl正则表达式.
    2. 多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
    3. 常见例子:
    4. 1. 所有表:.* or .*\\..*
    5. 2. canal schema下所有表: canal\\..*
    6. 3. canal下的以canal打头的表:canal\\.canal.*
    7. 4. canal schema下的一张表:canal.test1
    8. 5. 多个规则组合使用然后以逗号隔开:canal\\..*,mysql.test1,mysql.test2

    2.4查看canal运行状态

    1.Canal容器启动日志

    docker logs -f canal

     2.查看canal是否和mysql建立连接

    进入canal容器

    docker exec -it canal bash

    查看canal日志

    tail -f canal-server/logs/canal/canal.log
    

     查看canal和数据库相关的日志

    tail -f canal-server/logs/heima/heima.log
    

     Canal客户端应用

     

     要和启动canal时配置的desitinations一致

     

     @Id,@Transient,@Column 使用spring的注解就可以

    1. <dependency>
    2. <groupId>top.javatoolgroupId>
    3. <artifactId>canal-spring-boot-starterartifactId>
    4. <version>1.2.1-RELEASEversion>
    5. dependency>
    6. canal:
    7. destination: heima
    8. server: 192.168.150.101:11111

    1. import com.baomidou.mybatisplus.annotation.IdType;
    2. import com.baomidou.mybatisplus.annotation.TableField;
    3. import com.baomidou.mybatisplus.annotation.TableId;
    4. import com.baomidou.mybatisplus.annotation.TableName;
    5. import lombok.Data;
    6. import org.springframework.data.annotation.Id;
    7. import org.springframework.data.annotation.Transient;
    8. import javax.persistence.Column;
    9. import java.util.Date;
    10. @Data
    11. @TableName("tb_item")
    12. public class Item {
    13. @TableId(type = IdType.AUTO)
    14. @Id
    15. private Long id;//商品id
    16. @Column(name = "name")
    17. private String name;//商品名称
    18. private String title;//商品标题
    19. private Long price;//价格(分)
    20. private String image;//商品图片
    21. private String category;//分类名称
    22. private String brand;//品牌名称
    23. private String spec;//规格
    24. private Integer status;//商品状态 1-正常,2-下架
    25. private Date createTime;//创建时间
    26. private Date updateTime;//更新时间
    27. @TableField(exist = false)
    28. @Transient
    29. private Integer stock;
    30. @TableField(exist = false)
    31. @Transient
    32. private Integer sold;
    33. }
    34. @CanalTable("tb_item")
    35. @Component
    36. public class ItemHandler implements EntryHandler {
    37. @Autowired
    38. private RedisHandler redisHandler;
    39. @Autowired
    40. private Cache itemCache;
    41. @Override
    42. public void insert(Item item) {
    43. // 写数据到JVM进程缓存
    44. itemCache.put(item.getId(), item);
    45. // 写数据到redis
    46. redisHandler.saveItem(item);
    47. }
    48. @Override
    49. public void update(Item before, Item after) {
    50. // 写数据到JVM进程缓存
    51. itemCache.put(after.getId(), after);
    52. // 写数据到redis
    53. redisHandler.saveItem(after);
    54. }
    55. @Override
    56. public void delete(Item item) {
    57. // 删除数据到JVM进程缓存
    58. itemCache.invalidate(item.getId());
    59. // 删除数据到redis
    60. redisHandler.deleteItemById(item.getId());
    61. }
    62. }

    总结

     

     

  • 相关阅读:
    使用开源库libyuv中替换开源汇编接口,解决汇编接口中的崩溃问题
    将本地jar包手动添加到Maven仓库依赖处理
    国际结算名词解释
    缓存的设计
    sci论文写法
    羊城杯2023 部分wp
    音视频实战---音视频解码
    有损压缩与无损压缩
    SpringCloud Alibaba 整合Sentinel的基本使用
    一个基于.NET7的开源DNS服务 DnsServer 的部署使用经验分享
  • 原文地址:https://blog.csdn.net/qq_33753147/article/details/126548191