【zookeeper 编程】 - 处理状态变化

下面通过一个示例演示, 如何监听一个node并接收和处理状态变更.

监听和处理事件


要实现监听, 首先需要实现Watcher接口:

public interface Watcher {
void process(org.apache.zookeeper.WatchedEvent watchedEvent);

static interface Event {
...
}
}

然后再使用ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)创建实例同时设置Watcher, 当有事件发生的时候, 那么Watcher.process(WatchedEvent watchedEvent) 将会被调用以处理事件, 比如连接、过期、中断、node 修改、子node修改等事件。

代码示例:

@Override
public void process(WatchedEvent event) {
String path = event.getPath();
if (event.getType() == Event.EventType.None) {
// We are are being told that the state of the
// connection has changed
switch (event.getState()) {
case SyncConnected:
// In this particular example we don't need to do anything
// here - watches are automatically re-registered with
// server and any watches triggered while the client was
// disconnected will be delivered (in order of course)
break;
case Expired:
// It's all over
dead = true;
listener.closing(KeeperException.Code.SessionExpired);
break;
}
} else {
if (path != null && path.equals(znode)) {
// Something has changed on the node, let's find out
zk.exists(znode, true, this, null);
}
}
if (chainedWatcher != null) {
chainedWatcher.process(event);
}
}

StatCallback


上面的代码中有一个 exists(), 主要是进行检查node的状态并处理变更, 源码如下:


public void exists(final String path, Watcher watcher,
StatCallback cb, Object ctx)
{
final String clientPath = path;
PathUtils.validatePath(clientPath);

// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new ExistsWatchRegistration(watcher, clientPath);
}

final String serverPath = prependChroot(clientPath);

RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.exists);
ExistsRequest request = new ExistsRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
SetDataResponse response = new SetDataResponse();
cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
clientPath, serverPath, ctx, wcb);
}

public void exists(String path, boolean watch, StatCallback cb, Object ctx) {
exists(path, watch ? watchManager.defaultWatcher : null, cb, ctx);
}

获取 Stat的 exists() API docs:
org.apache.zookeeper.ZooKeeper
public Stat exists(String path,
boolean watch)
throws KeeperException,
InterruptedException

Return the stat of the node of the given path. Return null if no such a node exists.
If the watch is true and the call is successful (no exception is thrown), a watch will be left on the node with the given path. The watch will be triggered by a successful operation that creates/delete the node or sets the data on the node.
Parameters:
path - the node path
watch - whether need to watch this node
Returns:
the stat of the node of the given path; return null if no such a node exists.
Throws:
KeeperException - If the server signals an error
InterruptedException - If the server transaction is interrupted.


从上可知, 我们需要传入一个StatCallback的实现, 进行 node state 改变的相关处理, org.apache.zookeeper.AsyncCallback.StatCallback接口定义如下 :

public interface AsyncCallback {
interface StatCallback extends AsyncCallback {
public void processResult(int rc, String path, Object ctx, Stat stat);
}

interface DataCallback extends AsyncCallback {
public void processResult(int rc, String path, Object ctx, byte data[],
Stat stat);
}

interface ACLCallback extends AsyncCallback {
public void processResult(int rc, String path, Object ctx,
List<ACL> acl, Stat stat);
}

interface ChildrenCallback extends AsyncCallback {
public void processResult(int rc, String path, Object ctx,
List<String> children);
}

interface Children2Callback extends AsyncCallback {
public void processResult(int rc, String path, Object ctx,
List<String> children, Stat stat);
}

interface StringCallback extends AsyncCallback {
public void processResult(int rc, String path, Object ctx, String name);
}

interface VoidCallback extends AsyncCallback {
public void processResult(int rc, String path, Object ctx);
}
}

实现代码举例:


@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
boolean exists;
switch (rc) {
case Code.Ok:
exists = true;
break;
case Code.NoNode:
exists = false;
break;
case Code.SessionExpired:
case Code.NoAuth:
dead = true;
listener.closing(rc);
return;
default:
// Retry errors
zk.exists(znode, true, this, null);
return;
}

byte b[] = null;
if (exists) {
try {
b = zk.getData(znode, false, null);
} catch (KeeperException e) {
// We don't need to worry about recovering now. The watch
// callbacks will kick off any exception handling
e.printStackTrace();
} catch (InterruptedException e) {
return;
}
}
if ((b == null && b != prevData)
|| (b != null && !Arrays.equals(prevData, b))) {
// TODO
}
}

processResult() 的调用关系图如下:

可知, 当执行以下动作时, processResult()将会被调用 (因为 node state changed):

ZooKeeper.create(String, byte[], List<ACL>, CreateMode, StringCallback, Object)
ZooKeeper.delete(String, int, VoidCallback, Object)
ZooKeeper.exists(String, Watcher, StatCallback, Object)
等等...

然后在processResult()里面我们就可以处理变更的node了.

示例源码


最后, 附上官网的 example 源码, 可以运行以下的代码体验一下更深刻:

DataMonitor

主要用来进行事件处理 process(WatchedEvent event)和 state changed 处理 processResult(int rc, String path, Object ctx, Stat stat).

public class DataMonitor implements Watcher, StatCallback {

ZooKeeper zk;

String znode;

Watcher chainedWatcher;

boolean dead;

DataMonitorListener listener;

byte prevData[];

public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,
DataMonitorListener listener) {
this.zk = zk;
this.znode = znode;
this.chainedWatcher = chainedWatcher;
this.listener = listener;
// Get things started by checking if the node exists. We are going
// to be completely event driven
zk.exists(znode, true, this, null);
}

/**
* Other classes use the DataMonitor by implementing this method
*/
public interface DataMonitorListener {
/**
* The existence status of the node has changed.
*/
void exists(byte data[]);

/**
* The ZooKeeper session is no longer valid.
*
* @param rc
* the ZooKeeper reason code
*/
void closing(int rc);
}

@Override
public void process(WatchedEvent event) {
String path = event.getPath();
if (event.getType() == Event.EventType.None) {
// We are are being told that the state of the
// connection has changed
switch (event.getState()) {
case SyncConnected:
// In this particular example we don't need to do anything
// here - watches are automatically re-registered with
// server and any watches triggered while the client was
// disconnected will be delivered (in order of course)
break;
case Expired:
// It's all over
dead = true;
listener.closing(KeeperException.Code.SessionExpired);
break;
}
} else {
if (path != null && path.equals(znode)) {
// Something has changed on the node, let's find out
zk.exists(znode, true, this, null);
}
}
if (chainedWatcher != null) {
chainedWatcher.process(event);
}
}

@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
boolean exists;
switch (rc) {
case Code.Ok:
exists = true;
break;
case Code.NoNode:
exists = false;
break;
case Code.SessionExpired:
case Code.NoAuth:
dead = true;
listener.closing(rc);
return;
default:
// Retry errors
zk.exists(znode, true, this, null);
return;
}

byte b[] = null;
if (exists) {
try {
b = zk.getData(znode, false, null);
} catch (KeeperException e) {
// We don't need to worry about recovering now. The watch
// callbacks will kick off any exception handling
e.printStackTrace();
} catch (InterruptedException e) {
return;
}
}
if ((b == null && b != prevData)
|| (b != null && !Arrays.equals(prevData, b))) {
listener.exists(b);
prevData = b;
}
}
}

Executor

启动程序和初始化, 根 Watcher,资源管理:

public class Executor
implements Watcher, Runnable, DataMonitor.DataMonitorListener
{
String znode;

DataMonitor dm;

ZooKeeper zk;

String filename;

String exec[];

Process child;

public Executor(String hostPort, String znode, String filename,
String exec[]) throws KeeperException, IOException {
this.filename = filename;
this.exec = exec;
zk = new ZooKeeper(hostPort, 3000, this);
dm = new DataMonitor(zk, znode, null, this);
}

/**
* @param args
*/
public static void main(String[] args) {
if (args.length < 4) {
System.err
.println("USAGE: Executor hostPort znode filename program [args ...]");
System.exit(2);
}
String hostPort = args[0];
String znode = args[1];
String filename = args[2];
String exec[] = new String[args.length - 3];
System.arraycopy(args, 3, exec, 0, exec.length);
try {
new Executor(hostPort, znode, filename, exec).run();
} catch (Exception e) {
e.printStackTrace();
}
}

/***************************************************************************
* We do process any events ourselves, we just need to forward them on.
*
* @see org.apache.zookeeper.Watcher#process(org.apache.zookeeper.proto.WatcherEvent)
*/
@Override
public void process(WatchedEvent event) {
dm.process(event);
}

public void run() {
try {
synchronized (this) {
while (!dm.dead) {
wait();
}
}
} catch (InterruptedException e) {
}
}

public void closing(int rc) {
synchronized (this) {
notifyAll();
}
}

static class StreamWriter extends Thread {
OutputStream os;

InputStream is;

StreamWriter(InputStream is, OutputStream os) {
this.is = is;
this.os = os;
start();
}

public void run() {
byte b[] = new byte[80];
int rc;
try {
while ((rc = is.read(b)) > 0) {
os.write(b, 0, rc);
}
} catch (IOException e) {
}

}
}

@Override
public void exists(byte[] data) {
if (data == null) {
if (child != null) {
System.out.println("Killing process");
child.destroy();
try {
child.waitFor();
} catch (InterruptedException e) {
}
}
child = null;
} else {
if (child != null) {
System.out.println("Stopping child");
child.destroy();
try {
child.waitFor();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
FileOutputStream fos = new FileOutputStream(filename);
fos.write(data);
fos.close();
} catch (IOException e) {
e.printStackTrace();
}
try {
System.out.println("Starting child");
child = Runtime.getRuntime().exec(exec);
new StreamWriter(child.getInputStream(), System.out);
new StreamWriter(child.getErrorStream(), System.err);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

参考


ZooKeeper Programmer's Guide
https://zookeeper.apache.org/doc/r3.4.6/javaExample.html