Redis学习(二)redis API的使用

1、通用命令

这里先简单的列出6个常用的命令,之后逐步的学习更多的命令;

命令名 功能 用法  示例
keys 遍历key

keys [pattern]

生产环境不推荐使用

keys *  1537325301674010.png

dbsize
计算key的总数

dbsize

可以在线上使用

1537325651874645.png
exists
检查key是否存在

exists key

可以在线上使用 

1537325812209989.png
del 删除指定key-value del key,可以删除多个
1537326389630969.png
expire、ttl、persist key过期命令

#key在seconds秒后过期

expir

e key seconds 

#查看key剩余的过期时间

ttl key

#去掉key的过期时间

persist key

1537326307872919.png
type 返回key的类型 type key 1537327573348376.png

2、字符串

2.1  字符串键值结构

key为字符串类型,值可以为字符串、整型、二进制、json、xml等;(值不能大于512MB)

2.2 字符串使用场景

缓存、计数器、分布式锁等

2.3 API

API 功能描述 时间复杂度
get key 获取key对应的value o(1)
set key value 设置key-value o(1)
del key 删除key-value o(1)
incr key  key自增1,如果key不存在,自增后get(key)=1 o(1)
decr key key自减1,如果key不存在,自减后get(key)=-1 o(1)
incrby key k key自增k,如果key不存在,自增后get(key)=k o(1)
decrby key k key自减k,如果key不存在,自减后get(key)=-k o(1)
set key value 不管key是否存在,都设置 o(1)
setnx key value key不存在,才设置 (理解为添加) o(1)
set key value xx key存在,才设置    (理解为更新) o(1)
mget key1 key2 key3… 批量获取key,原子操作 o(n)
mset key1 value1 key2 value2 key3 value3 批量设置key-value o(n)
getset key newvalue set key newvalue并返回旧的value  o(1)
append key value
将value追加到旧的value o(1)
strlen key 返回字符串的长度(注意中文) o(1)
incrybyfloat key 3.5
增加key对应的值3.5 o(1)
getrange key start end 获取字符串指定下标所有的值 o(1)
setrange key index value 设置指定下标所有对应的值 o(1)

3、HASH

3.1  哈希键值结构

key是字符串类型,value分为field和value即属性和value;需要注意field不能相同,value可以相同

例如:

image.png

3.2 API

API 功能描述 时间复杂度
hget key field
获取hash key对应的field的value o(1)
hset key field value 设置hash key对应field的value o(1)
hdel key field 删除hash key对应field的value o(1)
hexists key field 判断hash key是否有field o(1)
hlen key 获取hash key field的数量 o(1)
hmget key field1 field2 … fieldN 批量获取hash key的一批field对应的值 o(n)
hmset key field1 value1 field2 value2 …fieldN valueN 批量设置hash key的一批field value o(n)
hgetall key
返回hash key对应所有的field和value o(n)
hvals key 返回hash key对应所有的field的value o(n)
hkeys key 返回hash key对应所有field o(n)
hsetnx key field value
设置hash key对应field的value(如果field已经存在,则失败) o(1)
hincrby key field intCounter hash key对应的field的value自增intCounter o(1)
hincrbyfloat key field floatCounter
hincrby浮点数版 o(1)

4、列表(List)

4.1 列表结构

 image.png

4.2特点

有序、可以重复、左右两边插入弹出

4.3 API

API 功能描述 时间复杂度
rpush key value1 value2 … valueN 从列表右端插入值(1-N个) o(1~n)
lpush key value1 value2 … valueN 从列表左端插入值(1-N个) o(1~n)
linsert key before|after value newValue
在list指定的值前|后插入newValue
o(n)
lpop key
从列表左侧弹出一个item
o(1)
rpop key
从列表右侧弹出一个item o(1)
lrem key count value

根据count值,从列表删除所有value相等的项

(1)count>0,从左到右,删除最多count个value相等的项

(2)count<0,从右到左,删除最多Math.abs(count)个value相等的项

(3)count=0,删除所有value相等的项

o(n)
ltrim key start end 按照索引范围修剪列表 o(n)
lrange key start end(包含end) 获取列表指定索引范围所有item o(n)
lindex key index
获取列表指定索引的item o(n)
llen key
获取列表长度 o(1)
lset key index newValue 设置列表指定索引值为newValue o(n)
blpop key timeout
lpop阻塞版本,timeout是阻塞超时时间,timeout=0为永远不阻塞 o(1)
brpop key timeout
rpop阻塞版本,timeout是阻塞超时时间,timeout=0为永远不阻塞 o(1)

4.4 列表应用

时间线功能

小技巧:

  1. LRUSH + LPOP = Stack     实现栈

  2. LPUSH + RPOP = Queue    实现队列

  3. LPUSH + LTRIM = Capped Collection 有固定数量的列表

  4. LPUSH + BRPOP = Message Queue 消息队列

5、集合

5.1 集合结构

image.png

5.2 集合特点

无序、无重复、支持集合间操作

5.3 集合内API

API 功能描述 时间复杂度
sadd key element 向集合key添加element(如果element已经存在,添加失败) o(1)
srem key element
将集合key中的element移除掉 o(1)
scard key
计算集合大小
sismember key element 判断元素是否在集合中
srandmember key count 从集合中随机取出count个元素
smembers key 取出集合中的所有元素
spop key 从集合中随机弹出一个元素

5.4 集合内操作应用

微博抽奖系统、微博点赞、用户标签

5.5 集合间API

API 功能描述 时间复杂度
sdiff
差集
sinter 交集
sunion 并集
sdiff|sinter|suion + store destkey ..
将差集、交集、并集结果保存在destkey中

5.6 集合间操作应用

社交app中的共同关注功能

集合简单的使用:

1.SADD = Tagging 标签

2.SPOP/SRANDMEMBER = Random item 随机数

3.SADD + SINTER = Social Graph 社交相关应用

6、有序集合

6.1 有序集合结构

image.png

按照score来指定顺

6.2 有序集合API

API 功能描述 时间复杂度
zadd key score element(score可以重复)
添加score和element o(longN)
zrem key element(可以删除多个)
删除元素
o(1)
zscore key element
返回元素的分数 o(1)
zincrby key increScore element
增加或减少元素分数 o(1)
zcard key 返回元素的总个数 o(1)
zrange key start end[WITHSCORES]
返回指定索引范围内的升序元素[分值] o(log(n)+m)
zrangebyscore key minScore maxScore[WITHSCORES]
返回指定分数范围内的升序元素[分值] o(log(n)+m)
zcount key minScore maxScore
返回有序集合内在指定分数范围内的个数 o(log(n)+m)
zremrangebyrank key start end 删除指定排名内的升序元素 o(log(n)+m)
zremrangebyscore key minScore maxScore 删除指定分数内的升序元素 o(log(n)+m)
zrank key 获取升序排名
zrevrank
获取降序排名
zrevrange key start end[WITHSCORES] 返回指定索引范围内的降序元素[分值]
zrevrangebyscore key minScore maxScore[WITHSCORES] 返回指定分数范围内的降序元素[分值]
zinterstore
交集运算存储
zunionstore 并集运算存储

6.3 有序集合应用

排行榜

Redis学习(一)redis简介与安装

1、何为Redis

Redis是一个开源的使用ANSI C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。                                                             — 百度百科

说白了,就是很牛逼的存储系统。

2、Redis的特性

  • 速度快 

数据存储在内存中、redis使用C语言编写、单线程的线程模型
  • 持久化

对数据的更新异步保存到磁盘
  • 多种数据结构 

Strings、HashTables、Linked Lists、Sets、Sorted Sets、BitMaps、HyperLogLog、GEO
  • 支持多种编程语言

Java、PHP、Python、Ruby、Lua、node.js等
  • 功能丰富

发布订阅、Lua脚本、事务、pipeline
  • 简单

源码少、不依赖外部库、单线程模型
  • 主从复制

主服务器数据可以复制到从服务器上
  • 高可用、分布式

Redis-Sentinel(v2.8)支持高可用、Redis-Cluster(v3.0)支持分布式

3、Redis典型应用场景

缓存系统、计数器、消息队列系统、排行榜、社交网络、实时系统

4、Redis的安装

#下载redis
wget  
#解压
tar -xzf redis-3.0.7.tar.gz
#建立软连接
ln -s redis-3.0.7 redis
#进入目录
cd redis
#编译和安装
make && make install

image.png

5、Redis的启动方式

最简启动:redis-server(使用了默认配置)

动态参数启动:redis-server –port 6380

配置文件启动:redis-server configPath (推荐)

      常用配置:daemonize — 是否是守护进程(no|yes) 建议使用yes

                     port  — Redis对外端口号

                     logfile — Redis系统日志

                     dir — Redis工作目录(日志文件及持久化文件存放的目录)

最简启动示例:

image.png

动态参数启动示例:

image.png

配置文件启动示例:

/opt/soft/redis目录下新建config目录,用来存放配置文件,然后复制一份默认的配置文件redis.conf 到 config目录下,为了方便查看和修改  使用  cat redis.conf | grep -v "#" | grep -v "^$" > redis-6381.conf  去掉注释和空格重定向为新的配置文件,一般都是用端口号来命名不同的配置文件,所以新的配置文件叫 redis-6381.conf  然后就可以修改配置和删除不需要的配置;这里配置文件仅有如下四个,以后的配置在之后的学习中逐渐的了解和学习。

image.png

修改完配置后,运行命令启动: redis-server config/redis-6381.conf   这样就启动成功了,详情看图:

image.png

6、Redis客户端连接

redis-cli -h ip(ip地址) -p(端口号) port

image.png

以上就是学习redis的第一步。。。坚持下去我们必定成功!!!

Zookeeper学习(五)Apache Curator客户端的使用

1、curator与原生客户端之间的差异

原生api的不足:

  • 超时重连,不支持自动,需要手动操作

  • watch注册一次后会失效

  • 不支持递归创建节点

Apache Curator的优势:

  • Apache的开源项目

  • 解决watcher的注册一次就失效

  • Api更加简单易用

  • 提供更多解决方案并且实现简单

  • 提供常用的ZooKeeper工具类

2、Apache Curator的使用

会话连接与关闭:

package com.wjy329.curatordemo.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;


/**
 * @Author wjy329
 * @Time 2018/9/16上午11:02
 * @description curator会话连接
 */

public class CuratorConnect {
    //声明一个客户端
    public CuratorFramework client = null;
    //客户端ip
    public static final String zkServerPath = "172.16.106.130:2181";

    /**
     * 实例化zk客户端
     */
    public CuratorConnect() {
        /**
         * curator链接zookeeper的策略:RetryNTimes
         * n:重试的次数
         * sleepMsBetweenRetries:每次重试间隔的时间
         */
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);

        client = CuratorFrameworkFactory.builder()
                .connectString(zkServerPath)
                .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
                .namespace("workspace").build();
        client.start();
    }

    /**
     *
     * @Description: 关闭zk客户端连接
     */
    public void closeZKClient() {
        if (client != null) {
            this.client.close();
        }
    }

    public static void main(String[] args) throws Exception {
        // 实例化
        CuratorConnect curatorConnect = new CuratorConnect();
        boolean isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));

        Thread.sleep(3000);

        curatorConnect.closeZKClient();
        boolean isZkCuratorStarted2 = curatorConnect.client.isStarted();
        System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭"));
    }

}

curator连接zookeeper的策略大致有5种,上面的一种是推荐使用的,下面列出所有的策略:

      /**
       * curator链接zookeeper的策略:ExponentialBackoffRetry
       * baseSleepTimeMs:初始sleep的时间
       * maxRetries:最大重试次数
       * maxSleepMs:最大重试时间
       */
     RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
      
      /**
       * curator链接zookeeper的策略:RetryNTimes
       * n:重试的次数
       * sleepMsBetweenRetries:每次重试间隔的时间
       */
      RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
      
      /**
       * curator链接zookeeper的策略:RetryOneTime
       * sleepMsBetweenRetry:每次重试间隔的时间
       */
     RetryPolicy retryPolicy2 = new RetryOneTime(3000);
      
      /**
       * 永远重试,不推荐使用
       */
     RetryPolicy retryPolicy3 = new RetryForever(retryIntervalMs)
      
      /**
       * curator链接zookeeper的策略:RetryUntilElapsed
       * maxElapsedTimeMs:最大重试时间
       * sleepMsBetweenRetries:每次重试间隔
       * 重试时间超过maxElapsedTimeMs后,就不再重试
       */
     RetryPolicy retryPolicy4 = new RetryUntilElapsed(2000, 3000);

然后运行代码,连接成功后关闭。image.png

zookeeper命名空间及创建节点:

命名空间:

上面构造函数中的namespace就是命名空间,创建命名空间后,后续的操作都将在该命名空间下进行。

节点的增删改查:

添加节点:

package com.wjy329.curatordemo.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;


/**
 * @Author wjy329
 * @Time 2018/9/16上午11:02
 * @description curator会话连接
 */

public class CuratorConnect {
    public CuratorFramework client = null;
    public static final String zkServerPath = "172.16.106.130:2181";

    /**
     * 实例化zk客户端
     */
    public CuratorConnect() {
        /**
         * curator链接zookeeper的策略:RetryNTimes
         * n:重试的次数
         * sleepMsBetweenRetries:每次重试间隔的时间
         */
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);

        client = CuratorFrameworkFactory.builder()
                .connectString(zkServerPath)
                .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
                .namespace("workspace").build();
        client.start();
    }

    /**
     *
     * @Description: 关闭zk客户端连接
     */
    public void closeZKClient() {
        if (client != null) {
            this.client.close();
        }
    }

    public static void main(String[] args) throws Exception {
        // 实例化
        CuratorConnect curatorConnect = new CuratorConnect();
        boolean isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));

        //创建节点
        String nodePath = "/super/wjy";
        byte[] data = "wjy329".getBytes();
        curatorConnect.client.create().creatingParentsIfNeeded()
         .withMode(CreateMode.PERSISTENT)
         .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
         .forPath(nodePath, data);

        Thread.sleep(3000);

        curatorConnect.closeZKClient();
        boolean isZkCuratorStarted2 = curatorConnect.client.isStarted();
        System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭"));
    }

}

上面注释中写了创建节点的方法,运行后,我们在服务器中可以看到创建的命名空间和节点:

image.png
修改节点:

package com.wjy329.curatordemo.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;


/**
 * @Author wjy329
 * @Time 2018/9/16上午11:02
 * @description curator会话连接
 */

public class CuratorConnect {
    public CuratorFramework client = null;
    public static final String zkServerPath = "172.16.106.130:2181";

    /**
     * 实例化zk客户端
     */
    public CuratorConnect() {
        /**
         * curator链接zookeeper的策略:RetryNTimes
         * n:重试的次数
         * sleepMsBetweenRetries:每次重试间隔的时间
         */
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);

        client = CuratorFrameworkFactory.builder()
                .connectString(zkServerPath)
                .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
                .namespace("workspace").build();
        client.start();
    }

    /**
     *
     * @Description: 关闭zk客户端连接
     */
    public void closeZKClient() {
        if (client != null) {
            this.client.close();
        }
    }

    public static void main(String[] args) throws Exception {
        // 实例化
        CuratorConnect curatorConnect = new CuratorConnect();
        boolean isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));

     
        String nodePath = "/super/wjy";

        // 更新节点数据
        byte[] newData = "update".getBytes();
        curatorConnect.client.setData().withVersion(0).forPath(nodePath, newData);

        Thread.sleep(3000);

        curatorConnect.closeZKClient();
        boolean isZkCuratorStarted2 = curatorConnect.client.isStarted();
        System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭"));
    }

}

同样,修改的代码也在注释中写明了,这里需要注意的就是版本号,运行程序后:

image.png

删除节点:

package com.wjy329.curatordemo.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;


/**
 * @Author wjy329
 * @Time 2018/9/16上午11:02
 * @description curator会话连接
 */

public class CuratorConnect {
    public CuratorFramework client = null;
    public static final String zkServerPath = "172.16.106.130:2181";

    /**
     * 实例化zk客户端
     */
    public CuratorConnect() {
        /**
         * curator链接zookeeper的策略:RetryNTimes
         * n:重试的次数
         * sleepMsBetweenRetries:每次重试间隔的时间
         */
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);

        client = CuratorFrameworkFactory.builder()
                .connectString(zkServerPath)
                .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
                .namespace("workspace").build();
        client.start();
    }

    /**
     *
     * @Description: 关闭zk客户端连接
     */
    public void closeZKClient() {
        if (client != null) {
            this.client.close();
        }
    }

    public static void main(String[] args) throws Exception {
        // 实例化
        CuratorConnect curatorConnect = new CuratorConnect();
        boolean isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));

        //创建节点
        String nodePath = "/super/wjy";

        // 删除节点
        curatorConnect.client.delete()
              .guaranteed()                // 如果删除失败,那么在后端还是继续会删除,直到成功
              .deletingChildrenIfNeeded()  // 如果有子节点,就删除
              .withVersion(1)
              .forPath(nodePath);

        Thread.sleep(3000);

        curatorConnect.closeZKClient();
        boolean isZkCuratorStarted2 = curatorConnect.client.isStarted();
        System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭"));
    }

}

删除节点的部分也通过注释注明,删除的方法也应注意版本号,利用此方法删除的是指定路径下所有的子节点包括路径中的最后一个父节点,例如 节点为 /school/class/student/xiaowang   指定删除路径为 /school/class ,那么运行程序后,节点为:/school ,其余节点都被删掉,并且命名空间是不会被删掉的,这个可以自行验证。

image.png

读取节点数据:

package com.wjy329.curatordemo.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;


/**
 * @Author wjy329
 * @Time 2018/9/16上午11:02
 * @description curator会话连接
 */

public class CuratorConnect {
    public CuratorFramework client = null;
    public static final String zkServerPath = "172.16.106.130:2181";

    /**
     * 实例化zk客户端
     */
    public CuratorConnect() {
        /**
         * curator链接zookeeper的策略:RetryNTimes
         * n:重试的次数
         * sleepMsBetweenRetries:每次重试间隔的时间
         */
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);

        client = CuratorFrameworkFactory.builder()
                .connectString(zkServerPath)
                .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
                .namespace("workspace").build();
        client.start();
    }

    /**
     *
     * @Description: 关闭zk客户端连接
     */
    public void closeZKClient() {
        if (client != null) {
            this.client.close();
        }
    }

    public static void main(String[] args) throws Exception {
        // 实例化
        CuratorConnect curatorConnect = new CuratorConnect();
        boolean isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));

        //创建节点
        String nodePath = "/super";

        //读取节点数据
      Stat stat = new Stat();
      byte[] data = curatorConnect.client.getData().storingStatIn(stat).forPath(nodePath);
      System.out.println("节点" + nodePath + "的数据为: " + new String(data));
      System.out.println("该节点的版本号为: " + stat.getVersion());

        Thread.sleep(3000);

        curatorConnect.closeZKClient();
        boolean isZkCuratorStarted2 = curatorConnect.client.isStarted();
        System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭"));
    }

}

运行程序:

image.png

读取子节点:

package com.wjy329.curatordemo.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;

import java.util.List;


/**
 * @Author wjy329
 * @Time 2018/9/16上午11:02
 * @description curator会话连接
 */

public class CuratorConnect {
    public CuratorFramework client = null;
    public static final String zkServerPath = "172.16.106.130:2181";

    /**
     * 实例化zk客户端
     */
    public CuratorConnect() {
        /**
         * curator链接zookeeper的策略:RetryNTimes
         * n:重试的次数
         * sleepMsBetweenRetries:每次重试间隔的时间
         */
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);

        client = CuratorFrameworkFactory.builder()
                .connectString(zkServerPath)
                .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
                .namespace("workspace").build();
        client.start();
    }

    /**
     *
     * @Description: 关闭zk客户端连接
     */
    public void closeZKClient() {
        if (client != null) {
            this.client.close();
        }
    }

    public static void main(String[] args) throws Exception {
        // 实例化
        CuratorConnect curatorConnect = new CuratorConnect();
        boolean isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));

        //创建节点
        String nodePath = "/super";

        // 查询子节点
      List<String> childNodes = curatorConnect.client.getChildren()
                                 .forPath(nodePath);
      System.out.println("开始打印子节点:");
      for (String s : childNodes) {
         System.out.println(s);
      }


        // 判断节点是否存在,如果不存在则为空
//    Stat statExist = cto.client.checkExists().forPath(nodePath + "/abc");
//    System.out.println(statExist);

        Thread.sleep(3000);

        curatorConnect.closeZKClient();
        boolean isZkCuratorStarted2 = curatorConnect.client.isStarted();
        System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭"));
    }

}

可以看到控制台输出子节点:

image.png

上述代码还有一个能判断节点是否存在的代码,自行运行看效果吧。

使用watcher:

package com.wjy329.curatordemo.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;

import java.util.List;


/**
 * @Author wjy329
 * @Time 2018/9/16上午11:02
 * @description curator会话连接
 */

public class CuratorConnect {
    public CuratorFramework client = null;
    public static final String zkServerPath = "172.16.106.130:2181";

    /**
     * 实例化zk客户端
     */
    public CuratorConnect() {
        /**
         * curator链接zookeeper的策略:RetryNTimes
         * n:重试的次数
         * sleepMsBetweenRetries:每次重试间隔的时间
         */
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);

        client = CuratorFrameworkFactory.builder()
                .connectString(zkServerPath)
                .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
                .namespace("workspace").build();
        client.start();
    }

    /**
     *
     * @Description: 关闭zk客户端连接
     */
    public void closeZKClient() {
        if (client != null) {
            this.client.close();
        }
    }

    public static void main(String[] args) throws Exception {
        // 实例化
        CuratorConnect curatorConnect = new CuratorConnect();
        boolean isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));

        //创建节点
        String nodePath = "/super/aaa";

        // watcher 事件  当使用usingWatcher的时候,监听只会触发一次,监听完毕后就销毁
        curatorConnect.client.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath);
//      curatorConnect.client.getData().usingWatcher(new MyWatcher()).forPath(nodePath);

        Thread.sleep(100000);

        curatorConnect.closeZKClient();
        boolean isZkCuratorStarted2 = curatorConnect.client.isStarted();
        System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭"));
    }

}

上述我们看到有两个watcher的方式,下面都给出两个wathcer的实现类,只演示第一种。

package com.wjy329.curatordemo.curator;

import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.zookeeper.WatchedEvent;

public class MyCuratorWatcher implements CuratorWatcher {

   @Override
   public void process(WatchedEvent event) throws Exception {
      System.out.println("触发watcher,节点路径为:" + event.getPath());
   }

}


=============================================

package com.wjy329.curatordemo.curator;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

public class MyWatcher implements Watcher {

   @Override
   public void process(WatchedEvent event) {
      System.out.println("触发watcher,节点路径为:" + event.getPath());
   }

}

运行程序后,控制台连接成功后开始等待触发,注意线程睡眠时间稍微设置长一些,然后在服务器中作出相应的修改操作,就会触发watcher事件。

image.png

image.png

一次注册N次监听:

上面代码虽然实现了watcher的监听,但是发现是一次性的,在触发监听完成后就会销毁,下面我们再来做一个永久监听的操作:

package com.wjy329.curatordemo.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;

import java.util.List;


/**
 * @Author wjy329
 * @Time 2018/9/16上午11:02
 * @description curator会话连接
 */

public class CuratorConnect {
    public CuratorFramework client = null;
    public static final String zkServerPath = "172.16.106.130:2181";

    /**
     * 实例化zk客户端
     */
    public CuratorConnect() {
        /**
         * curator链接zookeeper的策略:RetryNTimes
         * n:重试的次数
         * sleepMsBetweenRetries:每次重试间隔的时间
         */
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);

        client = CuratorFrameworkFactory.builder()
                .connectString(zkServerPath)
                .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
                .namespace("workspace").build();
        client.start();
    }

    /**
     *
     * @Description: 关闭zk客户端连接
     */
    public void closeZKClient() {
        if (client != null) {
            this.client.close();
        }
    }

    public static void main(String[] args) throws Exception {
        // 实例化
        CuratorConnect curatorConnect = new CuratorConnect();
        boolean isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));

        //创建节点
        String nodePath = "/super/aaa";

// 为节点添加watcher
        // NodeCache: 监听数据节点的变更,会触发事件
      final NodeCache nodeCache = new NodeCache(curatorConnect.client, nodePath);
      // buildInitial : 初始化的时候获取node的值并且缓存,不加true或者为false,则初始化不缓存数据
      nodeCache.start(true);
      if (nodeCache.getCurrentData() != null) {
         System.out.println("节点初始化数据为:" + new String(nodeCache.getCurrentData().getData()));
      } else {
         System.out.println("节点初始化数据为空...");
      }
      nodeCache.getListenable().addListener(new NodeCacheListener() {
         public void nodeChanged() throws Exception {
            if (nodeCache.getCurrentData() == null) {
               System.out.println("空");
               return;
            }
            String data = new String(nodeCache.getCurrentData().getData());
            System.out.println("节点路径:" + nodeCache.getCurrentData().getPath() + "数据:" + data);
         }
      });


        Thread.sleep(100000);

        curatorConnect.closeZKClient();
        boolean isZkCuratorStarted2 = curatorConnect.client.isStarted();
        System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭"));
    }

}

image.png

image.png

子节点监听:

package com.wjy329.curatordemo.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;

import java.util.List;


/**
 * @Author wjy329
 * @Time 2018/9/16上午11:02
 * @description curator会话连接
 */

public class CuratorConnect {
    public CuratorFramework client = null;
    public static final String zkServerPath = "172.16.106.130:2181";

    /**
     * 实例化zk客户端
     */
    public CuratorConnect() {
        /**
         * curator链接zookeeper的策略:RetryNTimes
         * n:重试的次数
         * sleepMsBetweenRetries:每次重试间隔的时间
         */
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);

        client = CuratorFrameworkFactory.builder()
                .connectString(zkServerPath)
                .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
                .namespace("workspace").build();
        client.start();
    }

    /**
     *
     * @Description: 关闭zk客户端连接
     */
    public void closeZKClient() {
        if (client != null) {
            this.client.close();
        }
    }

    public static void main(String[] args) throws Exception {
        // 实例化
        CuratorConnect curatorConnect = new CuratorConnect();
        boolean isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));

        //创建节点
        String nodePath = "/super";

        // 为子节点添加watcher
        // PathChildrenCache: 监听数据节点的增删改,会触发事件
        String childNodePathCache =  nodePath;
        // cacheData: 设置缓存节点的数据状态
        final PathChildrenCache childrenCache = new PathChildrenCache(curatorConnect.client, childNodePathCache, true);
        /**
         * StartMode: 初始化方式
         * POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件
         * NORMAL:异步初始化
         * BUILD_INITIAL_CACHE:同步初始化
         */
        childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);

        List<ChildData> childDataList = childrenCache.getCurrentData();
        System.out.println("当前数据节点的子节点数据列表:");
        for (ChildData cd : childDataList) {
            String childData = new String(cd.getData());
            System.out.println(childData);
        }

        childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                if(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)){
                    System.out.println("子节点初始化ok...");
                }

                else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){
                    String path = event.getData().getPath();
                    if (path.equals(ADD_PATH)) {
                        System.out.println("添加子节点:" + event.getData().getPath());
                        System.out.println("子节点数据:" + new String(event.getData().getData()));
                    } else if (path.equals("/super/imooc/e")) {
                        System.out.println("添加不正确...");
                    }

                }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){
                    System.out.println("删除子节点:" + event.getData().getPath());
                }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
                    System.out.println("修改子节点路径:" + event.getData().getPath());
                    System.out.println("修改子节点数据:" + new String(event.getData().getData()));
                }
            }
        });


        Thread.sleep(100000);

        curatorConnect.closeZKClient();
        boolean isZkCuratorStarted2 = curatorConnect.client.isStarted();
        System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭"));
    }
    public final static String ADD_PATH = "/super/imooc/d";

}

这部分代码的演示其实是差不多的,这就不过多说明。

扩展部分:Zookeeper学习之Apache Curator使用实例-统一更新N台机器的节点配置

ACL相关操作:

package com.wjy329.curatordemo.curator;


import com.wjy329.curatordemo.utils.AclUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;

import java.util.ArrayList;
import java.util.List;

public class CuratorAcl {

   public CuratorFramework client = null;
   public static final String zkServerPath = "172.16.106.130:2181";

   public CuratorAcl() {
      RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
      client = CuratorFrameworkFactory.builder().authorization("digest", "imooc1:123456".getBytes())
            .connectString(zkServerPath)
            .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
            .namespace("workspace").build();
      client.start();
   }
   
   public void closeZKClient() {
      if (client != null) {
         this.client.close();
      }
   }
   
   public static void main(String[] args) throws Exception {
      // 实例化
      CuratorAcl cto = new CuratorAcl();
      boolean isZkCuratorStarted = cto.client.isStarted();
      System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));
      
      String nodePath = "/acl/father";
      
      List<ACL> acls = new ArrayList<ACL>();
      Id wjy1 = new Id("digest", AclUtils.getDigestUserPwd("wjy1:123456"));
      Id wjy2 = new Id("digest", AclUtils.getDigestUserPwd("wjy2:123456"));
      acls.add(new ACL(Perms.ALL, wjy1));
      acls.add(new ACL(Perms.READ, wjy2));
      acls.add(new ACL(Perms.DELETE | Perms.CREATE, wjy2));

      // 创建节点
      byte[] data = "wjy329".getBytes();
      cto.client.create().creatingParentsIfNeeded()
            .withMode(CreateMode.PERSISTENT)
            .withACL(acls, true)
            .forPath(nodePath, data);

      //也可以使用这种方式设置权限,如果节点已经有权限需要认证后才能设置权限
      //cto.client.setACL().withACL(acls).forPath("/curatorNode");

      
      cto.closeZKClient();
      boolean isZkCuratorStarted2 = cto.client.isStarted();
      System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭"));
   }
   
}

acl部分就直接上相关代码了,就不进行演示了。

Zookeeper学习之Apache Curator使用实例-统一更新N台机器的节点配置

统一配置管理,对集群中的任意一台机器修改,其他机器也会自动的修改相关的配置。

这里采用3台机器演示:分别新建3个文件,client1.java、client2.java、client3.java,这3个文件除了地址不一样外,其他部分都一样,请自行修改;

package com.wjy329.curatordemo.curator.checkConfig;


import com.wjy329.curatordemo.utils.JsonUtils;
import com.wjy329.curatordemo.utils.RedisConfig;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.RetryNTimes;

import java.util.concurrent.CountDownLatch;

public class Client1 {

   public CuratorFramework client = null;
   public static final String zkServerPath = "172.16.106.130:2181";

   public Client1() {
      RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
      client = CuratorFrameworkFactory.builder()
            .connectString(zkServerPath)
            .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
            .namespace("workspace").build();
      client.start();
   }
   
   public void closeZKClient() {
      if (client != null) {
         this.client.close();
      }
   }
   
// public final static String CONFIG_NODE = "/super/imooc/redis-config";
   public final static String CONFIG_NODE_PATH = "/super/wjy";
   public final static String SUB_PATH = "/redis-config";
   public static CountDownLatch countDown = new CountDownLatch(1);
   
   public static void main(String[] args) throws Exception {
      Client1 cto = new Client1();
      System.out.println("client1 启动成功...");
      
      final PathChildrenCache childrenCache = new PathChildrenCache(cto.client, CONFIG_NODE_PATH, true);
      childrenCache.start(StartMode.BUILD_INITIAL_CACHE);
      
      // 添加监听事件
      childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
         public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
            // 监听节点变化
            if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
               String configNodePath = event.getData().getPath();
               if (configNodePath.equals(CONFIG_NODE_PATH + SUB_PATH)) {
                  System.out.println("监听到配置发生变化,节点路径为:" + configNodePath);
                  
                  // 读取节点数据
                  String jsonConfig = new String(event.getData().getData());
                  System.out.println("节点" + CONFIG_NODE_PATH + "的数据为: " + jsonConfig);
                  
                  // 从json转换配置
                  RedisConfig redisConfig = null;
                  if (StringUtils.isNotBlank(jsonConfig)) {
                     redisConfig = JsonUtils.jsonToPojo(jsonConfig, RedisConfig.class);
                  }
                  
                  // 配置不为空则进行相应操作
                  if (redisConfig != null) {
                     String type = redisConfig.getType();
                     String url = redisConfig.getUrl();
                     String remark = redisConfig.getRemark();
                     // 判断事件
                     if (type.equals("add")) {
                        System.out.println("监听到新增的配置,准备下载...");
                        // ... 连接ftp服务器,根据url找到相应的配置
                        Thread.sleep(500);
                        System.out.println("开始下载新的配置文件,下载路径为<" + url + ">");
                        // ... 下载配置到你指定的目录
                        Thread.sleep(1000);
                        System.out.println("下载成功,已经添加到项目中");
                        // ... 拷贝文件到项目目录
                     } else if (type.equals("update")) {
                        System.out.println("监听到更新的配置,准备下载...");
                        // ... 连接ftp服务器,根据url找到相应的配置
                        Thread.sleep(500);
                        System.out.println("开始下载配置文件,下载路径为<" + url + ">");
                        // ... 下载配置到你指定的目录
                        Thread.sleep(1000);
                        System.out.println("下载成功...");
                        System.out.println("删除项目中原配置文件...");
                        Thread.sleep(100);
                        // ... 删除原文件
                        System.out.println("拷贝配置文件到项目目录...");
                        // ... 拷贝文件到项目目录
                     } else if (type.equals("delete")) {
                        System.out.println("监听到需要删除配置");
                        System.out.println("删除项目中原配置文件...");
                     }
                     
                     // TODO 视情况统一重启服务
                  }
               }
            }
         }
      });
      
      countDown.await();
      
      cto.closeZKClient();
   }
   
}

列出项目中的工具类:

JSON工具类:

package com.wjy329.curatordemo.utils;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.List;

/**
 * 
 * @Title: JsonUtils.java
 * @Package com.lee.utils
 * @Description: JSON/对象转换类
 * Copyright: Copyright (c) 2016
 * Company:Nathan.Lee.Salvatore
 * 
 * @author leechenxiang
 * @date 2016年4月29日 下午11:05:03
 * @version V1.0
 */
public class JsonUtils {

    // 定义jackson对象
    private static final ObjectMapper MAPPER = new ObjectMapper();

    /**
     * 将对象转换成json字符串。
     * <p>Title: pojoToJson</p>
     * <p>Description: </p>
     * @param data
     * @return
     */
    public static String objectToJson(Object data) {
       try {
         String string = MAPPER.writeValueAsString(data);
         return string;
      } catch (JsonProcessingException e) {
         e.printStackTrace();
      }
       return null;
    }
    
    /**
     * 将json结果集转化为对象
     * 
     * @param jsonData json数据
     * @param clazz 对象中的object类型
     * @return
     */
    public static <T> T jsonToPojo(String jsonData, Class<T> beanType) {
        try {
            T t = MAPPER.readValue(jsonData, beanType);
            return t;
        } catch (Exception e) {
           e.printStackTrace();
        }
        return null;
    }
    
    /**
     * 将json数据转换成pojo对象list
     * <p>Title: jsonToList</p>
     * <p>Description: </p>
     * @param jsonData
     * @param beanType
     * @return
     */
    public static <T>List<T> jsonToList(String jsonData, Class<T> beanType) {
       JavaType javaType = MAPPER.getTypeFactory().constructParametricType(List.class, beanType);
       try {
          List<T> list = MAPPER.readValue(jsonData, javaType);
          return list;
      } catch (Exception e) {
         e.printStackTrace();
      }
       
       return null;
    }
    
}

模拟redis的配置:

package com.wjy329.curatordemo.utils;

public class RedisConfig {

   private String type;   // add 新增配置    update 更新配置    delete 删除配置
   private String url;       // 如果是add或update,则提供下载地址
   private String remark; // 备注
   
   public String getType() {
      return type;
   }
   public void setType(String type) {
      this.type = type;
   }
   public String getUrl() {
      return url;
   }
   public void setUrl(String url) {
      this.url = url;
   }
   public String getRemark() {
      return remark;
   }
   public void setRemark(String remark) {
      this.remark = remark;
   }
}

修改的配置文件:

{"type":"add","url":"ftp://192.168.10.123/config/redis.xml","remark":"add"}

{"type":"update","url":"ftp://192.168.10.123/config/redis.xml","remark":"update"}
{"type":"delete","url":"","remark":"delete"}

上述代码都准备完成后,启动client1.java、client2.java、client3.java这三个客户端。

image.png

然后对其中任意一台进行修改:

image.png

image.pngimage.png

image.png


这时我们观察到三个控制台都相应的更新完成。

Maven 更换国内仓库

默认的仓库在国外,下载速度真的好low,国内也有很多的镜像仓库来供我们下载,这里目前先提供阿里云,之后再补充其他。

修改配置:

$ cd  $M2_HOME/conf/
$ sudo vim settings.xml

然后找到相应的位置,添加下面内容保存即可:

<mirror>
    <id>nexus-aliyun</id>
    <mirrorOf>central</mirrorOf>
    <name>Nexus aliyun</name>
    <url></url>
</mirror>

.....省略内容......

<repositories>
  <repository>
    <id>nexus-aliyun</id>
    <name>Nexus aliyun</name>
    <url>http://maven.aliyun.com/nexus/content/groups/public</url>
  </repository>
</repositories>

保存退出后,重新获取依赖,速度飞快!!!

Zookeeper学习(四)Zookeeper原生Java API 客户端开发

1、客户端与zookeeper服务端的连接

首先我们解压下载的zookeeper的tar文件,然后把如下图的几个jar包都添加到项目中。

image.png

项目结构图如下:

image.png

先附上日志的配置文件:

log4j.xml:

<?xml version="1.0" encoding="GB2312" ?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">


    <appender name="com.wjy329.console" class="org.apache.log4j.ConsoleAppender">
        <layout class="org.apache.log4j.PatternLayout">
            <param name="ConversionPattern" value="%c %d{ISO8601}-- %p -- %m%n" />
        </layout>
    </appender>

    <appender name="com.wjy329.all" class="org.apache.log4j.RollingFileAppender">
        <!-- 设置通道ID:org.zblog.all和输出方式:org.apache.log4j.RollingFileAppender -->
        <param name="File" value="/Users/wjy329/desktop/zklog/all.output.log"/><!-- 设置File参数:日志输出文件名 -->
        <param name="Append" value="false"/><!-- 设置是否在重新启动服务时,在原有日志的基础添加新日志 -->
        <param name="MaxBackupIndex" value="10"/>
        <layout class="org.apache.log4j.PatternLayout">
            <param name="ConversionPattern" value="%p (%c:%L)- %m%n"/><!-- 设置输出文件项目和格式 -->
        </layout>
    </appender>
    <appender name="com.wjy329.wjy" class="org.apache.log4j.RollingFileAppender">
    <param name="File" value="/Users/wjy329/desktop/zklog/wjy.output.log"/>
    <param name="Append" value="true"/>
    <param name="MaxFileSize" value="10240"/> <!-- 设置文件大小 -->
    <param name="MaxBackupIndex" value="10"/>
    <layout class="org.apache.log4j.PatternLayout">
        <param name="ConversionPattern" value="%p (%c:%L)- %m%n"/>
    </layout>
</appender>


    <logger name="wjy.log"> <!-- 设置域名限制 -->
        <level value="info"/><!-- 设置级别 -->
        <appender-ref ref="com.wjy329.wjy"/><!-- 与前面的通道id相对应 -->
    </logger>
    <!-- 根logger的设置-->
    <root>
        <level value="INFO" />
        <appender-ref ref="com.wjy329.console" />
        <appender-ref ref="com.wjy329.all" />
    </root>
</log4j:configuration>

ZKConnect.java:

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**

 * @description:
 *
 * @author: wjy329
 * @param:
 * @return:
 * @create: 2018-09-12 15:00
 **/
public class ZKConnect implements Watcher {
    final static Logger log = LoggerFactory.getLogger(ZKConnect.class);
    //zookeeper集群的ip
    private static final String zkServerPath="172.16.106.130:2181,172.16.106.131:2181,172.16.106.132:2181";
    //超时时间
    private static final Integer timeout = 5000;

    public static void main(String[] args) throws Exception {
        ZooKeeper zk = new ZooKeeper(zkServerPath,timeout,new ZKConnect());
        log.info("客户端开始连接zookeeper服务器...");
        log.info("连接状态:"+zk.getState());
        new Thread().sleep(2000);

        log.info("连接状态:"+zk.getState());
    }

    @Override
    public void process(WatchedEvent event) {
        log.info("接受到watch通知:"+event);
    }
}

上面就可以连接zookeeper服务端,接下来我们运行,可以看到控制台输出相关信息:

image.png2、zookeeper会话重连

直接上代码:

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**

 * @description: 会话重连
 * @author: wjy329
 * @param:
 * @return:
 * @create: 2018-09-13 10:42
 **/
public class ZKConnectSessionWatcher implements Watcher {

    final static Logger log = LoggerFactory.getLogger(ZKConnectSessionWatcher.class);

    public static final String zkServerPath = "172.16.106.130:2181";
    public static final Integer timeout = 5000;

    public static void main(String[] args) throws Exception{
        ZooKeeper zk = new ZooKeeper(zkServerPath,timeout,new ZKConnectSessionWatcher());
        log.warn("客户端开始连接zookeeper服务器...");
        log.warn("连接状态:{}",zk.getState());
        new Thread().sleep(1000);
        log.warn("连接状态:{}",zk.getState());
        //获取session id
        long sessionId = zk.getSessionId();
        //获取session 密码
        byte[] sessionPassword = zk.getSessionPasswd();
        System.out.println("此时的sessionID为"+sessionId);
        new Thread().sleep(200);

        //开始会话重连
        log.warn("开始会话重连...");

        ZooKeeper zkSession = new ZooKeeper(zkServerPath,timeout,
                            new ZKConnectSessionWatcher(),sessionId,sessionPassword);
        log.warn("重新连接状态zkSession:{}",zkSession.getState());
        new Thread().sleep(1000);
        log.warn("重新连接状态zkSession:{}",zkSession.getState());
        System.out.println("此时的sessionID为"+zkSession.getSessionId());
    }

    @Override
    public void process(WatchedEvent event) {
        log.warn("接受到watch通知:"+event);
    }
}

上面运行后,我们可以看运行结果:

image.png3、节点的增删改查

  • 同步方式创建节点:

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;

import java.io.IOException;
import java.util.List;

/**
 * @description:  节点操作
 * @author: wjy329
 * @param:
 * @return:
 * @create: 2018-09-13 14:52
 **/
public class ZKNodeOperator implements Watcher{

    private ZooKeeper zooKeeper = null;

    private static final String zkServerPath = "172.16.106.130:2181";
    private static final Integer timeout = 5000;

    public ZKNodeOperator(){}

    public ZKNodeOperator(String connectString){
        try{
            zooKeeper = new ZooKeeper(connectString,timeout,new ZKNodeOperator());
        }catch (IOException e){
            e.printStackTrace();
            if(zooKeeper != null){
                try {
                    zooKeeper.close();
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) throws Exception{
        ZKNodeOperator zkServer = new ZKNodeOperator(zkServerPath);
        //创建zookeeper节点
        zkServer.createZKNode("/testnode","testnode".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
    }

    public ZooKeeper getZooKeeper() {
        return zooKeeper;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    /**
     * @description  创建zookeeper节点
     * @param path 创建的路径
     * @param data 存储数据的byte[]
     * @param acls 控制权限策略
     * @author wjy329
     * @date 2018/9/13
     */
    public void createZKNode(String path, byte[] data, List<ACL> acls){
        String result = "";
        try{
            /**
             * acl:控制权限策略:Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa
             *                  CREATOR_ALL_ACL --> auth:user:password:cdrwa
             * createMode: 节点类型:是一个枚举
             *              PERSISTENT:持久节点
             *              PERSISTENT_SEQUENTIAL:持久顺序节点
             *              EPHEMERAL:临时节点
             *              EPHEMERAL_SEQUENTIAL:临时顺序节点
             * */
            result = zooKeeper.create(path,data,acls, CreateMode.EPHEMERAL);

            System.out.println("创建节点:\t"+result+"\t成功...");
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    @Override
    public void process(WatchedEvent event) {

    }
}

运行上述代码,创建了一个临时节点,当我们在创建完后,执行ls / 会发现节点已经创建,再次 ls /发现节点不见了,这也是zookeeper的心跳机制导致的,因为主程序运行后,会话结束,心跳机制检测到会话已经结束,那么临时节点也就消失。

image.png

image.png

  • 异步方式创建节点:

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;

import java.io.IOException;
import java.util.List;

/**
 * @description:  节点操作
 * @author: wjy329
 * @param:
 * @return:
 * @create: 2018-09-13 14:52
 **/
public class ZKNodeOperator implements Watcher{

    private ZooKeeper zooKeeper = null;

    private static final String zkServerPath = "172.16.106.130:2181";
    private static final Integer timeout = 5000;

    public ZKNodeOperator(){}

    public ZKNodeOperator(String connectString){
        try{
            zooKeeper = new ZooKeeper(connectString,timeout,new ZKNodeOperator());
        }catch (IOException e){
            e.printStackTrace();
            if(zooKeeper != null){
                try {
                    zooKeeper.close();
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) throws Exception{
        ZKNodeOperator zkServer = new ZKNodeOperator(zkServerPath);
        //创建zookeeper节点
        zkServer.createZKNode("/testTwo","testTwo".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
    }

    public ZooKeeper getZooKeeper() {
        return zooKeeper;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    /**
     * @description  创建zookeeper节点
     * @param path 创建的路径
     * @param data 存储数据的byte[]
     * @param acls 控制权限策略
     * @author wjy329
     * @date 2018/9/13
     */
    public void createZKNode(String path, byte[] data, List<ACL> acls){
        String result = "";
        try{
            /**
             * acl:控制权限策略:Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa
             *                  CREATOR_ALL_ACL --> auth:user:password:cdrwa
             * createMode: 节点类型:是一个枚举
             *              PERSISTENT:持久节点
             *              PERSISTENT_SEQUENTIAL:持久顺序节点
             *              EPHEMERAL:临时节点
             *              EPHEMERAL_SEQUENTIAL:临时顺序节点
             * */
            //result = zooKeeper.create(path,data,acls, CreateMode.EPHEMERAL);
            String ctx = "{'create':'success'}";
            zooKeeper.create(path,data,acls,CreateMode.PERSISTENT,new CreateCallBack(),ctx);
            System.out.println("创建节点:\t"+result+"\t成功...");
            new Thread().sleep(2000);
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    @Override
    public void process(WatchedEvent event) {

    }
}

回调函数CreateCallBack.java:

import org.apache.zookeeper.AsyncCallback;

/**
 * @Author wjy329
 * @Time 2018/9/13下午3:43
 * @description 异步创建节点回调函数
 */

public class CreateCallBack implements AsyncCallback.StringCallback{
    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
        System.out.println("创建节点:"+path);
        System.out.println(ctx);
    }
}

运行程序,控制台显示相关信息:

image.png

由于异步创建了永久节点,所以这个节点永久存在image.png

  • 修改节点

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.List;

/**
 * @description:  节点操作
 * @author: wjy329
 * @param:
 * @return:
 * @create: 2018-09-13 14:52
 **/
public class ZKNodeOperator implements Watcher{

    private ZooKeeper zooKeeper = null;

    private static final String zkServerPath = "172.16.106.130:2181";
    private static final Integer timeout = 5000;

    public ZKNodeOperator(){}

    public ZKNodeOperator(String connectString){
        try{
            zooKeeper = new ZooKeeper(connectString,timeout,new ZKNodeOperator());
        }catch (IOException e){
            e.printStackTrace();
            if(zooKeeper != null){
                try {
                    zooKeeper.close();
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) throws Exception{
        ZKNodeOperator zkServer = new ZKNodeOperator(zkServerPath);
        //创建zookeeper节点
        //zkServer.createZKNode("/testTwo","testTwo".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
        //修改节点信息
        //第一个参数为 修改的节点路径,第二个参数 修改的数据 第三个参数 版本号
        Stat status = zkServer.getZooKeeper().setData("/testTwo","wjy329".getBytes(),0);
        System.out.println("输出的结果是"+status.getVersion());
    }

    public ZooKeeper getZooKeeper() {
        return zooKeeper;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    /**
     * @description  创建zookeeper节点
     * @param path 创建的路径
     * @param data 存储数据的byte[]
     * @param acls 控制权限策略
     * @author wjy329
     * @date 2018/9/13
     */
    public void createZKNode(String path, byte[] data, List<ACL> acls){
        String result = "";
        try{
            /**
             * acl:控制权限策略:Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa
             *                  CREATOR_ALL_ACL --> auth:user:password:cdrwa
             * createMode: 节点类型:是一个枚举
             *              PERSISTENT:持久节点
             *              PERSISTENT_SEQUENTIAL:持久顺序节点
             *              EPHEMERAL:临时节点
             *              EPHEMERAL_SEQUENTIAL:临时顺序节点
             * */
            //result = zooKeeper.create(path,data,acls, CreateMode.EPHEMERAL);
            String ctx = "{'create':'success'}";
            zooKeeper.create(path,data,acls,CreateMode.PERSISTENT,new CreateCallBack(),ctx);
            System.out.println("创建节点:\t"+result+"\t成功...");
            new Thread().sleep(2000);
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    @Override
    public void process(WatchedEvent event) {

    }
}

修改的代码在main()函数中,注意查看。这里特别注意第三个参数,版本号,一定要与当前版本号一致,然后Stat为返回的节点信息,可以获取到一些节点的信息。这里返回修改完成的版本号。

image.png

image.png

  • 同步删除节点

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.List;

/**
 * @description:  节点操作
 * @author: wjy329
 * @param:
 * @return:
 * @create: 2018-09-13 14:52
 **/
public class ZKNodeOperator implements Watcher{

    private ZooKeeper zooKeeper = null;

    private static final String zkServerPath = "172.16.106.130:2181";
    private static final Integer timeout = 5000;

    public ZKNodeOperator(){}

    public ZKNodeOperator(String connectString){
        try{
            zooKeeper = new ZooKeeper(connectString,timeout,new ZKNodeOperator());
        }catch (IOException e){
            e.printStackTrace();
            if(zooKeeper != null){
                try {
                    zooKeeper.close();
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) throws Exception{
        ZKNodeOperator zkServer = new ZKNodeOperator(zkServerPath);
        //创建zookeeper节点
        //zkServer.createZKNode("/testTwo","testTwo".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
        //修改节点信息
        //第一个参数为 修改的节点路径,第二个参数 修改的数据 第三个参数 版本号
        //Stat status = zkServer.getZooKeeper().setData("/testTwo","wjy329".getBytes(),0);
        //删除节点信息,第一个参数是路径,第二个参数是版本号
        zkServer.getZooKeeper().delete("/testTwo",1);
        System.out.println("删除成功");
    }

    public ZooKeeper getZooKeeper() {
        return zooKeeper;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    /**
     * @description  创建zookeeper节点
     * @param path 创建的路径
     * @param data 存储数据的byte[]
     * @param acls 控制权限策略
     * @author wjy329
     * @date 2018/9/13
     */
    public void createZKNode(String path, byte[] data, List<ACL> acls){
        String result = "";
        try{
            /**
             * acl:控制权限策略:Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa
             *                  CREATOR_ALL_ACL --> auth:user:password:cdrwa
             * createMode: 节点类型:是一个枚举
             *              PERSISTENT:持久节点
             *              PERSISTENT_SEQUENTIAL:持久顺序节点
             *              EPHEMERAL:临时节点
             *              EPHEMERAL_SEQUENTIAL:临时顺序节点
             * */
            //result = zooKeeper.create(path,data,acls, CreateMode.EPHEMERAL);
            String ctx = "{'create':'success'}";
            zooKeeper.create(path,data,acls,CreateMode.PERSISTENT,new CreateCallBack(),ctx);
            System.out.println("创建节点:\t"+result+"\t成功...");
            new Thread().sleep(2000);
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    @Override
    public void process(WatchedEvent event) {

    }
}

同样,我们也需要看main()方法中的删除即可

image.png

  • 异步删除节点

由于同步的删除没有任何返回状态,一般情况下,我们采用异步的方式。

异步删除回调函数:

import org.apache.zookeeper.AsyncCallback;

/**

 * @description: 异步删除回调函数
 *
 * @author: wjy329
 * @return:
 * @create: 2018-09-13 19:23
 **/
public class DeleteCallBack implements AsyncCallback.VoidCallback{
    @Override
    public void processResult(int rc, String path, Object ctx) {
        System.out.println("删除节点"+path);
        System.out.println((String) ctx);
    }
}

异步删除:

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.List;

/**
 * @description:  节点操作
 * @author: wjy329
 * @param:
 * @return:
 * @create: 2018-09-13 14:52
 **/
public class ZKNodeOperator implements Watcher{

    private ZooKeeper zooKeeper = null;

    private static final String zkServerPath = "172.16.106.130:2181";
    private static final Integer timeout = 5000;

    public ZKNodeOperator(){}

    public ZKNodeOperator(String connectString){
        try{
            zooKeeper = new ZooKeeper(connectString,timeout,new ZKNodeOperator());
        }catch (IOException e){
            e.printStackTrace();
            if(zooKeeper != null){
                try {
                    zooKeeper.close();
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) throws Exception{
        ZKNodeOperator zkServer = new ZKNodeOperator(zkServerPath);
        //创建zookeeper节点
        //zkServer.createZKNode("/testTwo","testTwo".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
        //修改节点信息
        //第一个参数为 修改的节点路径,第二个参数 修改的数据 第三个参数 版本号
        //Stat status = zkServer.getZooKeeper().setData("/testTwo","wjy329".getBytes(),0);
        //删除节点信息,第一个参数是路径,第二个参数是版本号
        String ctx = "{'delete':'success'}";
        zkServer.getZooKeeper().delete("/testTwo",0,new DeleteCallBack(),ctx);
        Thread.sleep(2000);
        System.out.println("删除成功");
    }

    public ZooKeeper getZooKeeper() {
        return zooKeeper;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    /**
     * @description  创建zookeeper节点
     * @param path 创建的路径
     * @param data 存储数据的byte[]
     * @param acls 控制权限策略
     * @author wjy329
     * @date 2018/9/13
     */
    public void createZKNode(String path, byte[] data, List<ACL> acls){
        String result = "";
        try{
            /**
             * acl:控制权限策略:Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa
             *                  CREATOR_ALL_ACL --> auth:user:password:cdrwa
             * createMode: 节点类型:是一个枚举
             *              PERSISTENT:持久节点
             *              PERSISTENT_SEQUENTIAL:持久顺序节点
             *              EPHEMERAL:临时节点
             *              EPHEMERAL_SEQUENTIAL:临时顺序节点
             * */
            //result = zooKeeper.create(path,data,acls, CreateMode.EPHEMERAL);
            String ctx = "{'create':'success'}";
            zooKeeper.create(path,data,acls,CreateMode.PERSISTENT,new CreateCallBack(),ctx);
            System.out.println("创建节点:\t"+result+"\t成功...");
            new Thread().sleep(2000);
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    @Override
    public void process(WatchedEvent event) {

    }
}

同样,也是主要看main()函数中的方法

image.png

  • 节点查询

在看节点查询之前,建议先阅读《Zookeeper学习之CountDownLatch》 

获取节点数据:

package ZKGetNode;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * @Author wjy329
 * @Time 2018/9/14上午9:42
 * @description
 */

public class ZKGetNodeData implements Watcher{
    private ZooKeeper zooKeeper = null;

    public static final String zkServerPath = "172.16.106.130:2181";
    public static final Integer timeout = 5000;
    private static Stat stat = new Stat();

    public ZKGetNodeData(){}

    public ZKGetNodeData(String connectString){
        try{
            zooKeeper = new ZooKeeper(connectString,timeout,new ZKGetNodeData());
        }catch (IOException e){
            e.printStackTrace();
            if(zooKeeper != null){
                try{
                    zooKeeper.close();
                }catch (InterruptedException e1){
                    e1.printStackTrace();
                }
            }
        }
    }

    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws Exception{
        ZKGetNodeData zkServer = new ZKGetNodeData(zkServerPath);
        /**
         * 参数:path:节点路径
         *  watch:true/false 注册一个watch事件
         *  stat:状态
         * */
        byte[] resByte = zkServer.getZooKeeper().getData("/test",true,stat);
        String result = new String(resByte);
        System.out.println("当前值:"+result);
        countDownLatch.await();
    }

    @Override
    public void process(WatchedEvent event) {
        try{
            if(event.getType() == Event.EventType.NodeDataChanged){
                ZKGetNodeData zkServer = new ZKGetNodeData(zkServerPath);
                byte[] resByte = zkServer.getZooKeeper().getData("/test",false,stat);
                String result = new String(resByte);
                System.out.println("更改后的值:"+result);
                System.out.println("版本号变化dversion:"+stat.getVersion());
                countDownLatch.countDown();
            }else if(event.getType() == Event.EventType.NodeCreated){

            }else if(event.getType() == Event.EventType.NodeChildrenChanged){

            }else if(event.getType() == Event.EventType.NodeDeleted){

            }
        }catch (KeeperException e){
            e.printStackTrace();
        }catch (InterruptedException e1){
            e1.printStackTrace();
        }
    }

    public ZooKeeper getZooKeeper() {
        return zooKeeper;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    public static String getZkServerPath() {
        return zkServerPath;
    }

    public static Integer getTimeout() {
        return timeout;
    }

    public static Stat getStat() {
        return stat;
    }

    public static void setStat(Stat stat) {
        ZKGetNodeData.stat = stat;
    }

    public static CountDownLatch getCountDownLatch() {
        return countDownLatch;
    }

    public static void setCountDownLatch(CountDownLatch countDownLatch) {
        ZKGetNodeData.countDownLatch = countDownLatch;
    }
}

上述代码就是查看节点的演示,上面由于使用了CountDownLatch,所以运行程序后,线程还会再等待节点的相应操作,等到计数器减为0后任务结束。

现在控制台中查看/test节点的数据为  329

image.png

然后运行程序,控制台输出:

image.png

然后我们在控制台作出相应的事件,修改下节点的数据:

image.png

此时控制台为:

image.png

获取子节点数据:

package ZKGetNode;

import org.apache.zookeeper.*;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * @Author wjy329
 * @Time 2018/9/14上午10:29
 * @description 获取子节点的数据
 */

public class ZKGetChildrenList implements Watcher{
    private ZooKeeper zooKeeper = null;

    public static final String zkServerPath = "172.16.106.130:2181";
    public static final Integer timeout = 5000;

    public ZKGetChildrenList(){}

    public ZKGetChildrenList(String connectString){
        try{
            zooKeeper = new ZooKeeper(connectString,timeout,new ZKGetChildrenList());
        }catch (IOException e){
            e.printStackTrace();
            if(zooKeeper != null){
                try{
                    zooKeeper.close();
                }catch (InterruptedException e1){
                    e1.printStackTrace();
                }
            }
        }
    }

    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws Exception{
        ZKGetChildrenList zkServer = new ZKGetChildrenList(zkServerPath);
        /**
         * 参数:
         * path:父节点路径
         * watch:true或者false,注册一个watch事件
         * */
        List<String> strChildList = zkServer.getZooKeeper().getChildren("/test",true);
        for(String s : strChildList){
            System.out.println(s);
        }
        //异步调用
        String ctx = "{'callback':'ChildrenCallback'}";
        zkServer.getZooKeeper().getChildren("/test",true,new ChildrenCallback(),ctx);
        //zkServer.getZooKeeper().getChildren("/test",true,new Children2Callback(),ctx);

        countDownLatch.await();
    }
    @Override
    public void process(WatchedEvent event) {
        try{
            if(event.getType() == Event.EventType.NodeChildrenChanged){
                System.out.println("NodeChildrenChanged");
                ZKGetChildrenList zkServer = new ZKGetChildrenList(zkServerPath);
                List<String> strChildList = zkServer.getZooKeeper().getChildren(event.getPath(),false);
                for(String s : strChildList){
                    System.out.println(s);
                }
                countDownLatch.countDown();
            }else if(event.getType() == Event.EventType.NodeCreated){
                System.out.println("NodeCreated");
            }else if(event.getType() == Event.EventType.NodeDataChanged){
                System.out.println("NodeDataChanged");
            }else if(event.getType() == Event.EventType.NodeDeleted){
                System.out.println("NodeDeleted");
            }
        }catch (KeeperException e){
            e.printStackTrace();
        }catch (InterruptedException e1){
            e1.printStackTrace();
        }

    }

    public ZooKeeper getZooKeeper() {
        return zooKeeper;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    public static String getZkServerPath() {
        return zkServerPath;
    }

    public static Integer getTimeout() {
        return timeout;
    }
}

从上面我们看到异步的方式有两种,第二种比第一种多了一个参数,能获取到最后的一些状态,这里仅给出第一种的结果,第二种自行演示。

ChildrenCallback.java:

package ZKGetNode;/**
 * @Author wjy329
 * @Time 2018/9/14上午10:45
 * @description
 */

import org.apache.zookeeper.AsyncCallback;

import java.util.List;

/**

 * @description:
 *
 * @author: wjy329
 * @param:
 * @return:
 * @create: 2018-09-14 10:45
 **/
public class ChildrenCallback implements AsyncCallback.ChildrenCallback {
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children) {
        for(String s : children){
            System.out.println(s);
        }
        System.out.println("ChildrenCallback:"+path);
        System.out.println((String) ctx);
    }
}

Children2Callback.java:

package ZKGetNode;/**
 * @Author wjy329
 * @Time 2018/9/14上午10:45
 * @description
 */

import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.data.Stat;

import java.util.List;

/**

 * @description:
 *
 * @author: wjy329
 * @param:
 * @return:
 * @create: 2018-09-14 10:45
 **/
public class Children2Callback implements AsyncCallback.Children2Callback {
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
        for(String s : children){
            System.out.println(s);
        }
        System.out.println("ChildrenCallback:"+path);
        System.out.println((String) ctx);
        System.out.println(stat.toString());
    }
}

命令行先查看子节点:image.png

然后看控制台输出:

image.png

至于watcher的监听事件,自己尝试吧。

  • 判断节点是否存在:

package ZKGetNode;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * @Author wjy329
 * @Time 2018/9/14下午7:41
 * @description 判断节点是否存在
 */

public class ZKNodeExist implements Watcher{
    private ZooKeeper zooKeeper = null;

    public static final String zkServerPath = "172.16.106.130:2181";
    public static final Integer timeout = 5000;

    public ZKNodeExist(){}

    public ZKNodeExist(String connectString){
        try{
            zooKeeper = new ZooKeeper(connectString,timeout,new ZKNodeExist());
        }catch (IOException e){
            e.printStackTrace();
            if(zooKeeper != null){
                try{
                    zooKeeper.close();
                }catch (InterruptedException e1){
                    e1.printStackTrace();
                }
            }
        }
    }

    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws Exception{
        ZKNodeExist zkServer = new ZKNodeExist(zkServerPath);

        Stat stat = zkServer.getZooKeeper().exists("/test",true);
        if(stat != null){
            System.out.println("查询的节点版本:"+stat.getVersion());
        }else{
            System.out.println("该节点不存在。。。");
        }

        countDownLatch.await();
    }

    @Override
    public void process(WatchedEvent event) {
        if(event.getType() == Event.EventType.NodeCreated){
            System.out.println("节点创建");
            countDownLatch.countDown();
        }else if(event.getType() == Event.EventType.NodeDataChanged){
            System.out.println("节点数据被改变");
            countDownLatch.countDown();
        }else if(event.getType() == Event.EventType.NodeDeleted){
            System.out.println("节点被删除");
            countDownLatch.countDown();
        }
    }

    public ZooKeeper getZooKeeper() {
        return zooKeeper;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    public static String getZkServerPath() {
        return zkServerPath;
    }

    public static Integer getTimeout() {
        return timeout;
    }
}

运行后,我们再改变下数据以供watcher监听到。

image.png

image.png

  • ACL相关

默认匿名权限

package ZKAcl;

import ZKGetNode.ZKNodeExist;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;

import java.io.IOException;
import java.util.List;

/**
 * @Author wjy329
 * @Time 2018/9/14下午8:58
 * @description zookeeper操作节点acl
 */

public class ZKNodeACL implements Watcher{
    private ZooKeeper zooKeeper = null;

    public static final String zkServerPath = "172.16.106.130:2181";
    public static final Integer timeout = 5000;

    public ZKNodeACL(){}

    public ZKNodeACL(String connectString){
        try{
            zooKeeper = new ZooKeeper(connectString,timeout,new ZKNodeACL());
        }catch (IOException e){
            e.printStackTrace();
            if(zooKeeper != null){
                try{
                    zooKeeper.close();
                }catch (InterruptedException e1){
                    e1.printStackTrace();
                }
            }
        }
    }

    public void createZKNode(String path, byte[] data, List<ACL> acls){
        String result = "";
        try{
            result = zooKeeper.create(path,data,acls, CreateMode.PERSISTENT);
            System.out.println("创建节点:\t"+result+"\t成功...");
        }catch (KeeperException e){
            e.printStackTrace();
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        ZKNodeACL zkServer = new ZKNodeACL(zkServerPath);
        //acl任何人都可以访问
        zkServer.createZKNode("/aclnode","test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);

    }

    @Override
    public void process(WatchedEvent event) {

    }

    public ZooKeeper getZooKeeper() {
        return zooKeeper;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    public static String getZkServerPath() {
        return zkServerPath;
    }

    public static Integer getTimeout() {
        return timeout;
    }
}

运行后,创建节点:

image.png

自定义用户权限:

由于用户需要加密,先附上工具类:

package ZKAcl;

import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;

/**
 * @Author wjy329
 * @Time 2018/9/14下午9:16
 * @description
 */

public class AclUtils {
    public static String getDigestUserPwd(String id)throws Exception{
        return DigestAuthenticationProvider.generateDigest(id);
    }

}

然后再附上主要的代码:

package ZKAcl;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * @Author wjy329
 * @Time 2018/9/14下午9:17
 * @description 自定义用户权限
 */


public class ZKNodeACL2 implements Watcher{
    private ZooKeeper zooKeeper = null;

    public static final String zkServerPath = "172.16.106.130:2181";
    public static final Integer timeout = 5000;

    public ZKNodeACL2(){}

    public ZKNodeACL2(String connectString){
        try{
            zooKeeper = new ZooKeeper(connectString,timeout,new ZKNodeACL2());
        }catch (IOException e){
            e.printStackTrace();
            if(zooKeeper != null){
                try{
                    zooKeeper.close();
                }catch (InterruptedException e1){
                    e1.printStackTrace();
                }
            }
        }
    }

    public void createZKNode(String path, byte[] data, List<ACL> acls){
        String result = "";
        try{
            result = zooKeeper.create(path,data,acls, CreateMode.PERSISTENT);
            System.out.println("创建节点:\t"+result+"\t成功...");
        }catch (KeeperException e){
            e.printStackTrace();
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws Exception {
        ZKNodeACL2 zkServer = new ZKNodeACL2(zkServerPath);
        //acl任何人都可以访问
        //zkServer.createZKNode("/aclnode","test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
        //自定义用户认证访问
        List<ACL> acls = new ArrayList<>();
        Id user1 = new Id("digest",AclUtils.getDigestUserPwd("user1:123456"));
        Id user2 = new Id("digest",AclUtils.getDigestUserPwd("user2:123456"));
        acls.add(new ACL(ZooDefs.Perms.ALL,user1));
        acls.add(new ACL(ZooDefs.Perms.READ,user2));
        acls.add(new ACL(ZooDefs.Perms.DELETE | ZooDefs.Perms.CREATE,user2));

        zkServer.createZKNode("/aclnode/testdigest","test".getBytes(),acls);
        //注册用户必须通过登录才能操作节点
        zkServer.getZooKeeper().addAuthInfo("digest","user1:123456".getBytes());
        zkServer.createZKNode("/aclnode/testdigest/childtest","child".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL);
    }

    @Override
    public void process(WatchedEvent event) {

    }

    public ZooKeeper getZooKeeper() {
        return zooKeeper;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    public static String getZkServerPath() {
        return zkServerPath;
    }

    public static Integer getTimeout() {
        return timeout;
    }
}

上面的代码也很好理解,就是新建两个用户,分别授予不同的权限,然后对节点进行操作,更多效果自行测试。

image.png

以上部分就是基本的客户端操作了,都是一样的思路,很简单喽,自己实践就会明白了。

Zookeeper学习之CountDownLatch

在这里先介绍一下CountDownLatch,因为Zookeeper的节点查询和分布式锁等需要到这个计数器。它允许一个或多个线程一直等待,直到其他线程执行完后再执行;例如:一个系统等待参数加载、资源加载等任务都完成后才允许用户登录。

CountDownLatch是一个计数器,多用于线程,可以暂停也可以继续。

CountDownLatch一般有两个方法 .await()  .countDown()

主线程必须在启用其他线程之后立即调用.await()方法,这样主线程的操作就会在这个方法上阻塞,等待其他线程完成各自的任务;其他任务完成后必须通过.countDown()来通知任务完成,当count减为0后,主线程就可以通过.await()恢复执行自己的方法。

这里我以慕课网的例子来演示:

实例:监控调度系统,当监控调度中心收到所有的调度分区返回的成功信息后,然后通知发车。

目录结构:

image.png

DangeCenter.java:

package CountDownLatchTest;

import java.util.concurrent.CountDownLatch;

/**
 * @description: 抽象类,用来演示调度中心,统一检查
 *
 * @author: wjy329
 * @param:
 * @return:
 * @create: 2018-09-13 20:46
 **/
public abstract class DangerCenter implements Runnable{

    //计数器
    private CountDownLatch countDown;
    //调度站
    private String station;
    //检查标志ok
    private boolean ok;

    public DangerCenter(CountDownLatch countDown,String station){
        this.countDown = countDown;
        this.station = station;
        this.ok = false;
    }

    @Override
    public void run() {
        try{
            check();
            ok = true;
        }catch (Exception e){
            e.printStackTrace();
            ok = false;
        }finally {
            if(countDown != null){
                countDown.countDown();
            }
        }
    }

    /**
    * @description 检查车况
    * @param
    * @return
    * @author wjy329
    * @Date 2018/9/13
    */
    public abstract void check();

    public CountDownLatch getCountDown() {
        return countDown;
    }

    public void setCountDown(CountDownLatch countDown) {
        this.countDown = countDown;
    }

    public String getStation() {
        return station;
    }

    public void setStation(String station) {
        this.station = station;
    }

    public boolean isOk() {
        return ok;
    }

    public void setOk(boolean ok) {
        this.ok = ok;
    }
}

StationW.java:

package CountDownLatchTest;

import java.util.concurrent.CountDownLatch;

/**
 * @description:
 *
 * @author: wjy329
 * @param:
 * @return:
 * @create: 2018-09-13 20:58
 **/
public class StationW extends DangerCenter{
    public StationW(CountDownLatch countDown) {
        super(countDown, "W调度站");
    }

    @Override
    public void check() {
        System.out.println("正在检查["+this.getStation()+"]...");
        try{
            Thread.sleep(2000);
        }catch (InterruptedException e){
            e.printStackTrace();
        }

        System.out.println("检查["+this.getStation()+"]完毕,可以调度");
    }
}

StationJ.java:

package CountDownLatchTest;

import java.util.concurrent.CountDownLatch;

/**
 * @description:
 *
 * @author: wjy329
 * @param:
 * @return:
 * @create: 2018-09-13 20:58
 **/
public class StationJ extends DangerCenter{
    public StationJ(CountDownLatch countDown) {
        super(countDown, "J调度站");
    }

    @Override
    public void check() {
        System.out.println("正在检查["+this.getStation()+"]...");
        try{
            Thread.sleep(1000);
        }catch (InterruptedException e){
            e.printStackTrace();
        }

        System.out.println("检查["+this.getStation()+"]完毕,可以调度");
    }
}

StationY.java:

package CountDownLatchTest;

import java.util.concurrent.CountDownLatch;

/**
 * @description:
 *
 * @author: wjy329
 * @param:
 * @return:
 * @create: 2018-09-13 20:58
 **/
public class StationY extends DangerCenter{
    public StationY(CountDownLatch countDown) {
        super(countDown, "Y调度站");
    }

    @Override
    public void check() {
        System.out.println("正在检查["+this.getStation()+"]...");
        try{
            Thread.sleep(1500);
        }catch (InterruptedException e){
            e.printStackTrace();
        }

        System.out.println("检查["+this.getStation()+"]完毕,可以调度");
    }
}

CheckStartUp.java:

package CountDownLatchTest;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

/**
 * @Author wjy329
 * @Time 2018/9/13下午9:04
 * @description 监控中心总部
 */

public class CheckStartUp {
    private static List<DangerCenter> stationList;
    private static CountDownLatch countDown;

    public CheckStartUp(){}

    public static boolean checkAllStations() throws Exception{
        //初始化3个调度站
        countDown = new CountDownLatch(3);

        //把所有站点添加到list
        stationList = new ArrayList<>();
        stationList.add(new StationW(countDown));
        stationList.add(new StationJ(countDown));
        stationList.add(new StationY(countDown));

        Executor executor = Executors.newFixedThreadPool(stationList.size());

        for(DangerCenter center : stationList){
            executor.execute(center);
        }

        //等待线程执行完毕
        countDown.await();

        for(DangerCenter center : stationList){
            if(!center.isOk()){
                return false;
            }
        }
        return true;
    }

    public static void main(String[] args) throws Exception {
        boolean result = CheckStartUp.checkAllStations();
        System.out.println("监控中心对所有站点的检查结果为:"+ result);
    }
}

运行结果:

image.png

Zookeeper学习(三)Zookeeper的集群安装

准备工作:3台虚拟机,3台虚拟机中都得安装zookeeper(这不是废话么。。。)

image.png

在第一台虚拟机中,执行  /usr/local/zookeeper/conf  然后vim zoo.cfg 按下图修改配置

image.png

然后执行cd /usr/local/zookeeper/dataDir 新建一个myid文件,vim myid myid中的内容为数字 1

其他两台的zoo.cfg配置一样,就是myid分别设置为2,3即可

然后分别在zookeeper的bin目录中启动服务即可 ./zkServer.sh start

image.png

然后分别查看zookeeper的状态 执行 ./zkServer.sh status  可以看到有一台机器的leader,其余两台为follower

image.png

image.pngimage.png

在第二台机器中,执行 ./zkCli.sh -server 172.16.106.131:2181 然后查看节点,发现只有一个节点zookeeper,然后我们新建一个test节点,设置值为329,然后在其他两台机器中查看节点,发现也会有刚在创建的值为329的test节点

image.png

image.png

到此位置,zookeeper的集群搭建就基本完成了。怎么样,是不是贼简单。赶紧去试试吧。

Zookeeper学习(二)Zookeeper基本特征、命令行和权限

1、Zookeeper常用命令行操作

通过./zkCli.sh 打开zookeeper的客户端进行命令行后台

ls与ls2命令

    ls :查看某一目录下有哪些详细文件目录

    ls2 : 可以文件和状态信息

get与stat命令

    get : 获取当前节点的数据

    stat :查看当前节点的状态信息

create命令

    用来创建子节点 

    create path data 创建持久化节点

    create -e path data 创建临时节点

    create -s path data 创建顺序节点

set命令

    用来修改子节点

    set path data [version]

    set /path new-data  将path子节点的值变为new-data

    [version] 指定最新的版本号进行修改,加上版本号其实就是加锁

delete命令

    用来删除节点

    delete path [version]

    不加版本号会直接删除节点,加上版本号是判断锁操作

2、Zookeeper特性

     -session的基本原理

  • 客户端与服务端之间的连接存在会话

  • 每个会话都会可以设置一个超时时间

  • 心跳结束,session则过期

  • Session过期,则临时节点znode会被抛弃

  • 心跳机制:客户端向服务端的ping包请求

    Zookeeper特性

    -watcher机制

  • 针对每个节点的操作,都会有一个监督者—watcher

  • 当监控的某个对象(znode)发生了变化,则触发watcher事件

  • zookeeper中的watcher是一次性的,触发后立即销毁

  • 父节点,子节点 增删改都能够触发其watcher

  • 针对不同类型的操作,触发的watcher事件也不同:

        1.(子)节点创建事件

        2.(子)节点删除事件

        3.(子)节点数据变化事件

3、Watcher命令行学习

  • 通过get path [watch] 设置watcher

  • 父节点 增 删 改 操作触发watcher

  • 子节点 增 删 改 操作触发watcher

4、Watcher事件类型

父节点watch事件:

  • 创建父节点触发:NodeCreated

  • 修改父节点数据触发:NodeDataChanged

  • 删除父节点触发:NodeDeleted

image.png

image.pngimage.png

子节点watcher事件:

ls为父节点设置watcher,创建子节点触发:NodeChildrenChanged

ls为父节点设置watcher,删除子节点触发:NodeChildrenChanged

ls为父节点设置watcher,修改子节点不触发事件(需要把子节点当成父节点才能触发watcher事件)

image.pngimage.pngimage.pngimage.png

watcher使用场景

  • 统一资源配置

ACL权限控制

  • 针对节点可以设置相关读写操作等权限,目的是为了保障数据安全性

  • 权限permissions可以指定不同的权限范围以及角色

ACL命令行

  • getAcl:获取某个节点的acl权限信息

  • setAcl:设置某个节点的acl权限信息

  • addauth:输入认证授权信息,注册时输入明文密码(登录)但是在zk的系统里,密码是以加密的形式存在的

ACL的构成

  • zookeeper的acl通过[scheme:id:permissions]来构成权限列表

        scheme:代表采用的某种权限机制

            world:world下只有一个id,即只有一个用户,也就是anyone,那么组合的写法就是world:anyone:[permissions]

            auth:代表认证登录,需要注册用户有权限就可以,形式为auth:user:password:[permissions]

            digest:需要对密码加密才能访问,组合形式为digest:username:BASE64(SHA1(password)):[permissions]

            简而言之,auth与digest的区别就是,前者明文,后者密文;setAcl/path auth:lee:lee:cdrwa与setAcl/path                                                  digest:lee.BASE64(SHA1(password))cdrwa是等价的,在通过adduth digest lee:lee后都能操作指定节点的权限

        id:代表允许访问的用户

            ip:当设置为ip指定的ip地址,此时限制ip进行访问,比如ip:192.168.1.1[permissions]

            super:代表超级管理员,拥有所有的权限

        permissions:权限组合字符串-缩写crdwa

            CREATE:创建子节点

            READ:获取节点/子节点

            WRITE:设置节点数据

            DELETE:删除子节点

            ADMIN:设置权限

ACL命令行学习

  • world:anyone:cdrwa

image.png

  • digest:user:BASE64(SHA1(pwd)):cdrwa

        addauth digest user:pwd

image.png

  • ip:192.168.1.1:cdrwa

image.png

  • super超级管理员

修改/usr/local/zookeeper/bin目录下的zkServer.sh 添加如下的语句:用户名和密码自定义,密码是BASE64加密后的

"-Dzookeeper.DigestAuthenticationProvider.superDigest=wjy:xQJmxLMiHGwaqBvst5y6rkB6HQs="

image.png修改完保存后,然后执行./zkServer.sh restart重启 zkServer.sh,启动./zkCli.sh

image.png

ACL的常用使用场景

  • 开发/测试环境分离,开发者无权操作测试库的节点,只能看

  • 生产环境上控制指定ip的服务可以访问相关节点,防止混乱

zookeeper四字命令

  • zookeeper可以通过它自身提供的简写命令来和服务器进行交互

  • 需要使用到nc命令,安装:yum install nc

  • echo [commond]| nc [ip] [port]

官方文档:http://zookeeper.apache.org/doc/r3.4.13/zookeeperAdmin.html#sc_zkCommands

image.png

仅演示一个命令,其他的参考文档吧。

Zookeeper学习(一)简介、安装和配置

ZooKeeper 是一个开源的分布式协调服务,由雅虎创建,是 Google Chubby 的开源实现。
分布式应用程序可以基于 ZooKeeper 实现诸如数据发布/订阅、负载均衡、命名服务、分布式协
调/通知、集群管理、Master 选举、配置维护,名字服务、分布式同步、分布式锁和分布式队列
等功能。

一、简介

  • 中间件,提供协调服务

  • 作用于分布式系统,发挥其优势,可以为大数据服务

  • 支持java,提供java和c语言的客户端api

二、安装Zookeeper

安装zookeeper之前需要安装jdk,关于jdk的安装,之前的博客有介绍: CentOS之Java安装 

下面是我的linux虚拟机中的java的版本image.png

单机Zooleeper安装:

下载页面:https://archive.apache.org/dist/zookeeper/

这里选择3.4.11的版本:

image.png

将下载下来的tar包上传到服务器中,一般上传文件到/home 目录,然后执行tar -zxvf zookeeper-3.4.11.tar.gz 解压,之后执行

mv zookeeper-3.4.11 zookeeper重命名为zookeeper,然后执行mv zookeeper /usr/local  复制移动文件到指定目录

接下来就是配置环境变量,vim /etc/profile  添加下图中画线的文本

image.png配置Zookeeper,进入/usr/local/zookeeper/conf 复制一份官方的配置文件,并重命名为zoo.cfg,然后我们对zoo.cfg进行修改,先看下面几个配置的意义。

tickTime:用于计算的时间单元。比如session超时:N*tickTime

initLimit:用于集群,允许从节点连接并同步到master节点的初始化连接时间,以tickTime的倍数来表示

syncLimit:用于集群,master主节点与从节点之间发送消息,请求和应答时间长度。(心跳机制)

dataDir:必须配置

dataLogDir:日志目录,如果不配置会和dataDir公用

clientPort:连接服务器的端口,默认2181

image.png

然后vim zoo.cfg,下图就是我们修改之后的配置文件:

image.png

然后相应的在路径新建dataDirdataLogDir两个数据目录,这样单机的zookeeper就配置完成了。

配置完成后,进入到 /usr/local/zookeeper/bin 目录下,执行启动服务命令,开始我们执行./zkServer.sh 会提示我们添加一些参数,如下所示,参数的意思也很直观。  

执行 ./zkServer.sh start 来启动服务。 到此,zookeeper的单机的安装和配置就基本完成了。image.png

Zookeeper的作用体现

1、master节点选举,主节点挂了以后,从节点就会接手工作,并且保证这个节点是唯一的,这也是所谓的首脑模式,从而保证我们的集群是高可用的。

2、统一配置文件管理,即只需要部署一台服务器,则可以把相同的配置文件同步更新到其他所有服务器,此操作在云计算中用的特别多。

3、发布与订阅,类似消息队列MQ,dubbo发布者把数据存在znode上,订阅者会读取这个数据。

4、提供分布式锁,分布式环境中不同进程之间争夺资源,类似于多线程中的锁。

5、集群管理,集群中保证数据的强一致性。