更新时间:2022-12-02 10:38:53 来源:极悦 浏览1497次
Java实现消息队列的方法有哪些?极悦小编来告诉大家。
消息队列是MQ是一种系统间相互协作的通信机制
Broker:消息处理中心,负责消息的接收、存储、转发等;
Producer:消息生产者,负责产生和发送消息到消息处理中心;
Consumer:消息消费者,负责从消息处理中心获取消息,并进行相应的处理。
其结构如下所示:
(1)消息处理中心
作为消息处理中心,至少有一个数据容器来保存接收到的消息。这里采用java中队列(Queue)的一个子类ArrayBockingQueue来实现。
如下是消息处理中心Broker的实现:
import java.util.concurrent.ArrayBlockingQueue;
public class Broker {
private final static int MAX_SIZE = 3;
private static ArrayBlockingQueue<String> messageQueue = new ArrayBlockingQueue<>(MAX_SIZE);
public static void produce(String msg){
if(messageQueue.offer(msg)){
System.out.println("成功向消息处理中心投递消息: " + msg + ",当前缓存的消息数量是:"+ messageQueue.size());
} else{
System.out.println("消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!");
}
System.out.println("==============================");
}
public static String consume(){
String msg = messageQueue.poll();
if(msg != null){
System.out.println("已经消费消息:" + msg + ",当前暂存的消息数量是:" + messageQueue.size());
} else {
System.out.println("消息处理中心内没有消息可供消费!");
}
System.out.println("==============================");
return msg;
}
}
有了消息处理中心类后,需要将该类的功能暴露出去,这样别人才能够用它来发送和接收消息。我们定义了BrokerServer类用来对外提供Broker类的服务。
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
public class BrokerServer implements Runnable{
public static int SERVICE_PORT = 9999;
private final Socket socket;
public BrokerServer(Socket socket){
this.socket = socket;
}
@Override
public void run(){
try(
BufferedReader in = new BufferedReader(new InputStreamReader(
socket.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream()))
{
while (true){
String str = in.readLine();
if (str == null){
continue;
}
System.out.println("接收到原始数据: " + str);
if (str.equals("CONSUME")){
String message = Broker.consume();
out.println(message);
out.flush();
}else {
Broker.produce(str);
}
}
} catch (Exception e){
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception{
ServerSocket server = new ServerSocket(SERVICE_PORT);
while(true){
BrokerServer brokerServer = new BrokerServer(server.accept());
new Thread(brokerServer).start();
}
}
}
在java中设计服务其功能的软件一般少不了套接字(Socket)和线程(Thread),需要通过线程的方式将应用启动起来,而服务器和应用的客户端需要用Socket进行网络通信。
(2)客户端访问
有了消息处理中心服务器后,自然需要相应客户端来与之通信,来发送和接收消息。
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.Socket;
public class MyClient {
public static void produce(String message) throws Exception{
Socket socket = new Socket(InetAddress.getLocalHost(),BrokerServer.SERVICE_PORT);
try(
PrintWriter out = new PrintWriter(socket.getOutputStream())
){
out.println(message);
out.flush();
}
}
public static String consume() throws Exception{
Socket socket = new Socket(InetAddress.getLocalHost(),BrokerServer.SERVICE_PORT);
try(
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream())
){
out.println("CONSUME");
out.flush();
String message = in.readLine();
return message;
}
}
}
以上是通用的客户端访问代码,接下来分别看一个生产消息和消费消息的示例:
生产消息:
public class ProduceClient {
public static void main(String[] args) throws Exception{
MyClient client = new MyClient();
client.produce("hello World.");
}
}
消费消息:
public class ConsumeClient {
public static void main(String[] args) throws Exception{
MyClient client = new MyClient();
String message = client.consume();
System.out.println("获得的消息为: " + message);
}
}
(3)运行效果
开启BrokerServer服务
生产消息:ProduceClient
消费消息:ConsumeClient
以上就是极悦小编介绍的"Java实现消息队列的简单方法",希望对大家有帮助,想了解更多可查看Java教程。极悦在线学习教程,针对没有任何Java基础的读者学习,让你从入门到精通,主要介绍了一些Java基础的核心知识,让同学们更好更方便的学习和了解Java编程,感兴趣的同学可以关注一下。
0基础 0学费 15天面授
Java就业班有基础 直达就业
业余时间 高薪转行
Java在职加薪班工作1~3年,加薪神器
工作3~5年,晋升架构
提交申请后,顾问老师会电话与您沟通安排学习