ZMQ [java]

发布时间 2023-03-28 00:59:24作者: 王文初

java中ZMQ的用法

简介

ZMQ是基于C语言实现的消息队列, 可用TCP或UDP实现. JeroMQ是其java实现, 首先导入maven:

 <!-- https://mvnrepository.com/artifact/org.zeromq/jeromq -->
<dependency>
     <groupId>org.zeromq</groupId>
     <artifactId>jeromq</artifactId>
     <version>0.5.3</version>
</dependency>

使用

请求-响应模式

该模式由客户端和服务端组成, 必须是问了再答, 答了再问

  • 客户端: 发送 -> 接收 --> 发送 --> ...
  • 服务端: 接收 --> 发送 --> 接收 --> ...

服务端

服务端监听8888的tcp连接

public class Response {
    public static void main(String args[]) throws InterruptedException {
        ZMQ.Context context = ZMQ.context(1);     //I/O线程上下文的数量为1
        ZMQ.Socket socket = context.socket(ZMQ.REP);         //ZMQ.REP表示这是一个reponse类型的socket
        socket.bind("tcp://*:8888");                 //绑定到8888端口
        while (true) {
            byte[] request = socket.recv();
            if (new String(request).equals("END"))
                break;
            System.out.println("Response recv:\t" + new String(request));
            String response = "I got it";
            Thread.sleep(3000);
            socket.send(response.getBytes());
        }
        //关闭
        socket.close();
        context.term();
    }
}

客户端

public class Request {
    public static void main(String args[]) {
        ZMQ.Context context = ZMQ.context(1);     //I/O线程上下文的数量为1
        ZMQ.Socket socket = context.socket(ZMQ.REQ);         //ZMQ.REQ表示这是一个request类型的socket
        socket.connect("tcp://127.0.0.1:8888");      //连接到8888端口
        for (int i = 0; i < 10; i++) {
            long now = System.currentTimeMillis();
            String request = "hello, time is " + now;
            socket.send(request.getBytes());
            byte[] response = socket.recv();
            System.out.println("Request recv:\t" + new String(response));
        }
        socket.send("END".getBytes());

        //关闭
        socket.close();
        context.term();
    }
}

发布-订阅模式

该服务器一直发, 客户端订阅后就可以接受服务器消息了.

服务端(发布者)

服务端只需发送以Time和Order开头的消息

public class Server {
    public static void main(String[] args) {
        try (ZContext context = new ZContext()) {
            ZMQ.Socket publisher = context.createSocket(ZMQ.PUB); //publish类型
            publisher.bind("tcp://*:5555");

            Random random = new Random();
            while (true) {
                String update;
                //随机将update赋值为Time: 或Order: 开头的值
                if (random.nextInt(10) <= 5)
                    update = "Time: " + System.currentTimeMillis();
                else
                    update = "Order: " + System.currentTimeMillis();
                publisher.send(update); //发送
                System.out.println("SEND:[" + update + "]");
            }
        }
    }
}

客户端(订阅者)

客户端只订阅以Time开头的

public class Client {
    public static void main(String args[]) {
        try (ZContext context = new ZContext()) {
            ZMQ.Socket subscriber = context.createSocket(ZMQ.SUB); //subscribe类型
            subscriber.connect("tcp://localhost:5555");

            subscriber.subscribe("Time:".getBytes()); //只订阅Time: 开头的信息

            for (int i = 0; i < 1000; i++) {
                System.out.println(subscriber.recvStr()); //recvStr直接返回String,内部调用了recv,将byte数组转化为String
            }
        }

    }
}

​​

集群超时模拟

public class ApiConstant {
    public static final String CLUSTER_INIT_API = "INIT_CLUSTER";
    public static final String TDI_INIT_API = "INIT_TDI_API";
    public static final String END_API = "END_API";
}
package com.wang.zmq;

import org.zeromq.ZMQ;

import java.util.Date;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.wang.zmq.ApiConstant.END_API;

/**
 * ClusterImpl : TODO
 *
 * @author : Wangwenchu
 * @since : 2023/3/27
 */
public class ClusterImpl {

    static Boolean isClusterWorking = false;
    static Boolean isTdiWorking = false;
    private static ZMQ.Socket socket;
    private ThreadPoolExecutor threadPoolExecutor = new ScheduledThreadPoolExecutor(10);
    private Object lock = new Object();

    private AtomicBoolean shouldSend = new AtomicBoolean(true);

    private int count = 0;

    public static void main(String[] args) {
        ClusterImpl cluster = new ClusterImpl();
        cluster.init();
        cluster.initCluster();
    }

    public void init() {
        ZMQ.Context context = ZMQ.context(1);     //I/O线程上下文的数量为1
        socket = context.socket(ZMQ.REQ);         //ZMQ.REQ表示这是一个request类型的socket
        socket.connect("tcp://127.0.0.1:8888");      //连接到8888端口
        pollMsg();
    }

    private void initCluster() {
        threadPoolExecutor.execute(() -> {
            if (shouldSend.get()) {
                String api = ApiConstant.CLUSTER_INIT_API;
                try {
                    Thread.sleep(2000L);
                    socket.send(api);
                    shouldSend.set(false);
                    System.out.println("send cluster init:" + new Date());
                    long now = System.currentTimeMillis();
                    while (true) {
                        // System.out.println("I am wait for you 1");
                        synchronized (lock) {
                            lock.wait(10L);
                        }
                        //   Thread.sleep(10L);
                        // System.out.println("I am wait for you 2");
                        if (isClusterWorking) {
                            System.out.println("cluster is working");
                            break;
                        }
                        //System.out.println("I am wait for you 3");
                        if (System.currentTimeMillis() - now >= 10000L) {
                            System.out.println("start cluster time out");
                            break;
                        }
                        // System.out.println("I am wait for you 4");
                    }

                    Thread.sleep(3000L);
                    if (shouldSend.get()) {
                        socket.send(END_API);
                    }
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

    }

    public void initTdi() {
        threadPoolExecutor.execute(() -> {
            String api = ApiConstant.TDI_INIT_API;
            try {
                Thread.sleep(2000L);
                System.out.println("send tdi init:" + new Date());
                socket.send(api);
                long now = System.currentTimeMillis();
                while (true) {
                    synchronized (lock) {
                        this.wait(10L);
                    }

                    if (isTdiWorking) {
                        System.out.println("tdi is working");
                    }
                    if (System.currentTimeMillis() - now >= 3000L) {
                        System.out.println("start tdi time out");
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

        });
    }

    public void pollMsg() {
        threadPoolExecutor.execute(() -> {
            while (true) {
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }

                if (shouldSend.get()) {
                    continue;
                }
                count++;
                System.out.println("poll msg count: " + count);
                byte[] response = socket.recv();

                String str = new String(response);
                System.out.println("recv msg: " + str + "\t" + new Date());
                if (str.startsWith("CLUSTER_INIT_ACK")) {
                    isClusterWorking = true;
                } else if (str.startsWith("TDI_INIT_ACK")) {
                    isTdiWorking = true;
                }
                shouldSend.set(true);
            }

        });
    }
}
package com.wang.zmq;

import org.zeromq.ZMQ;

import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.wang.zmq.ApiConstant.END_API;

/**
 * Server : TODO
 *
 * @author : Wangwenchu
 * @since : 2023/3/27
 */
public class Server {

    private ThreadPoolExecutor threadPoolExecutor = new ScheduledThreadPoolExecutor(10);

    private ZMQ.Context context;

    private ZMQ.Socket socket;

    private String reply;

    private AtomicBoolean shouldRecv = new AtomicBoolean(true);

    private int count = 0;

    public static void main(String[] args) {
        Server server = new Server();
        server.init();
        server.pollInMsg();
    }

    public void init() {
        context = ZMQ.context(1);     //I/O线程上下文的数量为1
        socket = context.socket(ZMQ.REP);         //ZMQ.REP表示这是一个reponse类型的socket
        socket.bind("tcp://*:8888");                 //绑定到8888端口
    }

    public void pollInMsg() {
        threadPoolExecutor.execute(() -> {

            while (true) {
                if (!shouldRecv.get()) {
                    continue;
                }
                count++;
                System.out.println("receive loop count" + count);
                byte[] request = socket.recv();

                String str = new String(request);
                if (str != null && str.length() > 0) {
                    System.out.println("Receive api: " + str);
                    if (str.equals(ApiConstant.CLUSTER_INIT_API)) {
                        try {
                            Thread.sleep(1000L);
                            reply = "CLUSTER_INIT_ACK";
                            socket.send(reply.getBytes());
                            System.out.println("reply with: " + reply);
                            shouldRecv.set(true);
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }

                    } else if (str.equals(ApiConstant.TDI_INIT_API)) {
                        try {
                            Thread.sleep(2000L);
                            reply = "TDI_INIT_ACK";
                            socket.send(reply.getBytes());
                            System.out.println("reply with: " + reply);
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    } else if (str.equals(END_API)) {
                        System.out.println("exe shutdown");
                        break;
                    }
                    shouldRecv.set(false);
                }

                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }

        });
    }
}