面试官问:“在项目中用过多线程吗?”你就把这个案例讲给他听!

在面试当中,有时候会问到你在项目中用过多线程么?

对于普通的应届生或者工作时间不长的初级开发 ???—— crud仔流下了没有技术的眼泪。

流下了没技术的眼泪_流下_眼泪_技术表情

博主这里整理了项目中用到了多线程的一个简单的实例,希望能对你有所启发。

多线程开发实例

应用背景

应用的背景非常简单,博主做的项目是一个审核类的项目,审核的数据需要推送给第三方监管系统,这只是一个很简单的对接,但是存在一个问题。

我们需要推送的数据大概三十万条,但是第三方监管提供的接口只支持单条推送(别问为什么不支持批量,问就是没过)。可以估算一下,三十万条数据,一条数据按3秒算,大概需要250(为什么恰好会是这个数)个小时。

所以就考虑到引入多线程来进行并发操作,降低数据推送的时间,提高数据推送的实时性。

业务示例

设计要点

防止重复

我们推送给第三方的数据肯定是不能重复推送的,必须要有一个机制保证各个线程推送数据的隔离。

这里有两个思路:

    1. 将所有数据取到集合(内存)中,然后进行切割,每个线程推送不同段的数据
    1. 利用 数据库分页的方式,每个线程取 [start,limit] 区间的数据推送,我们需要保证start的一致性

这里采用了第二种方式,因为考虑到可能数据量后续会继续增加,把所有数据都加载到内存中,可能会有比较大的内存占用。

失败机制

我们还得考虑到线程推送数据失败的情况。

如果是自己的系统,我们可以把多线程调用的方法抽出来加一个事务,一个线程异常,整体回滚。

但是是和第三方的对接,我们都没法做事务的,所以,我们采用了直接在数据库记录失败状态的方法,可以在后面用其它方式处理失败的数据。

线程池选择

在实际使用中,我们肯定是要用到线程池来管理线程,关于线程池,我们常用 ThreadPoolExecutor提供的线程池服务,SpringBoot中同样也提供了线程池异步的方式,虽然SprignBoot异步可能更方便一点,但是使用ThreadPoolExecutor更加直观地控制线程池,所以我们直接使用ThreadPoolExecutor构造方法创建线程池。

大概的技术设计示意图:

设计示意图

核心代码

上面叭叭了一堆,到了show you code的环节了。我将项目里的代码抽取出来,简化出了一个示例。

核心代码如下:

/**
 * @Author 三分恶
 * @Date 2021/3/5
 * @Description
 */
@Service
public class PushProcessServiceImpl implements PushProcessService {
    @Autowired
    private PushUtil pushUtil;
    @Autowired
    private PushProcessMapper pushProcessMapper;

    private final static Logger logger = LoggerFactory.getLogger(PushProcessServiceImpl.class);

    //每个线程每次查询的条数
    private static final Integer LIMIT = 5000;
    //起的线程数
    private static final Integer THREAD_NUM = 5;
    //创建线程池
    ThreadPoolExecutor pool = new ThreadPoolExecutor(THREAD_NUM,THREAD_NUM * 2,TimeUnit.SECONDS,new LinkedBlockingQueue<>(100));

    @Override
    public void pushData() throws ExecutionException,InterruptedException {
        //计数器,需要保证线程安全
        int count = 0;
        //未推送数据总数
        Integer total = pushProcessMapper.countPushRecordsByState(0);
        logger.info("未推送数据条数:{}",total);
        //计算需要多少轮
        int num = total / (LIMIT * THREAD_NUM) + 1;
        logger.info("要经过的轮数:{}",num);
        //统计总共推送成功的数据条数
        int totalSuccessCount = 0;
        for (int i = 0; i < num; i++) {
            //接收线程返回结果
            List<Future<Integer>> futureList = new ArrayList<>(32);
            //起THREAD_NUM个线程并行查询更新库,加锁
            for (int j = 0; j < THREAD_NUM; j++) {
                synchronized (PushProcessServiceImpl.class) {
                    int start = count * LIMIT;
                    count++;
                    //提交线程,用数据起始位置标识线程
                    Future<Integer> future = pool.submit(new PushDataTask(start,LIMIT,start));
                    //先不取值,防止阻塞,放进集合
                    futureList.add(future);
                }
            }
            //统计本轮推送成功数据
            for (Future f : futureList) {
                totalSuccessCount = totalSuccessCount + (int) f.get();
            }
        }
        //更新推送标志
        pushProcessMapper.updateAllState(1);
        logger.info("推送数据完成,需推送数据:{},推送成功:{}",total,totalSuccessCount);
    }

    /**
     * 推送数据线程类
     */
    class PushDataTask implements Callable<Integer> {
        int start;
        int limit;
        int threadNo;   //线程编号

        PushDataTask(int start,int limit,int threadNo) {
            this.start = start;
            this.limit = limit;
            this.threadNo = threadNo;
        }

        @Override
        public Integer call() throws Exception {
            int count = 0;
            //推送的数据
            List<PushProcess> pushProcessList = pushProcessMapper.findPushRecordsByStateLimit(0,start,limit);
            if (CollectionUtils.isEmpty(pushProcessList)) {
                return count;
            }
            logger.info("线程{}开始推送数据",threadNo);
            for (PushProcess process : pushProcessList) {
                boolean isSuccess = pushUtil.sendRecord(process);
                if (isSuccess) {   //推送成功
                    //更新推送标识
                    pushProcessMapper.updateFlagById(process.getId(),1);
                    count++;
                } else {  //推送失败
                    pushProcessMapper.updateFlagById(process.getId(),2);
                }
            }
            logger.info("线程{}推送成功{}条",threadNo,count);
            return count;
        }
    }
}

代码很长,我们简单说一下关键的地方:

  • 线程创建:线程内部类选择了实现Callable接口,这样方便获取线程任务执行的结果,在示例里用于统计线程推送成功的数量
 class PushDataTask implements Callable<Integer> {
  • 使用 ThreadPoolExecutor 创建线程池,
  //创建线程池
      ThreadPoolExecutor pool = new ThreadPoolExecutor(THREAD_NUM,new LinkedBlockingQueue<>(100));

主要构造参数如下:

​ - corePoolSize:线程核心参数选择了5

​ - maximumPoolSize:最大线程数选择了核心线程数2倍数

​ - keepAliveTime:非核心闲置线程存活时间直接置为0

​ - unit:非核心线程保持存活的时间选择了 TimeUnit.SECONDS 秒

​ - workQueue:线程池等待队列,使用 容量初始为100的 LinkedBlockingQueue阻塞队列

这里还有没写出来的线程池拒绝策略,采用了默认AbortPolicy:直接丢弃任务,抛出异常。

  • 使用 synchronized 来保证线程安全,保证计数器的增加是有序的
  synchronized (PushProcessServiceImpl.class) {
  • 使用集合来接收线程的运行结果,防止阻塞
List<Future<Integer>> futureList = new ArrayList<>(32);

好了,主要的代码和简单的解析就到这里了。

关于这个简单的demo,这里只是简单地做推送数据处理。考虑一下,这个实例是不是可以用在你项目的某些地方。例如监管系统的数据校验、审计系统的数据统计、电商系统的数据分析等等,只要是有大量数据处理的地方,都可以把这个例子结合到你的项目里,这样你就有了多线程开发的经验。

完整代码仓库地址在文章底部

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


在 Java 语言中,提高程序的执行效率有两种实现方法,一个是使用线程、另一个是使用线程池。而在生产环境下,我们通常会采用后者。为什么会这样呢?今天我们就来聊聊线程池的优点,以及池化技术及其应用。 1.池化技术 池化技术指的是提前准备一些资源,在需要时可以重复使用这些预先准备的资源。 池化技术的优点
在 Java 中停止线程的实现方法有以下 3 种: 自定义中断标识符,停止线程。 使用线程中断方法 interrupt 停止线程。 使用 stop 停止线程。 其中 stop 方法为 @Deprecated 修饰的过期方法,也就是不推荐使用的过期方法,因为 stop 方法会直接停止线程,这样就没有给
在多线程编程中,wait 方法是让当前线程进入休眠状态,直到另一个线程调用了 notify 或 notifyAll 方法之后,才能继续恢复执行。而在 Java 中,wait 和 notify/notifyAll 有着一套自己的使用格式要求,也就是在使用 wait 和 notify(notifyAll
在 Java 语言中,并发编程都是通过创建线程池来实现的,而线程池的创建方式也有很多种,每种线程池的创建方式都对应了不同的使用场景,总体来说线程池的创建可以分为以下两类: 通过 ThreadPoolExecutor 手动创建线程池。 通过 Executors 执行器自动创建线程池。 而以上两类创建线
sleep 方法和 wait 方法都是用来将线程进入休眠状态的,并且 sleep 和 wait 方法都可以响应 interrupt 中断,也就是线程在休眠的过程中,如果收到中断信号,都可以进行响应,并抛出 InterruptedException 异常。那 sleep 和 wait 的区别都有哪些呢
在 Java 中,线程的创建方法有 7 种,分为以下 3 大类: 继承 Thread 类的方式,它有 2 种实现方法。 实现 Runnable 接口的方式,它有 3 种实现方法。 实现 Callable 接口的方式,它有 2 种实现方法。 接下来我们一个一个来看。 1.继承Thread类 继承 Th
所谓的线程池的 7 大参数是指,在使用 ThreadPoolExecutor 创建线程池时所设置的 7 个参数,如以下源码所示: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
在 Java 语言中,线程分为两类:用户线程和守护线程,默认情况下我们创建的线程或线程池都是用户线程,所以用户线程也被称之为普通线程。 想要查看线程到底是用户线程还是守护线程,可以通过 Thread.isDaemon() 方法来判断,如果返回的结果是 true 则为守护线程,反之则为用户线程。 我们
聊到线程池就一定会聊到线程池的执行流程,也就是当有一个任务进入线程池之后,线程池是如何执行的?我们今天就来聊聊这个话题。线程池是如何执行的?线程池的拒绝策略有哪些? 线程池执行流程 想要真正的了解线程池的执行流程,就得先从线程池的执行方法 execute() 说起,execute() 实现源码如下:
单例模式是面试中的常客了,它的常见写法有 4 种:饿汉模式、懒汉模式、静态内部类和枚举,接下来我们一一来看。 1.饿汉模式 饿汉模式也叫预加载模式,它是在类加载时直接创建并初始化单例对象,所以它并不存在线程安全的问题。它是依靠 ClassLoader 类机制,在程序启动时只加载一次,因此不存在线程安
线程安全是指某个方法或某段代码,在多线程中能够正确的执行,不会出现数据不一致或数据污染的情况,我们把这样的程序称之为线程安全的,反之则为非线程安全的。在 Java 中,解决线程安全问题有以下 3 种手段: 使用线程安全类,比如 AtomicInteger。 加锁排队执行 使用 synchronize
在 Java 语言中,保证线程安全性的主要手段是加锁,而 Java 中的锁主要有两种:synchronized 和 Lock,我们今天重点来看一下 synchronized 的几种用法。 用法简介 使用 synchronized 无需手动执行加锁和释放锁的操作,我们只需要声明 synchronize
在 Java 语言中,有两个线程池可以执行定时任务:ScheduledThreadPool 和 SingleThreadScheduledExecutor,其中 SingleThreadScheduledExecutor 可以看做是 ScheduledThreadPool 的单线程版本,它的用法和
从公平的角度来说,Java 中的锁总共可分为两类:公平锁和非公平锁。但公平锁和非公平锁有哪些区别?孰优孰劣呢?在 Java 中的应用场景又有哪些呢?接下来我们一起来看。 正文 公平锁:每个线程获取锁的顺序是按照线程访问锁的先后顺序获取的,最前面的线程总是最先获取到锁。 非公平锁:每个线程获取锁的顺序
单例模式的实现方法有很多种,如饿汉模式、懒汉模式、静态内部类和枚举等,当面试官问到“为什么单例模式一定要加 volatile?”时,那么他指的是为什么懒汉模式中的私有变量要加 volatile? 懒汉模式指的是对象的创建是懒加载的方式,并不是在程序启动时就创建对象,而是第一次被真正使用时才创建对象。
读写锁(Readers-Writer Lock)顾名思义是一把锁分为两部分:读锁和写锁,其中读锁允许多个线程同时获得,因为读操作本身是线程安全的,而写锁则是互斥锁,不允许多个线程同时获得写锁,并且写操作和读操作也是互斥的。总结来说,读写锁的特点是:读读不互斥、读写互斥、写写互斥。 1.读写锁使用 在
很多场景下,我们需要等待线程池的所有任务都执行完,然后再进行下一步操作。对于线程 Thread 来说,很好实现,加一个 join 方法就解决了,然而对于线程池的判断就比较麻烦了。 我们本文提供 4 种判断线程池任务是否执行完的方法: 使用 isTerminated 方法判断。 使用 getCompl
在 Java 中,线程池的状态和线程的状态是完全不同的,线程有 6 种状态:NEW:初始化状态、RUNNABLE:可运行/运行状态、BLOCKED:阻塞状态、WAITING:无时限等待状态、TIMED_WAITING:有时限等待状态和 TERMINATED:终止状态。而线程池的状态有以下 5 种:
volatile 是 Java 并发编程的重要组成部分,也是常见的面试题之一,它的主要作用有两个:保证内存的可见性和禁止指令重排序。下面我们具体来看这两个功能。 内存可见性 说到内存可见性问题就不得不提 Java 内存模型,Java 内存模型(Java Memory Model)简称为 JMM,主要
1.第一范式 第一范式规定表中的每个列都应该是不可分割的最小单元。比如以下表中的 address 字段就不是不可分割的最小单元,如下图所示: 其中 address 还可以拆分为国家和城市,如下图所示: 这样改造之后,上面的表就满足第一范式了。 2.第二范式 第二范式是在满足第一范式的基础上,规定表中