当前位置: 首页 > news >正文

石家庄网站建设.神鹿网络百度推广登陆首页

石家庄网站建设.神鹿网络,百度推广登陆首页,python 做网站 案例,福田大型商城网站建设前言 前面荔枝梳理了RabbitMQ中的普通队列、交换机以及相关的知识,在这篇文章中荔枝将会梳理RabbitMQ的一个重要的队列 —— 死信队列,主要了解消息流转到死信队列的三种的方式以及相应的实现demo。希望能帮助到有需要的小伙伴~~~ 文章目录 前言 死信队…

前言

        前面荔枝梳理了RabbitMQ中的普通队列、交换机以及相关的知识,在这篇文章中荔枝将会梳理RabbitMQ的一个重要的队列 —— 死信队列,主要了解消息流转到死信队列的三种的方式以及相应的实现demo。希望能帮助到有需要的小伙伴~~~


文章目录

前言

死信队列

1 基本概念 

2 设置消息时间TTL过期的死信队列

3 队列达到最大长度发生死信 

4 消息被拒引发死信

总结


死信队列

1 基本概念 

     死信就是无法被消费的消息,一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理就变成了死信,有死信自然就有了死信队列。

应用场景:为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息
消费发生异常时,将消息投入死信队列中。比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。

死信具有一定的延迟性,它可以作为延迟消息来处理。

死信出现的原因:

  • 消息TTL过期
  • 队列达到最大长度(队列满了,无法再添加数据到mq中)
  • 消息被拒绝(basic.reject或basic.nack)并且requeue=false.I 

2 设置消息时间TTL过期的死信队列

首先我们在消费者Consumer1中声明普通交换机、死信交换机、普通队列和死信队列之间的关系,同时在声明之后令Consumer1拒收消息,在RabbitMQ中观察消息生产者发出消息的流转情况。

设置死信队列的消费者1

        在死信队列中我们设置了普通交换机、死信交换机、普通队列和死信队列。同时在正常队列中通过channel信道对象中的queueDeclare方法中的一个Map类型的参数,设置了死信交换机和普通交换机之间的关系,配置好TTL、RoutingKey并声明其死信交换机。

package com.crj.rabbitmq.deadQueue;import com.crj.rabbitmq.utils.RabbitMqUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;/*** 死信队列* 消费者1:需要声明死信队列和普通队列*/
public class Consumer {//普通交换机名称public static final String NORMAL_EXCHANGE = "normal";//死信交换机名称public static final String DEAD_EXCHANGE = "dead";//普通队列的名称public static final String NORMAL_QUEUE = "normalQueue";//死信队列的名称public static final String DEAD_QUEUE = "deadQueue";public static void main(String[] args) throws Exception {//声明通道Channel channel = RabbitMqUtil.getChannel();//声明普通交换机和死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);/*** 声明普通队列和死信队列*///创建一个hashmap对象来配置连接死信队列的参数Map<String, Object> arguments = new HashMap<>();//设置过期时间arguments.put("x-message-ttl",10000);//正常队列设置死信交换机arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);//设置死信RoutingKeyarguments.put("x-dead-letter-routing-key","dead1");//声明普通队列channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);//死信队列channel.queueDeclare(DEAD_QUEUE,false,false,false,null);//绑定队列和交换机channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");channel.queueBind(DEAD_QUEUE,DEAD_QUEUE,"dead");//接收消息DeliverCallback deliverCallback = (consumerTag, message)->{System.out.println("Consumer1接收到的信息:"+new String(message.getBody(),"UTF-8"));System.out.println("接收队列:"+DEAD_QUEUE+"接收键:"+message.getEnvelope().getRoutingKey());};//消费者开始消费消息channel.basicConsume(DEAD_QUEUE,true,deliverCallback,(consumerTag)->{});}
}

需要注意的是,这里在正常队列中设置过期时间TTL一般不太常用,我们通常会在publish处设置消息的TTL,因此这里arguments对象有关 "x-message-ttl" 参数的配置可以注释掉。

实际处理消息的消费者2

在处理死信队列消息的消费者处,我们只需要设置消费者接收消息是来自死信队列即可。 

package com.crj.rabbitmq.deadQueue;import com.crj.rabbitmq.utils.RabbitMqUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;/*** 死信队列* 消费者1:需要声明死信队列和普通队列*/
public class Consumer2 {//死信队列的名称public static final String DEAD_QUEUE = "deadQueue";public static void main(String[] args) throws Exception {//声明通道Channel channel = RabbitMqUtil.getChannel();System.out.println("等待接收消息");//接收消息DeliverCallback deliverCallback = (consumerTag, message)->{System.out.println("Consumer2接收到的信息:"+new String(message.getBody(),"UTF-8"));System.out.println("接收队列:"+DEAD_QUEUE+"接收键:"+message.getEnvelope().getRoutingKey());};//消费者开始消费消息channel.basicConsume(DEAD_QUEUE,true,deliverCallback,(consumerTag)->{});}
}

​​​​生产者

在这里我们借助AMQP. BasicProperties对象的build方法来设置相应的死信TTL。

package com.crj.rabbitmq.deadQueue;import com.crj.rabbitmq.utils.RabbitMqUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;public class Publish {public static final String NORMAL_EXCHANGE = "normal";public static final String NORMAL_QUEUE = "normalQueue";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();//在Consumer已经声明过交换机了,所以在这里不能声明//死信消息,设置TTLAMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();for (int i = 0; i < 11; i++) {String message = "info"+i;channel.basicPublish(NORMAL_EXCHANGE,"normal",properties,message.getBytes());}}
}

未运行Consumer2前我们看到普通队列在我们设置的TTL:10s之后将消息流转到死信队列中。

最后启动Consumer2后确实也收到了死信队列中的消息

3 队列达到最大长度发生死信 

在这一部分中我们需要注释掉之前在生产者中设置的消息的TTL,同时在消费者1中开启正常队列的最大消息堆积容量。 

arguments.put("x-max-length",6);

 这样子我们就可以模拟队列达到最大长度后产生死信的情况了。

4 消息被拒引发死信

        要想开启消费者拒收消息的功能,首先需要在消息接收的basicConsumer方法中关闭自动应答,同时自行设置手动应答的逻辑。在下面接收消息的回调函数中,在basicAck中设置应答,在basicReject实现消息拒收。

package com.crj.rabbitmq.deadQueue;import com.crj.rabbitmq.utils.RabbitMqUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;/*** 死信队列* 消费者1:需要声明死信队列和普通队列*/
public class Consumer {//普通交换机名称public static final String NORMAL_EXCHANGE = "normal";//死信交换机名称public static final String DEAD_EXCHANGE = "dead";//普通队列的名称public static final String NORMAL_QUEUE = "normalQueue";//死信队列的名称public static final String DEAD_QUEUE = "deadQueue";public static void main(String[] args) throws Exception {//声明通道Channel channel = RabbitMqUtil.getChannel();//声明普通交换机和死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);/*** 声明普通队列和死信队列*///创建一个hashmap对象来配置连接死信队列的参数Map<String, Object> arguments = new HashMap<>();//正常队列设置死信交换机arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);//设置死信RoutingKeyarguments.put("x-dead-letter-routing-key","dead1");//声明普通队列channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);//死信队列channel.queueDeclare(DEAD_QUEUE,false,false,false,null);//绑定队列和交换机channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"dead1");System.out.println("等待接收消息》》》》》》》》》》》");//接收消息DeliverCallback deliverCallback = (consumerTag, message)->{String msg = new String(message.getBody(),"UTF-8");if (msg.equals("info5")){System.out.println("Consumer1接收的消息是:"+msg+":此消息是被拒绝的");//这里第二个参数设置了是否要将拒收的消息塞回原队列channel.basicReject(message.getEnvelope().getDeliveryTag(), false);}else {System.out.println("Consumer1接收到的信息:"+new String(message.getBody(),"UTF-8"));//成功应答,这里设置不批量操作channel.basicAck(message.getEnvelope().getDeliveryTag(), false);}};//开启手动应答//消费者开始消费消息channel.basicConsume(DEAD_QUEUE,false,deliverCallback,(consumerTag)->{});}
}

总结

        时间过期、消息被拒、队列容量限制这三个机制会引发消息被转发死信队列,那么死信队列除了在这三种情况下继续保存消息之外,还有什么作用呢?下一篇文章荔枝会梳理延时队列,相信看完下一篇文章大家能有所收获~

今朝已然成为过去,明日依然向往未来!我是荔枝,在技术成长之路上与您相伴~~~

如果博文对您有帮助的话,可以给荔枝一键三连嘿,您的支持和鼓励是荔枝最大的动力!

如果博文内容有误,也欢迎各位大佬在下方评论区批评指正!!!

http://www.rdtb.cn/news/21700.html

相关文章:

  • 襄阳营销型网站建设如何开发网站
  • 网站 做购物车在哪里做推广效果好
  • 网页设置快捷键高端网站优化公司
  • 做教育招生网站国内做网站的公司
  • wordpress笑话页面模板青岛网站建设优化
  • 什么公司做网站最好朋友圈广告投放
  • 网站建设合同报价seop
  • Wordpress 学校网站网络运营培训班多少钱
  • 宁波高端网站设计公司百度客服系统
  • wordpress代码生成器百度网盘优化
  • 做日本外贸网站设计怎么建网站赚钱
  • 做网站推销手表seo高效优化
  • 湘潭网站优化怎么让关键词快速排名首页
  • 小程序开发平台多少钱惠州seo管理
  • 榆林做网站的公司软件开发培训机构
  • 做名宿比较好的网站全自动引流推广软件免费
  • 国外政府网站建设网络营销的基本功能
  • 做dota2菠菜网站提升seo搜索排名
  • 建筑效果图网站推荐seo外包是什么
  • 阿里云esc 可以做几个网站bt磁力种子搜索引擎
  • 网站导航栏隐藏部分怎么做百度广告联盟网站
  • 10个产品设计成功案例seo搜索优化推广
  • 天门建设局官方网站网络营销案例ppt
  • 设计网站 f百度入驻商家
  • 南部 网站 建设电商怎么做?如何从零开始学做电商赚钱
  • 建独立的网站网络推广计划方案
  • 重庆二级站seo整站优化排名广告
  • 西安专业网站建设价格上海公布最新情况
  • 做中医诊所网站外链管理
  • 莆田网站建设百度帐号登录入口