博客
关于我
电商项目——消息队列——第十一章——中篇
阅读量:326 次
发布时间:2019-03-04

本文共 25607 字,大约阅读时间需要 85 分钟。

文章目录

1:RabbitMQ简介

  • 概述

    在这里插入图片描述
    我们来讲一下队列和主题中的点对点式和发布订阅式的概念
    在这里插入图片描述

  • 消息代理

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

  • 分布式中的使用场景:

  1. 异步处理

    假设我们注册用户信息到数据库,分为3步,第一步,我们先将注册的信息保存到数据库中花费50ms,第二步,发送邮件告诉注册者注册成功花费50ms,第三步,还有发送通知短信告诉注册者又花费50ms,这是一个同步模式,用户注册完到用户看到注册完的提示信息后,我们花费了150ms(就是第一张图);其实我们可以把发送邮件告诉注册者注册成功和送通知短信告诉注册者启动一个异步任务,这样就可以大大缩短花费时间(第二张图);但是实际上我们连异步都不要,我们可以将在注册到数据库上成功的消息保存在消息队列里面,至此我们直接给用户返回(消息队列的存取速度很快),这样我们实际只要花费55ms,其他服务在去消息队列中取消息
    在这里插入图片描述

  2. 应用解耦

    我们以下一个订单为例,我们下了一个订单,库存要执行出库操作,比如下订单的方法api传入三个参数,调库存方法存入五个参数,如果我们的库存服务一升级,订单服务就要修改源代码重新部署,我们就可以引入消息队列,我们给订单系统下完订单以后,我们就可以给消息队列中存入消息,库存消息不关系库存的接口是什么样,只需要实时的订阅消息队列里面的内容,只要有内容库存系统就可以实时收到消息,然后就可以得到什么样的消息,减去什么样的订单;我们下完订单以后,库存系统无需再知道订单系统的接口就可以完成操作了,这就是应用解耦;
    在这里插入图片描述

  3. 流量监控(流量削峰)

    对于一些秒杀商品来说,瞬间流量会非常大,如果一百万个请求同时进入业务代码可呢会导致机器宕机,我们可以将大并发的请求进来存取到消息队列中,后台的业务处理就不着急调用,后台根据它的能力来进行消费
    在这里插入图片描述

2:RabbitMQ工作流程

RabbitMQ概念

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3:RabbitMQ安装

在这里插入图片描述

#  rabbitmq安装命令docker run --name rabbitmq \ -p 5671:5671 -p 5672:5672 \ -p 4369:4369 -p 25672:25672 \ -p 15671:15671 \ -p 15672:15672 \ rabbitmq:management

安装完以后,我们就可以使用虚拟机ip+15672(web管理后台端口)来打开rabbitmq的图形化界面

在这里插入图片描述

4:Exchange类型

前面我们安装好了rabbitmq,现在我们就来测试rabbitmq的使用,我们来看下Exchanges和Queues是怎么工作的

我们先来看一下RabbitMQ的运行机制
在这里插入图片描述
在这里插入图片描述
Exchange的四种类型介绍
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

  • 发消息是发给交换机,监听消息是来监听队列
  • 我们接下来就可以在整个rabbitmq的图形化界面进行测试
    创建一个交换机
    在这里插入图片描述
    创建一个队列
    在这里插入图片描述
    交换机与队列进行绑定
    在这里插入图片描述

5:Direct-Exchange

在这里插入图片描述

我们来创建出上面的三个交换机和四个队列来进行演示
大家可以根据上面三种交换机的介绍自行测试

6:Fanout-Exchange

大家可以根据上面三种交换机的介绍自行测试

7:Topic-Exchange

大家可以根据上面三种交换机的介绍自行测试

8:SpringBoot整合RabbitMQ

SpringBoot整合RabbitMQ三步曲

在这里插入图片描述
第一步:在mall-order中导入如下依赖

org.springframework.boot
spring-boot-starter-amqp
/** * 使用RabbitMQ * 1:引入amqp场景:RabbitAutoConfiguration就会自动生效 * 2:RabbitAutoConfiguration就会自动生效给容器中自动配置了 * CachingConnectionFactory  RabbitTemplateConfigurer  RabbitTemplate RabbitMessagingTemplate amqpAdmin * // @Bean public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties properties, 上面代码可以知道我们要去application.yml种进行配置所有的属性都是在下面的类中绑定 @ConfigurationProperties(prefix = "spring.rabbitmq") public class RabbitProperties { 3:给配置文件中配置spring.rabbitmq信息 *  4:@EnableRabbit :这种注解用到了非常多开启相关功能 */

我们接下来就在mall-order中的test下进行测试

第二步:给配置文件中配置spring.rabbitmq信息(我配置在了nacos config)

spring.rabbitmq.host=192.168.56.10spring.rabbitmq.port=5672spring.rabbitmq.virtual-host=\

第三步:添加@EnableRabbit :这种注解用到了非常多开启相关功能

@EnableRabbit@EnableDiscoveryClient@SpringBootApplicationpublic class MallOrderApplication {   	public static void main(String[] args) {   		SpringApplication.run(MallOrderApplication.class, args);	}}

9:AmqpAdmin使用

创建一个交换机

@Test	void createExchange() {   		DirectExchange directExchange = new DirectExchange("hello-java-exchange", true, false);		amqpAdmin.declareExchange(directExchange);		log.info("交换机:hello-java-exchange【】创建成功");	}
交换机:hello-java-exchange【】创建成功

在这里插入图片描述

创建一个队列

@Test	void createQueue() {   //true if we are declaring an exclusive queue (the queue will only be used by the declarer's	 * connection)		Queue queue = new Queue("hello-java-queue", true, false, false);		amqpAdmin.declareQueue(queue);		log.info("队列:hello-java-queue【】创建成功");	}
队列:hello-java-queue【】创建成功

在这里插入图片描述

创建一个交换机和队列的绑定关系

在这里插入图片描述

@Test	void createBinding() {   		Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE, "hello-java-exchange", "hello-java-queue", null);		amqpAdmin.declareBinding(binding);		log.info("绑定关系:hello-java-binding【】创建成功");	}
绑定关系:hello-java-binding【】创建成功

在这里插入图片描述

大家可以去看amqpAdmin的源码就可以自己动手写出来啦

10:RabbitTemplate使用

发送消息

@Test	void sendMessageTest(){   		rabbitTemplate.convertAndSend("hello-java-exchange","hello-java-queue","消息发送成功给hello-java-queue");		log.info("发送消息成功");	}
发送消息成功

在这里插入图片描述

在这里插入图片描述
如果没有配置如下的config,发送消息给rabbitmq是序列化的格式
在这里插入图片描述

要想完成发送消息给rabbitmq是json格式,我们必须做如下配置

package com.atstudying.mall.order.config;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;import org.springframework.amqp.support.converter.MessageConverter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * @author Mr.zhneg * @create 2020-11-20-1:28 */@Configurationpublic class MyRabbitConfig {       /**     * 发送消息可以转化为json     * @return     */    @Bean    public MessageConverter messageConverter(){           return new Jackson2JsonMessageConverter();    }}

在这里插入图片描述

11:RabbitListener&RabbitHandler接受消息

监听消息队列里面的消息

mall-order
OrderItemServiceImpl.java

@RabbitListener(queues="hello-java-queue")    /**     * 参数可以写以下类型     * 1:Message message:原生消息详细信息。头+体     * 2:T
<发送消息的类型>
OrderReturnReasonEntity contents * 3:Channel channel:当前传输数据的通道 */ //Object可以获取到我们接受到的消息 public void receiveMessage(Object message){ System.out.println("接受到的消息。。内容:"+message+"==>类型"+message.getClass()); }

在这里插入图片描述

Queue:可以让很多人来监听。只要收到消息,队列删除消息,而且只可以有一个收到此消息
1)订单服务启动多个:同一个消息,只可以有一个客户端收到
2)只有一个消息完全处理完,方法运行结束,我们就可以收到下一个消息

@RabbitListener:类+方法上(监听哪些队列即可)  @RabbitHandler:标在方法上(重载区分不同的消息)

下面给出一个示例来说明 @RabbitListener:类+方法上(监听哪些队列即可)和@RabbitHandler:标在方法上(重载区分不同的消息)的用法

发送消息给消息队列的代码

@Test	void sendMessageTest(){   		OrderEntity orderEntity=new OrderEntity();		orderEntity.setOrderSn("dd");		orderEntity.setReceiverRegion("ddd");		OrderItemEntity orderItemEntity=new OrderItemEntity();		orderItemEntity.setSkuQuantity(2);		orderItemEntity.setSpuBrand("牛逼");		for (int i=0;i<10;i++){   			if (i%2==0){   				rabbitTemplate.convertAndSend("hello-java-exchange","hello-java-queue",orderItemEntity);			}			rabbitTemplate.convertAndSend("hello-java-exchange","hello-java-queue",orderEntity);		}		log.info("发送消息成功");	}
发送消息成功

监听消息队列,并根据有效负载类型准确地选择一个方法( @RabbitHandler

@RabbitListener(queues="hello-java-queue")@Service("orderItemService")public class OrderItemServiceImpl extends ServiceImpl
implements OrderItemService { @RabbitHandler public void receiveMessage1(OrderEntity message){ System.out.println("111"); System.out.println("接受到的消息。。内容:"+message+"==>类型"+message.getClass()); } @RabbitHandler public void receiveMessage2(OrderItemEntity message){ System.out.println("222"); System.out.println("接受到的消息。。内容:"+message+"==>类型"+message.getClass()); }
222接受到的消息。。内容:OrderItemEntity(id=null, orderId=null, orderSn=null, spuId=null, spuName=null, spuPic=null, spuBrand=牛逼, categoryId=null, skuId=null, skuName=null, skuPic=null, skuPrice=null, skuQuantity=2, skuAttrsVals=null, promotionAmount=null, couponAmount=null, integrationAmount=null, realAmount=null, giftIntegration=null, giftGrowth=null)==>类型class com.atstudying.mall.order.entity.OrderItemEntity111接受到的消息。。内容:OrderEntity(id=null, memberId=null, orderSn=dd, couponId=null, createTime=null, memberUsername=null, totalAmount=null, payAmount=null, freightAmount=null, promotionAmount=null, integrationAmount=null, couponAmount=null, discountAmount=null, payType=null, sourceType=null, status=null, deliveryCompany=null, deliverySn=null, autoConfirmDay=null, integration=null, growth=null, billType=null, billHeader=null, billContent=null, billReceiverPhone=null, billReceiverEmail=null, receiverName=null, receiverPhone=null, receiverPostCode=null, receiverProvince=null, receiverCity=null, receiverRegion=ddd, receiverDetailAddress=null, note=null, confirmStatus=null, deleteStatus=null, useIntegration=null, paymentTime=null, deliveryTime=null, receiveTime=null, commentTime=null, modifyTime=null)==>类型class com.atstudying.mall.order.entity.OrderEntity111接受到的消息。。内容:OrderEntity(id=null, memberId=null, orderSn=dd, couponId=null, createTime=null, memberUsername=null, totalAmount=null, payAmount=null, freightAmount=null, promotionAmount=null, integrationAmount=null, couponAmount=null, discountAmount=null, payType=null, sourceType=null, status=null, deliveryCompany=null, deliverySn=null, autoConfirmDay=null, integration=null, growth=null, billType=null, billHeader=null, billContent=null, billReceiverPhone=null, billReceiverEmail=null, receiverName=null, receiverPhone=null, receiverPostCode=null, receiverProvince=null, receiverCity=null, receiverRegion=ddd, receiverDetailAddress=null, note=null, confirmStatus=null, deleteStatus=null, useIntegration=null, paymentTime=null, deliveryTime=null, receiveTime=null, commentTime=null, modifyTime=null)==>类型class com.atstudying.mall.order.entity.OrderEntity222接受到的消息。。内容:OrderItemEntity(id=null, orderId=null, orderSn=null, spuId=null, spuName=null, spuPic=null, spuBrand=牛逼, categoryId=null, skuId=null, skuName=null, skuPic=null, skuPrice=null, skuQuantity=2, skuAttrsVals=null, promotionAmount=null, couponAmount=null, integrationAmount=null, realAmount=null, giftIntegration=null, giftGrowth=null)==>类型class com.atstudying.mall.order.entity.OrderItemEntity111接受到的消息。。内容:OrderEntity(id=null, memberId=null, orderSn=dd, couponId=null, createTime=null, memberUsername=null, totalAmount=null, payAmount=null, freightAmount=null, promotionAmount=null, integrationAmount=null, couponAmount=null, discountAmount=null, payType=null, sourceType=null, status=null, deliveryCompany=null, deliverySn=null, autoConfirmDay=null, integration=null, growth=null, billType=null, billHeader=null, billContent=null, billReceiverPhone=null, billReceiverEmail=null, receiverName=null, receiverPhone=null, receiverPostCode=null, receiverProvince=null, receiverCity=null, receiverRegion=ddd, receiverDetailAddress=null, note=null, confirmStatus=null, deleteStatus=null, useIntegration=null, paymentTime=null, deliveryTime=null, receiveTime=null, commentTime=null, modifyTime=null)==>类型class com.atstudying.mall.order.entity.OrderEntity111接受到的消息。。内容:OrderEntity(id=null, memberId=null, orderSn=dd, couponId=null, createTime=null, memberUsername=null, totalAmount=null, payAmount=null, freightAmount=null, promotionAmount=null, integrationAmount=null, couponAmount=null, discountAmount=null, payType=null, sourceType=null, status=null, deliveryCompany=null, deliverySn=null, autoConfirmDay=null, integration=null, growth=null, billType=null, billHeader=null, billContent=null, billReceiverPhone=null, billReceiverEmail=null, receiverName=null, receiverPhone=null, receiverPostCode=null, receiverProvince=null, receiverCity=null, receiverRegion=ddd, receiverDetailAddress=null, note=null, confirmStatus=null, deleteStatus=null, useIntegration=null, paymentTime=null, deliveryTime=null, receiveTime=null, commentTime=null, modifyTime=null)==>类型class com.atstudying.mall.order.entity.OrderEntity222接受到的消息。。内容:OrderItemEntity(id=null, orderId=null, orderSn=null, spuId=null, spuName=null, spuPic=null, spuBrand=牛逼, categoryId=null, skuId=null, skuName=null, skuPic=null, skuPrice=null, skuQuantity=2, skuAttrsVals=null, promotionAmount=null, couponAmount=null, integrationAmount=null, realAmount=null, giftIntegration=null, giftGrowth=null)==>类型class com.atstudying.mall.order.entity.OrderItemEntity111接受到的消息。。内容:OrderEntity(id=null, memberId=null, orderSn=dd, couponId=null, createTime=null, memberUsername=null, totalAmount=null, payAmount=null, freightAmount=null, promotionAmount=null, integrationAmount=null, couponAmount=null, discountAmount=null, payType=null, sourceType=null, status=null, deliveryCompany=null, deliverySn=null, autoConfirmDay=null, integration=null, growth=null, billType=null, billHeader=null, billContent=null, billReceiverPhone=null, billReceiverEmail=null, receiverName=null, receiverPhone=null, receiverPostCode=null, receiverProvince=null, receiverCity=null, receiverRegion=ddd, receiverDetailAddress=null, note=null, confirmStatus=null, deleteStatus=null, useIntegration=null, paymentTime=null, deliveryTime=null, receiveTime=null, commentTime=null, modifyTime=null)==>类型class com.atstudying.mall.order.entity.OrderEntity

12:可靠投递-发送端确认

RabbitMQ消息确认机制-可靠抵达

  • 消息确认机制就是为了保证我们消息的可靠抵达;在分布式系统中有非常多的微服务,他们都要连接上我们消息队列的服务器,并且去监听队列,可呢会由于网络抖动的原因,包括MQ的服务器宕机,已经发消息的生产者,和监听消息的消费者的宕机都可呢导致消息丢失;在一定的关键环节都要保证我们的消息不丢失,比如订单消息发送出去该要扣库存的,加积分的这些消息千万不可以丢(经济问题)我们指定的路由键有问题,或者我在投递到queue的时候,队列被其他客户端删除
  • 可靠抵达分为两种模式处理,一种是发送端的publisher模式,一种是消费端的ack机制
    在这里插入图片描述
    在这里插入图片描述
    下面就是可靠消息抵达交换机的ConfirmCallback模式演示
    在mall-order中的application.yml进行配置
#开启发送端消息抵达交换机的确认spring.rabbitmq.publisher-confirm-type=correlated

编写如下代码

MyRabbitConfig

@Configurationpublic class MyRabbitConfig {     /**     * 定制RabbitTemplate     * 1:服务器收到消息就回调     *  1:spring.rabbitmq.publisher-confirm-type=correlated     *  2:设置确认回调setConfirmCallback     */    @PostConstruct//对象创建完以后,执行这个方法    public void initRabbitTemplate(){           //设置确认回调        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {               /**             *1:只要消息抵达Broker就ack=true             * @param correlationData:当前消息唯一的关联数据(这个是消息的唯一id             * @param ack 消息是否成功收到             * @param cause 失败原因             */            @Override            public void confirm(CorrelationData correlationData, boolean ack, String cause) {                   System.out.println("CorrelationData:"+correlationData+"===>ack:"+ack+"==>cause:"+cause);            }        });    }

我们还是使用上一节的测试代码进行测试

测试

CorrelationData:null===>ack:true==>cause:nullCorrelationData:null===>ack:true==>cause:nullCorrelationData:null===>ack:true==>cause:nullCorrelationData:null===>ack:true==>cause:nullCorrelationData:null===>ack:true==>cause:nullCorrelationData:null===>ack:true==>cause:nullCorrelationData:null===>ack:true==>cause:nullCorrelationData:null===>ack:true==>cause:nullCorrelationData:null===>ack:true==>cause:nullCorrelationData:null===>ack:true==>cause:nullCorrelationData:null===>ack:true==>cause:nullCorrelationData:null===>ack:true==>cause:nullCorrelationData:null===>ack:true==>cause:nullCorrelationData:null===>ack:true==>cause:null2020-11-20 11:32:26.833  INFO 6112 --- [           main] c.a.m.order.MallOrderApplicationTests    : 发送消息成功CorrelationData:null===>ack:true==>cause:null2020-11-20 11:32:26.888  INFO 6112 --- [extShutdownHook] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.111接受到的消息。。内容:OrderEntity(id=null, memberId=null, orderSn=dd, couponId=null, createTime=null, memberUsername=null, totalAmount=null, payAmount=null, freightAmount=null, promotionAmount=null, integrationAmount=null, couponAmount=null, discountAmount=null, payType=null, sourceType=null, status=null, deliveryCompany=null, deliverySn=null, autoConfirmDay=null, integration=null, growth=null, billType=null, billHeader=null, billContent=null, billReceiverPhone=null, billReceiverEmail=null, receiverName=null, receiverPhone=null, receiverPostCode=null, receiverProvince=null, receiverCity=null, receiverRegion=ddd, receiverDetailAddress=null, note=null, confirmStatus=null, deleteStatus=null, useIntegration=null, paymentTime=null, deliveryTime=null, receiveTime=null, commentTime=null, modifyTime=null)==>类型class com.atstudying.mall.order.entity.OrderEntity222接受到的消息。。内容:OrderItemEntity(id=null, orderId=null, orderSn=null, spuId=null, spuName=null, spuPic=null, spuBrand=牛逼, categoryId=null, skuId=null, skuName=null, skuPic=null, skuPrice=null, skuQuantity=2, skuAttrsVals=null, promotionAmount=null, couponAmount=null, integrationAmount=null, realAmount=null, giftIntegration=null, giftGrowth=null)==>类型class com.atstudying.mall.order.entity.OrderItemEntity222接受到的消息。。内容:OrderItemEntity(id=null, orderId=null, orderSn=null, spuId=null, spuName=null, spuPic=null, spuBrand=牛逼, categoryId=null, skuId=null, skuName=null, skuPic=null, skuPrice=null, skuQuantity=2, skuAttrsVals=null, promotionAmount=null, couponAmount=null, integrationAmount=null, realAmount=null, giftIntegration=null, giftGrowth=null)==>类型class com.atstudying.mall.order.entity.OrderItemEntity111接受到的消息。。内容:OrderEntity(id=null, memberId=null, orderSn=dd, couponId=null, createTime=null, memberUsername=null, totalAmount=null, payAmount=null, freightAmount=null, promotionAmount=null, integrationAmount=null, couponAmount=null, discountAmount=null, payType=null, sourceType=null, status=null, deliveryCompany=null, deliverySn=null, autoConfirmDay=null, integration=null, growth=null, billType=null, billHeader=null, billContent=null, billReceiverPhone=null, billReceiverEmail=null, receiverName=null, receiverPhone=null, receiverPostCode=null, receiverProvince=null, receiverCity=null, receiverRegion=ddd, receiverDetailAddress=null, note=null, confirmStatus=null, deleteStatus=null, useIntegration=null, paymentTime=null, deliveryTime=null, receiveTime=null, commentTime=null, modifyTime=null)==>类型class com.atstudying.mall.order.entity.OrderEntity111接受到的消息。。内容:OrderEntity(id=null, memberId=null, orderSn=dd, couponId=null, createTime=null, memberUsername=null, totalAmount=null, payAmount=null, freightAmount=null, promotionAmount=null, integrationAmount=null, couponAmount=null, discountAmount=null, payType=null, sourceType=null, status=null, deliveryCompany=null, deliverySn=null, autoConfirmDay=null, integration=null, growth=null, billType=null, billHeader=null, billContent=null, billReceiverPhone=null, billReceiverEmail=null, receiverName=null, receiverPhone=null, receiverPostCode=null, receiverProvince=null, receiverCity=null, receiverRegion=ddd, receiverDetailAddress=null, note=null, confirmStatus=null, deleteStatus=null, useIntegration=null, paymentTime=null, deliveryTime=null, receiveTime=null, commentTime=null, modifyTime=null)==>类型class com.atstudying.mall.order.entity.OrderEntity111接受到的消息。。内容:OrderEntity(id=null, memberId=null, orderSn=dd, couponId=null, createTime=null, memberUsername=null, totalAmount=null, payAmount=null, freightAmount=null, promotionAmount=null, integrationAmount=null, couponAmount=null, discountAmount=null, payType=null, sourceType=null, status=null, deliveryCompany=null, deliverySn=null, autoConfirmDay=null, integration=null, growth=null, billType=null, billHeader=null, billContent=null, billReceiverPhone=null, billReceiverEmail=null, receiverName=null, receiverPhone=null, receiverPostCode=null, receiverProvince=null, receiverCity=null, receiverRegion=ddd, receiverDetailAddress=null, note=null, confirmStatus=null, deleteStatus=null, useIntegration=null, paymentTime=null, deliveryTime=null, receiveTime=null, commentTime=null, modifyTime=null)==>类型class com.atstudying.mall.order.entity.OrderEntity111

下面就是可靠消息抵达交换机的ConfirmCallback模式演示

在这里插入图片描述

在mall-order中的application.yml进行配置

# 开启发送端消息抵达队列的确认spring.rabbitmq.publisher-returns=true#只要抵达队列,以异步发送优先回调我们这个returnsspring.rabbitmq.template.mandatory=true

编写如下代码

MyRabbitConfig

/**     * 定制RabbitTemplate     * 1:服务器收到消息就回调     *     1:spring.rabbitmq.publisher-confirm-type=correlated     *    2:设置确认回调setConfirmCallback     *  2:消息抵达队列进行回调     *    1:spring.rabbitmq.publisher-returns=true     #只要抵达队列,以异步发送优先回调我们这个returns     spring.rabbitmq.template.mandatory=true           2:设置确回调setReturnCallback     */    @PostConstruct//对象创建完以后,执行这个方法    public void initRabbitTemplate(){                  rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {               /**             * 只要消息没有投递给指定队列,就触发这失败回调             * @param message 投递失败的消息信息             * @param replyCode 回复状态码             * @param replyText 回复的文本内容             * @param exchange 当时的交换机             * @param routingKey 当时的路由键             */            @Override            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {               }        });    }

我们修改上一小节的测试代码:路由键变成hello-java11-queue看看是否会触发ReturnCallback

@Test	void sendMessageTest(){   		OrderEntity orderEntity=new OrderEntity();		orderEntity.setOrderSn("dd");		orderEntity.setReceiverRegion("ddd");		OrderItemEntity orderItemEntity=new OrderItemEntity();		orderItemEntity.setSkuQuantity(2);		orderItemEntity.setSpuBrand("牛逼");		for (int i=0;i<10;i++){   			if (i%2==0){   				rabbitTemplate.convertAndSend("hello-java-exchange","hello-java-queue",orderItemEntity);			}			rabbitTemplate.convertAndSend("hello-java-exchange","hello-java11-queue",orderEntity);		}		log.info("发送消息成功");	}

测试:

CorrelationData:null===>ack:true==>cause:nullMessage:(Body:'{"id":null,"memberId":null,"orderSn":"dd","couponId":null,"createTime":null,"memberUsername":null,"totalAmount":null,"payAmount":null,"freightAmount":null,"promotionAmount":null,"integrationAmount":null,"couponAmount":null,"discountAmount":null,"payType":null,"sourceType":null,"status":null,"deliveryCompany":null,"deliverySn":null,"autoConfirmDay":null,"integration":null,"growth":null,"billType":null,"billHeader":null,"billContent":null,"billReceiverPhone":null,"billReceiverEmail":null,"receiverName":null,"receiverPhone":null,"receiverPostCode":null,"receiverProvince":null,"receiverCity":null,"receiverRegion":"ddd","receiverDetailAddress":null,"note":null,"confirmStatus":null,"deleteStatus":null,"useIntegration":null,"paymentTime":null,"deliveryTime":null,"receiveTime":null,"commentTime":null,"modifyTime":null}' MessageProperties [headers={__TypeId__=com.atstudying.mall.order.entity.OrderEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])===>replyCode:312==>replyText:NO_ROUTE===>exchangehello-java-exchange==>routingKeyhello-java11-queueCorrelationData:null===>ack:true==>cause:nullMessage:(Body:'{"id":null,"memberId":null,"orderSn":"dd","couponId":null,"createTime":null,"memberUsername":null,"totalAmount":null,"payAmount":null,"freightAmount":null,"promotionAmount":null,"integrationAmount":null,"couponAmount":null,"discountAmount":null,"payType":null,"sourceType":null,"status":null,"deliveryCompany":null,"deliverySn":null,"autoConfirmDay":null,"integration":null,"growth":null,"billType":null,"billHeader":null,"billContent":null,"billReceiverPhone":null,"billReceiverEmail":null,"receiverName":null,"receiverPhone":null,"receiverPostCode":null,"receiverProvince":null,"receiverCity":null,"receiverRegion":"ddd","receiverDetailAddress":null,"note":null,"confirmStatus":null,"deleteStatus":null,"useIntegration":null,"paymentTime":null,"deliveryTime":null,"receiveTime":null,"commentTime":null,"modifyTime":null}' MessageProperties [headers={__TypeId__=com.atstudying.mall.order.entity.OrderEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])===>replyCode:312==>replyText:NO_ROUTE===>exchangehello-java-exchange==>routingKeyhello-java11-queueCorrelationData:null===>ack:true==>cause:nullCorrelationData:null===>ack:true==>cause:null2020-11-20 19:07:38.783  INFO 33564 --- [           main] c.a.m.order.MallOrderApplicationTests    : 发送消息成功CorrelationData:null===>ack:true==>cause:null//..

我们的发送消息成功会出现在如图的位置,就是我们如下配置起作用的效果

#只要抵达队列,以异步发送优先回调我们这个returns
spring.rabbitmq.template.mandatory=true

13:可靠投递-消费端确认

在这里插入图片描述

  • Ack消息确认机制:一旦我们的消费者从队列里面获取消息,他就会告诉我们mq服务器,我们已经收到货物了,可以叫mq服务器进行删除

我们开启了手动确认模式,如果我们没有确认的消息都不会进行处理;开启了手动模式,只要监听队列的消费端没有接收(ack),我们未接收的消息不会丢失,而是会从unacked状态到ready状态,等到下一次消费端再次进行消费

#手动确认接收消息spring.rabbitmq.listener.simple.acknowledge-mode=manual

在这里插入图片描述

在这里插入图片描述

3:消费端确认(保证每一个消息被正确消费,此时才可以broker删除这个消息)                  1:默认是自动确认的,只要消息接收到,消费端会自动确认,服务器就会移除这个消息     #手动确认接收消息     spring.rabbitmq.listener.simple.acknowledge-mode=manual                 问题:我们(消费端)可以收到很多消息,自动回复给服务器ack,只有一个消息处理成功了,宕机了,发送消息丢失                  解决:手动模式:我们必须要手动确认接收消息,防止消息丢失(只要我们没有明确告诉mq,货物被签收(unacked)。即使                Consumer宕机。消息也不会丢失,会重新变为Ready,下一次有新的consumer连接进行就会发给它             2:如何签收     channel.basicAck(deliveryTag,false);签收:业务成功完成就应该签收     channel.basicNack(deliveryTag,false,false);拒签:业务失败,拒签

代码演示:

mall-order
生产者发送代码

@GetMapping ("/test")    public String test(){        OrderEntity orderEntity=new OrderEntity();        orderEntity.setOrderSn("dd");        orderEntity.setReceiverRegion("ddd");        OrderItemEntity orderItemEntity=new OrderItemEntity();        orderItemEntity.setSkuQuantity(2);        orderItemEntity.setSpuBrand("牛逼");        for (int i=0;i<10;i++){            if (i%2==0){                rabbitTemplate.convertAndSend("hello-java-exchange","hello-java-queue",orderItemEntity);            }            rabbitTemplate.convertAndSend("hello-java-exchange","hello-java11-queue",orderEntity);        }        log.info("发送消息成功");        return "ok";    }

消费端监听队列,获取消息代码

@RabbitHandler    public void receiveMessage2(OrderItemEntity content, Message message, com.rabbitmq.client.Channel channel) throws IOException {           System.out.println("222");        //消息头属性信息        MessageProperties messageProperties = message.getMessageProperties();        //channel内按顺序自增        long deliveryTag = message.getMessageProperties().getDeliveryTag();        System.out.println("deliveryTag:"+deliveryTag);        System.out.println("接受到的消息。。内容:"+content+"==>类型"+content.getClass());        //签收货物,非批量模式        try {               if (deliveryTag%2==0){                   channel.basicAck(deliveryTag,false);                System.out.println("签收了货物:"+deliveryTag);            }else {                   channel.basicNack(deliveryTag,false,false);                System.out.println("没有签收了货物:"+deliveryTag);            }        }catch (Exception e){               //网络中断        }    }
222deliveryTag:1接受到的消息。。内容:OrderItemEntity(id=null, orderId=null, orderSn=null, spuId=null, spuName=null, spuPic=null, spuBrand=牛逼, categoryId=null, skuId=null, skuName=null, skuPic=null, skuPrice=null, skuQuantity=2, skuAttrsVals=null, promotionAmount=null, couponAmount=null, integrationAmount=null, realAmount=null, giftIntegration=null, giftGrowth=null)==>类型class com.atstudying.mall.order.entity.OrderItemEntity没有签收了货物:1222deliveryTag:2接受到的消息。。内容:OrderItemEntity(id=null, orderId=null, orderSn=null, spuId=null, spuName=null, spuPic=null, spuBrand=牛逼, categoryId=null, skuId=null, skuName=null, skuPic=null, skuPrice=null, skuQuantity=2, skuAttrsVals=null, promotionAmount=null, couponAmount=null, integrationAmount=null, realAmount=null, giftIntegration=null, giftGrowth=null)==>类型class com.atstudying.mall.order.entity.OrderItemEntity签收了货物:2

结合消费端的确认消息机制和发送端的确认消息机制,就可以保证消息最终一定发出去,和消息最终一定接受到

转载地址:http://dkhq.baihongyu.com/

你可能感兴趣的文章
MySQL: Host '127.0.0.1' is not allowed to connect to this MySQL server
查看>>
Mysql: 对换(替换)两条记录的同一个字段值
查看>>
mysql:Can‘t connect to local MySQL server through socket ‘/var/run/mysqld/mysqld.sock‘解决方法
查看>>
MYSQL:基础——3N范式的表结构设计
查看>>
MYSQL:基础——触发器
查看>>
Mysql:连接报错“closing inbound before receiving peer‘s close_notify”
查看>>
mysqlbinlog报错unknown variable ‘default-character-set=utf8mb4‘
查看>>
mysqldump 参数--lock-tables浅析
查看>>
mysqldump 导出中文乱码
查看>>
mysqldump 导出数据库中每张表的前n条
查看>>
mysqldump: Got error: 1044: Access denied for user ‘xx’@’xx’ to database ‘xx’ when using LOCK TABLES
查看>>
Mysqldump参数大全(参数来源于mysql5.5.19源码)
查看>>
mysqldump备份时忽略某些表
查看>>
mysqldump实现数据备份及灾难恢复
查看>>
mysqldump数据库备份无法进行操作只能查询 --single-transaction
查看>>
mysqldump的一些用法
查看>>
mysqli
查看>>
MySQLIntegrityConstraintViolationException异常处理
查看>>
mysqlreport分析工具详解
查看>>
MySQLSyntaxErrorException: Unknown error 1146和SQLSyntaxErrorException: Unknown error 1146
查看>>