Watcher watcher = new Watcher() {
public void process(WatchEvent event) {}
};
ZooKeeper zk = new ZooKeeper(hosts, 3000, watcher);
Object notifyObject = new Object();
String root;
int size;
Barrier(ZooKeeper zk, String name, int size) throws KeeperException, InterruptedException {
this.zk = zk;
this.root = name;
this.size = size;
// Make sure the barrier node exists
try {
zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
} catch (NodeExistsException e) {}
}
b.enter()
/** work with everyone **/
b.leave()
/**
* Join barrier
* @return
* @throws KeeperException
* @throws InterruptedException */
boolean enter() throws KeeperException, InterruptedException {
zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateFlags.EPHEMERAL);
while (true) {
synchronized (notifyObject) {
ArrayList
public void process(Event e) { notifyObject.notifyAll(); }
});
if (list.size() < size) {
notifyObject.wait();
} else {
return true;
}
}
}
}
/**
* Wait until all reach barrier
* @return
* @throws KeeperException
* @throws InterruptedException */
boolean leave() throws KeeperException, InterruptedException {
zk.delete(root + "/" + name, 0);
while (true) {
synchronized (notifyObject) {
ArrayList
public void process(Event e) { notifyObject.notifyAll(); }
});
if (list.size() > 0) {
notifyObject.wait();
} else {
return true;
}
}
}
}