Apache Curator操作zookeeper实现基本增删改查、事务、分布式锁等

        Apache Curator封装了一套高级API 简化zookeeper的操作,提供了对zookeeper基本操作,watch,分布式锁等场景封装

引入Curator包

        需要注意不同Curator兼容不同zookeeper版本,可以去查看下发行版本说明https://cwiki.apache.org/confluence/display/CURATOR/Releases

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.0.0</version>
</dependency>

建立连接

    // 会话超时时间
	private static final int SESSION_TIMEOUT = 10 * 1000;

	// 连接超时时间
	private static final int CONNECTION_TIMEOUT = 3 * 1000;

	// ZooKeeper服务地址
	private static final String CONNECT_ADDR = "ip:2181";

	/**
	 * 建立连接
	 * 
	 * @return
	 */
	public static CuratorFramework createConnection() {
		// 重试策略
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 3);
		// 工厂创建连接
		final CuratorFramework client = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR)
				.connectionTimeoutMs(CONNECTION_TIMEOUT).sessionTimeoutMs(SESSION_TIMEOUT).retryPolicy(retryPolicy)
				.build();
		// 开启连接
		client.start();
		return client;
	}

基本增删改查

/**
	 * 基本增删改查
	 * 
	 * @param client
	 * @throws Exception
	 */
	public static void basicOperation(CuratorFramework client) throws Exception {
		// 创建内容为空的永久节点

		client.create().forPath("/nonData");

		// 创建有内容的永久节点
		client.create().forPath("/permanent", "/data".getBytes());
		client.create().forPath("/permanent/one", "/data".getBytes());
		// 创建永久有序节点
		client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/permanentOrder", "/data".getBytes());

		// 创建临时节点,session过期节点失效
		client.create().withMode(CreateMode.EPHEMERAL).forPath("/temp", "data".getBytes());

		// 创建临时有序节点
		client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/tempOrder", "data".getBytes());
		// 判断节点是否存在
		Stat tempStat = client.checkExists().forPath("/temp");

		System.out.println("临时节点temp是否存在:" + tempStat != null);

		// 获取节点数据
		System.out.println("permanent节点数据:" + new String(client.getData().forPath("/permanent")));
		// 修改节点数据
		client.setData().forPath("/permanent", "updateData".getBytes());
		System.out.println("修改后permanent节点数据:" + new String(client.getData().forPath("/permanent")));
		// 删除节点
		client.delete().forPath("/nonData");
		// 删除节点及子节点
		client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/permanent");

	}

事务

/**
	 * 事务
	 * 
	 * @param client
	 * @throws Exception
	 */
	public static void transaction(CuratorFramework client) throws Exception {

		CuratorOp create = client.transactionOp().create().forPath("/test", "data".getBytes());

		CuratorOp delete = client.transactionOp().delete().forPath("/test1");
		// 添加节点与删除不存在节点一个事务提交,发生异常都会回滚
		List<CuratorTransactionResult> results = client.transaction().forOperations(create, delete);

	}

watch监听

        分为三种监听NodeCache监听指定节点数据变化,PathChildrenCache监听指定节点的下一级子节点变化情况,TreeCache功能比较全,监听自身节点数据变化,并且能向下监听Integer.MAX_VALUE级子节点变化。

/**
	 * 三种监听机制NodeCache监听指定节点数据变化,PathChildrenCache监听指定节点的下一级子节点变化情况,
	 * TreeCache功能比较全,监听自身节点数据变化,并且能向下监听Integer.MAX_VALUE级子节点变化
	 * 
	 * @param client
	 * @throws Exception
	 */
	public static void treeCacheWatch(CuratorFramework client) throws Exception {
		TreeCache treeCache = new TreeCache(client, "/test");
		treeCache.start();
		treeCache.getListenable().addListener(new TreeCacheListener() {
			public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
				switch (treeCacheEvent.getType()) {
				case CONNECTION_RECONNECTED:
					System.out.println("重新建立连接");
					break;
				case CONNECTION_SUSPENDED:
					System.out.println("连接超时");
					break;
				case CONNECTION_LOST:
					System.out.println("会话已过期");
					break;
				case INITIALIZED:
					System.out.println("初始化");
					break;
				case NODE_ADDED:
					System.out.println("添加节点,路径为" + treeCacheEvent.getData().getPath() + ",数据为"
							+ new String(treeCacheEvent.getData().getData()));
					break;
				case NODE_UPDATED:
					System.out.println("更新节点,路径为" + treeCacheEvent.getData().getPath() + ",数据为"
							+ new String(treeCacheEvent.getData().getData()));
					break;
				case NODE_REMOVED:
					System.out.println("删除节点,路径为" + treeCacheEvent.getData().getPath() + ",数据为"
							+ new String(treeCacheEvent.getData().getData()));
					break;
				}
			}
		});
		while (true) {

		}
	}

分布式锁

        zookeeper分布式锁主要依赖于临时顺序节点,当A,B两个线程获取锁时,假如A线程先发起加锁请求,zookeeper会在加锁的node下创建一个临时顺序节点node-001,然后查询该节点下所有临时顺序节点,看自己是否排在第一位,如果排在第一位可以获取锁,A获取到锁后B线程请求加锁,创建临时顺序节点node-002,然后查询该节点下所有临时顺序节点,看自己是否排在第一位,发现此时排在第二位,此时线程B会对前一个节点加一个监听器,监听节点是否被删除,当A执行完删除节点后,B监听到A节点删除,判断自己是否是第一个节点,如果是会获取到锁。

/**
	 * 分布式锁,使用Curator两步acquire获取锁(),release()释放锁
	 * 
	 * @param client
	 * @throws Exception
	 */
	public static void lock(CuratorFramework client, InterProcessMutex interProcessMutex) {
		try {
			String name = Thread.currentThread().getName();
			if (interProcessMutex.acquire(1, TimeUnit.SECONDS)) {
				System.out.println(name + "获取锁成功");
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			try {
				interProcessMutex.release();
			} catch (Exception e) {
				e.printStackTrace();
			}
		}

	}

完整demo

package com.shaofei;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.framework.recipes.locks.InterProcessMultiLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

public class demo {
	
	
	// 会话超时时间
	private static final int SESSION_TIMEOUT = 10 * 1000;

	// 连接超时时间
	private static final int CONNECTION_TIMEOUT = 3 * 1000;

	// ZooKeeper服务地址
	private static final String CONNECT_ADDR = "ip:2181";

	public static void main(String[] args) throws Exception {
		final CuratorFramework client = createConnection();
		basicOperation(client);
		transaction(client);
		treeCacheWatch(client);
		final InterProcessMutex interProcessMutex = new InterProcessMutex(client, "/lock/test"); // 互斥锁
		final InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(client, "/lock/test");// 读写锁
		final InterProcessSemaphoreMutex interProcessSemaphoreMutex = new InterProcessSemaphoreMutex(client,
				"/lock/test"); // 不可重入互斥锁
		List<String> list = new ArrayList<String>();
		list.add("/lock/test");
		list.add("/lock/test1");
		final InterProcessMultiLock interProcessMultiLock = new InterProcessMultiLock(client, list); // 集合锁
		new Thread("thread-1") {
			@Override
			public void run() {
				lock(client, interProcessMutex);
			}
		}.start();
		new Thread("thread-2") {
			@Override
			public void run() {
				lock(client, interProcessMutex);
			}
		}.start();

	}

	/**
	 * 建立连接
	 * 
	 * @return
	 */
	public static CuratorFramework createConnection() {
		// 重试策略
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 3);
		// 工厂创建连接
		final CuratorFramework client = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR)
				.connectionTimeoutMs(CONNECTION_TIMEOUT).sessionTimeoutMs(SESSION_TIMEOUT).retryPolicy(retryPolicy)
				.build();
		// 开启连接
		client.start();
		return client;
	}

	
	
	/**
	 * 分布式锁,使用Curator两步acquire获取锁(),release()释放锁
	 * 
	 * @param client
	 * @throws Exception
	 */
	public static void lock(CuratorFramework client, InterProcessMutex interProcessMutex) {
		try {
			String name = Thread.currentThread().getName();
			if (interProcessMutex.acquire(1, TimeUnit.SECONDS)) {
				System.out.println(name + "获取锁成功");
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			try {
				interProcessMutex.release();
			} catch (Exception e) {
				e.printStackTrace();
			}
		}

	}

	/**
	 * 三种监听机制NodeCache监听指定节点数据变化,PathChildrenCache监听指定节点的下一级子节点变化情况,
	 * TreeCache功能比较全,监听自身节点数据变化,并且能向下监听Integer.MAX_VALUE级子节点变化
	 * 
	 * @param client
	 * @throws Exception
	 */
	public static void treeCacheWatch(CuratorFramework client) throws Exception {
		TreeCache treeCache = new TreeCache(client, "/test");
		treeCache.start();
		treeCache.getListenable().addListener(new TreeCacheListener() {
			public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
				switch (treeCacheEvent.getType()) {
				case CONNECTION_RECONNECTED:
					System.out.println("重新建立连接");
					break;
				case CONNECTION_SUSPENDED:
					System.out.println("连接超时");
					break;
				case CONNECTION_LOST:
					System.out.println("会话已过期");
					break;
				case INITIALIZED:
					System.out.println("初始化");
					break;
				case NODE_ADDED:
					System.out.println("添加节点,路径为" + treeCacheEvent.getData().getPath() + ",数据为"
							+ new String(treeCacheEvent.getData().getData()));
					break;
				case NODE_UPDATED:
					System.out.println("更新节点,路径为" + treeCacheEvent.getData().getPath() + ",数据为"
							+ new String(treeCacheEvent.getData().getData()));
					break;
				case NODE_REMOVED:
					System.out.println("删除节点,路径为" + treeCacheEvent.getData().getPath() + ",数据为"
							+ new String(treeCacheEvent.getData().getData()));
					break;
				}
			}
		});
		while (true) {

		}
	}

	/**
	 * 事务
	 * 
	 * @param client
	 * @throws Exception
	 */
	public static void transaction(CuratorFramework client) throws Exception {

		CuratorOp create = client.transactionOp().create().forPath("/test", "data".getBytes());

		CuratorOp delete = client.transactionOp().delete().forPath("/test1");
		// 添加节点与删除不存在节点一个事务提交,发生异常都会回滚
		List<CuratorTransactionResult> results = client.transaction().forOperations(create, delete);

	}

	/**
	 * 基本增删改查
	 * 
	 * @param client
	 * @throws Exception
	 */
	public static void basicOperation(CuratorFramework client) throws Exception {
		// 创建内容为空的永久节点

		client.create().forPath("/nonData");

		// 创建有内容的永久节点
		client.create().forPath("/permanent", "/data".getBytes());
		client.create().forPath("/permanent/one", "/data".getBytes());
		// 创建永久有序节点
		client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/permanentOrder", "/data".getBytes());

		// 创建临时节点,session过期节点失效
		client.create().withMode(CreateMode.EPHEMERAL).forPath("/temp", "data".getBytes());

		// 创建临时有序节点
		client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/tempOrder", "data".getBytes());
		// 判断节点是否存在
		Stat tempStat = client.checkExists().forPath("/temp");

		System.out.println("临时节点temp是否存在:" + tempStat != null);

		// 获取节点数据
		System.out.println("permanent节点数据:" + new String(client.getData().forPath("/permanent")));
		// 修改节点数据
		client.setData().forPath("/permanent", "updateData".getBytes());
		System.out.println("修改后permanent节点数据:" + new String(client.getData().forPath("/permanent")));
		// 删除节点
		client.delete().forPath("/nonData");
		// 删除节点及子节点
		client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/permanent");

	}
}

原文地址:https://blog.csdn.net/shaofei_huai/article/details/119024407

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


#一、什么是ZooKeeper**ZooKeeper是一个分布式服务协调框架**,提供了分布式数据一致性的解决方案,基于ZooKeeper的**数据结构,Watcher,选举机制**等特点,可以**实现数据的发布/订阅,软负载均衡,命名服务,统一配置管理,分布式锁,集群管理**等等。#二、为什么使用ZooKeeperZooKeeper能保证:*
2.ZooKeeper介绍2.1.ZooKeeper由来正式介绍ZooKeeper之前,我们先来看看ZooKeeper的由来,还挺有意思的。下面这段内容摘自《从Paxos到ZooKeeper》第四章第一节,推荐大家阅读一下:ZooKeeper最早起源于雅虎研究院的一个研究小组。在当时,研究人员发现,在雅虎内部很
Zookeeper概述1.ZooKeeper最为主要的使⽤场景,是作为分布式系统的分布式协同服务。2.分布式系统中每台服务器的算力和资源都是有限的,但是我们通过分布式系统组成集群就可以对算力和资源进行无限扩张,但是分布式节点间的协调就成了问题。3.就像我们的开发团队之间的协作一
环境:1.VMware®Workstation12Pro 2.CentOS7 3.zookeeper-3.4.6安装步骤1.下载zookeeper本文使用的zookeeper下载地址如下(大家也可以下载其它版本)链接:https://pan.baidu.com/s/1Ab9F53jNy7upsrYHCacWrw 提取码:jqyn 
###1\.面试官:工作中使用过Zookeeper嘛?你知道它是什么,有什么用途呢?**小菜鸡的我:***有使用过的,使用ZooKeeper作为**dubbo的注册中心**,使用ZooKeeper实现**分布式锁**。*ZooKeeper,它是一个开放源码的**分布式协调服务**,它是一个集群的管理者,它将简单易用的接口提供给用户。*
##2\.ZooKeeper介绍###2.1\.ZooKeeper由来正式介绍ZooKeeper之前,我们先来看看ZooKeeper的由来,还挺有意思的。下面这段内容摘自《从Paxos到ZooKeeper》第四章第一节,推荐大家阅读一下:>ZooKeeper最早起源于雅虎研究院的一个研究小组。在当时,研究人员发现,在雅虎内部很
环境准备:windows:jdk8+nginx+ab压测centos7:redis+zookeeper3.3.6ab下载链接:https://www.apachelounge.com/download/测试思路:windows下使用springboot编写秒杀接口,商品数据存在redis,运行三个服务,利用nginx做代理,使用ab分别测试单机锁,自实现zookeeper分布式锁和Curator
由于只有一台电脑,所以搭建一个伪集群(伪集群就是在一台电脑上模拟搭建集群,走不同端口启动,真实的情况在每台机器上搭建一个zookeeper或者每台机器两个zookeeper等),道理是一样的,只不过要注意别被防火墙或者安全组规则挡住了zookeeper节点间的通信,每个节点直接的网络要是通的。集群数
常用命令总结(linux运行.sh结尾的脚本,window运行.cmd结尾的脚本,一下均为linux运行的,直接将.sh改为.cmd即可):linux环境:1.启动ZK服务:bin/zkServer.shstart2.查看ZK服务状态:bin/zkServer.shstatus3.停止ZK服务:bin/zkServer.shstop4.重启ZK服务:bin/zkServer.shrest
Kubernetes简介Kubernetes(简称K8S,K和S之间有8个字母)是用于自动部署,扩展和管理容器化应用程序的开源系统。它将组成应用程序的容器组合成逻辑单元,以便于管理和服务发现。Kubernetes源自Google15年生产环境的运维经验,同时凝聚了社区的最佳创意和实践。Kubernetes具有如下特性:
###正文ZooKeeper很流行,有个基本的疑问:*ZooKeeper是用来做什么的?*之前没有ZK,为什么会诞生ZK?OK,解答一下上面的疑问:(下面是凭直觉说的)*ZooKeeper是用于简化分布式应用开发的,对开发者屏蔽一些分布式应用开发过程中的底层细节*ZooKeeper对外暴露简单的API,用于支持分
#一、什么是ZooKeeper**ZooKeeper是一个分布式服务协调框架**,提供了分布式数据一致性的解决方案,基于ZooKeeper的**数据结构,Watcher,选举机制**等特点,可以**实现数据的发布/订阅,软负载均衡,命名服务,统一配置管理,分布式锁,集群管理**等等。#二、为什么使用ZooKeeperZooKeeper能保证:*
点赞再看,养成习惯,微信搜索「小大白日志」关注这个搬砖人。文章不定期同步公众号,还有各种一线大厂面试原题、我的学习系列笔记。zoo.cfg即/usr/local/java/zookeeper/conf下的zoo_sample.cfgzoo.cfg内含参数:tickTime、initLimit、syncLimit、dataDir、dataLogDir、clientPort
正文ZooKeeper很流行,有个基本的疑问:ZooKeeper是用来做什么的?之前没有ZK,为什么会诞生ZK?OK,解答一下上面的疑问:(下面是凭直觉说的)ZooKeeper是用于简化分布式应用开发的,对开发者屏蔽一些分布式应用开发过程中的底层细节ZooKeeper对外暴露简单的API,用于支持分布式应用开
#**NO1:说说zookeeper是什么?**ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现(Chubby是不开源的),它是集群的管理者,监视着集群中各个节点的状态根据节点提交的反馈进行下一步合理操作。最终,将简单易用的接口和性能高效、功能稳定的系统提供
正文ZooKeeper很流行,有个基本的疑问:ZooKeeper是用来做什么的?之前没有ZK,为什么会诞生ZK?OK,解答一下上面的疑问:(下面是凭直觉说的)ZooKeeper是用于简化分布式应用开发的,对开发者屏蔽一些分布式应用开发过程中的底层细节ZooKeeper对外暴露简单的API,用于支持分布式应用开
NO1:说说zookeeper是什么?ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现(Chubby是不开源的),它是集群的管理者,监视着集群中各个节点的状态根据节点提交的反馈进行下一步合理操作。最终,将简单易用的接口和性能高效、功能稳定的系统提供
#**NO1:说说zookeeper是什么?**ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现(Chubby是不开源的),它是集群的管理者,监视着集群中各个节点的状态根据节点提交的反馈进行下一步合理操作。最终,将简单易用的接口和性能高效、功能稳定的系统提供
一、什么是ZooKeeperZooKeeper是一个分布式服务协调框架,提供了分布式数据一致性的解决方案,基于ZooKeeper的数据结构,Watcher,选举机制等特点,可以实现数据的发布/订阅,软负载均衡,命名服务,统一配置管理,分布式锁,集群管理等等。二、为什么使用ZooKeeperZooKeeper能保证:更新请求
#**NO1:说说zookeeper是什么?**ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现(Chubby是不开源的),它是集群的管理者,监视着集群中各个节点的状态根据节点提交的反馈进行下一步合理操作。最终,将简单易用的接口和性能高效、功能稳定的系统提供