分类:ZooKeeper

Zookeeper学习(五)Apache Curator客户端的使用

1、curator与原生客户端之间的差异

原生api的不足:

  • 超时重连,不支持自动,需要手动操作

  • watch注册一次后会失效

  • 不支持递归创建节点

Apache Curator的优势:

  • Apache的开源项目

  • 解决watcher的注册一次就失效

  • Api更加简单易用

  • 提供更多解决方案并且实现简单

  • 提供常用的ZooKeeper工具类

2、Apache Curator的使用

会话连接与关闭:

package com.wjy329.curatordemo.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;


/**
 * @Author wjy329
 * @Time 2018/9/16上午11:02
 * @description curator会话连接
 */

public class CuratorConnect {
    //声明一个客户端
    public CuratorFramework client = null;
    //客户端ip
    public static final String zkServerPath = "172.16.106.130:2181";

    /**
     * 实例化zk客户端
     */
    public CuratorConnect() {
        /**
         * curator链接zookeeper的策略:RetryNTimes
         * n:重试的次数
         * sleepMsBetweenRetries:每次重试间隔的时间
         */
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);

        client = CuratorFrameworkFactory.builder()
                .connectString(zkServerPath)
                .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
                .namespace("workspace").build();
        client.start();
    }

    /**
     *
     * @Description: 关闭zk客户端连接
     */
    public void closeZKClient() {
        if (client != null) {
            this.client.close();
        }
    }

    public static void main(String[] args) throws Exception {
        // 实例化
        CuratorConnect curatorConnect = new CuratorConnect();
        boolean isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));

        Thread.sleep(3000);

        curatorConnect.closeZKClient();
        boolean isZkCuratorStarted2 = curatorConnect.client.isStarted();
        System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭"));
    }

}

curator连接zookeeper的策略大致有5种,上面的一种是推荐使用的,下面列出所有的策略:

      /**
       * curator链接zookeeper的策略:ExponentialBackoffRetry
       * baseSleepTimeMs:初始sleep的时间
       * maxRetries:最大重试次数
       * maxSleepMs:最大重试时间
       */
     RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
      
      /**
       * curator链接zookeeper的策略:RetryNTimes
       * n:重试的次数
       * sleepMsBetweenRetries:每次重试间隔的时间
       */
      RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
      
      /**
       * curator链接zookeeper的策略:RetryOneTime
       * sleepMsBetweenRetry:每次重试间隔的时间
       */
     RetryPolicy retryPolicy2 = new RetryOneTime(3000);
      
      /**
       * 永远重试,不推荐使用
       */
     RetryPolicy retryPolicy3 = new RetryForever(retryIntervalMs)
      
      /**
       * curator链接zookeeper的策略:RetryUntilElapsed
       * maxElapsedTimeMs:最大重试时间
       * sleepMsBetweenRetries:每次重试间隔
       * 重试时间超过maxElapsedTimeMs后,就不再重试
       */
     RetryPolicy retryPolicy4 = new RetryUntilElapsed(2000, 3000);

然后运行代码,连接成功后关闭。image.png

zookeeper命名空间及创建节点:

命名空间:

上面构造函数中的namespace就是命名空间,创建命名空间后,后续的操作都将在该命名空间下进行。

节点的增删改查:

添加节点:

package com.wjy329.curatordemo.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;


/**
 * @Author wjy329
 * @Time 2018/9/16上午11:02
 * @description curator会话连接
 */

public class CuratorConnect {
    public CuratorFramework client = null;
    public static final String zkServerPath = "172.16.106.130:2181";

    /**
     * 实例化zk客户端
     */
    public CuratorConnect() {
        /**
         * curator链接zookeeper的策略:RetryNTimes
         * n:重试的次数
         * sleepMsBetweenRetries:每次重试间隔的时间
         */
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);

        client = CuratorFrameworkFactory.builder()
                .connectString(zkServerPath)
                .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
                .namespace("workspace").build();
        client.start();
    }

    /**
     *
     * @Description: 关闭zk客户端连接
     */
    public void closeZKClient() {
        if (client != null) {
            this.client.close();
        }
    }

    public static void main(String[] args) throws Exception {
        // 实例化
        CuratorConnect curatorConnect = new CuratorConnect();
        boolean isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));

        //创建节点
        String nodePath = "/super/wjy";
        byte[] data = "wjy329".getBytes();
        curatorConnect.client.create().creatingParentsIfNeeded()
         .withMode(CreateMode.PERSISTENT)
         .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
         .forPath(nodePath, data);

        Thread.sleep(3000);

        curatorConnect.closeZKClient();
        boolean isZkCuratorStarted2 = curatorConnect.client.isStarted();
        System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭"));
    }

}

上面注释中写了创建节点的方法,运行后,我们在服务器中可以看到创建的命名空间和节点:

image.png
修改节点:

package com.wjy329.curatordemo.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;


/**
 * @Author wjy329
 * @Time 2018/9/16上午11:02
 * @description curator会话连接
 */

public class CuratorConnect {
    public CuratorFramework client = null;
    public static final String zkServerPath = "172.16.106.130:2181";

    /**
     * 实例化zk客户端
     */
    public CuratorConnect() {
        /**
         * curator链接zookeeper的策略:RetryNTimes
         * n:重试的次数
         * sleepMsBetweenRetries:每次重试间隔的时间
         */
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);

        client = CuratorFrameworkFactory.builder()
                .connectString(zkServerPath)
                .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
                .namespace("workspace").build();
        client.start();
    }

    /**
     *
     * @Description: 关闭zk客户端连接
     */
    public void closeZKClient() {
        if (client != null) {
            this.client.close();
        }
    }

    public static void main(String[] args) throws Exception {
        // 实例化
        CuratorConnect curatorConnect = new CuratorConnect();
        boolean isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));

     
        String nodePath = "/super/wjy";

        // 更新节点数据
        byte[] newData = "update".getBytes();
        curatorConnect.client.setData().withVersion(0).forPath(nodePath, newData);

        Thread.sleep(3000);

        curatorConnect.closeZKClient();
        boolean isZkCuratorStarted2 = curatorConnect.client.isStarted();
        System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭"));
    }

}

同样,修改的代码也在注释中写明了,这里需要注意的就是版本号,运行程序后:

image.png

删除节点:

package com.wjy329.curatordemo.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;


/**
 * @Author wjy329
 * @Time 2018/9/16上午11:02
 * @description curator会话连接
 */

public class CuratorConnect {
    public CuratorFramework client = null;
    public static final String zkServerPath = "172.16.106.130:2181";

    /**
     * 实例化zk客户端
     */
    public CuratorConnect() {
        /**
         * curator链接zookeeper的策略:RetryNTimes
         * n:重试的次数
         * sleepMsBetweenRetries:每次重试间隔的时间
         */
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);

        client = CuratorFrameworkFactory.builder()
                .connectString(zkServerPath)
                .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
                .namespace("workspace").build();
        client.start();
    }

    /**
     *
     * @Description: 关闭zk客户端连接
     */
    public void closeZKClient() {
        if (client != null) {
            this.client.close();
        }
    }

    public static void main(String[] args) throws Exception {
        // 实例化
        CuratorConnect curatorConnect = new CuratorConnect();
        boolean isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));

        //创建节点
        String nodePath = "/super/wjy";

        // 删除节点
        curatorConnect.client.delete()
              .guaranteed()                // 如果删除失败,那么在后端还是继续会删除,直到成功
              .deletingChildrenIfNeeded()  // 如果有子节点,就删除
              .withVersion(1)
              .forPath(nodePath);

        Thread.sleep(3000);

        curatorConnect.closeZKClient();
        boolean isZkCuratorStarted2 = curatorConnect.client.isStarted();
        System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭"));
    }

}

删除节点的部分也通过注释注明,删除的方法也应注意版本号,利用此方法删除的是指定路径下所有的子节点包括路径中的最后一个父节点,例如 节点为 /school/class/student/xiaowang   指定删除路径为 /school/class ,那么运行程序后,节点为:/school ,其余节点都被删掉,并且命名空间是不会被删掉的,这个可以自行验证。

image.png

读取节点数据:

package com.wjy329.curatordemo.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;


/**
 * @Author wjy329
 * @Time 2018/9/16上午11:02
 * @description curator会话连接
 */

public class CuratorConnect {
    public CuratorFramework client = null;
    public static final String zkServerPath = "172.16.106.130:2181";

    /**
     * 实例化zk客户端
     */
    public CuratorConnect() {
        /**
         * curator链接zookeeper的策略:RetryNTimes
         * n:重试的次数
         * sleepMsBetweenRetries:每次重试间隔的时间
         */
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);

        client = CuratorFrameworkFactory.builder()
                .connectString(zkServerPath)
                .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
                .namespace("workspace").build();
        client.start();
    }

    /**
     *
     * @Description: 关闭zk客户端连接
     */
    public void closeZKClient() {
        if (client != null) {
            this.client.close();
        }
    }

    public static void main(String[] args) throws Exception {
        // 实例化
        CuratorConnect curatorConnect = new CuratorConnect();
        boolean isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));

        //创建节点
        String nodePath = "/super";

        //读取节点数据
      Stat stat = new Stat();
      byte[] data = curatorConnect.client.getData().storingStatIn(stat).forPath(nodePath);
      System.out.println("节点" + nodePath + "的数据为: " + new String(data));
      System.out.println("该节点的版本号为: " + stat.getVersion());

        Thread.sleep(3000);

        curatorConnect.closeZKClient();
        boolean isZkCuratorStarted2 = curatorConnect.client.isStarted();
        System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭"));
    }

}

运行程序:

image.png

读取子节点:

package com.wjy329.curatordemo.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;

import java.util.List;


/**
 * @Author wjy329
 * @Time 2018/9/16上午11:02
 * @description curator会话连接
 */

public class CuratorConnect {
    public CuratorFramework client = null;
    public static final String zkServerPath = "172.16.106.130:2181";

    /**
     * 实例化zk客户端
     */
    public CuratorConnect() {
        /**
         * curator链接zookeeper的策略:RetryNTimes
         * n:重试的次数
         * sleepMsBetweenRetries:每次重试间隔的时间
         */
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);

        client = CuratorFrameworkFactory.builder()
                .connectString(zkServerPath)
                .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
                .namespace("workspace").build();
        client.start();
    }

    /**
     *
     * @Description: 关闭zk客户端连接
     */
    public void closeZKClient() {
        if (client != null) {
            this.client.close();
        }
    }

    public static void main(String[] args) throws Exception {
        // 实例化
        CuratorConnect curatorConnect = new CuratorConnect();
        boolean isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));

        //创建节点
        String nodePath = "/super";

        // 查询子节点
      List<String> childNodes = curatorConnect.client.getChildren()
                                 .forPath(nodePath);
      System.out.println("开始打印子节点:");
      for (String s : childNodes) {
         System.out.println(s);
      }


        // 判断节点是否存在,如果不存在则为空
//    Stat statExist = cto.client.checkExists().forPath(nodePath + "/abc");
//    System.out.println(statExist);

        Thread.sleep(3000);

        curatorConnect.closeZKClient();
        boolean isZkCuratorStarted2 = curatorConnect.client.isStarted();
        System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭"));
    }

}

可以看到控制台输出子节点:

image.png

上述代码还有一个能判断节点是否存在的代码,自行运行看效果吧。

使用watcher:

package com.wjy329.curatordemo.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;

import java.util.List;


/**
 * @Author wjy329
 * @Time 2018/9/16上午11:02
 * @description curator会话连接
 */

public class CuratorConnect {
    public CuratorFramework client = null;
    public static final String zkServerPath = "172.16.106.130:2181";

    /**
     * 实例化zk客户端
     */
    public CuratorConnect() {
        /**
         * curator链接zookeeper的策略:RetryNTimes
         * n:重试的次数
         * sleepMsBetweenRetries:每次重试间隔的时间
         */
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);

        client = CuratorFrameworkFactory.builder()
                .connectString(zkServerPath)
                .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
                .namespace("workspace").build();
        client.start();
    }

    /**
     *
     * @Description: 关闭zk客户端连接
     */
    public void closeZKClient() {
        if (client != null) {
            this.client.close();
        }
    }

    public static void main(String[] args) throws Exception {
        // 实例化
        CuratorConnect curatorConnect = new CuratorConnect();
        boolean isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));

        //创建节点
        String nodePath = "/super/aaa";

        // watcher 事件  当使用usingWatcher的时候,监听只会触发一次,监听完毕后就销毁
        curatorConnect.client.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath);
//      curatorConnect.client.getData().usingWatcher(new MyWatcher()).forPath(nodePath);

        Thread.sleep(100000);

        curatorConnect.closeZKClient();
        boolean isZkCuratorStarted2 = curatorConnect.client.isStarted();
        System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭"));
    }

}

上述我们看到有两个watcher的方式,下面都给出两个wathcer的实现类,只演示第一种。

package com.wjy329.curatordemo.curator;

import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.zookeeper.WatchedEvent;

public class MyCuratorWatcher implements CuratorWatcher {

   @Override
   public void process(WatchedEvent event) throws Exception {
      System.out.println("触发watcher,节点路径为:" + event.getPath());
   }

}


=============================================

package com.wjy329.curatordemo.curator;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

public class MyWatcher implements Watcher {

   @Override
   public void process(WatchedEvent event) {
      System.out.println("触发watcher,节点路径为:" + event.getPath());
   }

}

运行程序后,控制台连接成功后开始等待触发,注意线程睡眠时间稍微设置长一些,然后在服务器中作出相应的修改操作,就会触发watcher事件。

image.png

image.png

一次注册N次监听:

上面代码虽然实现了watcher的监听,但是发现是一次性的,在触发监听完成后就会销毁,下面我们再来做一个永久监听的操作:

package com.wjy329.curatordemo.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;

import java.util.List;


/**
 * @Author wjy329
 * @Time 2018/9/16上午11:02
 * @description curator会话连接
 */

public class CuratorConnect {
    public CuratorFramework client = null;
    public static final String zkServerPath = "172.16.106.130:2181";

    /**
     * 实例化zk客户端
     */
    public CuratorConnect() {
        /**
         * curator链接zookeeper的策略:RetryNTimes
         * n:重试的次数
         * sleepMsBetweenRetries:每次重试间隔的时间
         */
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);

        client = CuratorFrameworkFactory.builder()
                .connectString(zkServerPath)
                .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
                .namespace("workspace").build();
        client.start();
    }

    /**
     *
     * @Description: 关闭zk客户端连接
     */
    public void closeZKClient() {
        if (client != null) {
            this.client.close();
        }
    }

    public static void main(String[] args) throws Exception {
        // 实例化
        CuratorConnect curatorConnect = new CuratorConnect();
        boolean isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));

        //创建节点
        String nodePath = "/super/aaa";

// 为节点添加watcher
        // NodeCache: 监听数据节点的变更,会触发事件
      final NodeCache nodeCache = new NodeCache(curatorConnect.client, nodePath);
      // buildInitial : 初始化的时候获取node的值并且缓存,不加true或者为false,则初始化不缓存数据
      nodeCache.start(true);
      if (nodeCache.getCurrentData() != null) {
         System.out.println("节点初始化数据为:" + new String(nodeCache.getCurrentData().getData()));
      } else {
         System.out.println("节点初始化数据为空...");
      }
      nodeCache.getListenable().addListener(new NodeCacheListener() {
         public void nodeChanged() throws Exception {
            if (nodeCache.getCurrentData() == null) {
               System.out.println("空");
               return;
            }
            String data = new String(nodeCache.getCurrentData().getData());
            System.out.println("节点路径:" + nodeCache.getCurrentData().getPath() + "数据:" + data);
         }
      });


        Thread.sleep(100000);

        curatorConnect.closeZKClient();
        boolean isZkCuratorStarted2 = curatorConnect.client.isStarted();
        System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭"));
    }

}

image.png

image.png

子节点监听:

package com.wjy329.curatordemo.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;

import java.util.List;


/**
 * @Author wjy329
 * @Time 2018/9/16上午11:02
 * @description curator会话连接
 */

public class CuratorConnect {
    public CuratorFramework client = null;
    public static final String zkServerPath = "172.16.106.130:2181";

    /**
     * 实例化zk客户端
     */
    public CuratorConnect() {
        /**
         * curator链接zookeeper的策略:RetryNTimes
         * n:重试的次数
         * sleepMsBetweenRetries:每次重试间隔的时间
         */
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);

        client = CuratorFrameworkFactory.builder()
                .connectString(zkServerPath)
                .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
                .namespace("workspace").build();
        client.start();
    }

    /**
     *
     * @Description: 关闭zk客户端连接
     */
    public void closeZKClient() {
        if (client != null) {
            this.client.close();
        }
    }

    public static void main(String[] args) throws Exception {
        // 实例化
        CuratorConnect curatorConnect = new CuratorConnect();
        boolean isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));

        //创建节点
        String nodePath = "/super";

        // 为子节点添加watcher
        // PathChildrenCache: 监听数据节点的增删改,会触发事件
        String childNodePathCache =  nodePath;
        // cacheData: 设置缓存节点的数据状态
        final PathChildrenCache childrenCache = new PathChildrenCache(curatorConnect.client, childNodePathCache, true);
        /**
         * StartMode: 初始化方式
         * POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件
         * NORMAL:异步初始化
         * BUILD_INITIAL_CACHE:同步初始化
         */
        childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);

        List<ChildData> childDataList = childrenCache.getCurrentData();
        System.out.println("当前数据节点的子节点数据列表:");
        for (ChildData cd : childDataList) {
            String childData = new String(cd.getData());
            System.out.println(childData);
        }

        childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                if(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)){
                    System.out.println("子节点初始化ok...");
                }

                else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){
                    String path = event.getData().getPath();
                    if (path.equals(ADD_PATH)) {
                        System.out.println("添加子节点:" + event.getData().getPath());
                        System.out.println("子节点数据:" + new String(event.getData().getData()));
                    } else if (path.equals("/super/imooc/e")) {
                        System.out.println("添加不正确...");
                    }

                }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){
                    System.out.println("删除子节点:" + event.getData().getPath());
                }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
                    System.out.println("修改子节点路径:" + event.getData().getPath());
                    System.out.println("修改子节点数据:" + new String(event.getData().getData()));
                }
            }
        });


        Thread.sleep(100000);

        curatorConnect.closeZKClient();
        boolean isZkCuratorStarted2 = curatorConnect.client.isStarted();
        System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭"));
    }
    public final static String ADD_PATH = "/super/imooc/d";

}

这部分代码的演示其实是差不多的,这就不过多说明。

扩展部分:Zookeeper学习之Apache Curator使用实例-统一更新N台机器的节点配置

ACL相关操作:

package com.wjy329.curatordemo.curator;


import com.wjy329.curatordemo.utils.AclUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;

import java.util.ArrayList;
import java.util.List;

public class CuratorAcl {

   public CuratorFramework client = null;
   public static final String zkServerPath = "172.16.106.130:2181";

   public CuratorAcl() {
      RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
      client = CuratorFrameworkFactory.builder().authorization("digest", "imooc1:123456".getBytes())
            .connectString(zkServerPath)
            .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
            .namespace("workspace").build();
      client.start();
   }
   
   public void closeZKClient() {
      if (client != null) {
         this.client.close();
      }
   }
   
   public static void main(String[] args) throws Exception {
      // 实例化
      CuratorAcl cto = new CuratorAcl();
      boolean isZkCuratorStarted = cto.client.isStarted();
      System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));
      
      String nodePath = "/acl/father";
      
      List<ACL> acls = new ArrayList<ACL>();
      Id wjy1 = new Id("digest", AclUtils.getDigestUserPwd("wjy1:123456"));
      Id wjy2 = new Id("digest", AclUtils.getDigestUserPwd("wjy2:123456"));
      acls.add(new ACL(Perms.ALL, wjy1));
      acls.add(new ACL(Perms.READ, wjy2));
      acls.add(new ACL(Perms.DELETE | Perms.CREATE, wjy2));

      // 创建节点
      byte[] data = "wjy329".getBytes();
      cto.client.create().creatingParentsIfNeeded()
            .withMode(CreateMode.PERSISTENT)
            .withACL(acls, true)
            .forPath(nodePath, data);

      //也可以使用这种方式设置权限,如果节点已经有权限需要认证后才能设置权限
      //cto.client.setACL().withACL(acls).forPath("/curatorNode");

      
      cto.closeZKClient();
      boolean isZkCuratorStarted2 = cto.client.isStarted();
      System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭"));
   }
   
}

acl部分就直接上相关代码了,就不进行演示了。

Zookeeper学习之Apache Curator使用实例-统一更新N台机器的节点配置

统一配置管理,对集群中的任意一台机器修改,其他机器也会自动的修改相关的配置。

这里采用3台机器演示:分别新建3个文件,client1.java、client2.java、client3.java,这3个文件除了地址不一样外,其他部分都一样,请自行修改;

package com.wjy329.curatordemo.curator.checkConfig;


import com.wjy329.curatordemo.utils.JsonUtils;
import com.wjy329.curatordemo.utils.RedisConfig;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.RetryNTimes;

import java.util.concurrent.CountDownLatch;

public class Client1 {

   public CuratorFramework client = null;
   public static final String zkServerPath = "172.16.106.130:2181";

   public Client1() {
      RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
      client = CuratorFrameworkFactory.builder()
            .connectString(zkServerPath)
            .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
            .namespace("workspace").build();
      client.start();
   }
   
   public void closeZKClient() {
      if (client != null) {
         this.client.close();
      }
   }
   
// public final static String CONFIG_NODE = "/super/imooc/redis-config";
   public final static String CONFIG_NODE_PATH = "/super/wjy";
   public final static String SUB_PATH = "/redis-config";
   public static CountDownLatch countDown = new CountDownLatch(1);
   
   public static void main(String[] args) throws Exception {
      Client1 cto = new Client1();
      System.out.println("client1 启动成功...");
      
      final PathChildrenCache childrenCache = new PathChildrenCache(cto.client, CONFIG_NODE_PATH, true);
      childrenCache.start(StartMode.BUILD_INITIAL_CACHE);
      
      // 添加监听事件
      childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
         public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
            // 监听节点变化
            if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
               String configNodePath = event.getData().getPath();
               if (configNodePath.equals(CONFIG_NODE_PATH + SUB_PATH)) {
                  System.out.println("监听到配置发生变化,节点路径为:" + configNodePath);
                  
                  // 读取节点数据
                  String jsonConfig = new String(event.getData().getData());
                  System.out.println("节点" + CONFIG_NODE_PATH + "的数据为: " + jsonConfig);
                  
                  // 从json转换配置
                  RedisConfig redisConfig = null;
                  if (StringUtils.isNotBlank(jsonConfig)) {
                     redisConfig = JsonUtils.jsonToPojo(jsonConfig, RedisConfig.class);
                  }
                  
                  // 配置不为空则进行相应操作
                  if (redisConfig != null) {
                     String type = redisConfig.getType();
                     String url = redisConfig.getUrl();
                     String remark = redisConfig.getRemark();
                     // 判断事件
                     if (type.equals("add")) {
                        System.out.println("监听到新增的配置,准备下载...");
                        // ... 连接ftp服务器,根据url找到相应的配置
                        Thread.sleep(500);
                        System.out.println("开始下载新的配置文件,下载路径为<" + url + ">");
                        // ... 下载配置到你指定的目录
                        Thread.sleep(1000);
                        System.out.println("下载成功,已经添加到项目中");
                        // ... 拷贝文件到项目目录
                     } else if (type.equals("update")) {
                        System.out.println("监听到更新的配置,准备下载...");
                        // ... 连接ftp服务器,根据url找到相应的配置
                        Thread.sleep(500);
                        System.out.println("开始下载配置文件,下载路径为<" + url + ">");
                        // ... 下载配置到你指定的目录
                        Thread.sleep(1000);
                        System.out.println("下载成功...");
                        System.out.println("删除项目中原配置文件...");
                        Thread.sleep(100);
                        // ... 删除原文件
                        System.out.println("拷贝配置文件到项目目录...");
                        // ... 拷贝文件到项目目录
                     } else if (type.equals("delete")) {
                        System.out.println("监听到需要删除配置");
                        System.out.println("删除项目中原配置文件...");
                     }
                     
                     // TODO 视情况统一重启服务
                  }
               }
            }
         }
      });
      
      countDown.await();
      
      cto.closeZKClient();
   }
   
}

列出项目中的工具类:

JSON工具类:

package com.wjy329.curatordemo.utils;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.List;

/**
 * 
 * @Title: JsonUtils.java
 * @Package com.lee.utils
 * @Description: JSON/对象转换类
 * Copyright: Copyright (c) 2016
 * Company:Nathan.Lee.Salvatore
 * 
 * @author leechenxiang
 * @date 2016年4月29日 下午11:05:03
 * @version V1.0
 */
public class JsonUtils {

    // 定义jackson对象
    private static final ObjectMapper MAPPER = new ObjectMapper();

    /**
     * 将对象转换成json字符串。
     * <p>Title: pojoToJson</p>
     * <p>Description: </p>
     * @param data
     * @return
     */
    public static String objectToJson(Object data) {
       try {
         String string = MAPPER.writeValueAsString(data);
         return string;
      } catch (JsonProcessingException e) {
         e.printStackTrace();
      }
       return null;
    }
    
    /**
     * 将json结果集转化为对象
     * 
     * @param jsonData json数据
     * @param clazz 对象中的object类型
     * @return
     */
    public static <T> T jsonToPojo(String jsonData, Class<T> beanType) {
        try {
            T t = MAPPER.readValue(jsonData, beanType);
            return t;
        } catch (Exception e) {
           e.printStackTrace();
        }
        return null;
    }
    
    /**
     * 将json数据转换成pojo对象list
     * <p>Title: jsonToList</p>
     * <p>Description: </p>
     * @param jsonData
     * @param beanType
     * @return
     */
    public static <T>List<T> jsonToList(String jsonData, Class<T> beanType) {
       JavaType javaType = MAPPER.getTypeFactory().constructParametricType(List.class, beanType);
       try {
          List<T> list = MAPPER.readValue(jsonData, javaType);
          return list;
      } catch (Exception e) {
         e.printStackTrace();
      }
       
       return null;
    }
    
}

模拟redis的配置:

package com.wjy329.curatordemo.utils;

public class RedisConfig {

   private String type;   // add 新增配置    update 更新配置    delete 删除配置
   private String url;       // 如果是add或update,则提供下载地址
   private String remark; // 备注
   
   public String getType() {
      return type;
   }
   public void setType(String type) {
      this.type = type;
   }
   public String getUrl() {
      return url;
   }
   public void setUrl(String url) {
      this.url = url;
   }
   public String getRemark() {
      return remark;
   }
   public void setRemark(String remark) {
      this.remark = remark;
   }
}

修改的配置文件:

{"type":"add","url":"ftp://192.168.10.123/config/redis.xml","remark":"add"}

{"type":"update","url":"ftp://192.168.10.123/config/redis.xml","remark":"update"}
{"type":"delete","url":"","remark":"delete"}

上述代码都准备完成后,启动client1.java、client2.java、client3.java这三个客户端。

image.png

然后对其中任意一台进行修改:

image.png

image.pngimage.png

image.png


这时我们观察到三个控制台都相应的更新完成。

Zookeeper学习(四)Zookeeper原生Java API 客户端开发

1、客户端与zookeeper服务端的连接

首先我们解压下载的zookeeper的tar文件,然后把如下图的几个jar包都添加到项目中。

image.png

项目结构图如下:

image.png

先附上日志的配置文件:

log4j.xml:

<?xml version="1.0" encoding="GB2312" ?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">


    <appender name="com.wjy329.console" class="org.apache.log4j.ConsoleAppender">
        <layout class="org.apache.log4j.PatternLayout">
            <param name="ConversionPattern" value="%c %d{ISO8601}-- %p -- %m%n" />
        </layout>
    </appender>

    <appender name="com.wjy329.all" class="org.apache.log4j.RollingFileAppender">
        <!-- 设置通道ID:org.zblog.all和输出方式:org.apache.log4j.RollingFileAppender -->
        <param name="File" value="/Users/wjy329/desktop/zklog/all.output.log"/><!-- 设置File参数:日志输出文件名 -->
        <param name="Append" value="false"/><!-- 设置是否在重新启动服务时,在原有日志的基础添加新日志 -->
        <param name="MaxBackupIndex" value="10"/>
        <layout class="org.apache.log4j.PatternLayout">
            <param name="ConversionPattern" value="%p (%c:%L)- %m%n"/><!-- 设置输出文件项目和格式 -->
        </layout>
    </appender>
    <appender name="com.wjy329.wjy" class="org.apache.log4j.RollingFileAppender">
    <param name="File" value="/Users/wjy329/desktop/zklog/wjy.output.log"/>
    <param name="Append" value="true"/>
    <param name="MaxFileSize" value="10240"/> <!-- 设置文件大小 -->
    <param name="MaxBackupIndex" value="10"/>
    <layout class="org.apache.log4j.PatternLayout">
        <param name="ConversionPattern" value="%p (%c:%L)- %m%n"/>
    </layout>
</appender>


    <logger name="wjy.log"> <!-- 设置域名限制 -->
        <level value="info"/><!-- 设置级别 -->
        <appender-ref ref="com.wjy329.wjy"/><!-- 与前面的通道id相对应 -->
    </logger>
    <!-- 根logger的设置-->
    <root>
        <level value="INFO" />
        <appender-ref ref="com.wjy329.console" />
        <appender-ref ref="com.wjy329.all" />
    </root>
</log4j:configuration>

ZKConnect.java:

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**

 * @description:
 *
 * @author: wjy329
 * @param:
 * @return:
 * @create: 2018-09-12 15:00
 **/
public class ZKConnect implements Watcher {
    final static Logger log = LoggerFactory.getLogger(ZKConnect.class);
    //zookeeper集群的ip
    private static final String zkServerPath="172.16.106.130:2181,172.16.106.131:2181,172.16.106.132:2181";
    //超时时间
    private static final Integer timeout = 5000;

    public static void main(String[] args) throws Exception {
        ZooKeeper zk = new ZooKeeper(zkServerPath,timeout,new ZKConnect());
        log.info("客户端开始连接zookeeper服务器...");
        log.info("连接状态:"+zk.getState());
        new Thread().sleep(2000);

        log.info("连接状态:"+zk.getState());
    }

    @Override
    public void process(WatchedEvent event) {
        log.info("接受到watch通知:"+event);
    }
}

上面就可以连接zookeeper服务端,接下来我们运行,可以看到控制台输出相关信息:

image.png2、zookeeper会话重连

直接上代码:

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**

 * @description: 会话重连
 * @author: wjy329
 * @param:
 * @return:
 * @create: 2018-09-13 10:42
 **/
public class ZKConnectSessionWatcher implements Watcher {

    final static Logger log = LoggerFactory.getLogger(ZKConnectSessionWatcher.class);

    public static final String zkServerPath = "172.16.106.130:2181";
    public static final Integer timeout = 5000;

    public static void main(String[] args) throws Exception{
        ZooKeeper zk = new ZooKeeper(zkServerPath,timeout,new ZKConnectSessionWatcher());
        log.warn("客户端开始连接zookeeper服务器...");
        log.warn("连接状态:{}",zk.getState());
        new Thread().sleep(1000);
        log.warn("连接状态:{}",zk.getState());
        //获取session id
        long sessionId = zk.getSessionId();
        //获取session 密码
        byte[] sessionPassword = zk.getSessionPasswd();
        System.out.println("此时的sessionID为"+sessionId);
        new Thread().sleep(200);

        //开始会话重连
        log.warn("开始会话重连...");

        ZooKeeper zkSession = new ZooKeeper(zkServerPath,timeout,
                            new ZKConnectSessionWatcher(),sessionId,sessionPassword);
        log.warn("重新连接状态zkSession:{}",zkSession.getState());
        new Thread().sleep(1000);
        log.warn("重新连接状态zkSession:{}",zkSession.getState());
        System.out.println("此时的sessionID为"+zkSession.getSessionId());
    }

    @Override
    public void process(WatchedEvent event) {
        log.warn("接受到watch通知:"+event);
    }
}

上面运行后,我们可以看运行结果:

image.png3、节点的增删改查

  • 同步方式创建节点:

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;

import java.io.IOException;
import java.util.List;

/**
 * @description:  节点操作
 * @author: wjy329
 * @param:
 * @return:
 * @create: 2018-09-13 14:52
 **/
public class ZKNodeOperator implements Watcher{

    private ZooKeeper zooKeeper = null;

    private static final String zkServerPath = "172.16.106.130:2181";
    private static final Integer timeout = 5000;

    public ZKNodeOperator(){}

    public ZKNodeOperator(String connectString){
        try{
            zooKeeper = new ZooKeeper(connectString,timeout,new ZKNodeOperator());
        }catch (IOException e){
            e.printStackTrace();
            if(zooKeeper != null){
                try {
                    zooKeeper.close();
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) throws Exception{
        ZKNodeOperator zkServer = new ZKNodeOperator(zkServerPath);
        //创建zookeeper节点
        zkServer.createZKNode("/testnode","testnode".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
    }

    public ZooKeeper getZooKeeper() {
        return zooKeeper;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    /**
     * @description  创建zookeeper节点
     * @param path 创建的路径
     * @param data 存储数据的byte[]
     * @param acls 控制权限策略
     * @author wjy329
     * @date 2018/9/13
     */
    public void createZKNode(String path, byte[] data, List<ACL> acls){
        String result = "";
        try{
            /**
             * acl:控制权限策略:Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa
             *                  CREATOR_ALL_ACL --> auth:user:password:cdrwa
             * createMode: 节点类型:是一个枚举
             *              PERSISTENT:持久节点
             *              PERSISTENT_SEQUENTIAL:持久顺序节点
             *              EPHEMERAL:临时节点
             *              EPHEMERAL_SEQUENTIAL:临时顺序节点
             * */
            result = zooKeeper.create(path,data,acls, CreateMode.EPHEMERAL);

            System.out.println("创建节点:\t"+result+"\t成功...");
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    @Override
    public void process(WatchedEvent event) {

    }
}

运行上述代码,创建了一个临时节点,当我们在创建完后,执行ls / 会发现节点已经创建,再次 ls /发现节点不见了,这也是zookeeper的心跳机制导致的,因为主程序运行后,会话结束,心跳机制检测到会话已经结束,那么临时节点也就消失。

image.png

image.png

  • 异步方式创建节点:

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;

import java.io.IOException;
import java.util.List;

/**
 * @description:  节点操作
 * @author: wjy329
 * @param:
 * @return:
 * @create: 2018-09-13 14:52
 **/
public class ZKNodeOperator implements Watcher{

    private ZooKeeper zooKeeper = null;

    private static final String zkServerPath = "172.16.106.130:2181";
    private static final Integer timeout = 5000;

    public ZKNodeOperator(){}

    public ZKNodeOperator(String connectString){
        try{
            zooKeeper = new ZooKeeper(connectString,timeout,new ZKNodeOperator());
        }catch (IOException e){
            e.printStackTrace();
            if(zooKeeper != null){
                try {
                    zooKeeper.close();
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) throws Exception{
        ZKNodeOperator zkServer = new ZKNodeOperator(zkServerPath);
        //创建zookeeper节点
        zkServer.createZKNode("/testTwo","testTwo".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
    }

    public ZooKeeper getZooKeeper() {
        return zooKeeper;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    /**
     * @description  创建zookeeper节点
     * @param path 创建的路径
     * @param data 存储数据的byte[]
     * @param acls 控制权限策略
     * @author wjy329
     * @date 2018/9/13
     */
    public void createZKNode(String path, byte[] data, List<ACL> acls){
        String result = "";
        try{
            /**
             * acl:控制权限策略:Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa
             *                  CREATOR_ALL_ACL --> auth:user:password:cdrwa
             * createMode: 节点类型:是一个枚举
             *              PERSISTENT:持久节点
             *              PERSISTENT_SEQUENTIAL:持久顺序节点
             *              EPHEMERAL:临时节点
             *              EPHEMERAL_SEQUENTIAL:临时顺序节点
             * */
            //result = zooKeeper.create(path,data,acls, CreateMode.EPHEMERAL);
            String ctx = "{'create':'success'}";
            zooKeeper.create(path,data,acls,CreateMode.PERSISTENT,new CreateCallBack(),ctx);
            System.out.println("创建节点:\t"+result+"\t成功...");
            new Thread().sleep(2000);
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    @Override
    public void process(WatchedEvent event) {

    }
}

回调函数CreateCallBack.java:

import org.apache.zookeeper.AsyncCallback;

/**
 * @Author wjy329
 * @Time 2018/9/13下午3:43
 * @description 异步创建节点回调函数
 */

public class CreateCallBack implements AsyncCallback.StringCallback{
    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
        System.out.println("创建节点:"+path);
        System.out.println(ctx);
    }
}

运行程序,控制台显示相关信息:

image.png

由于异步创建了永久节点,所以这个节点永久存在image.png

  • 修改节点

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.List;

/**
 * @description:  节点操作
 * @author: wjy329
 * @param:
 * @return:
 * @create: 2018-09-13 14:52
 **/
public class ZKNodeOperator implements Watcher{

    private ZooKeeper zooKeeper = null;

    private static final String zkServerPath = "172.16.106.130:2181";
    private static final Integer timeout = 5000;

    public ZKNodeOperator(){}

    public ZKNodeOperator(String connectString){
        try{
            zooKeeper = new ZooKeeper(connectString,timeout,new ZKNodeOperator());
        }catch (IOException e){
            e.printStackTrace();
            if(zooKeeper != null){
                try {
                    zooKeeper.close();
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) throws Exception{
        ZKNodeOperator zkServer = new ZKNodeOperator(zkServerPath);
        //创建zookeeper节点
        //zkServer.createZKNode("/testTwo","testTwo".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
        //修改节点信息
        //第一个参数为 修改的节点路径,第二个参数 修改的数据 第三个参数 版本号
        Stat status = zkServer.getZooKeeper().setData("/testTwo","wjy329".getBytes(),0);
        System.out.println("输出的结果是"+status.getVersion());
    }

    public ZooKeeper getZooKeeper() {
        return zooKeeper;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    /**
     * @description  创建zookeeper节点
     * @param path 创建的路径
     * @param data 存储数据的byte[]
     * @param acls 控制权限策略
     * @author wjy329
     * @date 2018/9/13
     */
    public void createZKNode(String path, byte[] data, List<ACL> acls){
        String result = "";
        try{
            /**
             * acl:控制权限策略:Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa
             *                  CREATOR_ALL_ACL --> auth:user:password:cdrwa
             * createMode: 节点类型:是一个枚举
             *              PERSISTENT:持久节点
             *              PERSISTENT_SEQUENTIAL:持久顺序节点
             *              EPHEMERAL:临时节点
             *              EPHEMERAL_SEQUENTIAL:临时顺序节点
             * */
            //result = zooKeeper.create(path,data,acls, CreateMode.EPHEMERAL);
            String ctx = "{'create':'success'}";
            zooKeeper.create(path,data,acls,CreateMode.PERSISTENT,new CreateCallBack(),ctx);
            System.out.println("创建节点:\t"+result+"\t成功...");
            new Thread().sleep(2000);
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    @Override
    public void process(WatchedEvent event) {

    }
}

修改的代码在main()函数中,注意查看。这里特别注意第三个参数,版本号,一定要与当前版本号一致,然后Stat为返回的节点信息,可以获取到一些节点的信息。这里返回修改完成的版本号。

image.png

image.png

  • 同步删除节点

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.List;

/**
 * @description:  节点操作
 * @author: wjy329
 * @param:
 * @return:
 * @create: 2018-09-13 14:52
 **/
public class ZKNodeOperator implements Watcher{

    private ZooKeeper zooKeeper = null;

    private static final String zkServerPath = "172.16.106.130:2181";
    private static final Integer timeout = 5000;

    public ZKNodeOperator(){}

    public ZKNodeOperator(String connectString){
        try{
            zooKeeper = new ZooKeeper(connectString,timeout,new ZKNodeOperator());
        }catch (IOException e){
            e.printStackTrace();
            if(zooKeeper != null){
                try {
                    zooKeeper.close();
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) throws Exception{
        ZKNodeOperator zkServer = new ZKNodeOperator(zkServerPath);
        //创建zookeeper节点
        //zkServer.createZKNode("/testTwo","testTwo".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
        //修改节点信息
        //第一个参数为 修改的节点路径,第二个参数 修改的数据 第三个参数 版本号
        //Stat status = zkServer.getZooKeeper().setData("/testTwo","wjy329".getBytes(),0);
        //删除节点信息,第一个参数是路径,第二个参数是版本号
        zkServer.getZooKeeper().delete("/testTwo",1);
        System.out.println("删除成功");
    }

    public ZooKeeper getZooKeeper() {
        return zooKeeper;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    /**
     * @description  创建zookeeper节点
     * @param path 创建的路径
     * @param data 存储数据的byte[]
     * @param acls 控制权限策略
     * @author wjy329
     * @date 2018/9/13
     */
    public void createZKNode(String path, byte[] data, List<ACL> acls){
        String result = "";
        try{
            /**
             * acl:控制权限策略:Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa
             *                  CREATOR_ALL_ACL --> auth:user:password:cdrwa
             * createMode: 节点类型:是一个枚举
             *              PERSISTENT:持久节点
             *              PERSISTENT_SEQUENTIAL:持久顺序节点
             *              EPHEMERAL:临时节点
             *              EPHEMERAL_SEQUENTIAL:临时顺序节点
             * */
            //result = zooKeeper.create(path,data,acls, CreateMode.EPHEMERAL);
            String ctx = "{'create':'success'}";
            zooKeeper.create(path,data,acls,CreateMode.PERSISTENT,new CreateCallBack(),ctx);
            System.out.println("创建节点:\t"+result+"\t成功...");
            new Thread().sleep(2000);
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    @Override
    public void process(WatchedEvent event) {

    }
}

同样,我们也需要看main()方法中的删除即可

image.png

  • 异步删除节点

由于同步的删除没有任何返回状态,一般情况下,我们采用异步的方式。

异步删除回调函数:

import org.apache.zookeeper.AsyncCallback;

/**

 * @description: 异步删除回调函数
 *
 * @author: wjy329
 * @return:
 * @create: 2018-09-13 19:23
 **/
public class DeleteCallBack implements AsyncCallback.VoidCallback{
    @Override
    public void processResult(int rc, String path, Object ctx) {
        System.out.println("删除节点"+path);
        System.out.println((String) ctx);
    }
}

异步删除:

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.List;

/**
 * @description:  节点操作
 * @author: wjy329
 * @param:
 * @return:
 * @create: 2018-09-13 14:52
 **/
public class ZKNodeOperator implements Watcher{

    private ZooKeeper zooKeeper = null;

    private static final String zkServerPath = "172.16.106.130:2181";
    private static final Integer timeout = 5000;

    public ZKNodeOperator(){}

    public ZKNodeOperator(String connectString){
        try{
            zooKeeper = new ZooKeeper(connectString,timeout,new ZKNodeOperator());
        }catch (IOException e){
            e.printStackTrace();
            if(zooKeeper != null){
                try {
                    zooKeeper.close();
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) throws Exception{
        ZKNodeOperator zkServer = new ZKNodeOperator(zkServerPath);
        //创建zookeeper节点
        //zkServer.createZKNode("/testTwo","testTwo".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
        //修改节点信息
        //第一个参数为 修改的节点路径,第二个参数 修改的数据 第三个参数 版本号
        //Stat status = zkServer.getZooKeeper().setData("/testTwo","wjy329".getBytes(),0);
        //删除节点信息,第一个参数是路径,第二个参数是版本号
        String ctx = "{'delete':'success'}";
        zkServer.getZooKeeper().delete("/testTwo",0,new DeleteCallBack(),ctx);
        Thread.sleep(2000);
        System.out.println("删除成功");
    }

    public ZooKeeper getZooKeeper() {
        return zooKeeper;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    /**
     * @description  创建zookeeper节点
     * @param path 创建的路径
     * @param data 存储数据的byte[]
     * @param acls 控制权限策略
     * @author wjy329
     * @date 2018/9/13
     */
    public void createZKNode(String path, byte[] data, List<ACL> acls){
        String result = "";
        try{
            /**
             * acl:控制权限策略:Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa
             *                  CREATOR_ALL_ACL --> auth:user:password:cdrwa
             * createMode: 节点类型:是一个枚举
             *              PERSISTENT:持久节点
             *              PERSISTENT_SEQUENTIAL:持久顺序节点
             *              EPHEMERAL:临时节点
             *              EPHEMERAL_SEQUENTIAL:临时顺序节点
             * */
            //result = zooKeeper.create(path,data,acls, CreateMode.EPHEMERAL);
            String ctx = "{'create':'success'}";
            zooKeeper.create(path,data,acls,CreateMode.PERSISTENT,new CreateCallBack(),ctx);
            System.out.println("创建节点:\t"+result+"\t成功...");
            new Thread().sleep(2000);
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    @Override
    public void process(WatchedEvent event) {

    }
}

同样,也是主要看main()函数中的方法

image.png

  • 节点查询

在看节点查询之前,建议先阅读《Zookeeper学习之CountDownLatch》 

获取节点数据:

package ZKGetNode;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * @Author wjy329
 * @Time 2018/9/14上午9:42
 * @description
 */

public class ZKGetNodeData implements Watcher{
    private ZooKeeper zooKeeper = null;

    public static final String zkServerPath = "172.16.106.130:2181";
    public static final Integer timeout = 5000;
    private static Stat stat = new Stat();

    public ZKGetNodeData(){}

    public ZKGetNodeData(String connectString){
        try{
            zooKeeper = new ZooKeeper(connectString,timeout,new ZKGetNodeData());
        }catch (IOException e){
            e.printStackTrace();
            if(zooKeeper != null){
                try{
                    zooKeeper.close();
                }catch (InterruptedException e1){
                    e1.printStackTrace();
                }
            }
        }
    }

    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws Exception{
        ZKGetNodeData zkServer = new ZKGetNodeData(zkServerPath);
        /**
         * 参数:path:节点路径
         *  watch:true/false 注册一个watch事件
         *  stat:状态
         * */
        byte[] resByte = zkServer.getZooKeeper().getData("/test",true,stat);
        String result = new String(resByte);
        System.out.println("当前值:"+result);
        countDownLatch.await();
    }

    @Override
    public void process(WatchedEvent event) {
        try{
            if(event.getType() == Event.EventType.NodeDataChanged){
                ZKGetNodeData zkServer = new ZKGetNodeData(zkServerPath);
                byte[] resByte = zkServer.getZooKeeper().getData("/test",false,stat);
                String result = new String(resByte);
                System.out.println("更改后的值:"+result);
                System.out.println("版本号变化dversion:"+stat.getVersion());
                countDownLatch.countDown();
            }else if(event.getType() == Event.EventType.NodeCreated){

            }else if(event.getType() == Event.EventType.NodeChildrenChanged){

            }else if(event.getType() == Event.EventType.NodeDeleted){

            }
        }catch (KeeperException e){
            e.printStackTrace();
        }catch (InterruptedException e1){
            e1.printStackTrace();
        }
    }

    public ZooKeeper getZooKeeper() {
        return zooKeeper;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    public static String getZkServerPath() {
        return zkServerPath;
    }

    public static Integer getTimeout() {
        return timeout;
    }

    public static Stat getStat() {
        return stat;
    }

    public static void setStat(Stat stat) {
        ZKGetNodeData.stat = stat;
    }

    public static CountDownLatch getCountDownLatch() {
        return countDownLatch;
    }

    public static void setCountDownLatch(CountDownLatch countDownLatch) {
        ZKGetNodeData.countDownLatch = countDownLatch;
    }
}

上述代码就是查看节点的演示,上面由于使用了CountDownLatch,所以运行程序后,线程还会再等待节点的相应操作,等到计数器减为0后任务结束。

现在控制台中查看/test节点的数据为  329

image.png

然后运行程序,控制台输出:

image.png

然后我们在控制台作出相应的事件,修改下节点的数据:

image.png

此时控制台为:

image.png

获取子节点数据:

package ZKGetNode;

import org.apache.zookeeper.*;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * @Author wjy329
 * @Time 2018/9/14上午10:29
 * @description 获取子节点的数据
 */

public class ZKGetChildrenList implements Watcher{
    private ZooKeeper zooKeeper = null;

    public static final String zkServerPath = "172.16.106.130:2181";
    public static final Integer timeout = 5000;

    public ZKGetChildrenList(){}

    public ZKGetChildrenList(String connectString){
        try{
            zooKeeper = new ZooKeeper(connectString,timeout,new ZKGetChildrenList());
        }catch (IOException e){
            e.printStackTrace();
            if(zooKeeper != null){
                try{
                    zooKeeper.close();
                }catch (InterruptedException e1){
                    e1.printStackTrace();
                }
            }
        }
    }

    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws Exception{
        ZKGetChildrenList zkServer = new ZKGetChildrenList(zkServerPath);
        /**
         * 参数:
         * path:父节点路径
         * watch:true或者false,注册一个watch事件
         * */
        List<String> strChildList = zkServer.getZooKeeper().getChildren("/test",true);
        for(String s : strChildList){
            System.out.println(s);
        }
        //异步调用
        String ctx = "{'callback':'ChildrenCallback'}";
        zkServer.getZooKeeper().getChildren("/test",true,new ChildrenCallback(),ctx);
        //zkServer.getZooKeeper().getChildren("/test",true,new Children2Callback(),ctx);

        countDownLatch.await();
    }
    @Override
    public void process(WatchedEvent event) {
        try{
            if(event.getType() == Event.EventType.NodeChildrenChanged){
                System.out.println("NodeChildrenChanged");
                ZKGetChildrenList zkServer = new ZKGetChildrenList(zkServerPath);
                List<String> strChildList = zkServer.getZooKeeper().getChildren(event.getPath(),false);
                for(String s : strChildList){
                    System.out.println(s);
                }
                countDownLatch.countDown();
            }else if(event.getType() == Event.EventType.NodeCreated){
                System.out.println("NodeCreated");
            }else if(event.getType() == Event.EventType.NodeDataChanged){
                System.out.println("NodeDataChanged");
            }else if(event.getType() == Event.EventType.NodeDeleted){
                System.out.println("NodeDeleted");
            }
        }catch (KeeperException e){
            e.printStackTrace();
        }catch (InterruptedException e1){
            e1.printStackTrace();
        }

    }

    public ZooKeeper getZooKeeper() {
        return zooKeeper;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    public static String getZkServerPath() {
        return zkServerPath;
    }

    public static Integer getTimeout() {
        return timeout;
    }
}

从上面我们看到异步的方式有两种,第二种比第一种多了一个参数,能获取到最后的一些状态,这里仅给出第一种的结果,第二种自行演示。

ChildrenCallback.java:

package ZKGetNode;/**
 * @Author wjy329
 * @Time 2018/9/14上午10:45
 * @description
 */

import org.apache.zookeeper.AsyncCallback;

import java.util.List;

/**

 * @description:
 *
 * @author: wjy329
 * @param:
 * @return:
 * @create: 2018-09-14 10:45
 **/
public class ChildrenCallback implements AsyncCallback.ChildrenCallback {
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children) {
        for(String s : children){
            System.out.println(s);
        }
        System.out.println("ChildrenCallback:"+path);
        System.out.println((String) ctx);
    }
}

Children2Callback.java:

package ZKGetNode;/**
 * @Author wjy329
 * @Time 2018/9/14上午10:45
 * @description
 */

import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.data.Stat;

import java.util.List;

/**

 * @description:
 *
 * @author: wjy329
 * @param:
 * @return:
 * @create: 2018-09-14 10:45
 **/
public class Children2Callback implements AsyncCallback.Children2Callback {
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
        for(String s : children){
            System.out.println(s);
        }
        System.out.println("ChildrenCallback:"+path);
        System.out.println((String) ctx);
        System.out.println(stat.toString());
    }
}

命令行先查看子节点:image.png

然后看控制台输出:

image.png

至于watcher的监听事件,自己尝试吧。

  • 判断节点是否存在:

package ZKGetNode;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * @Author wjy329
 * @Time 2018/9/14下午7:41
 * @description 判断节点是否存在
 */

public class ZKNodeExist implements Watcher{
    private ZooKeeper zooKeeper = null;

    public static final String zkServerPath = "172.16.106.130:2181";
    public static final Integer timeout = 5000;

    public ZKNodeExist(){}

    public ZKNodeExist(String connectString){
        try{
            zooKeeper = new ZooKeeper(connectString,timeout,new ZKNodeExist());
        }catch (IOException e){
            e.printStackTrace();
            if(zooKeeper != null){
                try{
                    zooKeeper.close();
                }catch (InterruptedException e1){
                    e1.printStackTrace();
                }
            }
        }
    }

    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws Exception{
        ZKNodeExist zkServer = new ZKNodeExist(zkServerPath);

        Stat stat = zkServer.getZooKeeper().exists("/test",true);
        if(stat != null){
            System.out.println("查询的节点版本:"+stat.getVersion());
        }else{
            System.out.println("该节点不存在。。。");
        }

        countDownLatch.await();
    }

    @Override
    public void process(WatchedEvent event) {
        if(event.getType() == Event.EventType.NodeCreated){
            System.out.println("节点创建");
            countDownLatch.countDown();
        }else if(event.getType() == Event.EventType.NodeDataChanged){
            System.out.println("节点数据被改变");
            countDownLatch.countDown();
        }else if(event.getType() == Event.EventType.NodeDeleted){
            System.out.println("节点被删除");
            countDownLatch.countDown();
        }
    }

    public ZooKeeper getZooKeeper() {
        return zooKeeper;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    public static String getZkServerPath() {
        return zkServerPath;
    }

    public static Integer getTimeout() {
        return timeout;
    }
}

运行后,我们再改变下数据以供watcher监听到。

image.png

image.png

  • ACL相关

默认匿名权限

package ZKAcl;

import ZKGetNode.ZKNodeExist;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;

import java.io.IOException;
import java.util.List;

/**
 * @Author wjy329
 * @Time 2018/9/14下午8:58
 * @description zookeeper操作节点acl
 */

public class ZKNodeACL implements Watcher{
    private ZooKeeper zooKeeper = null;

    public static final String zkServerPath = "172.16.106.130:2181";
    public static final Integer timeout = 5000;

    public ZKNodeACL(){}

    public ZKNodeACL(String connectString){
        try{
            zooKeeper = new ZooKeeper(connectString,timeout,new ZKNodeACL());
        }catch (IOException e){
            e.printStackTrace();
            if(zooKeeper != null){
                try{
                    zooKeeper.close();
                }catch (InterruptedException e1){
                    e1.printStackTrace();
                }
            }
        }
    }

    public void createZKNode(String path, byte[] data, List<ACL> acls){
        String result = "";
        try{
            result = zooKeeper.create(path,data,acls, CreateMode.PERSISTENT);
            System.out.println("创建节点:\t"+result+"\t成功...");
        }catch (KeeperException e){
            e.printStackTrace();
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        ZKNodeACL zkServer = new ZKNodeACL(zkServerPath);
        //acl任何人都可以访问
        zkServer.createZKNode("/aclnode","test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);

    }

    @Override
    public void process(WatchedEvent event) {

    }

    public ZooKeeper getZooKeeper() {
        return zooKeeper;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    public static String getZkServerPath() {
        return zkServerPath;
    }

    public static Integer getTimeout() {
        return timeout;
    }
}

运行后,创建节点:

image.png

自定义用户权限:

由于用户需要加密,先附上工具类:

package ZKAcl;

import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;

/**
 * @Author wjy329
 * @Time 2018/9/14下午9:16
 * @description
 */

public class AclUtils {
    public static String getDigestUserPwd(String id)throws Exception{
        return DigestAuthenticationProvider.generateDigest(id);
    }

}

然后再附上主要的代码:

package ZKAcl;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * @Author wjy329
 * @Time 2018/9/14下午9:17
 * @description 自定义用户权限
 */


public class ZKNodeACL2 implements Watcher{
    private ZooKeeper zooKeeper = null;

    public static final String zkServerPath = "172.16.106.130:2181";
    public static final Integer timeout = 5000;

    public ZKNodeACL2(){}

    public ZKNodeACL2(String connectString){
        try{
            zooKeeper = new ZooKeeper(connectString,timeout,new ZKNodeACL2());
        }catch (IOException e){
            e.printStackTrace();
            if(zooKeeper != null){
                try{
                    zooKeeper.close();
                }catch (InterruptedException e1){
                    e1.printStackTrace();
                }
            }
        }
    }

    public void createZKNode(String path, byte[] data, List<ACL> acls){
        String result = "";
        try{
            result = zooKeeper.create(path,data,acls, CreateMode.PERSISTENT);
            System.out.println("创建节点:\t"+result+"\t成功...");
        }catch (KeeperException e){
            e.printStackTrace();
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws Exception {
        ZKNodeACL2 zkServer = new ZKNodeACL2(zkServerPath);
        //acl任何人都可以访问
        //zkServer.createZKNode("/aclnode","test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
        //自定义用户认证访问
        List<ACL> acls = new ArrayList<>();
        Id user1 = new Id("digest",AclUtils.getDigestUserPwd("user1:123456"));
        Id user2 = new Id("digest",AclUtils.getDigestUserPwd("user2:123456"));
        acls.add(new ACL(ZooDefs.Perms.ALL,user1));
        acls.add(new ACL(ZooDefs.Perms.READ,user2));
        acls.add(new ACL(ZooDefs.Perms.DELETE | ZooDefs.Perms.CREATE,user2));

        zkServer.createZKNode("/aclnode/testdigest","test".getBytes(),acls);
        //注册用户必须通过登录才能操作节点
        zkServer.getZooKeeper().addAuthInfo("digest","user1:123456".getBytes());
        zkServer.createZKNode("/aclnode/testdigest/childtest","child".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL);
    }

    @Override
    public void process(WatchedEvent event) {

    }

    public ZooKeeper getZooKeeper() {
        return zooKeeper;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    public static String getZkServerPath() {
        return zkServerPath;
    }

    public static Integer getTimeout() {
        return timeout;
    }
}

上面的代码也很好理解,就是新建两个用户,分别授予不同的权限,然后对节点进行操作,更多效果自行测试。

image.png

以上部分就是基本的客户端操作了,都是一样的思路,很简单喽,自己实践就会明白了。

Zookeeper学习之CountDownLatch

在这里先介绍一下CountDownLatch,因为Zookeeper的节点查询和分布式锁等需要到这个计数器。它允许一个或多个线程一直等待,直到其他线程执行完后再执行;例如:一个系统等待参数加载、资源加载等任务都完成后才允许用户登录。

CountDownLatch是一个计数器,多用于线程,可以暂停也可以继续。

CountDownLatch一般有两个方法 .await()  .countDown()

主线程必须在启用其他线程之后立即调用.await()方法,这样主线程的操作就会在这个方法上阻塞,等待其他线程完成各自的任务;其他任务完成后必须通过.countDown()来通知任务完成,当count减为0后,主线程就可以通过.await()恢复执行自己的方法。

这里我以慕课网的例子来演示:

实例:监控调度系统,当监控调度中心收到所有的调度分区返回的成功信息后,然后通知发车。

目录结构:

image.png

DangeCenter.java:

package CountDownLatchTest;

import java.util.concurrent.CountDownLatch;

/**
 * @description: 抽象类,用来演示调度中心,统一检查
 *
 * @author: wjy329
 * @param:
 * @return:
 * @create: 2018-09-13 20:46
 **/
public abstract class DangerCenter implements Runnable{

    //计数器
    private CountDownLatch countDown;
    //调度站
    private String station;
    //检查标志ok
    private boolean ok;

    public DangerCenter(CountDownLatch countDown,String station){
        this.countDown = countDown;
        this.station = station;
        this.ok = false;
    }

    @Override
    public void run() {
        try{
            check();
            ok = true;
        }catch (Exception e){
            e.printStackTrace();
            ok = false;
        }finally {
            if(countDown != null){
                countDown.countDown();
            }
        }
    }

    /**
    * @description 检查车况
    * @param
    * @return
    * @author wjy329
    * @Date 2018/9/13
    */
    public abstract void check();

    public CountDownLatch getCountDown() {
        return countDown;
    }

    public void setCountDown(CountDownLatch countDown) {
        this.countDown = countDown;
    }

    public String getStation() {
        return station;
    }

    public void setStation(String station) {
        this.station = station;
    }

    public boolean isOk() {
        return ok;
    }

    public void setOk(boolean ok) {
        this.ok = ok;
    }
}

StationW.java:

package CountDownLatchTest;

import java.util.concurrent.CountDownLatch;

/**
 * @description:
 *
 * @author: wjy329
 * @param:
 * @return:
 * @create: 2018-09-13 20:58
 **/
public class StationW extends DangerCenter{
    public StationW(CountDownLatch countDown) {
        super(countDown, "W调度站");
    }

    @Override
    public void check() {
        System.out.println("正在检查["+this.getStation()+"]...");
        try{
            Thread.sleep(2000);
        }catch (InterruptedException e){
            e.printStackTrace();
        }

        System.out.println("检查["+this.getStation()+"]完毕,可以调度");
    }
}

StationJ.java:

package CountDownLatchTest;

import java.util.concurrent.CountDownLatch;

/**
 * @description:
 *
 * @author: wjy329
 * @param:
 * @return:
 * @create: 2018-09-13 20:58
 **/
public class StationJ extends DangerCenter{
    public StationJ(CountDownLatch countDown) {
        super(countDown, "J调度站");
    }

    @Override
    public void check() {
        System.out.println("正在检查["+this.getStation()+"]...");
        try{
            Thread.sleep(1000);
        }catch (InterruptedException e){
            e.printStackTrace();
        }

        System.out.println("检查["+this.getStation()+"]完毕,可以调度");
    }
}

StationY.java:

package CountDownLatchTest;

import java.util.concurrent.CountDownLatch;

/**
 * @description:
 *
 * @author: wjy329
 * @param:
 * @return:
 * @create: 2018-09-13 20:58
 **/
public class StationY extends DangerCenter{
    public StationY(CountDownLatch countDown) {
        super(countDown, "Y调度站");
    }

    @Override
    public void check() {
        System.out.println("正在检查["+this.getStation()+"]...");
        try{
            Thread.sleep(1500);
        }catch (InterruptedException e){
            e.printStackTrace();
        }

        System.out.println("检查["+this.getStation()+"]完毕,可以调度");
    }
}

CheckStartUp.java:

package CountDownLatchTest;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

/**
 * @Author wjy329
 * @Time 2018/9/13下午9:04
 * @description 监控中心总部
 */

public class CheckStartUp {
    private static List<DangerCenter> stationList;
    private static CountDownLatch countDown;

    public CheckStartUp(){}

    public static boolean checkAllStations() throws Exception{
        //初始化3个调度站
        countDown = new CountDownLatch(3);

        //把所有站点添加到list
        stationList = new ArrayList<>();
        stationList.add(new StationW(countDown));
        stationList.add(new StationJ(countDown));
        stationList.add(new StationY(countDown));

        Executor executor = Executors.newFixedThreadPool(stationList.size());

        for(DangerCenter center : stationList){
            executor.execute(center);
        }

        //等待线程执行完毕
        countDown.await();

        for(DangerCenter center : stationList){
            if(!center.isOk()){
                return false;
            }
        }
        return true;
    }

    public static void main(String[] args) throws Exception {
        boolean result = CheckStartUp.checkAllStations();
        System.out.println("监控中心对所有站点的检查结果为:"+ result);
    }
}

运行结果:

image.png

Zookeeper学习(三)Zookeeper的集群安装

准备工作:3台虚拟机,3台虚拟机中都得安装zookeeper(这不是废话么。。。)

image.png

在第一台虚拟机中,执行  /usr/local/zookeeper/conf  然后vim zoo.cfg 按下图修改配置

image.png

然后执行cd /usr/local/zookeeper/dataDir 新建一个myid文件,vim myid myid中的内容为数字 1

其他两台的zoo.cfg配置一样,就是myid分别设置为2,3即可

然后分别在zookeeper的bin目录中启动服务即可 ./zkServer.sh start

image.png

然后分别查看zookeeper的状态 执行 ./zkServer.sh status  可以看到有一台机器的leader,其余两台为follower

image.png

image.pngimage.png

在第二台机器中,执行 ./zkCli.sh -server 172.16.106.131:2181 然后查看节点,发现只有一个节点zookeeper,然后我们新建一个test节点,设置值为329,然后在其他两台机器中查看节点,发现也会有刚在创建的值为329的test节点

image.png

image.png

到此位置,zookeeper的集群搭建就基本完成了。怎么样,是不是贼简单。赶紧去试试吧。

Zookeeper学习(二)Zookeeper基本特征、命令行和权限

1、Zookeeper常用命令行操作

通过./zkCli.sh 打开zookeeper的客户端进行命令行后台

ls与ls2命令

    ls :查看某一目录下有哪些详细文件目录

    ls2 : 可以文件和状态信息

get与stat命令

    get : 获取当前节点的数据

    stat :查看当前节点的状态信息

create命令

    用来创建子节点 

    create path data 创建持久化节点

    create -e path data 创建临时节点

    create -s path data 创建顺序节点

set命令

    用来修改子节点

    set path data [version]

    set /path new-data  将path子节点的值变为new-data

    [version] 指定最新的版本号进行修改,加上版本号其实就是加锁

delete命令

    用来删除节点

    delete path [version]

    不加版本号会直接删除节点,加上版本号是判断锁操作

2、Zookeeper特性

     -session的基本原理

  • 客户端与服务端之间的连接存在会话

  • 每个会话都会可以设置一个超时时间

  • 心跳结束,session则过期

  • Session过期,则临时节点znode会被抛弃

  • 心跳机制:客户端向服务端的ping包请求

    Zookeeper特性

    -watcher机制

  • 针对每个节点的操作,都会有一个监督者—watcher

  • 当监控的某个对象(znode)发生了变化,则触发watcher事件

  • zookeeper中的watcher是一次性的,触发后立即销毁

  • 父节点,子节点 增删改都能够触发其watcher

  • 针对不同类型的操作,触发的watcher事件也不同:

        1.(子)节点创建事件

        2.(子)节点删除事件

        3.(子)节点数据变化事件

3、Watcher命令行学习

  • 通过get path [watch] 设置watcher

  • 父节点 增 删 改 操作触发watcher

  • 子节点 增 删 改 操作触发watcher

4、Watcher事件类型

父节点watch事件:

  • 创建父节点触发:NodeCreated

  • 修改父节点数据触发:NodeDataChanged

  • 删除父节点触发:NodeDeleted

image.png

image.pngimage.png

子节点watcher事件:

ls为父节点设置watcher,创建子节点触发:NodeChildrenChanged

ls为父节点设置watcher,删除子节点触发:NodeChildrenChanged

ls为父节点设置watcher,修改子节点不触发事件(需要把子节点当成父节点才能触发watcher事件)

image.pngimage.pngimage.pngimage.png

watcher使用场景

  • 统一资源配置

ACL权限控制

  • 针对节点可以设置相关读写操作等权限,目的是为了保障数据安全性

  • 权限permissions可以指定不同的权限范围以及角色

ACL命令行

  • getAcl:获取某个节点的acl权限信息

  • setAcl:设置某个节点的acl权限信息

  • addauth:输入认证授权信息,注册时输入明文密码(登录)但是在zk的系统里,密码是以加密的形式存在的

ACL的构成

  • zookeeper的acl通过[scheme:id:permissions]来构成权限列表

        scheme:代表采用的某种权限机制

            world:world下只有一个id,即只有一个用户,也就是anyone,那么组合的写法就是world:anyone:[permissions]

            auth:代表认证登录,需要注册用户有权限就可以,形式为auth:user:password:[permissions]

            digest:需要对密码加密才能访问,组合形式为digest:username:BASE64(SHA1(password)):[permissions]

            简而言之,auth与digest的区别就是,前者明文,后者密文;setAcl/path auth:lee:lee:cdrwa与setAcl/path                                                  digest:lee.BASE64(SHA1(password))cdrwa是等价的,在通过adduth digest lee:lee后都能操作指定节点的权限

        id:代表允许访问的用户

            ip:当设置为ip指定的ip地址,此时限制ip进行访问,比如ip:192.168.1.1[permissions]

            super:代表超级管理员,拥有所有的权限

        permissions:权限组合字符串-缩写crdwa

            CREATE:创建子节点

            READ:获取节点/子节点

            WRITE:设置节点数据

            DELETE:删除子节点

            ADMIN:设置权限

ACL命令行学习

  • world:anyone:cdrwa

image.png

  • digest:user:BASE64(SHA1(pwd)):cdrwa

        addauth digest user:pwd

image.png

  • ip:192.168.1.1:cdrwa

image.png

  • super超级管理员

修改/usr/local/zookeeper/bin目录下的zkServer.sh 添加如下的语句:用户名和密码自定义,密码是BASE64加密后的

"-Dzookeeper.DigestAuthenticationProvider.superDigest=wjy:xQJmxLMiHGwaqBvst5y6rkB6HQs="

image.png修改完保存后,然后执行./zkServer.sh restart重启 zkServer.sh,启动./zkCli.sh

image.png

ACL的常用使用场景

  • 开发/测试环境分离,开发者无权操作测试库的节点,只能看

  • 生产环境上控制指定ip的服务可以访问相关节点,防止混乱

zookeeper四字命令

  • zookeeper可以通过它自身提供的简写命令来和服务器进行交互

  • 需要使用到nc命令,安装:yum install nc

  • echo [commond]| nc [ip] [port]

官方文档:http://zookeeper.apache.org/doc/r3.4.13/zookeeperAdmin.html#sc_zkCommands

image.png

仅演示一个命令,其他的参考文档吧。

Zookeeper学习(一)简介、安装和配置

ZooKeeper 是一个开源的分布式协调服务,由雅虎创建,是 Google Chubby 的开源实现。
分布式应用程序可以基于 ZooKeeper 实现诸如数据发布/订阅、负载均衡、命名服务、分布式协
调/通知、集群管理、Master 选举、配置维护,名字服务、分布式同步、分布式锁和分布式队列
等功能。

一、简介

  • 中间件,提供协调服务

  • 作用于分布式系统,发挥其优势,可以为大数据服务

  • 支持java,提供java和c语言的客户端api

二、安装Zookeeper

安装zookeeper之前需要安装jdk,关于jdk的安装,之前的博客有介绍: CentOS之Java安装 

下面是我的linux虚拟机中的java的版本image.png

单机Zooleeper安装:

下载页面:https://archive.apache.org/dist/zookeeper/

这里选择3.4.11的版本:

image.png

将下载下来的tar包上传到服务器中,一般上传文件到/home 目录,然后执行tar -zxvf zookeeper-3.4.11.tar.gz 解压,之后执行

mv zookeeper-3.4.11 zookeeper重命名为zookeeper,然后执行mv zookeeper /usr/local  复制移动文件到指定目录

接下来就是配置环境变量,vim /etc/profile  添加下图中画线的文本

image.png配置Zookeeper,进入/usr/local/zookeeper/conf 复制一份官方的配置文件,并重命名为zoo.cfg,然后我们对zoo.cfg进行修改,先看下面几个配置的意义。

tickTime:用于计算的时间单元。比如session超时:N*tickTime

initLimit:用于集群,允许从节点连接并同步到master节点的初始化连接时间,以tickTime的倍数来表示

syncLimit:用于集群,master主节点与从节点之间发送消息,请求和应答时间长度。(心跳机制)

dataDir:必须配置

dataLogDir:日志目录,如果不配置会和dataDir公用

clientPort:连接服务器的端口,默认2181

image.png

然后vim zoo.cfg,下图就是我们修改之后的配置文件:

image.png

然后相应的在路径新建dataDirdataLogDir两个数据目录,这样单机的zookeeper就配置完成了。

配置完成后,进入到 /usr/local/zookeeper/bin 目录下,执行启动服务命令,开始我们执行./zkServer.sh 会提示我们添加一些参数,如下所示,参数的意思也很直观。  

执行 ./zkServer.sh start 来启动服务。 到此,zookeeper的单机的安装和配置就基本完成了。image.png

Zookeeper的作用体现

1、master节点选举,主节点挂了以后,从节点就会接手工作,并且保证这个节点是唯一的,这也是所谓的首脑模式,从而保证我们的集群是高可用的。

2、统一配置文件管理,即只需要部署一台服务器,则可以把相同的配置文件同步更新到其他所有服务器,此操作在云计算中用的特别多。

3、发布与订阅,类似消息队列MQ,dubbo发布者把数据存在znode上,订阅者会读取这个数据。

4、提供分布式锁,分布式环境中不同进程之间争夺资源,类似于多线程中的锁。

5、集群管理,集群中保证数据的强一致性。