zookeeper - java操作
ZKUtils.java
package test; import java.io.IOException; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.Watcher.Event.KeeperState; public class ZKUtils { public static ZooKeeper openZk() throws IOException, InterruptedException { //连接zookeeper成功的标志 final CountDownLatch connectedSignal = new CountDownLatch(1); ZooKeeper zk = new ZooKeeper(Test.connectString, Test.sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { if(KeeperState.SyncConnected.equals(event.getState())) { //连接成功则打开当前进程 connectedSignal.countDown(); } } }); //对CountDownLatch对象调用await()方法后,当前线程会堵塞等待,直到对象的计数器为0(调用对象的countDown()方法减1) //堵塞当前进程 connectedSignal.await(); return zk; } }
Test.java
package test; import java.io.IOException; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.AsyncCallback.VoidCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooDefs.Perms; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.auth.DigestAuthenticationProvider; public class Test { public static final String connectString = "hadoop1:2181"; public static final int sessionTimeout = 5000; public static void main(String[] args) throws Exception { //创建path // createZnode("/b"); //列出子znode // listChildren("/"); // delete("/b"); // deleteAsynchronous("/b"); // watch(); //强制客户端连接的服务区跟领导者进行同步,以更新指定path的状态,只能是异步调用 sync(); /* * 验证 */ // auth1(); // auth2(); } /** * 创建znode * @param path * @throws IOException * @throws InterruptedException * @throws KeeperException */ private static void createZnode(String path) throws IOException, InterruptedException, KeeperException { ZooKeeper zk = ZKUtils.openZk(); String createdPath = zk.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("创建" + createdPath + "成功"); // 创建znode, 数据是hello, ACL(访问控制列表)是完全放开的列表, 短暂类型的znode(session断开后,znode将被zookeeper服务器删除) // String createdPath = zk.create(path, "hello".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); // 这里打印的是 // System.out.println(createdPath); // 顺序的短暂znode // createdPath = zk.create(path, "hello".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); // 这里打印的是path000000000N, 路径名+10位的序列号 // System.out.println(createdPath); } /** * 列出子znode * @param parent * @throws IOException * @throws InterruptedException * @throws KeeperException */ private static void listChildren(String parent) throws IOException, InterruptedException, KeeperException { ZooKeeper zk = ZKUtils.openZk(); Stat state = zk.exists(parent, false); if(state == null) { return; } List<String> children = zk.getChildren(parent, false); for (String child : children) { System.out.println(child); } } /** * 删除znode及其子znode * @param path * @throws IOException * @throws InterruptedException * @throws KeeperException */ private static void delete(String path) throws IOException, InterruptedException, KeeperException { ZooKeeper zk = ZKUtils.openZk(); Stat state = zk.exists(path, false); if(state == null) { return; } System.out.println(state.getVersion()); List<String> children = zk.getChildren(path, false); for (String child : children) { delete(path + "/" + child); } //需要指定path和version, version为-1则取消版本号验证 zk.delete(path, -1); } /** * 异步操作 * zookeeper同时提供同步、异步两个版本的API,业务对读取效率没影响的情况下选择哪个方式都可以. * @param path * @throws IOException * @throws InterruptedException * @throws KeeperException */ private static void deleteAsynchronous(String path) throws IOException, InterruptedException, KeeperException { ZooKeeper zk = ZKUtils.openZk(); Stat state = zk.exists(path, false); if(state == null) { return; } List<String> children = zk.getChildren(path, false); for (String child : children) { delete(path + "/" + child); } //需要指定path和version, version为-1则取消版本号验证 // zk.delete(path, -1); zk.delete(path, -1, new VoidCallback() { @Override public void processResult(int rc, String path, Object ctx) { System.out.println("异步删除操作执行完毕 , rc: " + rc + ", path: " + path); } }, null); //等待一会, 否则主线程直接结束了就看不到异步线程的输出结果了 Thread.sleep(2000); } /** * 监听事件 * @throws IOException * @throws InterruptedException * @throws KeeperException */ private static void watch() throws IOException, InterruptedException, KeeperException { final CountDownLatch singal = new CountDownLatch(1); ZooKeeper zk = new ZooKeeper("hadoop1:2181", 5000, new Watcher() { @Override public void process(WatchedEvent event) { System.out.println("接受到了一个事件:" + event); if(Watcher.Event.KeeperState.SyncConnected.equals(event.getState())) { singal.countDown(); } } }); singal.await(); //exists、getData、getChildren操作可以设置监控 //判断"/b"是否存在并对其进行监控, 使用创建zookeeper时的watcher处理 zk.exists("/b", true); //使用指定的watcher处理 // zk.exists("/b", new Watcher()); Thread.sleep(Long.MAX_VALUE); } /** * 强制客户端连接的服务器跟领导者进行同步,以更新指定znode的状态 * 只能异步调用 * @throws IOException * @throws InterruptedException * @throws KeeperException */ private static void sync() throws IOException, InterruptedException, KeeperException { ZooKeeper zk = ZKUtils.openZk(); zk.sync("/a", new VoidCallback(){ @Override public void processResult(int rc, String path, Object ctx) { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("同步完毕"); } }, null); System.out.println("here"); Stat stat = zk.exists("/a", false); byte[] data = zk.getData("/a", false, stat); System.out.println("data: " + new String(data)); Thread.sleep(5000); } /** * 使用自定义ACL创建znode * @throws IOException * @throws InterruptedException * @throws KeeperException * @throws NoSuchAlgorithmException */ private static void auth1() throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException { List<ACL> acls = new ArrayList<ACL>(); //用户名密码验证方式 Id id = new Id("digest", DigestAuthenticationProvider.generateDigest("lisg:123456")); acls.add(new ACL(Perms.READ, id)); ZooKeeper zk = ZKUtils.openZk(); zk.create("/c", "test".getBytes(), acls, CreateMode.PERSISTENT); } /** * ACL验证 * @throws IOException * @throws InterruptedException * @throws KeeperException */ private static void auth2() throws IOException, InterruptedException, KeeperException { ZooKeeper zk = ZKUtils.openZk(); Stat stat = zk.exists("/c", false); if(stat == null) { return; } List<ACL> cacls = zk.getACL("/c", stat); System.out.println("/c的ACL列表是:" + cacls); //KeeperErrorCode = NoAuth for /c 异常 zk.addAuthInfo("digest", "lisg:123456".getBytes()); byte[] data = zk.getData("/c", false, stat); System.out.println("data: " + new String(data)); } }
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。