548、RocketMQ详细入门教程系列 -【消息队列之 RocketMQ (二)】 2023.02.28

news/2024/4/24 12:49:38/文章来源:https://blog.csdn.net/youyouwuxin1234/article/details/128806952

目录

    • 一、Java 访问 RocketMQ 实例
      • 1.1 引入依赖
      • 1.2 消息生产者
      • 1.3 消息消费者
      • 1.4 启动 Name Server
      • 1.5 启动 Broker
      • 1.6 运行 Consumer
      • 1.7 运行 Producer
    • 二、参考链接

一、Java 访问 RocketMQ 实例

RocketMQ 目前支持 Java、C++、Go 三种语言访问,按惯例以 Java 语言为例看下如何用 RocketMQ 来收发消息的。

1.1 引入依赖

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.2.0</version></dependency>

添加 RocketMQ 客户端访问支持,具体版本和安装的 RocketMQ 版本一致即可。

1.2 消息生产者

package org.study.mq.rocketMQ.java;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class Producer {public static void main(String[] args) throws Exception {//创建一个消息生产者,并设置一个消息生产者组DefaultMQProducer producer = new DefaultMQProducer("niwei_producer_group");//指定 NameServer 地址producer.setNamesrvAddr("localhost:9876");//初始化 Producer,整个应用生命周期内只需要初始化一次producer.start();for (int i = 0; i < 100; i++) {//创建一条消息对象,指定其主题、标签和消息内容Message msg = new Message("topic_example_java" /* 消息主题名 */,"TagA" /* 消息标签 */,("Hello Java demo RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息内容 */);//发送消息并返回结果SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}// 一旦生产者实例不再被使用则将其关闭,包括清理资源,关闭网络连接等producer.shutdown();}
}

示例中用 DefaultMQProducer 类来创建一个消息生产者,通常一个应用创建一个 DefaultMQProducer 对象,所以一般由应用来维护生产者对象,可以其设置为全局对象或者单例。该类构造函数入参 producerGroup 是消息生产者组的名字,无论生产者还是消费者都必须给出 GroupName ,并保证该名字的唯一性,ProducerGroup 发送普通的消息时作用不大,后面介绍分布式事务消息时会用到。
接下来指定 NameServer 地址和调用 start 方法初始化,在整个应用生命周期内只需要调用一次 start 方法。
初始化完成后,调用 send 方法发送消息,示例中只是简单的构造了100条同样的消息发送,其实一个 Producer 对象可以发送多个主题多个标签的消息,消息对象的标签可以为空。send 方法是同步调用,只要不抛异常就标识成功。
最后应用退出时调用 shutdown 方法清理资源、关闭网络连接,从服务器上注销自己,通常建议应用在 JBOSS、Tomcat 等容器的退出钩子里调用 shutdown 方法。

1.3 消息消费者

package org.study.mq.rocketMQ.java;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;import java.io.UnsupportedEncodingException;
import java.util.Date;
import java.util.List;public class Consumer {public static void main(String[] args) throws Exception {//创建一个消息消费者,并设置一个消息消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("niwei_consumer_group");//指定 NameServer 地址consumer.setNamesrvAddr("localhost:9876");//设置 Consumer 第一次启动时从队列头部开始消费还是队列尾部开始消费consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//订阅指定 Topic 下的所有消息consumer.subscribe("topic_example_java", "*");//注册消息监听器consumer.registerMessageListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {//默认 list 里只有一条消息,可以通过设置参数来批量接收消息if (list != null) {for (MessageExt ext : list) {try {System.out.println(new Date() + new String(ext.getBody(), "UTF-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 消费者对象在使用之前必须要调用 start 初始化consumer.start();System.out.println("消息消费者已启动");}
}

示例中用 DefaultMQPushConsumer 类来创建一个消息消费者,通生产者一样一个应用一般创建一个 DefaultMQPushConsumer 对象,该对象一般由应用来维护,可以其设置为全局对象或者单例。该类构造函数入参 consumerGroup 是消息消费者组的名字,需要保证该名字的唯一性。
接下来指定 NameServer 地址和设置消费者应用程序第一次启动时从队列头部开始消费还是队列尾部开始消费。
接着调用 subscribe 方法给消费者对象订阅指定主题下的消息,该方法第一个参数是主题名,第二个擦书是标签名,示例表示订阅了主题名 topic_example_java 下所有标签的消息。
最主要的是注册消息监听器才能消费消息,示例中用的是 Consumer Push 的方式,即设置监听器回调的方式消费消息,默认监听回调方法中 List 里只有一条消息,可以通过设置参数来批量接收消息。
最后调用 start 方法初始化,在整个应用生命周期内只需要调用一次 start 方法。

1.4 启动 Name Server

nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log

RocketMQ 核心的四大组件中 Name Server 和 Broker 都是由 RocketMQ 安装包提供的,所以要启动这两个应用才能提供消息服务。首先启动 Name Server,先确保你的机器中已经安装了与 RocketMQ 相匹配的 JDK ,并设置了环境变量 JAVA_HOME ,然后在 RocketMQ 的安装目录下执行 bin 目录下的 mqnamesrv ,默认会将该命令的执行情况输出到当前目录的 nohup.out 文件,最后跟踪日志文件查看 Name Server 的实际运行情况。

1.5 启动 Broker

nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log

同样也要确保你的机器中已经安装了与 RocketMQ 相匹配的 JDK ,并设置了环境变量 JAVA_HOME ,然后在 RocketMQ 的安装目录下执行 bin 目录下的 mqbroker ,默认会将该命令的执行情况输出到当前目录的 nohup.out 文件,最后跟踪日志文件查看 Broker 的实际运行情况。

1.6 运行 Consumer

先运行 Consumer 类,这样当生产者发送消息的时候能在消费者后端看到消息记录。配置没问题的话会看到在控制台打印出消息消费者已启动

1.7 运行 Producer

最后运行 Producer 类,在 Consumer 的控制台能看到接收的消息
在这里插入图片描述

二、参考链接

[01] 消息队列之 RocketMQ

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_75566.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

IDEA社区版环境配置和插件安装

一、Java环境安装 1.1 下载openjdk环境安装包 可以进华为镜像站进行下载。参考链接&#xff1a; Index of openjdk-local https://repo.huaweicloud.com/openjdk/ 1.2 配置Java环境 解压缩openjdk到任意路径&#xff0c;建议路径不要有中文。然后把路径的bin文件&#xff0…

CSO面对面丨中核华辉刘博:应对大型央国企数字化转型道路上必须攻克的安全难题

“极致”&#xff0c;一直是大型央国企网络安全工作建设追求的目标。随着我国数字化转型全面走深向实&#xff0c;网络安全风险、数据管理、层出不穷的网络攻击&#xff0c;为各领域大型央国企数字化转型带来了更多的挑战。如何充分发挥优势、携手各方构筑网络安全屏障、提升安…

LeetCode 79. 单词搜索

LeetCode 79. 单词搜索 难度&#xff1a;middle\color{orange}{middle}middle 题目描述 给定一个 mxnm x nmxn 二维字符网格 boardboardboard 和一个字符串单词 wordwordword 。如果 wordwordword 存在于网格中&#xff0c;返回 truetruetrue &#xff1b;否则&#xff0c;返…

Leetcode19. 删除链表的倒数第n个结点

一、题目描述&#xff1a; 给你一个链表&#xff0c;删除链表的倒数第 n 个结点&#xff0c;并且返回链表的头结点。 示例 1&#xff1a; 输入&#xff1a;head [1,2,3,4,5], n 2输出&#xff1a;[1,2,3,5] 示例 2&#xff1a; 输入&#xff1a;head [1], n 1输出&#x…

JSP的分页

分页在读取数据库里的数据需要用&#xff0c;在以后数据库肯定还会有很多数据&#xff0c;一个页面装不下&#xff0c;所以需要分页功能。数据库查询的分页语句是“SELECT * FROM emp LIMIT 0, 5;”这里0是指起始行&#xff0c;5是查询5行&#xff0c;第二页起始行就是5&#x…

通过python技术获取甲流分布数据

近期&#xff0c;多地学校出现因甲流导致的班级停课&#xff0c;儿科甲流患者就诊量呈数倍增长。此轮甲流为何如此严重&#xff1f;感染甲流之后会出现哪些症状&#xff1f; 经过专家的介绍甲流之所以这么严重有这些原因导致的。一、疫情完全放开后很多孩子不戴口罩了&#x…

Odoo | Webserivce | 5分钟学会【JSONRPC】接口开发 - 换USERID(进阶)

文章目录JSONRPC - 换取USERID简述换取USERID1. 代码示例2. 换取结果JSONRPC - 换取USERID 简述 从Odoo JSONRPC 接口入门篇&#xff0c;可以发现我们直接传入了USERID&#xff0c;这只是为了方便快速测试。 其实按照常规流程&#xff0c;应该通过【用户名USERNAME】和【用户…

【办公类-19-02】Python批量制作word文本框的名字小标签,用A4word打印(植物角、家长会、值日生)

背景需求&#xff1a; 2月28日去小班带班&#xff0c;看到班主任制作了一些小手印花束作为家长会的家长座位提示&#xff0c;上面贴着“”圆形白色的幼儿名字贴”。 我立刻想起了制作的过程——在word中插入文本框&#xff0c;然后复制无数个文本框&#xff0c;摆好位置&#…

MyBatis学习笔记(八) —— 字段名和属性不一致的情况下,如何处理映射关系

EmpMapper.java /** * 根据id查询员工信息 * param empId * return */ Emp getEmpByEmpId(Param("empId") Integer empId);EmpMapper.xml <?xml version"1.0" encoding"UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//D…

day22_IO

今日内容 上课同步视频:CuteN饕餮的个人空间_哔哩哔哩_bilibili 同步笔记沐沐霸的博客_CSDN博客-Java2301 零、 复习昨日 一、作业 二、缓冲流 三、字符流 四、缓冲字符流 五、匿名内部类 零、 复习昨日 File: 通过路径代表一个文件或目录 方法: 创建型,查找类,判断类,其他 IO …

如何创建出实用的员工手册?

员工手册主要是企业内部的人事制度管理规范&#xff0c;包含企业规章制度和企业文化&#xff0c;同时还起到了展示企业形象、传播企业文化的作用。它既覆盖了企业人力资源管理的各个方面规章制度的主要内容&#xff0c;又因适应企业独特个性的经营发展需要而弥补了规章制度制定…

VIF_Benchmark: All infrare and visible image fusion method in one framework

VIF_Benchmark Github 地址: https://github.com/Linfeng-Tang/VIF_Benchmark 完整Project下载地址&#xff1a;https://download.csdn.net/download/fovever_/87514164 我们把所有主流的基于深度学习的红外和可见光图像融合方法都集成在了这个框架中。 这些方法包括&#xff1…

数据结构六大排序

1.插入排序 1.插入排序 思路&#xff1a; 从第一个元素开始认为是有序的&#xff0c;去一个元素tem从有序序列从后往前扫描&#xff0c;如果该元素大于tem&#xff0c;将该元素一刀下一位&#xff0c;循环步骤3知道找到有序序列中小于等于的元素将tem插入到该元素后&#xff0…

如何防止DNS污染?

对于DNS污染&#xff0c;一般除了使用代理服务器和VPN之类的软件之外&#xff0c;并没有什么其它办法。但是利用我们对DNS污染的了解&#xff0c;还是可以做到不用代理服务器和VPN之类的软件就能解决DNS污染的问题&#xff0c;从而在不使用代理服务器或VPN的情况下访问原本访问…

设计模式系列 - 代理模式及动态代理详解

定义 为其他对象提供一种代理以控制对这个对象的访问。在某些情况下&#xff0c;一个对象不适合或者不能直接引用另一个对象&#xff0c;而代理对象可以在客户端和目标对象之间起到中介的作用。 结构 抽象角色&#xff1a;通过接口或抽象类声明真实角色实现的业务方法。 代…

C++ STL:容器 Container

文章目录1、序列容器1.1、容器共性1.2、vectorvector 结构* vector 扩容原理* vector 迭代器失效1.3、dequedeque 结构deque 迭代器deque 模拟连续空间1.4、listlist 特殊操作list 结构list 迭代器2、关联式容器2.1、容器共性2.2、容器特性3、无序关联式容器3.1、容器共性3.2、…

电子科技大学 高级计算机系统结构 考试回忆

首先题量不算小&#xff0c;因此没有太多时间把题都记出来&#xff0c;但是叙述一下题的类型希望能帮到以后选了这门课大家&#xff0c;在网上确实没有搜到这门课有关考试的任何资料&#xff0c;所以我也没啥参考全凭记忆和老师的PPT结合。复习的时候老师给了大纲&#xff0c;就…

【k8s】Kubernetes的学习(1.k8s概念和架构)

目录 1.首先要知道&#xff0c;Kubernetes为什么简称为k8s? 2.Kubernetes概述 2.1 kubernetes基本介绍 2.2 kubernetes的特性 2.3 kubernetes集群架构组件 2.3.1 Master (主控节点) 2.3.2 node (工作节点) 2.4 k8s核心概念 2.4.1 Pod 2.4.2 controller 2.4.3 Se…

Python自动发周报给老板,到点赶紧跑

嗨害大家好鸭&#xff01;我是小熊猫~ 作为一个社畜人 …勤勤恳恳的打工人&#xff01;&#xff01;&#xff01; 几乎每周都要写周报 没办法只能用点小技术 用python写个小工具 让它来给老板发周报哈哈哈 更多python摸鱼小技巧、基础知识:点击此处跳转文末名片获取 目标细…

收下这份十万商家称赞的开店攻略,带你发家致富!

理想与现实之间的距离&#xff0c;大概就是开店吧&#xff01;总觉得自己投点钱&#xff0c;一两年回本&#xff0c;后面每月轻松赚几万、几十万&#xff1b;结果却发现房租太贵、人工太贵、自己什么都不懂&#xff0c;然后随波逐流的没有特色。其实&#xff0c;细心的朋友会发…