多线程案例(单例、阻塞队列、生消模型、定时器)

一、单例模式

单例模式是校招中最常考的设计模式之一.

1.1 概念

设计模式好比象棋中的 “棋谱”. 红方当头炮, 黑方马来跳. 针对红方的一些走法, 黑方应招的时候有一些固定的套路. 按照套路来走局势就不会吃亏.
软件开发中也有很多常见的 “问题场景”. 针对这些问题场景, 大佬们总结出了一些固定的套路. 按照这个套路来实现代码, 也不会吃亏.
单例模式能保证某个类在程序中只存在唯一一份实例, 而不会创建出多个实例(借助语法,强行限制不能创建多个实例,避免程序员不小心出错), 比如 JDBC 中的 DataSource 实例就只需要一个

单例模式具体的实现方式, 分成 “饿汉” 和 “懒汉” 两种.

1.2 饿汉模式

类加载的同时, 创建实例.

【代码实现】

class Singleton{
    private static Singleton instance=new Singleton();
    private Singleton(){

    }
    public static Singleton getInstance(){
        return instance;
    }
}

在这里插入图片描述


饿汉模式在多线程的环境下也是线程安全的,因为在多线程中调用getInstance方法时,只涉及多线程的读,故是线程安全的

1.3 懒汉模式

类加载的时候不创建实例. 第一次使用的时候才创建实例.
懒这个词,在计算机中往往意味着性能比较高
懒汉这种模式在计算机中很常见,例如现在有个10G的文件存储在硬盘中,通过编辑器打开这个文件

  • 把10G文件都读取到内存中,读取完毕之后再允许用户进行查看和修改【饿汉】
  • 只读取一页(当前屏幕能显出的范围),随着用户翻页,继续读后续内容,用户没翻到这一页,就先不去读【懒汉】

对比之下,懒汉形式的效率更高;还有我们生活中的微信朋友圈的图片,在你点开之前显示的都是缩略图,只有在你点开某个图片后才会真正加载原图

【线程不安全版】

class SingletonLazy{
    private static SingletonLazy instance=null;
    private SingletonLazy(){};
    public static SingletonLazy getInstance(){
        if(instance==null){
            instance=new SingletonLazy();
        }
        return instance;
    }
}

在这里插入图片描述


线程安全问题发生在首次创建实例时. 如果在多个线程中同时调用 getInstance 方法,多个线程都发现instance为null,就可能导致创建出多个实例.一旦实例已经创建好了, 后面再多线程环境调用 getInstance 就不再有线程安全问题了(不再修改instance了)

【线程安全版】

class SingletonLazy{
    private static volatile SingletonLazy instance=null;
    private SingletonLazy(){};
    public static SingletonLazy getInstance(){
        if(instance==null) {
            synchronized (SingletonLazy.class) {
                if (instance == null) {
                    instance = new SingletonLazy();
                }
            }
        }
        return instance;
    }
}

在这里插入图片描述

但在网上对于volatile在此处起的作用还有些争议,有的人还认为此处volatile的作用是禁止指令的重排序

在这里插入图片描述

【举例理解双重if起到的作用】

(1) 有三个线程, 开始执行 getInstance , 通过外层的 if (instance == null)
知道了实例还没有创建的消息. 于是开始竞争同一把锁

在这里插入图片描述


(2) 其中线程1 率先获取到锁, 此时线程1 通过里层的 if (instance == null) 进一步确认实例是否已经创建.
如果没创建, 就把这个实例创建出来.

在这里插入图片描述


(3)当线程1 释放锁之后, 线程2 和 线程3 也拿到锁, 也通过里层的 if (instance == null)
来确认实例是否已经创建, 发现实例已经创建出来了, 就不再创建了.

在这里插入图片描述


(4) 后续的线程, 不必加锁, 直接就通过外层 if (instance == null) 就知道实例已经创建了, 从而不再尝试获取锁了.
降低了开销.

在这里插入图片描述

二、阻塞队列

2.1 概念

阻塞队列是一种特殊的队列. 也遵守 “先进先出” 的原则.
阻塞队列能是一种线程安全的数据结构, 并且具有以下特性:

  • 当队列满的时候, 继续入队列就会阻塞, 直到有其他线程从队列中取走元素
  • 当队列空的时候, 继续出队列也会阻塞(本质上就是修改了线程的状态,让线程的PCB在内核中暂时不参与调度), 直到有其他线程往队列中插入元素.

【补充】:

  1. 阻塞队列是线程安全的,因为内部内置了锁和同步机制能保证线程安全
  2. 无锁队列:线程安全的队列,实现内部没有使用锁,更高效,但会消耗更多的CPU资源
  3. 消息队列:在队列中涵盖多种不同"类型"的元素,取元素的时候可以按照某个类型来取,做到针对该类型的"先进先出" (甚至会把消息队列作为服务器,单独部署,例如kafka,rocketmq等)

阻塞队列的一个典型应用场景就是 “生产者消费者模型”. 这是一种非常典型的开发模型.

2.2 生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取.
阻塞队列起到的作用:
(1) 阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力

比如在 “秒杀” 场景下, 服务器同一时刻可能会收到大量的支付请求. 如果直接处理这些支付请求, 服务器可能扛不住(每个支付请求的处理都需要比较复杂的流程). 这个时候就可以把这些请求都放 到一个阻塞队列中, 然后再由消费者线程慢慢的来处理每个支付请求. 这样做可以有效进行 “削峰”, 防止服务器被突然到来的一波请求直接冲垮.

(2) 阻塞队列也能使生产者和消费者之间解耦.

比如过年一家人一起包饺子. 一般都是有明确分工, 比如一个人负责擀饺子皮, 其他人负责包. 擀饺子皮的人就是 “生产者”, 包饺子的人就是 “消费者”. 擀饺子皮的人不关心包饺子的人是谁(能包就行, 无论是手工包, 借助工具, 还是机器包), 包饺子的人也不关心擀饺子皮的人是谁(有饺子皮就行, 无论是用擀面杖擀的, 还是拿罐头瓶擀, 还是直接从超市买的).

2.3 标准库中的阻塞队列

在 Java 标准库中内置了阻塞队列. 如果我们需要在一些程序中使用阻塞队列, 直接使用标准库中的即可。

  • BlockingQueue 是一个接口. 真正实现的类是 LinkedBlockingQueue.
  • put 方法用于阻塞式的入队列, take 用于阻塞式的出队列.
  • BlockingQueue 也有 offer, poll, peek 等方法, 但是这些方法不带有阻塞特性.

【代码示例:生产者消费者模型】

package thread;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class Test1 {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue=new LinkedBlockingQueue<>();
        Thread customer=new Thread(()->{
            while (true){
                try {
                   int val= queue.take();
                    System.out.println("消费元素:"+val);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        customer.start();
        Thread producer=new Thread(()->{
            int n=0;
           while(true){
               System.out.println("生产元素:"+n);
               try {
                   queue.put(n);
                   n++;
                   Thread.sleep(1000);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }

           }
        });
        producer.start();
    }
}

执行结果:

在这里插入图片描述

2.4 阻塞队列模拟实现

  • 基于数组实现队列
  • 提供两种核心方法:put和take
  • 使用 synchronized 进行加锁控制.
package thread;

public class MyBlockingQueue {
    private int[] items=new int[1000];
    //队列的头的位置
    private int head=0;
    //队列尾的位置
    private int tail=0;
    //队列中元素的个数
    private volatile int size=0;
  public void put(int val) throws InterruptedException {
      synchronized (this){
        // 此处最好使用 while
          // 否则 notifyAll 的时候, 该线程从 wait 中被唤醒,
       // 但是紧接着并未抢占到锁. 当锁被抢占的时候, 可能又已经队列满了
        // 就只能继续等待
          while(size==items.length){
         this.wait();
          }
          items[tail]=val;
          tail++;
          if(tail == items.length){
              tail=0;
          }
          size++;
          this.notify();
      }

  }
  public Integer take() throws InterruptedException {
      int ret=0;
      synchronized (this){
         while(size==0){
            this.wait();
          }
          ret=items[head];
          head++;
          if(head==items.length){
              head=0;
          }
          size--;
          this.notify();
      }
      return ret;
  }
}

在这里插入图片描述

【注意】:

put函数中的this.wait和this.notify take函数中的this.wait和this.notify
这几个this都指的是同一个对象,因为只有对同一个队列进行put和take才会造成阻塞

【代码示例:生产者消费者模型】

   public static void main(String[] args) {
        MyBlockingQueue queue=new MyBlockingQueue();
        Thread customer=new Thread(()->{
            while(true){
                int val= 0;
                try {
                    val = queue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费了:"+val);
            }
        });
        customer.start();
        Thread producer=new Thread(()->{
            int n=0;
            while (true){
                try {
                    queue.put(n);
                    System.out.println("生产了:"+n);
                    n++;
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        producer.start();
    }

执行结果:

在这里插入图片描述

三、定时器

3.1 概念

定时器也是软件开发中的一个重要组件. 类似于一个 “闹钟”. 达到一个设定的时间之后, 就执行某个指定好的代码.

在这里插入图片描述

定时器是一种实际开发中非常常用的组件.
比如网络通信中, 如果对方 500ms 内没有返回数据, 则断开连接尝试重连.
比如一个 Map, 希望里面的某个 key 在 3s 之后过期(自动删除).
类似于这样的场景就需要用到定时器.

3.2 标准库中的定时器

  • 标准库中提供了一个 Timer 类(java.util包下). Timer 类的核心方法为 schedule
  • schedule 包含两个参数. 第一个参数指定即将要执行的任务代码, 第二个参数指定多长时间之后执行 (单位为毫秒).
package thread;

import java.util.Timer;
import java.util.TimerTask;

public class Test2 {
    public static void main(String[] args) {
        Timer timer=new Timer();
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                System.out.println("hello timer");
            }
        },5000);
    }
}

执行结果:

在这里插入图片描述

【补充】:

TimerTask 类实现了Runnable接口

在这里插入图片描述

【定时器和sleep的区别】:

使用sleep会把当前的线程给阻塞掉
使用定时器,当前线程不会阻塞

【使用定时器】


import java.util.Timer; import java.util.TimerTask;

public class Test2 {
    public static void main(String[] args) throws InterruptedException {
        Timer timer=new Timer();
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                System.out.println("hello timer");
            }
        },5000);
        while (true){
            System.out.println("main");
            Thread.sleep(1000);
        }
    } }

执行结果:

在这里插入图片描述

【 使用sleep】


import java.util.Timer; import java.util.TimerTask;

public class Test2 {
    public static void main(String[] args) throws InterruptedException {
        Thread.sleep(5000);
        System.out.println("hello timer");
        while (true){
            System.out.println("main");
            Thread.sleep(1000);
        }
    } }

执行结果:

在这里插入图片描述

3.3 模拟实现定时器

定时器的构成:

  • 一个带优先级的阻塞队列

为啥要带优先级呢?
因为阻塞队列中的任务都有各自的执行时刻 (delay). 最先执行的任务一定是 delay 最小的. 使用带优先级的队列就可以高效的把这个 delay 最小的任务找出来

  • 队列中的每个元素是一个Task对象
  • Task中带有时间属性
  • 同时还要有一个worker线程,线程一直扫描队首元素,看队首元素是否需要执行

(1) Timer 类提供的核心接口为 schedule, 用于注册一个任务, 并指定这个任务多长时间后执行.

public class MyTimer {
     //command 要执行的任务
    //after 该任务多久后开始执行
    public void schedule(Runnable command,long after){

    }
}

(2) Task 类用于描述一个任务. 里面包含一个 Runnable 对象和一个 time(毫秒时
间戳)这个对象需要放到 优先队列 中. 因此需要实现 Comparable 接口.

class Task implements Comparable<Task>{
    private  Runnable command;
    private long time;
    public Task(Runnable command,long after){
        this.command=command;
        //此处记录的是绝对的时间戳
        this.time=System.currentTimeMillis()+after;
    }
    //执行任务的方法,直接在内部调用Runnable的run方法即可
    public void run(){
        command.run();
    }

    @Override
    public int compareTo(Task o) {
        //时间小的排在前面
        return (int)(this.time-o.time);
    }
    public long getTime() {
        return time;
    }

}

(3)Timer 实例中, 通过 PriorityBlockingQueue 来组织若干个 Task 对象.
通过 schedule 来往队列中插入一个个 Task 对象.

public class MyTimer {
    private PriorityBlockingQueue<Task> queue=new PriorityBlockingQueue<>();

    //command 要执行的任务
    //after 该任务多久后开始执行
    public void schedule(Runnable command,long after){
     Task task=new Task(command, after);
     queue.put(task);
    }
}

(4) Timer 类中存在一个 worker 线程, 一直不停的扫描队首元素, 看看是否能执行这个任务.

package thread;

import javax.rmi.ssl.SslRMIClientSocketFactory;
import java.util.concurrent.PriorityBlockingQueue;

class Task implements Comparable<Task>{
    private  Runnable command;
    private long time;
    public Task(Runnable command,long after){
        this.command=command;
        //此处记录的是绝对的时间戳
        this.time=System.currentTimeMillis()+after;
    }
    //执行任务的方法,直接在内部调用Runnable的run方法即可
    public void run(){
        command.run();
    }

    @Override
    public int compareTo(Task o) {
        //时间小的排在前面
        return (int)(this.time-o.time);
    }

    public long getTime() {
        return time;
    }

    public void setTime(long time) {
        this.time = time;
    }
}
public class MyTimer {
    private PriorityBlockingQueue<Task> queue=new PriorityBlockingQueue<>();

    //command 要执行的任务
    //after 该任务多久后开始执行
    public void schedule(Runnable command,long after){
     Task task=new Task(command, after);
         queue.put(task);
         queue.notify();
    }
    public  MyTimer(){
        Thread worker=new Thread(()->{
            while(true){
                //循环过程中获取队首元素
                //判断队首元素是否可以执行
                try {
                    Task task=queue.take();
                    long curTime= System.currentTimeMillis();
                    if(task.getTime()>curTime){
                        //还没到应该执行此任务的时间
                         queue.put(task);
                    }else{
                      task.run();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        worker.start();
    }
}

【bug分析】:当前的代码存在一个严重的问题,就是while(true)转的太快了,造成了无意义的CPU浪费

比如第一个任务设定的是 1 min 之后执行某个逻辑. 但是这里的 while (true) 会导致每秒钟访问队首元素几万次. 而当前距离任务执行的时间还有很久呢.

【bug处理】:使用wait和notify

(5) 引入locker对象,借助该对象的 wait / notify 来解决 while (true) 的忙等问题

package thread;

import javax.rmi.ssl.SslRMIClientSocketFactory;
import java.util.concurrent.PriorityBlockingQueue;

class Task implements Comparable<Task>{
    private  Runnable command;
    private long time;
    public Task(Runnable command,long after){
        this.command=command;
        //此处记录的是绝对的时间戳
        this.time=System.currentTimeMillis()+after;
    }
    //执行任务的方法,直接在内部调用Runnable的run方法即可
    public void run(){
        command.run();
    }

    @Override
    public int compareTo(Task o) {
        //时间小的排在前面
        return (int)(this.time-o.time);
    }

    public long getTime() {
        return time;
    }

    public void setTime(long time) {
        this.time = time;
    }
}
public class MyTimer {
    private Object locker=new Object();
    private PriorityBlockingQueue<Task> queue=new PriorityBlockingQueue<>();

    //command 要执行的任务
    //after 该任务多久后开始执行
    public void schedule(Runnable command,long after){
     Task task=new Task(command, after);
         queue.put(task);
        //每次有新任务到来的时候唤醒一下 worker 线程. (因为新插入的任务可能
       // 是需要马上执行的).
         synchronized (locker){
             locker.notify();
         }
    }
    public  MyTimer(){
        Thread worker=new Thread(()->{
            while(true){
                //循环过程中获取队首元素
                //判断队首元素是否可以执行
                try {
                    Task task=queue.take();
                    long curTime= System.currentTimeMillis();
                    if(task.getTime()>curTime){
                        //还没到应该执行此任务的时间
                         queue.put(task);
                         synchronized (locker){
                             locker.wait(task.getTime()-curTime);
                         }
                    }else{
                      task.run();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        worker.start();
    }
    
}

【bug分析】:即使我们引入了wait和notify解决了CPU忙等的问题,但该代码仍然存在bug

在这里插入图片描述


【解决办法】:扩大锁的范围

package thread;

import javax.rmi.ssl.SslRMIClientSocketFactory;
import java.util.concurrent.PriorityBlockingQueue;

class Task implements Comparable<Task>{
    private  Runnable command;
    private long time;
    public Task(Runnable command,long after){
        this.command=command;
        //此处记录的是绝对的时间戳
        this.time=System.currentTimeMillis()+after;
    }
    //执行任务的方法,直接在内部调用Runnable的run方法即可
    public void run(){
        command.run();
    }

    @Override
    public int compareTo(Task o) {
        //时间小的排在前面
        return (int)(this.time-o.time);
    }

    public long getTime() {
        return time;
    }

    public void setTime(long time) {
        this.time = time;
    }
}
public class MyTimer {
    private Object locker=new Object();
    private PriorityBlockingQueue<Task> queue=new PriorityBlockingQueue<>();

    //command 要执行的任务
    //after 该任务多久后开始执行
    public void schedule(Runnable command,long after){
     Task task=new Task(command, after);
        synchronized (locker){
        //这里的put操作也要放在锁中,如果将put放在锁的外面,仍有可能出现上面的
        //bug(在取完队首元素后,还没等到执行wait就有新任务插入)
         queue.put(task);
         //每次有新任务到来的时候唤醒一下 worker 线程. (因为新插入的任务可能
         // 是需要马上执行的).
            locker.notify();
         }
    }
    public  MyTimer(){
        Thread worker=new Thread(()->{
            while(true){
                //循环过程中获取队首元素
                //判断队首元素是否可以执行
                try {
                    synchronized (locker) {
                        Task task = queue.take();
                        long curTime = System.currentTimeMillis();
                        if (task.getTime() > curTime) {
                            //还没到应该执行此任务的时间
                            queue.put(task);

                            locker.wait(task.getTime() - curTime);

                        } else {
                            task.run();
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        worker.start();
    }
}

上述的代码仍存在一个问题,就是可能出现"死等"

在这里插入图片描述


【完整代码】:

package thread;

import javax.rmi.ssl.SslRMIClientSocketFactory;
import java.util.concurrent.PriorityBlockingQueue;

class Task implements Comparable<Task>{
    private  Runnable command;
    private long time;
    public Task(Runnable command,long after){
        this.command=command;
        //此处记录的是绝对的时间戳
        this.time=System.currentTimeMillis()+after;
    }
    //执行任务的方法,直接在内部调用Runnable的run方法即可
    public void run(){
        command.run();
    }

    @Override
    public int compareTo(Task o) {
        //时间小的排在前面
        return (int)(this.time-o.time);
    }

    public long getTime() {
        return time;
    }

    public void setTime(long time) {
        this.time = time;
    }
}
public class MyTimer {
    private Object locker=new Object();
    private PriorityBlockingQueue<Task> queue=new PriorityBlockingQueue<>();

    //command 要执行的任务
    //after 该任务多久后开始执行
    public void schedule(Runnable command,long after){
     Task task=new Task(command, after);
        synchronized (locker){
         queue.put(task);
         //每次有新任务到来的时候唤醒一下 worker 线程. (因为新插入的任务可能
         // 是需要马上执行的).
            locker.notify();
         }
    }
    public  MyTimer(){
        Thread worker=new Thread(()->{
            while(true){
                //循环过程中获取队首元素
                //判断队首元素是否可以执行
                try {
                    synchronized (locker) {
                        while (queue.isEmpty()){
                            locker.wait();
                        }
                        Task task = queue.take();
                        long curTime = System.currentTimeMillis();
                        if (task.getTime() > curTime) {
                            //还没到应该执行此任务的时间
                            queue.put(task);
                            locker.wait(task.getTime() - curTime);
                        } else {
                            task.run();
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        worker.start();
    }

}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


学习编程是顺着互联网的发展潮流,是一件好事。新手如何学习编程?其实不难,不过在学习编程之前你得先了解你的目的是什么?这个很重要,因为目的决定你的发展方向、决定你的发展速度。
IT行业是什么工作做什么?IT行业的工作有:产品策划类、页面设计类、前端与移动、开发与测试、营销推广类、数据运营类、运营维护类、游戏相关类等,根据不同的分类下面有细分了不同的岗位。
女生学Java好就业吗?女生适合学Java编程吗?目前有不少女生学习Java开发,但要结合自身的情况,先了解自己适不适合去学习Java,不要盲目的选择不适合自己的Java培训班进行学习。只要肯下功夫钻研,多看、多想、多练
Can’t connect to local MySQL server through socket \'/var/lib/mysql/mysql.sock问题 1.进入mysql路径
oracle基本命令 一、登录操作 1.管理员登录 # 管理员登录 sqlplus / as sysdba 2.普通用户登录
一、背景 因为项目中需要通北京网络,所以需要连vpn,但是服务器有时候会断掉,所以写个shell脚本每五分钟去判断是否连接,于是就有下面的shell脚本。
BETWEEN 操作符选取介于两个值之间的数据范围内的值。这些值可以是数值、文本或者日期。
假如你已经使用过苹果开发者中心上架app,你肯定知道在苹果开发者中心的web界面,无法直接提交ipa文件,而是需要使用第三方工具,将ipa文件上传到构建版本,开...
下面的 SQL 语句指定了两个别名,一个是 name 列的别名,一个是 country 列的别名。**提示:**如果列名称包含空格,要求使用双引号或方括号:
在使用H5混合开发的app打包后,需要将ipa文件上传到appstore进行发布,就需要去苹果开发者中心进行发布。​
+----+--------------+---------------------------+-------+---------+
数组的声明并不是声明一个个单独的变量,比如 number0、number1、...、number99,而是声明一个数组变量,比如 numbers,然后使用 nu...
第一步:到appuploader官网下载辅助工具和iCloud驱动,使用前面创建的AppID登录。
如需删除表中的列,请使用下面的语法(请注意,某些数据库系统不允许这种在数据库表中删除列的方式):
前不久在制作win11pe,制作了一版,1.26GB,太大了,不满意,想再裁剪下,发现这次dism mount正常,commit或discard巨慢,以前都很快...
赛门铁克各个版本概览:https://knowledge.broadcom.com/external/article?legacyId=tech163829
实测Python 3.6.6用pip 21.3.1,再高就报错了,Python 3.10.7用pip 22.3.1是可以的
Broadcom Corporation (博通公司,股票代号AVGO)是全球领先的有线和无线通信半导体公司。其产品实现向家庭、 办公室和移动环境以及在这些环境...
发现个问题,server2016上安装了c4d这些版本,低版本的正常显示窗格,但红色圈出的高版本c4d打开后不显示窗格,
TAT:https://cloud.tencent.com/document/product/1340