java中ZMQ的用法
简介
ZMQ是基于C语言实现的消息队列, 可用TCP或UDP实现. JeroMQ是其java实现, 首先导入maven:
<!-- https://mvnrepository.com/artifact/org.zeromq/jeromq -->
<dependency>
<groupId>org.zeromq</groupId>
<artifactId>jeromq</artifactId>
<version>0.5.3</version>
</dependency>
使用
请求-响应模式
该模式由客户端和服务端组成, 必须是问了再答, 答了再问
- 客户端: 发送 -> 接收 --> 发送 --> ...
- 服务端: 接收 --> 发送 --> 接收 --> ...
服务端
服务端监听8888的tcp连接
public class Response {
public static void main(String args[]) throws InterruptedException {
ZMQ.Context context = ZMQ.context(1); //I/O线程上下文的数量为1
ZMQ.Socket socket = context.socket(ZMQ.REP); //ZMQ.REP表示这是一个reponse类型的socket
socket.bind("tcp://*:8888"); //绑定到8888端口
while (true) {
byte[] request = socket.recv();
if (new String(request).equals("END"))
break;
System.out.println("Response recv:\t" + new String(request));
String response = "I got it";
Thread.sleep(3000);
socket.send(response.getBytes());
}
//关闭
socket.close();
context.term();
}
}
客户端
public class Request {
public static void main(String args[]) {
ZMQ.Context context = ZMQ.context(1); //I/O线程上下文的数量为1
ZMQ.Socket socket = context.socket(ZMQ.REQ); //ZMQ.REQ表示这是一个request类型的socket
socket.connect("tcp://127.0.0.1:8888"); //连接到8888端口
for (int i = 0; i < 10; i++) {
long now = System.currentTimeMillis();
String request = "hello, time is " + now;
socket.send(request.getBytes());
byte[] response = socket.recv();
System.out.println("Request recv:\t" + new String(response));
}
socket.send("END".getBytes());
//关闭
socket.close();
context.term();
}
}
发布-订阅模式
该服务器一直发, 客户端订阅后就可以接受服务器消息了.
服务端(发布者)
服务端只需发送以Time和Order开头的消息
public class Server {
public static void main(String[] args) {
try (ZContext context = new ZContext()) {
ZMQ.Socket publisher = context.createSocket(ZMQ.PUB); //publish类型
publisher.bind("tcp://*:5555");
Random random = new Random();
while (true) {
String update;
//随机将update赋值为Time: 或Order: 开头的值
if (random.nextInt(10) <= 5)
update = "Time: " + System.currentTimeMillis();
else
update = "Order: " + System.currentTimeMillis();
publisher.send(update); //发送
System.out.println("SEND:[" + update + "]");
}
}
}
}
客户端(订阅者)
客户端只订阅以Time开头的
public class Client {
public static void main(String args[]) {
try (ZContext context = new ZContext()) {
ZMQ.Socket subscriber = context.createSocket(ZMQ.SUB); //subscribe类型
subscriber.connect("tcp://localhost:5555");
subscriber.subscribe("Time:".getBytes()); //只订阅Time: 开头的信息
for (int i = 0; i < 1000; i++) {
System.out.println(subscriber.recvStr()); //recvStr直接返回String,内部调用了recv,将byte数组转化为String
}
}
}
}
集群超时模拟
public class ApiConstant {
public static final String CLUSTER_INIT_API = "INIT_CLUSTER";
public static final String TDI_INIT_API = "INIT_TDI_API";
public static final String END_API = "END_API";
}
package com.wang.zmq;
import org.zeromq.ZMQ;
import java.util.Date;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.wang.zmq.ApiConstant.END_API;
/**
* ClusterImpl : TODO
*
* @author : Wangwenchu
* @since : 2023/3/27
*/
public class ClusterImpl {
static Boolean isClusterWorking = false;
static Boolean isTdiWorking = false;
private static ZMQ.Socket socket;
private ThreadPoolExecutor threadPoolExecutor = new ScheduledThreadPoolExecutor(10);
private Object lock = new Object();
private AtomicBoolean shouldSend = new AtomicBoolean(true);
private int count = 0;
public static void main(String[] args) {
ClusterImpl cluster = new ClusterImpl();
cluster.init();
cluster.initCluster();
}
public void init() {
ZMQ.Context context = ZMQ.context(1); //I/O线程上下文的数量为1
socket = context.socket(ZMQ.REQ); //ZMQ.REQ表示这是一个request类型的socket
socket.connect("tcp://127.0.0.1:8888"); //连接到8888端口
pollMsg();
}
private void initCluster() {
threadPoolExecutor.execute(() -> {
if (shouldSend.get()) {
String api = ApiConstant.CLUSTER_INIT_API;
try {
Thread.sleep(2000L);
socket.send(api);
shouldSend.set(false);
System.out.println("send cluster init:" + new Date());
long now = System.currentTimeMillis();
while (true) {
// System.out.println("I am wait for you 1");
synchronized (lock) {
lock.wait(10L);
}
// Thread.sleep(10L);
// System.out.println("I am wait for you 2");
if (isClusterWorking) {
System.out.println("cluster is working");
break;
}
//System.out.println("I am wait for you 3");
if (System.currentTimeMillis() - now >= 10000L) {
System.out.println("start cluster time out");
break;
}
// System.out.println("I am wait for you 4");
}
Thread.sleep(3000L);
if (shouldSend.get()) {
socket.send(END_API);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
}
public void initTdi() {
threadPoolExecutor.execute(() -> {
String api = ApiConstant.TDI_INIT_API;
try {
Thread.sleep(2000L);
System.out.println("send tdi init:" + new Date());
socket.send(api);
long now = System.currentTimeMillis();
while (true) {
synchronized (lock) {
this.wait(10L);
}
if (isTdiWorking) {
System.out.println("tdi is working");
}
if (System.currentTimeMillis() - now >= 3000L) {
System.out.println("start tdi time out");
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
public void pollMsg() {
threadPoolExecutor.execute(() -> {
while (true) {
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (shouldSend.get()) {
continue;
}
count++;
System.out.println("poll msg count: " + count);
byte[] response = socket.recv();
String str = new String(response);
System.out.println("recv msg: " + str + "\t" + new Date());
if (str.startsWith("CLUSTER_INIT_ACK")) {
isClusterWorking = true;
} else if (str.startsWith("TDI_INIT_ACK")) {
isTdiWorking = true;
}
shouldSend.set(true);
}
});
}
}
package com.wang.zmq;
import org.zeromq.ZMQ;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.wang.zmq.ApiConstant.END_API;
/**
* Server : TODO
*
* @author : Wangwenchu
* @since : 2023/3/27
*/
public class Server {
private ThreadPoolExecutor threadPoolExecutor = new ScheduledThreadPoolExecutor(10);
private ZMQ.Context context;
private ZMQ.Socket socket;
private String reply;
private AtomicBoolean shouldRecv = new AtomicBoolean(true);
private int count = 0;
public static void main(String[] args) {
Server server = new Server();
server.init();
server.pollInMsg();
}
public void init() {
context = ZMQ.context(1); //I/O线程上下文的数量为1
socket = context.socket(ZMQ.REP); //ZMQ.REP表示这是一个reponse类型的socket
socket.bind("tcp://*:8888"); //绑定到8888端口
}
public void pollInMsg() {
threadPoolExecutor.execute(() -> {
while (true) {
if (!shouldRecv.get()) {
continue;
}
count++;
System.out.println("receive loop count" + count);
byte[] request = socket.recv();
String str = new String(request);
if (str != null && str.length() > 0) {
System.out.println("Receive api: " + str);
if (str.equals(ApiConstant.CLUSTER_INIT_API)) {
try {
Thread.sleep(1000L);
reply = "CLUSTER_INIT_ACK";
socket.send(reply.getBytes());
System.out.println("reply with: " + reply);
shouldRecv.set(true);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} else if (str.equals(ApiConstant.TDI_INIT_API)) {
try {
Thread.sleep(2000L);
reply = "TDI_INIT_ACK";
socket.send(reply.getBytes());
System.out.println("reply with: " + reply);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} else if (str.equals(END_API)) {
System.out.println("exe shutdown");
break;
}
shouldRecv.set(false);
}
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
}
}