Java Zookeeper分布式分片算法源码分析

这篇文章主要介绍了Java Zookeeper分布式分片算法源码分析的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇Java Zookeeper分布式分片算法源码分析文章都会有所收获,下面我们一起来看看吧。

    背景

    公司的一个服务需要做类似于分片的逻辑,一开始服务基于传统部署方式通过本地配置文件配置的方式就可以指定该机器服务的分片内容如:0,1,2,3,随着系统的升级迭代,该服务进行了容器化部署,所以原来基于本地配置文件各自配置分片数据的方式就不适用了,原来的部署方式使得服务是有状态,是一种非云原生的方式,所以该服务要重新设计实现一套分布式服务分片逻辑。

    技术方案

    分布式协调中间件

    要实现分布式服务分片的能力,需要有一个分布式中间件,如:RedisMysqlZookeeper等等都可以,我们选用Zookeeper

    基于Zookeeper的技术方案

    使用Zookeeper主要是基于Zookeeper的临时节点和节点变化监听机制,具体的技术设计如下:

    服务注册目录设计

    Zookeeper的数据存储结构类似于目录,服务注册后的目录类似如下结构:

    解释下该目录结构,首先/xxxx/xxxx/sharding是区别于其他业务的的目录,该目录节点是持久的,service是服务目录,标识一个服务,该节点也是持久的,ip1ip2是该服务注册到Zookeeper的机器列表节点,该节点是临时节点。

    /xxxx/xxxx/sharding/service/ip1
    -----|----|--------|-------/ip2

    服务分片处理流程
    • 服务启动,创建CuratorFramework客户端,设置客户端连接状态监听;

    • Zookeeper注册该机器的信息,这里设计简单,机器信息就是ip地址;

    • 注册机器信息后,从Zookeeper获取所有注册信息;

    • 根据Zookeeper获取的所有注册机器信息根据分片算法进行分片计算。

    编码实现

    ZookeeperConfig

    Zookeeper的配置信息

    @Data
    public class ZookeeperConfig {
        /**
         * zk集群地址
         */
        private String zkAddress;
        /**
         * 注册服务目录
         */
        private String nodePath;
        /**
         * 分片的服务名
         */
        private String serviceName;
        /**
         * 分片总数
         */
        private Integer shardingCount;
        public ZookeeperConfig(String zkAddress, String nodePath, String serviceName, Integer shardingCount) {
            this.zkAddress = zkAddress;
            this.nodePath = nodePath;
            this.serviceName = "/" + serviceName;
            this.shardingCount = shardingCount;
        }
        /**
         * 等待重试的间隔时间的初始值.
         * 单位毫秒.
         */
        private int baseSleepTimeMilliseconds = 1000;
        /**
         * 等待重试的间隔时间的最大值.
         * 单位毫秒.
         */
        private int maxSleepTimeMilliseconds = 3000;
        /**
         * 最大重试次数.
         */
        private int maxRetries = 3;
        /**
         * 会话超时时间.
         * 单位毫秒.
         */
        private int sessionTimeoutMilliseconds;
        /**
         * 连接超时时间.
         * 单位毫秒.
         */
        private int connectionTimeoutMilliseconds;
    }

    InstanceInfo注册机器

    @AllArgsConstructor
    @EqualsAndHashCode()
    public class InstanceInfo {
        private String ip;
        public String getInstance() {
            return ip;
        }
    }

    ZookeeperShardingService分片服务

    @Slf4j
    public class ZookeeperShardingService {
        public final Map<String, List<Integer>> caches = new HashMap<>(16);
        private final CuratorFramework client;
        private final ZookeeperConfig zkConfig;
        private final ShardingStrategy shardingStrategy;
        private final InstanceInfo instanceInfo;
        private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(1);
        public ZookeeperShardingService(ZookeeperConfig zkConfig, ShardingStrategy shardingStrategy) {
            this.zkConfig = zkConfig;
            log.info("开始初始化zk, ip列表是: {}.", zkConfig.getZkAddress());
            CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                    .connectString(zkConfig.getZkAddress())
                    .retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds()));
            if (0 != zkConfig.getSessionTimeoutMilliseconds()) {
                builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds());
            }
            if (0 != zkConfig.getConnectionTimeoutMilliseconds()) {
                builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds());
            }
            this.shardingStrategy = shardingStrategy;
            HostInfo host = new HostInfo();
            this.instanceInfo = new InstanceInfo(host.getAddress());
            client = builder.build();
            client.getConnectionStateListenable().addListener(new ConnectionListener());
            client.start();
            try {
                COUNT_DOWN_LATCH.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 注册服务节点监听
            registerPathChildListener(zkConfig.getNodePath() + zkConfig.getServiceName(), new ChildrenPathListener());
            try {
                if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) {
                    client.close();
                    throw new KeeperException.OperationTimeoutException();
                }
            } catch (final Exception ex) {
                ex.printStackTrace();
                throw new RuntimeException(ex);
            }
        }
        /**
         * 子节点监听器
         * @param nodePath 主节点
         * @param listener 监听器
         */
        private void registerPathChildListener(String nodePath, PathChildrenCacheListener listener) {
            try {
                // 1. 创建一个PathChildrenCache
                PathChildrenCache pathChildrenCache = new PathChildrenCache(client, nodePath, true);
                // 2. 添加目录监听器
                pathChildrenCache.getListenable().addListener(listener);
                // 3. 启动监听器
                pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
            } catch (Exception e) {
                log.error("注册子目录监听器出现异常,nodePath:{}",nodePath,e);
                throw new RuntimeException(e);
            }
        }
        /**
         * 服务启动,注册zk节点
         * @throws Exception 异常
         */
        private void zkOp() throws Exception {
            // 是否存在ruubypay-sharding主节点
            if (null == client.checkExists().forPath(zkConfig.getNodePath())) {
                client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(zkConfig.getNodePath(), Hashing.sha1().hashString("sharding", Charsets.UTF_8).toString().getBytes());
            }
            // 是否存服务主节点
            if (null == client.checkExists().forPath(zkConfig.getNodePath() + zkConfig.getServiceName())) {
                // 创建服务主节点
                client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(zkConfig.getNodePath() + zkConfig.getServiceName());
            }
            // 检查是否存在临时节点
            if (null == client.checkExists().forPath(zkConfig.getNodePath() + zkConfig.getServiceName() + "/" + instanceInfo.getInstance())) {
                System.out.println(zkConfig.getNodePath() + zkConfig.getServiceName() +  "/" + instanceInfo.getInstance());
                // 创建临时节点
                client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(zkConfig.getNodePath() + zkConfig.getServiceName() +
                        "/" + instanceInfo.getInstance(), zkConfig.getShardingCount().toString().getBytes(StandardCharsets.UTF_8));
            }
            shardingFromZk();
        }
        /**
         * 从zk获取机器列表并进行分片
         * @throws Exception 异常
         */
        private void shardingFromZk() throws Exception {
            // 从 serviceName 节点下获取所有Ip列表
            final GetChildrenBuilder childrenBuilder = client.getChildren();
            final List<String> instanceList = childrenBuilder.watched().forPath(zkConfig.getNodePath() + zkConfig.getServiceName());
            List<InstanceInfo> res = new ArrayList<>();
            instanceList.forEach(s -> {
                res.add(new InstanceInfo(s));
            });
            Map<InstanceInfo, List<Integer>> shardingResult = shardingStrategy.sharding(res, zkConfig.getShardingCount());
            // 先清一遍缓存
            caches.clear();
            shardingResult.forEach((k, v) -> {
                caches.put(k.getInstance().split("-")[0], v);
            });
        }
        /**
         * zk连接监听
         */
        private class ConnectionListener implements ConnectionStateListener {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                if (newState == ConnectionState.CONNECTED || newState == ConnectionState.LOST || newState == ConnectionState.RECONNECTED) {
                    try {
                        zkOp();
                    } catch (Exception e) {
                        e.printStackTrace();
                        throw new RuntimeException(e);
                    } finally {
                        COUNT_DOWN_LATCH.countDown();
                    }
                }
            }
        }
        /**
         * 子节点监听
         */
        private class ChildrenPathListener implements PathChildrenCacheListener {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
                PathChildrenCacheEvent.Type type = event.getType();
                if (PathChildrenCacheEvent.Type.CHILD_ADDED == type || PathChildrenCacheEvent.Type.CHILD_REMOVED == type) {
                    try {
                        shardingFromZk();
                    } catch (Exception e) {
                        e.printStackTrace();
                        throw new RuntimeException(e);
                    }
                }
            }
        }
    }

    分片算法

    采用平均分配的算法

    public interface ShardingStrategy {
        Map<InstanceInfo, List<Integer>> sharding(final List<InstanceInfo> list, Integer shardingCount);
    }
    public class AverageAllocationShardingStrategy implements ShardingStrategy {
        @Override
        public Map<InstanceInfo, List<Integer>> sharding(List<InstanceInfo> list, Integer shardingCount) {
            if (list.isEmpty()) {
                return null;
            }
            Map<InstanceInfo, List<Integer>> result = shardingAliquot(list, shardingCount);
            addAliquant(list, shardingCount, result);
            return result;
        }
        private Map<InstanceInfo, List<Integer>> shardingAliquot(final List<InstanceInfo> instanceInfos, final int shardingTotalCount) {
            Map<InstanceInfo, List<Integer>> result = new LinkedHashMap<>(shardingTotalCount, 1);
            int itemCountPerSharding = shardingTotalCount / instanceInfos.size();
            int count = 0;
            for (InstanceInfo each : instanceInfos) {
                List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1);
                for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {
                    shardingItems.add(i);
                }
                result.put(each, shardingItems);
                count++;
            }
            return result;
        }
        private void addAliquant(final List<InstanceInfo> instanceInfos, final int shardingTotalCount, final Map<InstanceInfo, List<Integer>> shardingResults) {
            int aliquant = shardingTotalCount % instanceInfos.size();
            int count = 0;
            for (Map.Entry<InstanceInfo, List<Integer>> entry : shardingResults.entrySet()) {
                if (count < aliquant) {
                    entry.getValue().add(shardingTotalCount / instanceInfos.size() * instanceInfos.size() + count);
                }
                count++;
            }
        }
    }

    关于“Java Zookeeper分布式分片算法源码分析”这篇文章的内容就介绍到这里,感谢各位的阅读!相信大家对“Java Zookeeper分布式分片算法源码分析”知识都有一定的了解,大家如果还想学习更多知识,欢迎关注编程之家行业资讯频道。

    版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

    相关推荐


    摘要: 原创出处 https://www.bysocket.com 「公众号:泥瓦匠BYSocket 」欢迎关注和转载,保留摘要,谢谢! 目录 连接 连接池产生原因 连接池实现原理 小结 TEMPERANCE:Eat not to dullness;drink not to elevation.节制
    摘要: 原创出处 https://www.bysocket.com 「公众号:泥瓦匠BYSocket 」欢迎关注和转载,保留摘要,谢谢! 一个优秀的工程师和一个普通的工程师的区别,不是满天飞的架构图,他的功底体现在所写的每一行代码上。-- 毕玄 1. 命名风格 【书摘】类名用 UpperCamelC
    今天犯了个错:“接口变动,伤筋动骨,除非你确定只有你一个人在用”。哪怕只是throw了一个新的Exception。哈哈,这是我犯的错误。一、接口和抽象类类,即一个对象。先抽象类,就是抽象出类的基础部分,即抽象基类(抽象类)。官方定义让人费解,但是记忆方法是也不错的 —包含抽象方法的类叫做抽象类。接口
    Writer :BYSocket(泥沙砖瓦浆木匠)微 博:BYSocket豆 瓣:BYSocketFaceBook:BYSocketTwitter :BYSocket一、引子文件,作为常见的数据源。关于操作文件的字节流就是 —FileInputStream&amp;FileOutputStream。
    作者:泥沙砖瓦浆木匠网站:http://blog.csdn.net/jeffli1993个人签名:打算起手不凡写出鸿篇巨作的人,往往坚持不了完成第一章节。交流QQ群:【编程之美 365234583】http://qm.qq.com/cgi-bin/qm/qr?k=FhFAoaWwjP29_Aonqz
    本文目录 线程与多线程 线程的运行与创建 线程的状态 1 线程与多线程 线程是什么? 线程(Thread)是一个对象(Object)。用来干什么?Java 线程(也称 JVM 线程)是 Java 进程内允许多个同时进行的任务。该进程内并发的任务成为线程(Thread),一个进程里至少一个线程。 Ja
    Writer :BYSocket(泥沙砖瓦浆木匠)微 博:BYSocket豆 瓣:BYSocketFaceBook:BYSocketTwitter :BYSocket在面向对象编程中,编程人员应该在意“资源”。比如?1String hello = &quot;hello&quot;; 在代码中,我们
    摘要: 原创出处 https://www.bysocket.com 「公众号:泥瓦匠BYSocket 」欢迎关注和转载,保留摘要,谢谢! 这是泥瓦匠的第103篇原创 《程序兵法:Java String 源码的排序算法(一)》 文章工程:* JDK 1.8* 工程名:algorithm-core-le
    摘要: 原创出处 https://www.bysocket.com 「公众号:泥瓦匠BYSocket 」欢迎关注和转载,保留摘要,谢谢! 目录 一、父子类变量名相同会咋样? 有个小故事,今天群里面有个人问下面如图输出什么? 我回答:60。但这是错的,答案结果是 40 。我知错能改,然后说了下父子类变
    作者:泥瓦匠 出处:https://www.bysocket.com/2021-10-26/mac-create-files-from-the-root-directory.html Mac 操作系统挺适合开发者进行写代码,最近碰到了一个问题,问题是如何在 macOS 根目录创建文件夹。不同的 ma
    作者:李强强上一篇,泥瓦匠基础地讲了下Java I/O : Bit Operation 位运算。这一讲,泥瓦匠带你走进Java中的进制详解。一、引子在Java世界里,99%的工作都是处理这高层。那么二进制,字节码这些会在哪里用到呢?自问自答:在跨平台的时候,就凸显神功了。比如说文件读写,数据通信,还
    1 线程中断 1.1 什么是线程中断? 线程中断是线程的标志位属性。而不是真正终止线程,和线程的状态无关。线程中断过程表示一个运行中的线程,通过其他线程调用了该线程的 方法,使得该线程中断标志位属性改变。 深入思考下,线程中断不是去中断了线程,恰恰是用来通知该线程应该被中断了。具体是一个标志位属性,
    Writer:BYSocket(泥沙砖瓦浆木匠)微博:BYSocket豆瓣:BYSocketReprint it anywhere u want需求 项目在设计表的时候,要处理并发多的一些数据,类似订单号不能重复,要保持唯一。原本以为来个时间戳,精确到毫秒应该不错了。后来觉得是错了,测试环境下很多一
    纯技术交流群 每日推荐 - 技术干货推送 跟着泥瓦匠,一起问答交流 扫一扫,我邀请你入群 纯技术交流群 每日推荐 - 技术干货推送 跟着泥瓦匠,一起问答交流 扫一扫,我邀请你入群 加微信:bysocket01
    Writer:BYSocket(泥沙砖瓦浆木匠)微博:BYSocket豆瓣:BYSocketReprint it anywhere u want.文章Points:1、介绍RESTful架构风格2、Spring配置CXF3、三层初设计,实现WebService接口层4、撰写HTTPClient 客户
    Writer :BYSocket(泥沙砖瓦浆木匠)什么是回调?今天傻傻地截了张图问了下,然后被陈大牛回答道“就一个回调…”。此时千万个草泥马飞奔而过(逃哈哈,看着源码,享受着这种回调在代码上的作用,真是美哉。不妨总结总结。一、什么是回调回调,回调。要先有调用,才有调用者和被调用者之间的回调。所以在百
    Writer :BYSocket(泥沙砖瓦浆木匠)一、什么大小端?大小端在计算机业界,Endian表示数据在存储器中的存放顺序。百度百科如下叙述之:大端模式,是指数据的高字节保存在内存的低地址中,而数据的低字节保存在内存的高地址中,这样的存储模式有点儿类似于把数据当作字符串顺序处理:地址由小向大增加
    What is a programming language? Before introducing compilation and decompilation, let&#39;s briefly introduce the Programming Language. Programming la
    Writer :BYSocket(泥沙砖瓦浆木匠)微 博:BYSocket豆 瓣:BYSocketFaceBook:BYSocketTwitter :BYSocket泥瓦匠喜欢Java,文章总是扯扯Java。 I/O 基础,就是二进制,也就是Bit。一、Bit与二进制什么是Bit(位)呢?位是CPU
    Writer:BYSocket(泥沙砖瓦浆木匠)微博:BYSocket豆瓣:BYSocket一、前言 泥瓦匠最近被项目搞的天昏地暗。发现有些要给自己一些目标,关于技术的目标:专注很重要。专注Java 基础 + H5(学习) 其他操作系统,算法,数据结构当成课外书博览。有时候,就是那样你越是专注方面越