转载声明:http://macrochen.iteye.com/blog/1366136
博客分类:
Curator是Netflix开源的一套ZooKeeper客户端框架. Netflix在使用ZooKeeper的过程中发现ZooKeeper自带的客户端太底层, 应用方在使用的时候需要自己处理很多事情, 于是在它的基础上包装了一下, 提供了一套更好用的客户端框架. Netflix在用ZooKeeper的过程中遇到的问题, 我们也遇到了, 所以开始研究一下, 首先从他在github上的源码, wiki文档以及Netflix的技术blog入手. 看完官方的文档之后, 发现Curator主要解决了三类问题: 如果在操作过程中失败, 且这种失败是可重试的, 而且在允许的次数内, Curator将保证操作的最终完成. 另一种使用Callable接口的重试做法: 重试策略 RetryPolicy接口只有一个方法(以前版本有两个方法): public boolean allowRetry(int retryCount, long elapsedTimeMs); 在开始重试之前, allowRetry方法被调用, 其参数将指定当前重试次数, 和操作已消耗时间. 如果允许, 将继续重试, 否则抛出异常. Curator内部实现的几种重试策略: 方法说明:
名称空间(Namespace) 因为一个zk集群会被多个应用共享, 为了避免各个应用的zk patch冲突, Curator Framework内部会给每一个Curator Framework实例分配一个namespace(可选). 这样你在create ZNode的时候都会自动加上这个namespace作为这个node path的root. 使用代码如下: Recipe Curator实现ZooKeeper的所有recipe(除了两段提交) 选举 集群领导选举(leader election) 锁服务 共享锁: 全局同步分布式锁, 同一时间两台机器只有一台能获得同一把锁. 共享读写锁: 用于分布式的读写互斥处理, 同时生成两个锁:一个读锁, 一个写锁, 读锁能被多个应用持有, 而写锁只能一个独占, 当写锁未被持有时, 多个读锁持有者可以同时进行读操作 共享信号量: 在分布式系统中的各个JVM使用同一个zk lock path, 该path将跟一个给定数量的租约(lease)相关联, 然后各个应用根据请求顺序获得对应的lease, 相对来说, 这是最公平的锁服务使用方式. 多共享锁:内部构件多个共享锁(会跟一个znode path关联), 在acquire()过程中, 执行所有共享锁的acquire()方法, 如果中间出现一个失败, 则将释放所有已require的共享锁; 执行release()方法时, 则执行内部多个共享锁的release方法(如果出现失败将忽略) 队列(Queue) 分布式队列:采用持久顺序zk node来实现FIFO队列, 如果有多个消费者, 可以使用LeaderSelector来保证队列的消费者顺序 分布式优先队列: 优先队列的分布式版本 BlockingQueueConsumer: JDK阻塞队列的分布式版本 关卡(Barrier) 分布式关卡:一堆客户端去处理一堆任务, 只有所有的客户端都执行完, 所有客户端才能继续往下处理 双分布式关卡:同时开始, 同时结束 计数器(Counter) 共享计数器:所有客户端监听同一个znode path, 并共享一个最新的integer计数值 分布式AtomicLong(AtomicInteger): AtomicXxx的分布式版本, 先采用乐观锁更新, 若失败再采用互斥锁更新, 可以配置重试策略来处理重试 工具类 Path Cache Path Cache用于监听ZNode的子节点的变化, 当add, update, remove子节点时将改变Path Cache state, 同时返回所有子节点的data和state. Curator中采用了PathChildrenCache类来处理Path Cache, 状态的变化则采用PathChildrenCacheListener来监听. 相关用法参见TestPathChildrenCache测试类 注意: 当zk server的数据发生变化, zk client会出现不一致, 这个需要通过版本号来识别这种状态的变化 Test Server 用来在测试中模拟一个本地进程内ZooKeeper Server. Test Cluster 用来在测试中模拟一个ZooKeeper Server集群 ZKPaths工具类 提供了和ZNode相关的path处理工具方法: Notification事件处理 Curator对ZooKeeper的事件Watcher进行了封装处理, 然后实现了一套监听机制. 提供了几个监听接口用来处理ZooKeeper连接状态的变化 当连接出现异常, 将通过ConnectionStateListener接口进行监听, 并进行相应的处理, 这些状态变化包括:
- 封装ZooKeeper client与ZooKeeper server之间的连接处理;
- 提供了一套Fluent风格的操作API;
- 提供ZooKeeper各种应用场景(recipe, 比如共享锁服务, 集群领导选举机制)的抽象封装.
- Client: 是ZooKeeper客户端的一个替代品, 提供了一些底层处理和相关的工具方法.
- Framework: 用来简化ZooKeeper高级功能的使用, 并增加了一些新的功能, 比如管理到ZooKeeper集群的连接, 重试处理
- Recipes: 实现了通用ZooKeeper的recipe, 该组件建立在Framework的基础之上
- Utilities:各种ZooKeeper的工具类
- Errors: 异常处理, 连接, 恢复等.
- Extensions: recipe扩展
- RetryLoop retryLoop = client.newRetryLoop();
- while ( retryLoop.shouldContinue() )
- {
- try
- {
- // perform your work
- ...
- // it's important to re-get the ZK instance as there may have been an error and the instance was re-created
- ZooKeeper zk = client.getZookeeper();
- retryLoop.markComplete();
- }
- catch ( Exception e )
- {
- retryLoop.takeException(e);
- }
- }
- RetryLoop.callWithRetry(client, new Callable()
- {
- @Override
- public Void call() throws Exception
- {
- // do your work here - it will get retried if needed
- return null;
- }
- });
- ExponentialBackoffRetry:重试指定的次数, 且每一次重试之间停顿的时间逐渐增加.
- RetryNTimes:指定最大重试次数的重试策略
- RetryOneTime:仅重试一次
- RetryUntilElapsed:一直重试直到达到规定的时间
- client.create().forPath("/head", new byte[0]);
- client.delete().inBackground().forPath("/head");
- client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/head/child", new byte[0]);
- client.getData().watched().inBackground().forPath("/test");
- create(): 发起一个create操作. 可以组合其他方法 (比如mode 或background) 最后以forPath()方法结尾
- delete(): 发起一个删除操作. 可以组合其他方法(version 或background) 最后以forPath()方法结尾
- checkExists(): 发起一个检查ZNode 是否存在的操作. 可以组合其他方法(watch 或background) 最后以forPath()方法结尾
- getData(): 发起一个获取ZNode数据的操作. 可以组合其他方法(watch, background 或get stat) 最后以forPath()方法结尾
- setData(): 发起一个设置ZNode数据的操作. 可以组合其他方法(version 或background) 最后以forPath()方法结尾
- getChildren(): 发起一个获取ZNode子节点的操作. 可以组合其他方法(watch, background 或get stat) 最后以forPath()方法结尾
- inTransaction(): 发起一个ZooKeeper事务. 可以组合create, setData, check, 和/或delete 为一个操作, 然后commit() 提交
CREATE | getResultCode() and getPath() |
DELETE | getResultCode() and getPath() |
EXISTS | getResultCode(), getPath() and getStat() |
GET_DATA | getResultCode(), getPath(), getStat() and getData() |
SET_DATA | getResultCode(), getPath() and getStat() |
CHILDREN | getResultCode(), getPath(), getStat(), getChildren() |
WATCHED | getWatchedEvent() |
- CuratorFramework client = CuratorFrameworkFactory.builder().namespace("MyApp") ... build();
- …
- client.create().forPath("/test", data);
- // node was actually written to: "/MyApp/test"
- getNodeFromPath: 根据给定path获取node name. i.e. "/one/two/three" -> "three"
- mkdirs: 根据给定路径递归创建所有node
- getSortedChildren: 根据给定路径, 返回一个按序列号排序的子节点列表
- makePath: 根据给定的path和子节点名, 创建一个完整path
- EnsurePath ensurePath = new EnsurePath(aFullPathToEnsure);
- ...
- String nodePath = aFullPathToEnsure + "/foo";
- ensurePath.ensure(zk); // first time syncs and creates if needed
- zk.create(nodePath, ...);
- ...
- ensurePath.ensure(zk); // subsequent times are NOPs
- zk.create(nodePath, ...);
- 暂停(SUSPENDED): 当连接丢失, 将暂停所有操作, 直到连接重新建立, 如果在规定时间内无法建立连接, 将触发LOST通知
- 重连(RECONNECTED): 连接丢失, 执行重连时, 将触发该通知
- 丢失(LOST): 连接超时时, 将触发该通知
- fluent interface中文版
- 分布式服务框架 Zookeeper -- 管理分布式环境中的数据