当前位置: 首页 > news >正文

洛阳营销型网站建设品牌网络推广运营公司

洛阳营销型网站建设,品牌网络推广运营公司,广州网站建设网页制作开发,门户网站 建设方案RabbitMQ的DLX 1、RabbitMQ死信队列2、代码示例2.1、队列过期2.1.1、配置类RabbitConfig(关键代码)2.1.2、业务类MessageService2.1.3、配置文件application.yml2.1.4、启动类2.1.5、配置文件2.1.6、测试 2.2、消息过期2.2.1、配置类RabbitConfig2.2.2、…

RabbitMQ的DLX

  • 1、RabbitMQ死信队列
  • 2、代码示例
    • 2.1、队列过期
      • 2.1.1、配置类RabbitConfig(关键代码)
      • 2.1.2、业务类MessageService
      • 2.1.3、配置文件application.yml
      • 2.1.4、启动类
      • 2.1.5、配置文件
      • 2.1.6、测试
    • 2.2、消息过期
      • 2.2.1、配置类RabbitConfig
      • 2.2.2、业务类MessageService(关键代码)
      • 2.2.3、配置文件application.yml
      • 2.2.4、启动类同上
      • 2.2.5、配置文件同上
      • 2.2.6、测试
    • 2.3、队列达到最大长度(先入队的消息会被发送到DLX)
      • 2.3.1、配置类RabbitConfig(关键代码)
      • 2.3.2、业务类MessageService(关键代码)
      • 2.3.3、配置文件application.yml
      • 2.3.4、启动类同上
      • 2.3.5、配置文件pom.xml同上
      • 2.3.6、测试
    • 2.4、消费者拒绝消息不进行重新投递
      • 2.4.1、生产者
        • 2.4.1.1、生产者application.yml
        • 2.4.1.2、生产者发送消息
        • 2.4.1.3、生产者配置类
      • 2.4.2、消费者
        • 2.4.2.1、消费者application.yml 启动手动确认
          • 关键配置
        • 2.4.2.2、消费者接收消息
          • 关键代码
      • 2.4.3、测试

1、RabbitMQ死信队列

RabbitMQ死信队列也有叫 死信交换机、死信邮箱等说法。
DLX: Dead-Letter-Exchange 死信交换器,死信邮箱。
在这里插入图片描述1-2、生产者发送一个消息到正常交换机
2-4、正常交换机接收到消息发送到正常队列
4、正常队列设置了队列过期时间,超时消息会自动删除
4-6、原本过期自动删除的消息发送到了死信交换机
6-8、死信交换机将消息发送到了死信队列

如上情况下一个消息会进入DLX(Dead Letter Exchange)死信交换机。

2、代码示例

2.1、队列过期

2.1.1、配置类RabbitConfig(关键代码)

package com.power.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {@Value("${my.exchangeNormalName}")private String exchangeNormalName;@Value("${my.queueNormalName}")private String queueNormalName;@Value("${my.exchangeDlxName}")private String exchangeDlxName;@Value("${my.queueDlxName}")private String queueDlxName;/*** 正常交换机* @return*/@Beanpublic DirectExchange normalExchange(){return ExchangeBuilder.directExchange(exchangeNormalName).build();}/*** 正常队列* @return*/@Beanpublic Queue normalQueue(){Map<String, Object> arguments = new HashMap<>();arguments.put("x-message-ttl",20000);//设置队列的过期时间为20秒//重点:设置这两个参数arguments.put("x-dead-letter-exchange",exchangeDlxName);    //设置队列的死信交换机arguments.put("x-dead-letter-routing-key","error");//设置死信路由key,要跟死信交换机和死信队列绑定的路由key一致return QueueBuilder.durable(queueNormalName).withArguments(arguments)       //设置队列的过期时间.build();}/*** 正常交换机和正常队列绑定* @param normalExchange* @param normalQueue* @return*/@Beanpublic Binding bingNormal(DirectExchange normalExchange,Queue normalQueue){return BindingBuilder.bind(normalQueue).to(normalExchange).with("order");}/*** 死信交换机* @return*/@Beanpublic DirectExchange dlxExchange(){return ExchangeBuilder.directExchange(exchangeDlxName).build();}/*** 死信队列* @return*/@Beanpublic Queue dlxQueue(){return QueueBuilder.durable(queueDlxName).build();}/*** 死信交换机和死信队列绑定* @param dlxExchange* @param dlxQueue* @return*/@Beanpublic Binding bindDlx(DirectExchange dlxExchange,Queue dlxQueue){return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("error");}
}

2.1.2、业务类MessageService

package com.power.service;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.util.Date;@Service
@Slf4j
public class MessageService {@Resourceprivate RabbitTemplate rabbitTemplate;@Beanpublic void sendMsg(){Message message = MessageBuilder.withBody("hello world".getBytes()).build();rabbitTemplate.convertAndSend("exchange.normal.a","order",message);log.info("消息发送完毕,发送时间是:"+new Date());}
}

2.1.3、配置文件application.yml

server:port: 8080
spring:application:name: dlx-test01rabbitmq:host: 你的服务器IPport: 5672username: 你的账号password: 你的密码virtual-host: powermy:exchangeNormalName: exchange.normal.a   #正常交换机queueNormalName: queue.normal.a         #正常队列,没有消费组,设置过期时间exchangeDlxName: exchange.dlx.a         #死信交换机queueDlxName: queue.dlx.a               #死信队列

2.1.4、启动类

package com.power;import com.power.service.MessageService;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;import javax.annotation.Resource;@SpringBootApplication
public class Application implements ApplicationRunner {@Resourceprivate MessageService messageService;public static void main(String[] args) {SpringApplication.run(Application.class);}@Overridepublic void run(ApplicationArguments args) throws Exception {messageService.sendMsg();}
}

2.1.5、配置文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.power</groupId><artifactId>rabbit_06_dlx01</artifactId><version>1.0-SNAPSHOT</version><name>rabbit_06_dlx01</name><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.6.13</version><relativePath/></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

2.1.6、测试

启动程序,发送消息:
在这里插入图片描述

消息会先被发送到正常队列queue.normal.a中,超时未被消费,
则消息会被发送到死信队列queue.dlx.a 中
在这里插入图片描述

2.2、消息过期

2.2.1、配置类RabbitConfig

package com.power.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {@Value("${my.exchangeNormalName}")private String exchangeNormalName;@Value("${my.queueNormalName}")private String queueNormalName;@Value("${my.exchangeDlxName}")private String exchangeDlxName;@Value("${my.queueDlxName}")private String queueDlxName;/*** 正常交换机* @return*/@Beanpublic DirectExchange normalExchange(){return ExchangeBuilder.directExchange(exchangeNormalName).build();}/*** 正常队列* @return*/@Beanpublic Queue normalQueue(){Map<String, Object> arguments = new HashMap<>();//重点:设置这两个参数//设置队列的死信交换机arguments.put("x-dead-letter-exchange",exchangeDlxName);//设置死信路由key,要跟死信交换机和死信队列绑定的路由key一致arguments.put("x-dead-letter-routing-key","error");return QueueBuilder.durable(queueNormalName).withArguments(arguments)       //设置队列的过期时间.build();}/*** 正常交换机和正常队列绑定* @param normalExchange* @param normalQueue* @return*/@Beanpublic Binding bingNormal(DirectExchange normalExchange,Queue normalQueue){return BindingBuilder.bind(normalQueue).to(normalExchange).with("order");}/*** 死信交换机* @return*/@Beanpublic DirectExchange dlxExchange(){return ExchangeBuilder.directExchange(exchangeDlxName).build();}/*** 死信队列* @return*/@Beanpublic Queue dlxQueue(){return QueueBuilder.durable(queueDlxName).build();}/*** 死信交换机和死信队列绑定* @param dlxExchange* @param dlxQueue* @return*/@Beanpublic Binding bindDlx(DirectExchange dlxExchange,Queue dlxQueue){return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("error");}
}

2.2.2、业务类MessageService(关键代码)

package com.power.service;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.util.Date;@Service
@Slf4j
public class MessageService {@Resourceprivate RabbitTemplate rabbitTemplate;@Beanpublic void sendMsg(){try {MessageProperties messageProperties = new MessageProperties();//设置单条消息的过期时间,单位为毫秒,数据类型为字符串messageProperties.setExpiration("20000");Message message = MessageBuilder.withBody("hello world".getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend("exchange.normal.02","order",message);}catch (Exception e){e.printStackTrace();log.info("消息发送失败:"+new Date());}log.info("消息发送完毕,发送时间是:"+new Date());}
}

2.2.3、配置文件application.yml

server:port: 8080
spring:application:name: dlx-test01rabbitmq:host: 你的服务器IPport: 5672username: 你的账号password: 你的密码virtual-host: powermy:exchangeNormalName: exchange.normal.02   #正常交换机queueNormalName: queue.normal.02         #正常队列,没有消费组,设置过期时间exchangeDlxName: exchange.dlx.02         #死信交换机queueDlxName: queue.dlx.02               #死信队列

2.2.4、启动类同上

2.2.5、配置文件同上

2.2.6、测试

启动程序发送消息
在这里插入图片描述
登录rabbitmq后台:
消息先进入正常队列queue.normal.02中,超时未消费,在这里插入图片描述
消息超过过期时间,则进入queue.dlx.02死信队列

在这里插入图片描述

2.3、队列达到最大长度(先入队的消息会被发送到DLX)

2.3.1、配置类RabbitConfig(关键代码)

package com.power.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {@Value("${my.exchangeNormalName}")private String exchangeNormalName;@Value("${my.queueNormalName}")private String queueNormalName;@Value("${my.exchangeDlxName}")private String exchangeDlxName;@Value("${my.queueDlxName}")private String queueDlxName;/*** 正常交换机* @return*/@Beanpublic DirectExchange normalExchange(){return ExchangeBuilder.directExchange(exchangeNormalName).build();}/*** 正常队列* @return*/@Beanpublic Queue normalQueue(){Map<String, Object> arguments = new HashMap<>();//设置队列的最大长度arguments.put("x-max-length",5);//重点:设置这两个参数//设置队列的死信交换机arguments.put("x-dead-letter-exchange",exchangeDlxName);//设置死信路由key,要跟死信交换机和死信队列绑定的路由key一致arguments.put("x-dead-letter-routing-key","error");return QueueBuilder.durable(queueNormalName).withArguments(arguments)       //设置队列的参数.build();}/*** 正常交换机和正常队列绑定* @param normalExchange* @param normalQueue* @return*/@Beanpublic Binding bingNormal(DirectExchange normalExchange,Queue normalQueue){return BindingBuilder.bind(normalQueue).to(normalExchange).with("order");}/*** 死信交换机* @return*/@Beanpublic DirectExchange dlxExchange(){return ExchangeBuilder.directExchange(exchangeDlxName).build();}/*** 死信队列* @return*/@Beanpublic Queue dlxQueue(){return QueueBuilder.durable(queueDlxName).build();}/*** 死信交换机和死信队列绑定* @param dlxExchange* @param dlxQueue* @return*/@Beanpublic Binding bindDlx(DirectExchange dlxExchange,Queue dlxQueue){return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("error");}
}

2.3.2、业务类MessageService(关键代码)

package com.power.service;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.util.Date;@Service
@Slf4j
public class MessageService {@Resourceprivate RabbitTemplate rabbitTemplate;@Beanpublic void sendMsg(){for (int i = 1; i < 8; i++) {String msg = "hello world "+ i;Message message = MessageBuilder.withBody(msg.getBytes()).build();rabbitTemplate.convertAndSend("exchange.normal.03","order",message);log.info("消息发送完毕,发送时间是:"+new Date());}}
}

2.3.3、配置文件application.yml

server:port: 8080
spring:application:name: dlx-test01rabbitmq:host: 你的服务器IPport: 5672username: 你的账号password: 你的密码virtual-host: powermy:exchangeNormalName: exchange.normal.03   #正常交换机queueNormalName: queue.normal.03         #正常队列,没有消费组,设置过期时间exchangeDlxName: exchange.dlx.03         #死信交换机queueDlxName: queue.dlx.03               #死信队列

2.3.4、启动类同上

2.3.5、配置文件pom.xml同上

2.3.6、测试

启动项目,发送消息
在这里插入图片描述
登录rabbitmq后台:
两条消息进入死信队列
在这里插入图片描述查看消息发现,是前两条消息进入了死信队列,
在这里插入图片描述
在这里插入图片描述

2.4、消费者拒绝消息不进行重新投递

消费者从正常的队列接收消息,但是消费者对消息不进行确认,并且不对消息进行重新投递,此时消息就进入死信队列。

2.4.1、生产者

在这里插入图片描述

2.4.1.1、生产者application.yml
server:port: 8080
spring:application:name: dlx-test04rabbitmq:host: 你的服务器IPport: 5672username: 你的账号password: 你的密码virtual-host: powermy:exchangeNormalName: exchange.normal.04   #正常交换机queueNormalName: queue.normal.04         #正常队列,没有消费组,设置过期时间exchangeDlxName: exchange.dlx.04         #死信交换机queueDlxName: queue.dlx.04               #死信队列
2.4.1.2、生产者发送消息
package com.power.service;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.util.Date;@Service
@Slf4j
public class MessageService {@Resourceprivate RabbitTemplate rabbitTemplate;@Beanpublic void sendMsg(){String msg = "hello world";Message message = MessageBuilder.withBody(msg.getBytes()).build();rabbitTemplate.convertAndSend("exchange.normal.04","order",message);log.info("消息发送完毕,发送时间是:"+new Date());}
}
2.4.1.3、生产者配置类
package com.power.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {@Value("${my.exchangeNormalName}")private String exchangeNormalName;@Value("${my.queueNormalName}")private String queueNormalName;@Value("${my.exchangeDlxName}")private String exchangeDlxName;@Value("${my.queueDlxName}")private String queueDlxName;/*** 正常交换机* @return*/@Beanpublic DirectExchange normalExchange(){return ExchangeBuilder.directExchange(exchangeNormalName).build();}/*** 正常队列* @return*/@Beanpublic Queue normalQueue(){Map<String, Object> arguments = new HashMap<>();//重点:设置这两个参数//设置队列的死信交换机arguments.put("x-dead-letter-exchange",exchangeDlxName);//设置死信路由key,要跟死信交换机和死信队列绑定的路由key一致arguments.put("x-dead-letter-routing-key","error");return QueueBuilder.durable(queueNormalName).withArguments(arguments)       //设置队列的参数.build();}/*** 正常交换机和正常队列绑定* @param normalExchange* @param normalQueue* @return*/@Beanpublic Binding bingNormal(DirectExchange normalExchange,Queue normalQueue){return BindingBuilder.bind(normalQueue).to(normalExchange).with("order");}/*** 死信交换机* @return*/@Beanpublic DirectExchange dlxExchange(){return ExchangeBuilder.directExchange(exchangeDlxName).build();}/*** 死信队列* @return*/@Beanpublic Queue dlxQueue(){return QueueBuilder.durable(queueDlxName).build();}/*** 死信交换机和死信队列绑定* @param dlxExchange* @param dlxQueue* @return*/@Beanpublic Binding bindDlx(DirectExchange dlxExchange,Queue dlxQueue){return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("error");}
}

2.4.2、消费者

在这里插入图片描述

2.4.2.1、消费者application.yml 启动手动确认
server:port: 9090
spring:application:name: dlx04-receiverrabbitmq:host: 你的服务器IPport: 5672username: 你的账号password: 你的密码virtual-host: powerlistener:simple:acknowledge-mode: manual
关键配置

在这里插入图片描述

2.4.2.2、消费者接收消息
package com.power.service;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.Date;@Component
@Slf4j
public class MessageReceive {@RabbitListener(queues={"queue.normal.04"})public void receiveMsg(Message message, Channel channel){//获取消息属性MessageProperties messageProperties = message.getMessageProperties();//获取消息的唯一标识,类似学号和身份证号long deliveryTag = messageProperties.getDeliveryTag();try{byte[] body = message.getBody();String msg = new String(body);log.info("监听到的消息是:"+msg+",接收的时间是:"+new Date());//TODO 业务逻辑处理int a=1/0;//消费者的手动确认,false:只确认当前消息,true:批量确认channel.basicAck(deliveryTag,false);}catch (Exception e){log.error("接收者出现问题:{}",e.getMessage());try {//消费者的手动不确认,参数3:是重新入队//不会进入死信队列
//                channel.basicNack(deliveryTag,false,true);//消费者的手动不确认,参数3:false 不重新入队(不重新投递),就会变成死信channel.basicNack(deliveryTag,false,false);}catch (IOException ex){throw new RuntimeException(ex);}}}}
关键代码

在这里插入图片描述

2.4.3、测试

启动生产者:发送消息
在这里插入图片描述
启动消费者:
因业务代码出错,程序处理异常,消息进入死信队列

在这里插入图片描述

在这里插入图片描述

http://www.rdtb.cn/news/20891.html

相关文章:

  • 建设厂招工信息网站体验营销案例
  • w网站制作和推广国际军事最新消息今天
  • 2017电商网站建设背景做关键词推广
  • 腾讯域名购买鼓楼网页seo搜索引擎优化
  • wordpress网站 搬家南京seo网络推广
  • 屏显的企业网站应该怎么做手机百度助手
  • 网站备案到公司网站流量查询平台
  • 贵州中小型营销型网站建设公司seo排名工具外包
  • 衡水网站建设哪家好营销网站建设创意
  • 天津网站建设排名建站系统推荐
  • 天猫网站做链接怎么做googleplay
  • 广州做网站的公免费注册推广网站
  • 浙江省建设网站徐叨法广州seo服务外包
  • 做网站编辑要会什么软文范例800字
  • 福建网站备案百度下载正版
  • 南宁保洁网站建设网络广告投放
  • 网站设计建设服务电商大数据查询平台
  • 销售网站模板免费下载奶茶网络营销策划方案
  • 黄色网站模板网络优化工程师有前途吗
  • 网站后续建设个人开发app去哪里接广告
  • 建网站英语站内搜索引擎
  • 武汉手机网站建设品牌互联网推广平台有哪些
  • 镇江门户网站微信搜一搜seo优化
  • 网络游戏网站网址大全广州短视频代运营
  • 自己的电脑做服务区 网站企业seo排名外包
  • 2021年给我一个网站百度搜索引擎营销案例
  • 河北省 政府网站 建设意见网站流量统计分析的维度包括
  • 前端做网站需要学什么软件磁力狗在线引擎
  • 做网站前台模型要做什么呢seo是什么平台
  • 上海电子通科技网站建设怎么建企业网站