RabbitMQ常用消息模式

news/2024/4/29 21:35:23/文章来源:https://blog.csdn.net/m0_46979453/article/details/127203667

目录

1、RabitMQ工作队列

2、交换机

3、RabbitMQ Fanout 发布订阅--- Fanout exchange(扇型交换机)

3.1、创建连接代码

3.1、生产者代码

3.2、消费者代码

4、Direct路由模式

4.1、生产者代码

4.2、消费者代码

5、Topic主题模式

5.1、生产者代码

5.2、消费者代码


1、RabitMQ工作队列

        默认的传统队列是为均摊消费,存在不公平性;如果每个消费者速度不一样的情况下,均摊消费是不公平的,应该是能者多劳。

采用工作队列

        在通道中只需要设置basicQos为1即可,表示MQ服务器每次只会给消费者推送1条消息必须手动ack确认之后才会继续发送。

2、交换机

Exchange:在RabbitMQ中,生产者发送消息不会直接将消息投递到队列中,而是先将消息投递到交换机中, 在由交换机转发到具体的队列, 队列再将消息以推送或者拉取方式给消费者进行消费。

 生产者将消息发送到Exchange, 由Exchange再路由到一个或多个队列中:

        /Virtual Hosts:区分不同的团队

        交换机exchabge: 路由消息存放在那个队列中 类似于nginx

        队列queue:队列 存放消息

       路由key(RoutingKey): 分发规则,生产者将消息发送给交换机的时候, 会指定RoutingKey指定路由规则。

交换机种类:

        Direct exchange(直连交换机)

        Fanout exchange(扇型交换机)

        Topic exchange(主题交换机)

        Headers exchange(头交换机)

3、RabbitMQ Fanout 发布订阅--- Fanout exchange(扇型交换机)

        生产者发送一条消息,经过交换机转发到多个不同的队列,多个不同的队列就多个不同的消费者。同一条消息,经路由器转发 c1和c2都能收到。

 

原理:

  1. 需要创建两个队列 ,每个队列对应一个消费者;
  2. 队列需要绑定我们交换机;
  3. 生产者投递消息到交换机中,交换机在将消息分配给两个队列中都存放起来;
  4. 消费者从队列中获取这个消息;

3.1、创建连接代码

导入依赖:

        <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.6.5</version></dependency>
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 获取连接*/
public class RabbitMQConnection {/*** 创建连接** @return* @throws IOException* @throws TimeoutException*/public static Connection getConnection() throws IOException, TimeoutException {//1.创建connectionFactoryConnectionFactory connectionFactory = new ConnectionFactory();//2.配置HostconnectionFactory.setHost("127.0.0.1");//3.设置PortconnectionFactory.setPort(5672);//4.设置账户和密码connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//5.设置VirtualHostconnectionFactory.setVirtualHost("/rkVirtualHost");return connectionFactory.newConnection();}
}

3.1、生产者代码

import com.mayikt.rabbitmq.RabbitMQConnection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ProducerFanout {/*** 定义交换机的名称*/private static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws IOException, TimeoutException {//  创建ConnectionConnection connection = RabbitMQConnection.getConnection();// 创建ChannelChannel channel = connection.createChannel();// 通道关联交换机    绑定交换机 将消息发送到该交换机channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);String msg = "rk 学 rabbitmq";//发送消息channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());//关闭连接channel.close();connection.close();}
}

3.2、消费者代码

邮件消费者:

import com.mayikt.rabbitmq.RabbitMQConnection;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class MailConsumer {/*** 定义邮件队列*/private static final String QUEUE_NAME = "fanout_email_queue";/*** 定义交换机的名称*/private static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws IOException, TimeoutException {System.out.println("邮件消费者...");// 创建我们的连接Connection connection = RabbitMQConnection.getConnection();// 创建我们通道final Channel channel = connection.createChannel();// 关联队列消费者关联队列   将交换机和队列绑定起来  channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {//从队列中获取消息@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "UTF-8");System.out.println("邮件消费者获取消息:" + msg);}};// 开始监听消息 自动签收channel.basicConsume(QUEUE_NAME, true, defaultConsumer);}
}

短信消费者:

        和邮件消费者代码一样,唯一不同的就是 QUEUE_NAME 变成了fanout_sms_queue

4、Direct路由模式

        当交换机类型为direct类型时,根据队列绑定的路由建转发到具体的队列中存放消息。

 

4.1、生产者代码

import com.mayikt.rabbitmq.RabbitMQConnection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ProducerDirect {/*** 定义交换机的名称*/private static final String EXCHANGE_NAME = "direct_exchange";public static void main(String[] args) throws IOException, TimeoutException {//  创建ConnectionConnection connection = RabbitMQConnection.getConnection();// 创建ChannelChannel channel = connection.createChannel();// 通道关联交换机channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);String msg = "rk 6666";//投递的消息的 路由键为 email  所以只有邮件消费者才能收到channel.basicPublish(EXCHANGE_NAME, "email", null, msg.getBytes());channel.close();connection.close();}}

4.2、消费者代码

邮件消费者:

public class MailConsumer {/*** 定义邮件队列*/private static final String QUEUE_NAME = "direct_email_queue";/*** 定义交换机的名称*/private static final String EXCHANGE_NAME = "direct_exchange";public static void main(String[] args) throws IOException, TimeoutException {System.out.println("邮件消费者...");// 创建我们的连接Connection connection = RabbitMQConnection.getConnection();// 创建我们通道final Channel channel = connection.createChannel();// 关联队列消费者关联队列   关联交换机和队列   路由键为emailchannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email");DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "UTF-8");System.out.println("邮件消费者获取消息:" + msg);}};// 开始监听消息 自动签收channel.basicConsume(QUEUE_NAME, true, defaultConsumer);}
}

短信消费者:

只有队列和路由键变了:QUEUE_NAME = "direct_sms_queue";    路由键为: sms

5、Topic主题模式

当交换机类型为topic类型时,根据队列绑定的路由建模糊转发到具体的队列中存放。

#号表示支持匹配多个词;

*号表示只能匹配一个词

        如图所示:生产者的路由键为mayikt.sms发送给交换机的消息会转发给邮件队列

5.1、生产者代码

import com.mayikt.rabbitmq.RabbitMQConnection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ProducerTopic {/*** 定义交换机的名称*/private static final String EXCHANGE_NAME = "topic_exchange";public static void main(String[] args) throws IOException, TimeoutException {//  创建ConnectionConnection connection = RabbitMQConnection.getConnection();// 创建ChannelChannel channel = connection.createChannel();// 通道关联交换机  设置为topic主题模式channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);String msg = "rk 6666";//发送消息   rk.smschannel.basicPublish(EXCHANGE_NAME, "rk.sms", null, msg.getBytes());//关闭连接channel.close();connection.close();}}

5.2、消费者代码

邮件消费者:

import com.mayikt.rabbitmq.RabbitMQConnection;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class MailConsumer {/*** 定义邮件队列*/private static final String QUEUE_NAME = "topic_email_queue";/*** 定义交换机的名称*/private static final String EXCHANGE_NAME = "topic_exchange";public static void main(String[] args) throws IOException, TimeoutException {System.out.println("邮件消费者...");// 创建我们的连接Connection connection = RabbitMQConnection.getConnection();// 创建我们通道final Channel channel = connection.createChannel();// 关联队列消费者关联队列  绑定路由器和队列 设置路由键为rk.*channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "rk.*");DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {//接收队列消息@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "UTF-8");System.out.println("邮件消费者获取消息:" + msg);}};// 开始监听消息 自动签收channel.basicConsume(QUEUE_NAME, true, defaultConsumer);}
}

短信消费者:

只有队列和路由键变了:QUEUE_NAME = "topic_sms_queue";    路由键为:luo.*

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_398769.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

分享两套企业级进销存管理系统源码

▶▶▶▶1&#xff1a;SpringBoot企业级进销存ERP管理系统源码 00189 本系统采用企业级开发标准&#xff0c;使用SpringBoot架构&#xff0c;数据访问层采用Spring Data Jpa&#xff0c;业务控制层采用SpringMvc&#xff0c;安全框架采用Shiro&#xff0c;实现了完整权限系…

风控模型别只会KS、AUC了,来看看其他衡量模型好坏的一些重要指标吧|含实操

当我们训练好一个机器学习模型之后&#xff0c;必然会对模型的综合性能进行评估&#xff0c;针对分类、回归、聚类等不同类型的算法模型&#xff0c;可以采用相关的评价指标&#xff0c;例如分类模型的Accuracy、KS等&#xff1b;回归模型的MAE、MSE等&#xff1b;聚类模型的SS…

Linux下编写C使用的GDB调试器

目录 1.GDB调试器 2.GDB使用 3.实例程序调试 &#xff08;1&#xff09;编写一段C程序 &#xff08;2&#xff09;对C程序进行编译 &#xff08;3&#xff09;调试阶段 ①启动调试 ②查看文件 ③设置断点 ④查看断点情况 ⑤运行代码 ⑥单步运行 ⑦恢复程序 ⑧查看…

数字孪生建筑工程系统开发案例方案,如何选择数孪平台?

据统计&#xff0c;全国建筑业增长值在 GDP 增长中所占比重连续十年保持在 6. 85%以上&#xff0c;其支柱产业的地位依然保持。但是我国建筑业产值利润率已连续五年下滑&#xff0c;部分原因是其生产方式粗放、信息化水平不高、科技创新能力不足等。因此&#xff0c;在发展数字…

java类加载机制解析

一&#xff1a;类加载流程 public class Math {public static final int initData 666;public static User user new User();public int compute(){int a 1;int b 2;return ab;};public static void main(String[] args){Math math new Math();math.compute();} } 当我们…

Mybatis批量插入数据

前言 在很多业务场景中&#xff0c;我们需要批量录入数据。那么意味着我们需要以最高效的方式去实现功能&#xff0c;同时也需要保证软件的便捷性与可维护性&#xff0c;开源字节使用MyBatis foreach标签方式优雅的实现了材料的出入库。源码开放&#xff0c;可前往码云仓库免费…

NR 物理层编码 - slide7 卷积码

前言&#xff1a; 卷积码(n,k,N) 是一种非分组码.与线性分组码的区别: 是一种有记忆的编码方案,n个输出不仅与当前k个输入有关系,也与移位寄存器前N个输入有关系. 发展历史&#xff1a; 1955年 麻省理工的P.Elias 发明 1957年 序列译码法 1963年 门限译码法 1967年 Vi…

MongoDB分片机制

为什么需要分片 应用层实现的手动分片&#xff1a; MongoDB分片组件 mongos路由器负责将应用程序的请求指引到合适的分片上。注意到mongos路由器是在应用程序端实现的&#xff0c;因此分片的配置信息需要保存在另外的服务器上&#xff0c;即配置服务器。mongos通过两阶段提交同…

使用PreparedStatement对数据库的增删改查

目录 介绍 JDBCUtils自定义工具类 增 删 改 查 介绍 可以通过调用 Connection 对象的 preparedStatement() 方法获取PreparedStatement 对象PreparedStatement 接口是 Statement 的子接口&#xff0c;它表示一条预编译过的 SQL 语句PreparedStatement 对象所代表的 SQL 语…

拼搏半个月,刷了 571道Java高频面试题喜提阿里 offer,定级 P7

今年较往年相比面试要难的多&#xff0c;大环境也是对于程序员的要求越来越高&#xff0c;环境是我们无法改变的&#xff0c;我们能改变的只有自己&#xff0c;月初我一好友&#xff0c;努力拼搏一周&#xff0c;刷完了这份阿里 P8 大牛整理的这 571 道 Java 高频面试题笔记&am…

彩色的木棒

一 问题描述 给你一堆木棒。每根棒的每个端点都用一些颜色着色。是否可以将棒对齐成直线&#xff0c;使得接触的端点的颜色具有相同的颜色&#xff1f; 二 输入和输出 1 输入 输入是一系列行&#xff0c;每行包含两个单词&#xff0c;由空格分隔&#xff0c;给出一个木棒的…

SkeyeVSS智慧国土高点视频监控解决方案

随着经济的快速发展、城镇化的快速推进&#xff0c;耕地及矿产资源等不断减少&#xff0c;未批先建、批少用多、私自改变土地用途等各种违法违规用地行为时有发生&#xff0c;在这种情况下&#xff0c;传统的人力巡查工作效率低、执法成本高的弊端进一步凸显。 SkeyeVSS智慧国土…

科技云报道:私有云市场加速洗牌,超云为何异军突起?

科技云报道原创。 近年来在国家相关政策的大力推动下&#xff0c;中国私有云市场发展渐入佳境&#xff0c;一股新的建设高潮汹涌而至。 根据IDC对于2022-2026中国SDS及HCI的市场预测&#xff0c;中国私有云基础架构市场正在从成长阶段迈向成熟阶段&#xff0c;未来3-5年将保持…

自己动手写ls命令——Java版

自己动手写ls命令——Java版 介绍 在前面的文章Linux命令系列之ls——原来最简单的ls这么复杂当中&#xff0c;我们仔细的介绍了关于ls命令的使用和输出结果&#xff0c;在本篇文章当中我们用Java代码自己实现ls命令&#xff0c;更加深入的了解ls命令。 代码实现 文件操作的…

3000字神经网络论文

你遇到了哪些困难和挫折是怎样克服的写下来的作文 我学会了骑自行车人生的道路上&#xff0c;谁都会遇到困难或挫折&#xff0c;就看你敢不敢去挑战它。那一次学自行车&#xff0c;一直让我记忆犹新。一天傍晚&#xff0c;我和爸爸妈妈一起推着车来到体育馆&#xff0c;这次我…

Android同文输入法的使用(开源输入法Trime)

Trime输入法背景源码APP试用下载安装配置部署成功后再一步&#xff1a;学习如何 DIY总结背景 想找一款开源的Android中文输入法&#xff0c;然后发现了这款备受推崇的输入法框架rime。 RIME&#xff0f;中州韵输入法引擎&#xff0c;是一个跨平台的输入法算法框架。 基于这一…

【MySQL】检索数据

每日鸡汤 &#xff1a; —— 若你困于无风之地&#xff0c;我将奏响高空之歌 要和我一起花 10 min 学一会 SQL 嘛&#xff1f; - 当然愿意&#xff0c;我美丽的小姐 &#xff08;封寝期间练就的自言自语能力越来越炉火纯青了~~~&#xff09; 前言&#xff1a; 本实验中所用数据…

Kotlin第二章:kotlin基础

1. 基础数据类型 1. 整数类型 序号类型位宽最小值最大值1Byte8-1281272Short16-32768327673Int32-2,147,483,648 (-2^31)2,147,483,647 (2^31 - 1)4Long64-9,223,372,036,854,775,808 (-2^63)9,223,372,036,854,775,807 (2^63 - 1) val number 100 //默认Int类型 类比java的…

0050 Enum枚举类

/* 枚举是一种特殊的类&#xff0c;里面只包含一组有限的特定对象枚举的两种实现方式1.自定义类实现枚举2.使用enum关键字实现枚举自定义类实现枚举1.构造器私有化2.本类的内部创建一组对象[]3.对外暴露对象&#xff08;为对象添加public final static修饰&#xff09;4.提供g…

第三章 Flink基础理论之内存优化及常见内存报错解决方案

第三章 Flink基础理论之内存优化及常见内存报错解决方案 哇. 1、总体内存模型 1.1、内存模型概述 ​ Flink内存配置分为JobManager内存配置和TaskManager内存配置。 配置项TaskManager配置参数JobManager配置参数Total Flink Memorytaskmanager.memory.flink.sizejobmana…