JAVA之并发编程(一) 2019-02-17 程序之旅 暂无评论 917 次阅读 过了一个年,突然回到公司不知道该做些什么,老板也没有直接给任务,现在就等着分配任务。感觉好久没有进行学习,万一老板一个心情不爽给我分配一些莫名的任务,自己傻眼不会做那就尴尬了。在没有被问到之前先提升自己的知识储备才是真的,所以学习一些基础知识,虽然和现在所做的事情没有多少关系,不过这些东西迟早都会接触的,还不如早点认识。 ------ 先开始java中的并发编程,目前只在大量消息队列的时候会涉及到一点,使用线程锁来进行并发编程,没有很好的发挥java的编程优势,在这需要好好的深究一下, 该如何更好的使用java的并发编程工具。 首先我们需要知道cpu的核心数和线程数与java中进程的线程其实是两码事,例如A进程中有10个线程,这19个线程是cpu线程进行RR调度,时间片轮转机制进行上下文切换来实现A进程中10个线程的并发运行,时间片足够短就会产生一种10个线程同时进行计算运行的错觉。其次就是并行和并发这两个概念,并行:同一时刻,可以同时处理事情的能力。并发:与单位时间相关,在单位时间内可以处理事情的能力。简单的说就是我们中学食堂,并行就是多个领菜窗口同时进行分配菜,并发就是一个窗口在1分钟内给两个人进行分配菜,其时间单位就1分钟。 使用并发编程有很多的明显好处,充分利用cpu的资源,加快操作响应速度,对数据模块化、异步化处理等等,自行体会,但同时也会到来一些问题,例如在线程共享资源时会存在资源抢占的冲突,导致死锁,启动太多的线程会加大硬件压力。在出现这些问题之前,我们应该规范的做好多线程的使用,从而安全的有效的规范的使用线程,提高对代码的质量。 java中线程的基本启动方式; - Thread - Runnable - Callable 线程的中断方法: - interrupt():中断一个线程,并不是强行关闭这个线程,只是跟这个线程打个招呼,将线程的中断标志位置为true,线程是否中断,由线程本身决定。 - isInterrupted() 判定当前线程是否处于中断状态。 - interrupted() 判定当前线程是否处于中断状态,同时中断标志位改为false。 方法里如果抛出InterruptedException,线程的中断标志位会被复位成false,如果确实是需要中断线程,要求我们自己在catch语句块里再次调用interrupt()。 - join方法:线程A,执行了线程B的join方法,线程A必须要等待B执行完成了以后,线程A才能继续自己的工作 ------ java中有许多线程并发的工具类,例如fork-join、countdownlatch、cyclicbarrier等等。在我所做过的项目中使用过countdownlatch这个工具,这个工具也是编程中最常用的工具之一。 fork-join,简单的说就是分而治之,规模为N的问题,N<阈值,直接解决,N>阈值,将N分解为K个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到原问题的解。就是在必要的情况下,将一个大业务,进行拆分(fork)成若干个小任务(拆到不可再拆时),再将一个个的小任务运算的结果进行join汇总。每一个任务由一个线程进行处理,这样就能达到cpu的高效利用,同时也避免任务之间出现死锁或资源竞争的现象。 ------ 举个栗子,需求和分析: - 有批量的数据需要处理 - 对处理数据进行筛选 需要做的 - 使用fork-join对数据进行分份进行处理 - 要求打印出处理的时间 - 对比正常处理数据的时间 生成模拟数据 ```java package fork_join; import java.util.Random; /** * 生成整形数组 */ public class MakeArray { // 数组长度 public static final int ARRAY_LENGTH = 100000000; public static int[] makeArray() { Random r = new Random(); int[] result = new int[ARRAY_LENGTH]; for (int i=0; i { // 处理的数据分成10份 private final static int THRESHOLD = MakeArray.ARRAY_LENGTH/10; private int[] src; // 表示我们要实际统计的数组 private int fromIndex; // 开始统计的标 private int toIndex; // 统计到哪里结束的下标 public SumTask(int[] src, int fromIndex, int toIndex) { this.src = src; this.fromIndex = fromIndex; this.toIndex = toIndex; } @Override protected Integer compute() { if (toIndex - fromIndex < THRESHOLD) { int count = 0; for (int i=fromIndex; i<=toIndex;i++) { count = count + src[i]; } return count; } else { int mid = (fromIndex + toIndex) /2 ; SumTask left = new SumTask(src, fromIndex, mid); SumTask right = new SumTask(src, mid + 1, toIndex); invokeAll(left,right); return left.join() + right.join(); } } } public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(); int[] src = MakeArray.makeArray(); SumTask innerFind = new SumTask(src, 0, src.length-1); long start = System.currentTimeMillis(); pool.invoke(innerFind); // 同步调用 System.out.println("Task is Running ...."); System.out.println("The count is "+ innerFind.join() + "spend time:"+(System.currentTimeMillis()-start) + " ms"); } } ``` #### 使用fork-join处理文件 对比,和正常处理数据的时间上优势不是很明显,特别是在需要处理的数据量比较少的时候,fork-join的架构来进行数据处理适得其反,时间上会比正常处理数据的单线程程序要长,这是因为java虚拟机在启动和调用线程的时间是比较长,从而导致线程并发处理会比单线程处理数据还要耗时间。在我们日常的任务中,如果你所处理的数据是百万级数据,那么fork-join的架构就能突显优势。上面的是同步用法来进行数据的返回,这次使用异步的用法来尽心数据的处理,例子如下: 需求分析: - 便利指定目录(含子目录)寻找指定类型文件 ``` package fork_join; import java.io.File; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; public class FindDirsFiles extends RecursiveAction { // 当前任务需要搜索的目录 private File path; public FindDirsFiles(File path) { this.path = path; } public static void main(String[] args) throws InterruptedException { // 用一个ForkJoinPool实力调用总任务 ForkJoinPool pool = new ForkJoinPool(); FindDirsFiles task = new FindDirsFiles(new File("D:/")); pool.execute(task); // 异步调用 System.out.println("Task is Running......"); Thread.sleep(1000); int otherWork = 0; for(int i=0;i<100;i++){ otherWork = otherWork+i; } System.out.println("Main Thread done sth......,otherWork="+otherWork); task.join();//阻塞的方法 System.out.println("Task end"); } @Override protected void compute() { List subTasks = new ArrayList<>(); File[] files = path.listFiles(); if (files!= null) { for (File file:files) { if (file.isDirectory()) { subTasks.add(new FindDirsFiles(file)); } else { if (file.getAbsolutePath().endsWith("txt")){ System.out.println("文件:"+file.getAbsolutePath()); } } } if (!subTasks.isEmpty()) { for (FindDirsFiles subTask : invokeAll(subTasks)) { subTask.join(); // 等待子任务执行完成 } } } } } ``` ![fork-join3.png](https://mufeng-blog.oss-cn-beijing.aliyuncs.com/usr/uploads/2019/03/3085322418.png) 如图,查找目录文件与主线程中的计算没有同步运行。 打赏: 微信, 支付宝 标签: 并发编程, java 本作品采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可。