Jump to content

Search the Community

Showing results for tags 'rabbitmq'.

  • Search By Tags

    Type tags separated by commas.
  • Search By Author

Find results in...

Find results that contain...


Date Created

  • Start

    End


Last Updated

  • Start

    End


Filter by number of...

Joined

  • Start

    End


Group


About Me

Found 1 result

  1. 在application.yml文件中配置web以及rabbitmq的配置信息 spring: rabbitmq: host: 127.0.0.1 port: 5672 username: root password: 9YoIu # 发送者开启 return 确认机制 publisher-returns: true # 发送者开启 confirm 确认机制 publisher-confirm-type: correlated listener.simple: # 设置消费端手动 ack acknowledge-mode: manual # 是否支持重试 retry: enabled: true 在maven中引入依赖: <!--MQ消息队列--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> 使用@EnableRabbit注解在入口类上启用MQ @SpringBootApplication @EnableRabbit public class SmsEmailApplication { public static void main(String[] args) { SpringApplication.run(SmsEmailApplication.class, args); } } 新建rabbitmq配置类 @Configuration public class RabbitmqConfig { @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } //配置正常业务 例如邮件发送 的队列交换机与通道 public static String Queue1 = "queue_1"; public static String Exchange1 = "exchange_1"; public static String Routing1 = "routing_key_1"; /** * 定义死信队列相关信息 */ public final static String deadQueueName = "dead_queue"; public final static String deadRoutingKey = "dead_routing_key"; public final static String deadExchangeName = "dead_exchange"; /** * 死信队列 交换机标识符 */ public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange"; /** * 死信队列交换机绑定键标识符 */ public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key"; /** * 创建死信交换机 */ @Bean public DirectExchange deadExchange() { return new DirectExchange(deadExchangeName); } /** * 创建配置死信队列 * * @return */ @Bean public Queue deadQueue() { Queue queue = new Queue(deadQueueName, true); return queue; } /** * 死信队列与死信交换机绑定 */ @Bean public Binding bindingDeadExchange() { return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(deadRoutingKey); } /** * 队列绑定到死信 * 第一个参数是创建的queue的名字,第二个参数是是否支持持久化 * @return */ @Bean public Queue EmailQueue() { // 将普通队列绑定到死信队列交换机上 Map<String, Object> args = new HashMap<>(2); args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName); args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey); Queue queue = new Queue(Queue1, true, false, false, args); return queue; } /** * 创建交换机 * 一共有三种构造方法,可以只传exchange的名字, 第二种,可以传exchange名字,是否支持持久化,是否可以自动删除, * 第三种在第二种参数上可以增加Map,Map中可以存放自定义exchange中的参数 * @return */ @Bean public DirectExchange EmailExchange() { return new DirectExchange(Exchange1, true, false); } /** * 绑定 * @param Queue * @param Exchange * @return */ @Bean public Binding bindingFinanceExchange(Queue Queue1, DirectExchange Exchange) { return BindingBuilder.bind(EmailQueue()).to(Exchange1).with(Routing); } } 新建消费者类RabbitReceiver /** 消息队列监听器 * * @param message */ @RabbitListener(queues = "queue_1") @RabbitHandler public void process(JSONObject data, Channel channel, Message message) throws IOException { //消息手动确认 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //消息重新投递 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); //消息是否重复 if(message.getMessageProperties().getRedelivered()){ //拒绝消息 channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); //拒绝消费消息(丢失消息) 重新投递给死信队列 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } } /** * 死信 消息队列消费者 * 当无法消费且被投递至死信队列则再次被死信消费 * @param message * @param headers * @param channel * @throws Exception */ @RabbitListener(queues = "dead_queue") @RabbitHandler public void deadProcess(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception { // 获取消息Id String messageId = message.getMessageProperties().getMessageId(); String msg = new String(message.getBody(), "UTF-8"); // // 手动ack Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); // 手动签收 channel.basicAck(deliveryTag, false); log.error("日志已记录...."); } 投递一条消息 @Resource RabbitTemplate rabbitTemplate; //生成一个随机消息ID public static Message getId (JSONObject data){ return MessageBuilder.withBody(JSON.toJSONString(data).getBytes()) .setContentType(MessageProperties.CONTENT_TYPE_JSON) .setContentEncoding("utf-8") .setMessageId(UUID.randomUUID()+"") .build(); } /** * 投递消息 * 通过自定义设置消息体给每一个消息设置一个唯一ID,方便失败时候重试 * @param sendData 要发送的数据 * @param async 如果开启此项,交换机会马上把所有的信息都交给所有的消费者,消费者再自行处理,不会因为消费者处理慢而阻塞线程。 返回一个null * 如果禁用此项,交换机会同步消费者。使用此方法,当确认了所有的消费者都接收成功之后,才接收另外一个 会造成阻塞 返回一个object * rabbitTemplate.convertAndSend 里接受3个参数 第一个是之前配置的交换机,第二个是配置好的routing_key,第三个是消息体 */ public Object sendSmsEmail(JSONObject sendData,boolean async){ //自定义消息体 必须给消息指定一个UUID,用于失败重试 Message message = getId(sendData); if (async){ rabbitTemplate.convertAndSend("exchange_1","routing_key_1",message); return null; }else{ return rabbitTemplate.convertSendAndReceive("exchange_1","routing_key_1",message); } } 现在我们在浏览器中输入:http://localhost:15672 可以看到一个登录界面 查看队列,features行下的普通交换机有一个 DLX 的标志,就说明已绑定了死信交换机
×
×
  • Create New...