阻塞队列的应用及简单实现一个阻塞队列

目录

前言

一、阻塞队列

二、生产者消费者模型

三、生产者消费者模型的应用

四、自己实现一个BlockingQueue【简单版本】

区分的方案有两种

方案一:(此方案会在数据结构模块更新的时候写)

方案二:(更推荐方案二)

总结


前言

实际工作中,队列这个数据结构比栈重要的多,用到的机会也多很多

实际使用的队列,不一定是简单的先进先出的队列,而可能是更复杂的队列

例如:

优先队列,需要注意的是,很多人对优先队列的概念很懵,这个要重视起来

消息队列,队列里的数据带有一定的“信息”,出队列的时候不是单纯的先进先出,而是按照分类,指定某个类先来的元素先出...

阻塞队列,是线程安全的队列,如果当前队列为空,尝试出队列,就会产生阻塞,一直阻塞到队列不为空,如果当前队列满了,尝试进队列,也会产生阻塞,一直阻塞到队列不满为止

无锁队列,线程安全的队列,但不是通过锁的方式保证线程安全

一、阻塞队列

Java标准库中内置了一个BlockingQueue这样的类来实现阻塞队列的功能,用法和普通队列很相似:入队列和出队列

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class ThreadDemo22 {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> queue=new LinkedBlockingQueue<>();
        //put带有阻塞功能,offer没有,所以涉及到阻塞功能的时候用put
        //queue.offer("hello");
        queue.put("hello");
        //take功能是取出队首第一个元素
        System.out.println(queue.take());
        System.out.println(queue.take());
    }
}

执行结果为

可以在jconsole.exe上看出此程序在第十五行阻塞了,正处于阻塞等待状态

 由于它可以自动扩容,所以它插进元素的时候没有满的说法,不过我们可以自己指定插的个数,满了就不能插了

BlockingQueue<String> queue=new LinkedBlockingQueue<>(10);

二、生产者消费者模型

通过学习阻塞队列之后,我们可以用来实现生产者消费者模型

首先我们需要理解什么是生产者和消费者?

我们可以用一个例子来理解

我们过年包饺子的就用到了这个模型

我们包饺子需要有人擀饺子皮,需要有人包饺子,此时我们有两种做法

第一种,每个人都擀饺子皮和包饺子,自己干自己的,但是擀面杖只有一个

第二种,一个人专门负责擀饺子皮,其他人负责包饺子

我们应该都会选择第二种吧,因为第二种比第一种效率高太多了

此时擀饺子皮的人就是生产者,而包饺子的人是消费者

除了这两者,生产者和消费者还需要一个重要的角色:交易场所,也就是放饺子皮的地方

在计算机中,生产者是一组线程,消费者是另一组线程,交易场所就是这个阻塞队列

三、生产者消费者模型的应用

生产者消费者模型在服务器开发中是非常常用非常有用的一种编程手段

最大的用途:

1.解耦合

我们要理解它,也是需要一个模型

假设有两台服务器A和B,服务器A要传输一定的数据给服务器B

如果直接传输,此时就要求,要么服务器A向服务器B推送数据,要么就是服务器B从服务器A拉取数据,这都是需要服务器A和服务器B直接交互的;

未来如果需要扩展一个服务器C,也让服务器A给服务器C传输数据,这个时候改动就比较复杂,这个时候就认为A和B的耦合度就比较高

但是此时引入生产者消费者模型的话就会变得简单

服务器A给服务器C传输数据通过一个阻塞队列,服务器A只知道要把数据传到阻塞队列中,不知道数据要传往哪个服务器,而服务器C也只知道从阻塞队列中往出拿数据,但并不知道数据从哪个服务器来的,此时要拓展的话,让服务器A给服务器C传输数据,只需要让服务器C从阻塞队列中拿数据就行了

2.削峰填谷

我们要理解它,还是需要一个模型

我们都知道三峡大坝

汛期,如果没有大坝,下游的水会很大,可能就会有水灾

旱期,如果没有大坝,下游的水会很少,可能就会有旱灾

如果有了大坝

汛期,关闸蓄水,让水按照一定的速率往下流,避免水太大发生水灾

旱期,开闸放水,让水也按照一定的速率往下流,避免太缺水发生旱灾

而大坝就相当于是阻塞队列

我们可以借助BlockingQueue写一个简单的生产者消费者模型

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class ThreadDemo23 {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue=new LinkedBlockingQueue<>();
        Thread customer=new Thread(){
            @Override
            public void run() {
                while(true){
                    try {
                        Integer value=queue.take();
                        System.out.println("消费元素:"+value);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
        };
        customer.start();
        Thread producer=new Thread(){
            @Override
            public void run() {
                for (int i = 0; i < 10000; i++) {
                    System.out.println("生产元素:"+i);
                    try {
                        queue.put(i);
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
        };
        producer.start();
    }
}

执行结果为

我们这里需要重点研究一下BlockingQueue代码内部是如何实现的,尤其是线程安全和阻塞等待,这里的操作就涉及到前面学过的synchronized、wait、notify了。

接下来写一个简单版本的BlockingQueue

四、自己实现一个BlockingQueue【简单版本】

循环队列的基本原理

针对循环队列,还有个重要问题,就是如何区分队列空还是满?当head和tail重合时,我们是区分不了的,具体情况是如果是一直插入元素导致的head和tail重合,此时就是队列满了,如果是一直删除元素导致head和tail重合,那么就是队列空了,所以我们必须想办法区分清楚。

区分的方案有两种

方案一:(此方案会在数据结构模块更新的时候写)

浪费一个空间,用head和tail重合表示空,tail+1和head重合表示满

方案二:(更推荐方案二

专门设一个变量size记录当前元素的个数

入队列,就是size++

出队列,就是size--

size为0就是空,size为数组最大长度就是满

代码为

public class ThreadDemo24 {
    static class BlockingQueue{
        //要想实现一个阻塞队列,首先要实现一个普通队列
        //用数组实现一个队列,也就是一个循环队列
        private int[] items=new int[10];
        private int head=0;
        private int tail=0;
        private int size=0;
        private Object locker=new Object();

        //put方法用来入队列
        public void put(int item) throws InterruptedException {
            synchronized (locker){
                //入队列就把新的元素放到tail位置上
                //这里要首先考虑队列满的情况
                //此处的条件最好写作while,而不是if
                //如果是有多个线程阻塞等待的时候,万一同时唤醒了多个线程
                //就有可能出现第一个元素放入元素之后,第二个元素要放就又满了的情况
                //虽然当前take的代码中使用的是notify,一次只唤醒一个等待的线程,用if也不算错
                //但是用while更好一些
                //使用while的意思是,保证wait被唤醒的时候能再确认一次队列确实不满
                while (size == items.length) {
                    //执行阻塞操作
                    locker.wait();
                }
                items[tail]=item;
                tail++;
                //这里还要考虑tail越不越界的问题
                //如果tail==items.length,那么就让tail等于0
                if (tail == items.length) {
                    tail=0;
                }
                size++;
                locker.notify();
            }

        }
        //take方法用来出队列
        public int take() throws InterruptedException {
            int value=0;
            synchronized (locker){
                //首先要考虑的是如果队列为空,那么再尝试取元素,那么就要阻塞
                //这里用while也是类似的原理
                //保证wait被唤醒的时候能再确认一次队列确实不为空
                while (size == 0) {
                    locker.wait();
                }

                value=items[head];
                head++;
                //此时仍需判断head越不越界的问题
                //如果head==items.length,那么就让tail等于0
                if (head == items.length) {
                    head=0;
                }
                size--;
                //此处的notify用来唤醒put中的wait
                locker.notify();
            }
            return value;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        /*BlockingQueue queue=new BlockingQueue();
        queue.put(1);
        queue.put(2);
        queue.put(3);
        queue.put(4);
        System.out.println(queue.take());
        System.out.println(queue.take());
        System.out.println(queue.take());
        System.out.println(queue.take());*/
        BlockingQueue queue=new BlockingQueue();
        //消费者
        Thread consumer=new Thread(){
            @Override
            public void run() {
                while(true){
                    try {
                        System.out.println("消费元素:"+queue.take());
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
        };
        consumer.start();
        Thread producer=new Thread(){
            @Override
            public void run() {
                for (int i = 0; i < 10000; i++) {
                    System.out.println("生产元素:"+i);
                    try {
                        queue.put(i);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
        };
        producer.start();
    }
}

运行结果为(一部分)

总结

put和take都可能会出现阻塞的情况,由于这两个代码中的阻塞条件是对立的,所以不会同时触发

wait

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


学习编程是顺着互联网的发展潮流,是一件好事。新手如何学习编程?其实不难,不过在学习编程之前你得先了解你的目的是什么?这个很重要,因为目的决定你的发展方向、决定你的发展速度。
IT行业是什么工作做什么?IT行业的工作有:产品策划类、页面设计类、前端与移动、开发与测试、营销推广类、数据运营类、运营维护类、游戏相关类等,根据不同的分类下面有细分了不同的岗位。
女生学Java好就业吗?女生适合学Java编程吗?目前有不少女生学习Java开发,但要结合自身的情况,先了解自己适不适合去学习Java,不要盲目的选择不适合自己的Java培训班进行学习。只要肯下功夫钻研,多看、多想、多练
Can’t connect to local MySQL server through socket \'/var/lib/mysql/mysql.sock问题 1.进入mysql路径
oracle基本命令 一、登录操作 1.管理员登录 # 管理员登录 sqlplus / as sysdba 2.普通用户登录
一、背景 因为项目中需要通北京网络,所以需要连vpn,但是服务器有时候会断掉,所以写个shell脚本每五分钟去判断是否连接,于是就有下面的shell脚本。
BETWEEN 操作符选取介于两个值之间的数据范围内的值。这些值可以是数值、文本或者日期。
假如你已经使用过苹果开发者中心上架app,你肯定知道在苹果开发者中心的web界面,无法直接提交ipa文件,而是需要使用第三方工具,将ipa文件上传到构建版本,开...
下面的 SQL 语句指定了两个别名,一个是 name 列的别名,一个是 country 列的别名。**提示:**如果列名称包含空格,要求使用双引号或方括号:
在使用H5混合开发的app打包后,需要将ipa文件上传到appstore进行发布,就需要去苹果开发者中心进行发布。​
+----+--------------+---------------------------+-------+---------+
数组的声明并不是声明一个个单独的变量,比如 number0、number1、...、number99,而是声明一个数组变量,比如 numbers,然后使用 nu...
第一步:到appuploader官网下载辅助工具和iCloud驱动,使用前面创建的AppID登录。
如需删除表中的列,请使用下面的语法(请注意,某些数据库系统不允许这种在数据库表中删除列的方式):
前不久在制作win11pe,制作了一版,1.26GB,太大了,不满意,想再裁剪下,发现这次dism mount正常,commit或discard巨慢,以前都很快...
赛门铁克各个版本概览:https://knowledge.broadcom.com/external/article?legacyId=tech163829
实测Python 3.6.6用pip 21.3.1,再高就报错了,Python 3.10.7用pip 22.3.1是可以的
Broadcom Corporation (博通公司,股票代号AVGO)是全球领先的有线和无线通信半导体公司。其产品实现向家庭、 办公室和移动环境以及在这些环境...
发现个问题,server2016上安装了c4d这些版本,低版本的正常显示窗格,但红色圈出的高版本c4d打开后不显示窗格,
TAT:https://cloud.tencent.com/document/product/1340