本系列文章用于介绍Zookeeper的应用场景以及它的代码实现方式。Zookeeper作为一个大型分布式系统的可靠协调系统,可以为分布式应用提供一致性服务。它的应用除了咱们常见的注册中心外,还有多种应用场景,本文就探究它作为数据发布/订阅服务的应用方式,以及给出一些个人理解的示例代码。

事实上Zookeeper只适合保存少量的数据信息,本文仅简单介绍如何用Zookeeper去实现数据发布/订阅,像消息队列常见的问题(比如消费失败重试机制、消息丢失、事务消息等)也不再考虑范围内。

数据发布/订阅

​ 数据发布/订阅(Publish/Subscribe)系统,即所谓的配置中心,顾名思义就是发布者将数据发布到
ZooKeeper的一个或一系列节点上,供订阅者进行数据订阅,进而达到动态获取数据的目的,实现配置
信息的集中式管理和数据的动态更新。

​ 发布/订阅系统一般有两种设计模式,分别是推(Push)模式和拉(Pull)模式。在推模式中,服务端
主动将数据更新发送给所有订阅的客户端;而拉模式则是由客户端主动发起请求来获取最新数据,通常
客户端都采用定时进行轮询拉取的方式。

代码实现

​ 我们这里采用的是推拉相结合的方式:客户端向服务端注册自己需要关注的节点,一旦该节点的数据
发生变更,那么服务端就会向相应的客户端发送Watcher事件通知,客户端接收到这个消息通知之后,
根据获取最新消息的URL主动到服务端获取最新的数据。

​ 本文配套代码地址: zookeeper-pub-sub

公共类的实现

ZkUtils类

public class ZkUtils {

    private static final String ZK_PUB_SUB_ROOT = "/zk_pub_sub";
    private static final String ZK_PUB_SUB_ROOT2 = ZK_PUB_SUB_ROOT + "/";

    private static final ZkClient ZK_CLIENT = new ZkClient(ZKConfig.zkHost);
    private static final Set<String> topicSet = new HashSet<>();

    /**
     * 推送消息,
     *
     * @param topic   消息的主题
     * @param pullUrl 拉取的地址
     */
    public static void pub(String topic, String pullUrl) {
        createEphemeral(topic, pullUrl);
    }

    /**
     * 订阅消息
     *
     * @param topic    消息的主题
     * @param listener 获取到消息之后的处理事件
     */
    public static void sub(String topic, SubListener listener) {
        ZK_CLIENT.subscribeDataChanges(ZK_PUB_SUB_ROOT2 + topic, listener);
    }

    public static void deleteRecursive(String path) {
        ZK_CLIENT.deleteRecursive(ZK_PUB_SUB_ROOT2 + path);
    }

    private static void createEphemeral(String path, String value) {
        check();
        if (!topicSet.contains(path)) {
            topicSet.add(path);
            ZK_CLIENT.createEphemeral(ZK_PUB_SUB_ROOT2 + path, value);
        } else {
            ZK_CLIENT.writeData(ZK_PUB_SUB_ROOT2 + path, value);
        }
    }

    private static void check() {
        if (!ZK_CLIENT.exists(ZK_PUB_SUB_ROOT)) {
            ZK_CLIENT.createPersistent(ZK_PUB_SUB_ROOT, true);
        }
    }
}

​ 本类十分简单,封装了ZkClient的初始化,然后分别定义pub和sub两个核心方法。

当需要推送消息(pub)时需要传入推送的topic和消息获取地址(比如http://xxx.com/pull/getLeast?_id=1);其具体的实现是 在ZooKeeper的 /zk_pub_sub 节点下生成一个临时节点 传入的topic值,然后将拉取的地址写入该节点。

​ 当消费方订阅消息( sub)时需要传入订阅的topic和获取到消息后的处理事件。处理事件由抽象类来描述,它继承于ZkClient包下的节点值变更监听接口IZkDataListener类。

SubListener类

SubListener

public abstract class SubListener implements IZkDataListener {

    @Override
    public void handleDataChange(String dataPath, Object pullUrl) throws Exception {
        HttpClient client = HttpClient.newHttpClient();
        HttpRequest request = HttpRequest.newBuilder(URI.create(pullUrl.toString())).GET().build();

        CompletableFuture<Void> voidCompletableFuture = client.sendAsync(request, HttpResponse.BodyHandlers.ofString())

                .thenApply(HttpResponse::body)
                // 当获取到HTTP响应值之后,调用具体业务进行处理
                .thenAccept(t -> {
                    afterReceived(t);
                });
    }

    @Override
    public void handleDataDeleted(String s) throws Exception {
        // 空实现
    }

    /**
     * 收到msg消息后的处理
     *
     * @param latestMsg
     */
    public abstract void afterReceived(String latestMsg);
}

​ 此类是抽象类,当监听到值变更之后,直接根据pullUrl进行HTTP请求,获取到响应值之后再进行具体的业务处理afterReceived(String latestMsg)方法。

使用方式

推送端使用方式:

public class Publisher {

    private String topic;

    public Publisher(String topic) {
        this.topic = topic;
    }

    public void destroy() {
        ZkUtils.deleteRecursive(topic);
    }

    public void send(String msg) {
         // 模拟将消息存放到本地PullController的messages集合中
        PullController.messages.addLast(msg);
        // 设置每次请求均含有不同的UUID
        ZkUtils.pub(topic, PullController.PULL_MSG_URL + "?_id=" + (UUID.randomUUID().toString()));
    }
}

​ 每个Publisher需要绑定一个topic,然后先模拟将消息存入到集合中,这里仅为演示,所以直接在PullController中缓存了最新的消息,可以使用DB等方法存储。然后再修改ZK的节点值,将pull地址更新,为了使每次更新值不同,这里添加了一个uuid。

​ PullController只需要定义一个http的接口,等待客户端的主动请求即可,这里演示每次从容器获取最新的msg数据。

@RestController
@RequestMapping("pull")
public class PullController {

    // 模拟消息存放位置,可以从数据库中查询得到
    public static final LinkedList<String> messages = new LinkedList<>();
    public static final String PULL_MSG_URL = "http://localhost:7777/pull/getLast";

    @RequestMapping("getLast")
    public String getLast() {
        return messages.getLast();
    }

}

订阅端使用方式:

​ 十分简单,只需要开启zk的订阅sub方法,然后定义收到消息后的处理动作afterReceived即可。

public class App {

    private static String topic = "topic1";

    public static void main(String[] args) throws InterruptedException {

        ZkUtils.sub(topic, new SubListener() {
            @Override
            public void afterReceived(String latestMsg) {
                System.out.println(latestMsg);
            }
        });

        // 休息10分钟,保证能持续获取消息
        TimeUnit.MINUTES.sleep(10);
    }
}