本系列文章用于介绍Zookeeper的应用场景以及它的代码实现方式。Zookeeper作为一个大型分布式系统的可靠协调系统,可以为分布式应用提供一致性服务。它的应用除了咱们常见的注册中心外,还有多种应用场景,本文就探究它作为数据发布/订阅服务的应用方式,以及给出一些个人理解的示例代码。
事实上Zookeeper只适合保存少量的数据信息,本文仅简单介绍如何用Zookeeper去实现数据发布/订阅,像消息队列常见的问题(比如消费失败重试机制、消息丢失、事务消息等)也不再考虑范围内。
数据发布/订阅
数据发布/订阅(Publish/Subscribe)系统,即所谓的配置中心,顾名思义就是发布者将数据发布到
ZooKeeper的一个或一系列节点上,供订阅者进行数据订阅,进而达到动态获取数据的目的,实现配置
信息的集中式管理和数据的动态更新。
发布/订阅系统一般有两种设计模式,分别是推(Push)模式和拉(Pull)模式。在推模式中,服务端
主动将数据更新发送给所有订阅的客户端;而拉模式则是由客户端主动发起请求来获取最新数据,通常
客户端都采用定时进行轮询拉取的方式。
代码实现
我们这里采用的是推拉相结合的方式:客户端向服务端注册自己需要关注的节点,一旦该节点的数据
发生变更,那么服务端就会向相应的客户端发送Watcher事件通知,客户端接收到这个消息通知之后,
根据获取最新消息的URL主动到服务端获取最新的数据。
本文配套代码地址: zookeeper-pub-sub
公共类的实现
ZkUtils类
public class ZkUtils {
private static final String ZK_PUB_SUB_ROOT = "/zk_pub_sub";
private static final String ZK_PUB_SUB_ROOT2 = ZK_PUB_SUB_ROOT + "/";
private static final ZkClient ZK_CLIENT = new ZkClient(ZKConfig.zkHost);
private static final Set<String> topicSet = new HashSet<>();
/**
* 推送消息,
*
* @param topic 消息的主题
* @param pullUrl 拉取的地址
*/
public static void pub(String topic, String pullUrl) {
createEphemeral(topic, pullUrl);
}
/**
* 订阅消息
*
* @param topic 消息的主题
* @param listener 获取到消息之后的处理事件
*/
public static void sub(String topic, SubListener listener) {
ZK_CLIENT.subscribeDataChanges(ZK_PUB_SUB_ROOT2 + topic, listener);
}
public static void deleteRecursive(String path) {
ZK_CLIENT.deleteRecursive(ZK_PUB_SUB_ROOT2 + path);
}
private static void createEphemeral(String path, String value) {
check();
if (!topicSet.contains(path)) {
topicSet.add(path);
ZK_CLIENT.createEphemeral(ZK_PUB_SUB_ROOT2 + path, value);
} else {
ZK_CLIENT.writeData(ZK_PUB_SUB_ROOT2 + path, value);
}
}
private static void check() {
if (!ZK_CLIENT.exists(ZK_PUB_SUB_ROOT)) {
ZK_CLIENT.createPersistent(ZK_PUB_SUB_ROOT, true);
}
}
}
本类十分简单,封装了ZkClient的初始化,然后分别定义pub和sub两个核心方法。
当需要推送消息(pub)时需要传入推送的topic和消息获取地址(比如http://xxx.com/pull/getLeast?_id=1);其具体的实现是 在ZooKeeper的 /zk_pub_sub
节点下生成一个临时节点 传入的topic值
,然后将拉取的地址写入该节点。
当消费方订阅消息( sub)时需要传入订阅的topic和获取到消息后的处理事件。处理事件由抽象类来描述,它继承于ZkClient包下的节点值变更监听接口IZkDataListener类。
SubListener类
SubListener
public abstract class SubListener implements IZkDataListener {
@Override
public void handleDataChange(String dataPath, Object pullUrl) throws Exception {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder(URI.create(pullUrl.toString())).GET().build();
CompletableFuture<Void> voidCompletableFuture = client.sendAsync(request, HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::body)
// 当获取到HTTP响应值之后,调用具体业务进行处理
.thenAccept(t -> {
afterReceived(t);
});
}
@Override
public void handleDataDeleted(String s) throws Exception {
// 空实现
}
/**
* 收到msg消息后的处理
*
* @param latestMsg
*/
public abstract void afterReceived(String latestMsg);
}
此类是抽象类,当监听到值变更之后,直接根据pullUrl进行HTTP请求,获取到响应值之后再进行具体的业务处理afterReceived(String latestMsg)方法。
使用方式
推送端使用方式:
public class Publisher {
private String topic;
public Publisher(String topic) {
this.topic = topic;
}
public void destroy() {
ZkUtils.deleteRecursive(topic);
}
public void send(String msg) {
// 模拟将消息存放到本地PullController的messages集合中
PullController.messages.addLast(msg);
// 设置每次请求均含有不同的UUID
ZkUtils.pub(topic, PullController.PULL_MSG_URL + "?_id=" + (UUID.randomUUID().toString()));
}
}
每个Publisher需要绑定一个topic,然后先模拟将消息存入到集合中,这里仅为演示,所以直接在PullController中缓存了最新的消息,可以使用DB等方法存储。然后再修改ZK的节点值,将pull地址更新,为了使每次更新值不同,这里添加了一个uuid。
PullController只需要定义一个http的接口,等待客户端的主动请求即可,这里演示每次从容器获取最新的msg数据。
@RestController
@RequestMapping("pull")
public class PullController {
// 模拟消息存放位置,可以从数据库中查询得到
public static final LinkedList<String> messages = new LinkedList<>();
public static final String PULL_MSG_URL = "http://localhost:7777/pull/getLast";
@RequestMapping("getLast")
public String getLast() {
return messages.getLast();
}
}
订阅端使用方式:
十分简单,只需要开启zk的订阅sub方法,然后定义收到消息后的处理动作afterReceived即可。
public class App {
private static String topic = "topic1";
public static void main(String[] args) throws InterruptedException {
ZkUtils.sub(topic, new SubListener() {
@Override
public void afterReceived(String latestMsg) {
System.out.println(latestMsg);
}
});
// 休息10分钟,保证能持续获取消息
TimeUnit.MINUTES.sleep(10);
}
}