个人主页:阿Q说代码
‍♂️作者简介:公众号阿Q说代码作者(期待你的重视)、infoQ签约作者、CSDN后端领域新星创作者
技术方向:专心于后端技术栈共享:JVM、数据库、中间件、微服务、Spring全家桶

哈喽大家好,我是阿Q!

最近不是正好在研讨 canal 嘛,刚巧前两天看了一篇关于解决缓存与数据库一致性问题的文章,里边说到了一种解决方案是结合 canal 来操作的,所以阿Q就想趁热打铁,手动来完成一下。

架构

文中说到的思维是:

  • 选用先更新数据库,后删去缓存的方法来解决并发引发的一致性问题;
  • 选用异步重试的方法来确保“更新数据库、删去缓存”这两步都能履行成功;
  • 能够选用订阅变更日志的方法来铲除 Redis 中的缓存;

根据这种思维,阿Q脑海中搭建了以下架构

实现缓存和数据库一致性方案实战:mysql+canal+rabbitmq+redis

  • APP 从 Redis 中查询信息,将数据的更新写入 MySQL 数据库中;
  • Canal 向 MySQL 发送 dump 协议,接纳 binlog 推送的数据;
  • Canal 将接纳到的数据投递给 MQ 音讯行列;
  • MQ 音讯行列消费音讯,同时删去 Redis 中对应数据的缓存;

环境预备

这篇文章中有 mysql 的装置教程:mysql 装置

这篇文章中有 canal 的装置教程以及对 mysql 的相关装备:canal装置

考虑到咱们服务器之前装置过 RabbitMQ ,所以咱们就用 RabbitMQ 来充任音讯行列吧。

Canal 装备

修正 conf/canal.properties 装备

# 指定形式
canal.serverMode = rabbitMQ
# 指定实例,多个实例运用逗号分隔: canal.destinations = example1,example2
canal.destinations = example 
# rabbitmq 服务端 ip
rabbitmq.host = 127.0.0.1
# rabbitmq 虚拟主机 
rabbitmq.virtual.host = / 
# rabbitmq 交换机  
rabbitmq.exchange = xxx
# rabbitmq 用户名
rabbitmq.username = xxx
# rabbitmq 暗码
rabbitmq.password = xxx
rabbitmq.deliveryMode =

修正实例装备文件 conf/example/instance.properties

#装备 slaveId,自界说,不等于 mysql 的 server Id 即可
canal.instance.mysql.slaveId=10 
# 数据库地址:装备自己的ip和端口
canal.instance.master.address=ip:port 
# 数据库用户名和暗码 
canal.instance.dbUsername=xxx 
canal.instance.dbPassword=xxx
# 指定库和表
canal.instance.filter.regex=.*\\..*    // 这儿的 .* 表示 canal.instance.master.address 下面的所有数据库
# mq config
# rabbitmq 的 routing key
canal.mq.topic=xxx

然后重启 canal 服务。

这篇文章中有 RabbitMQ 的装置教程:RabbitMQ装置

这篇文章中有 Redis 的装置教程:Redis装置

数据库

建表语句

CREATE TABLE `product_info` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  `price` decimal(10,4) DEFAULT NULL,
  `create_date` datetime DEFAULT NULL,
  `update_date` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8

数据初始化

INSERT INTO cheetah.product_info
(id, name, price, create_date, update_date)
VALUES(1, '从你的全世界路过', 14.0000, '2020-11-21 21:26:12', '2021-03-27 22:17:39');
INSERT INTO cheetah.product_info
(id, name, price, create_date, update_date)
VALUES(2, '乔布斯传', 25.0000, '2020-11-21 21:26:42', '2021-03-27 22:17:42');
INSERT INTO cheetah.product_info
(id, name, price, create_date, update_date)
VALUES(3, 'java开发', 87.0000, '2021-03-27 22:43:31', '2021-03-27 22:43:34');

实战

项目引入的依赖比较多,为了不占用过多的篇幅,大家能够在后台回复“canal”获取项目源码!

MySQL 和 Redis 的相关装备在此不再赘述,有不明白的能够私聊阿Q

RabbitMQ 装备

@Configuration
public class RabbitMQConfig {
    public static final String CANAL_QUEUE = "canal_queue";//行列
    public static final String DIRECT_EXCHANGE = "canal";//交换机,要与canal中装备的相同
    public static final String ROUTING_KEY = "routingkey";//routing-key,要与canal中装备的相同
    /**
     * 界说行列
     **/
    @Bean
    public Queue canalQueue(){
        return new Queue(CANAL_QUEUE,true);
    }
    /**
     * 界说直连交换机
     **/
    @Bean
    public DirectExchange directExchange(){
       return new DirectExchange(DIRECT_EXCHANGE);
    }
    /**
     * 行列和交换机绑定
     **/
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(canalQueue()).to(directExchange()).with(ROUTING_KEY);
    }
}

商品信息入缓存

/**
 * 获取商品信息:
 * 先从缓存中查,假如不存在再去数据库中查,然后将数据保存到缓存中
 * @param productInfoId
 * @return
 */
@Override
public ProductInfo findProductInfo(Long productInfoId) {
	//1.从缓存中获取商品信息
	Object object = redisTemplate.opsForValue().get(REDIS_PRODUCT_KEY + productInfoId);
	if(ObjectUtil.isNotEmpty(object)){
		return (ProductInfo)object;
	}
	//2.假如缓存中不存在,从数据库获取信息
	ProductInfo productInfo = this.baseMapper.selectById(productInfoId);
	if(productInfo != null){
		//3.将商品信息缓存
		redisTemplate.opsForValue().set(REDIS_PRODUCT_KEY+productInfoId, productInfo,
				REDIS_PRODUCT_KEY_EXPIRE, TimeUnit.SECONDS);
		return productInfo;
	}
	return null;
}

履行方法后,查看 Redis 客户端是否有数据存入

实现缓存和数据库一致性方案实战:mysql+canal+rabbitmq+redis

更新数据入MQ

/**
 * 更新商品信息
 * @param productInfo
 * @return
 */
@PostMapping("/update")
public AjaxResult update(@RequestBody ProductInfo productInfo){
	productInfoService.updateById(productInfo);
	return AjaxResult.success();
}

当我履行完 update 方法的时候,去RabbitMQ Management 查看,发现并没有音讯进入行列。

问题描述

经过排查之后我在服务器中 canal 下的 /usr/local/logs/example/example.log 文件里发现了问题所在。

实现缓存和数据库一致性方案实战:mysql+canal+rabbitmq+redis

原因就是meta.dat中保存的位点信息和数据库的位点信息不一致导致 canal 抓取不到数据库的动作。

所以我找到 canal 的 conf/example/instance.properties 实例装备文件,发现没有将canal.instance.master.address=127.0.0.1:3306 设置成自己的数据库地址。

实现缓存和数据库一致性方案实战:mysql+canal+rabbitmq+redis

解决方案

  • 先停止 canal 服务的运行;
  • 删去meta.dat文件;
  • 再重启 canal,问题解决;

再次履行 update 方法,会发现 RabbitMQ Management中已经有咱们想要的数据了。

实现缓存和数据库一致性方案实战:mysql+canal+rabbitmq+redis

MQ接纳数据

编写 RabbitMQ 消费代码的逻辑

@RabbitListener(queues = "canal_queue")//监听行列名称
public void getMsg(Message message, Channel channel, String msg) throws IOException {
	long deliveryTag = message.getMessageProperties().getDeliveryTag();
	try {
		log.info("消费的行列音讯来自:" + message.getMessageProperties().getConsumerQueue());
		//删去reids中对应的key
		ProductInfoDetail productInfoDetail = JSON.parseObject(msg, ProductInfoDetail.class);
		log.info("库名:"+ productInfoDetail.getDatabase());
		log.info("表名: "+ productInfoDetail.getTable());
		if(productInfoDetail!=null && productInfoDetail.getData()!=null){
			List<ProductInfo> data = productInfoDetail.getData();
			ProductInfo productInfo = data.get(0);
			if(productInfo!=null){
				Long id = productInfo.getId();
				redisTemplate.delete(REDIS_PRODUCT_KEY+id);
				channel.basicAck(deliveryTag, true);
				return;
			}
		}
		channel.basicReject(deliveryTag ,true);
		return;
	}catch (Exception e){
		channel.basicReject(deliveryTag,false);
		e.printStackTrace();
	}
}

当咱们再次调用 update接口时,控制台会打印以下信息

实现缓存和数据库一致性方案实战:mysql+canal+rabbitmq+redis

从图中打印的信息能够看出就是咱们的库和表以及音讯行列,Redis 客户端中缓存的信息也被删去了。

拓宽

看到这,你必定会问:RabbitMQ 是阅后即焚的机制,它承认音讯被顾客消费后会立刻删去,假如此时咱们的事务还没有跑完,没来的及删去 Redis 中的缓存就宕机了,岂不是缓存一直都得不到更新了吗?

首要咱们要清晰的是 RabbitMQ 是经过顾客回执来承认顾客是否成功处理音讯的,即顾客获取音讯后,应该向 RabbitMQ 发送 ACK 回执,表明自己已经处理音讯了。

为了不让上述问题呈现,顾客回来 ACK 回执的时机就显得非常重要了, 而 SpringAMQP 也为咱们供给了三种可选的承认形式:

  • manual:手动 ack,需求在事务代码结束后,调用 api 发送 ack;
  • auto:自动 ack ,由 spring 监测 listener 代码是否呈现反常,没有反常则回来 ack,抛出反常则回来 nack;
  • none:封闭 ack,MQ 假定顾客获取音讯后会成功处理,因此音讯投递后当即被删去;

由此可知在 none 形式下音讯投递最不可靠,可能会丢掉音讯;在默许的 auto 形式下假如呈现服务器宕机的状况也是会丢掉音讯的,本次实战中,阿Q为了防止音讯丢掉选用的是 manual 这种形式,装备信息如下:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual #开启手动承认

所以在代码中也就呈现了

//用于必定承认
channel.basicAck(deliveryTag, true);
//用于否定承认
channel.basicReject(deliveryTag ,true);

当然此种形式尽管不会丢掉音讯,可是会导致功率变低。

今天的内容到这儿就结束了,从速动手体验一下吧!后台回复“canal”获取项目源码。

跪求一键三连,期望靓仔在评论区打出老铁666,鼓励一下阿Q。

好看的皮囊千人一面,有趣的灵魂万里挑一,让咱们在冷漠的城市里相互温暖,我是阿Q,咱们下期再见!

推荐阅览

实战:画了几张图,总算把OAuth2搞清楚了

重磅出击,20张图带你彻底了解ReentrantLock加锁解锁的原理

领导看了我写的封闭超时订单,让我出门左转!

看了搭档写的代码,我竟然开始默默的仿照了。。。

实战篇:断点续传?文件秒传?手撸大文件上传

参阅文章:

mp.weixin.qq.com/s?__biz=MzI…

本文正在参加「金石计划」