ZooKeeper Example

The following code excerpt shows how to use ZooKeeper to implement a "barrier." A barrier separates a process into two logical halves. Multiple machines running in coordination with one another will all perform the first half of the process. No machine can begin the second half of the process until everyone has completed the first half. The barrier sits between these processes. As nodes reach the barrier, they all wait until everyone has reached the barrier. Then all nodes are released to begin the second half. A distributed barrier implementation written for ZooKeeper follows:

Watcher watcher = new Watcher() {
public void process(WatchEvent event) {}
};

ZooKeeper zk = new ZooKeeper(hosts, 3000, watcher);

Object notifyObject = new Object();
String root;
int size;

Barrier(ZooKeeper zk, String name, int size) throws KeeperException, InterruptedException {
this.zk = zk;
this.root = name;
this.size = size;
// Make sure the barrier node exists
try {
zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
} catch (NodeExistsException e) {}
}


b.enter()
/** work with everyone **/
b.leave()



/**
* Join barrier
* @return
* @throws KeeperException
* @throws InterruptedException */
boolean enter() throws KeeperException, InterruptedException {
zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateFlags.EPHEMERAL);
while (true) {
synchronized (notifyObject) {
ArrayList list = zk.getChildren(root, new Watcher() {
public void process(Event e) { notifyObject.notifyAll(); }
});

if (list.size() < size) {
notifyObject.wait();
} else {
return true;
}
}
}
}

/**
* Wait until all reach barrier
* @return
* @throws KeeperException
* @throws InterruptedException */
boolean leave() throws KeeperException, InterruptedException {
zk.delete(root + "/" + name, 0);
while (true) {
synchronized (notifyObject) {
ArrayList list = zk.getChildren(root, new Watcher() {
public void process(Event e) { notifyObject.notifyAll(); }
});

if (list.size() > 0) {
notifyObject.wait();
} else {
return true;
}
}
}
}