接着上篇中没写完的(
http://my.oschina.net/bluesroot/blog/223453),上篇中讲到很多,为完成对一个目录的扫描的频繁的IO操作,我们从单线程到多线程,从CountDownLatch到BlockingQueue,中间不免各种Callable和Future和ExecutorService等等,虽然纷繁,中间有些不免麻烦,但是最终仍紧紧贴着我们的需求和多线程操作这一主题。
随着jdk的版本升级,并发包也随之扩充了不少,上面博文中的API来源于jdk1.6,1.7的推出和新的api的新增,我们有了更安全更方便更高效的方式去解决上面的问题。
1.5的时候,我们更多的使用synchronized,并辅助其他的api的去操作多线程,线程的理解不深刻,这样的做法很多时候是很危险的;1.6到了,官方推荐使用线程池,线程池成熟且方便的为大家省去了很多不必要的麻烦,如今到了1.7的时候了,我们发觉它愈发高效。
最常用的,我们当然能使用ExecutorService来管理并调度线程池中的线程来执行任务。然后问题很分明,设计该线程的时候,我们该如何设置线程的数量呢?多了怕浪费,少了又不高效,好歹有个线程计算公式(线程数=cpu核心数 /(1-阻塞系数),阻塞系数在0-1之间取值),虽然不是绝对正确,却也能有参考。在java7中,针对这一情况,专门加入了针对ExecutorService效率和性能的改进版的fork-join API。
ForkJoinPool类可以根据可用的处理器数量和任务需求,动态的对线程进行管理。(真心高端...)Fork-join使用了work-stealing策略,即本线程在完成自己的任务之后,在发现其他的线程还有活没做完,则主动帮助其他的线程一起干。该策略的使用,不但提升了API的性能,而且有助于提高线程的利用率。(妈妈再也不用担心,我开多少线程大小的线程池了~~)
在这个例子中,为了更好的调度任务,我们提供一些ForkJoinTask的实例来配合ForkJoinPool的函数的使用。我们用ForkJoinTask来创建(fork任务),然后再将主线程join到任务的完成点上。这样就行了。
ForkJoinTask有两个子类,RecursiveAction和RecursiveTask。前者的子类主要用来执行不需要返回值的任务(是不是跟Runnable的说法类似?),而后者的子类,主要用于执行需要返回值的任务。(对了,跟Future的说法类似的)
Fork-join API主要用于处理那些有些规模合理的任务,即如果每个任务所分担的开销都不大(也没有不确定的循环),则该API就可以达到合理的吞吐量。此外,Fork-join的API希望所有的任务都不要有副作用(即不要改变共享状态),并且没有同步或者锁操作,而本例正是如此,所以,这里使用它,能大大提高效率和代码简洁。它的场景有限,大家切不可以为,它能代替谁,它的出现是为了简化和解决某些情况,而并非取代某些老牌的并发API,ExecutorService表示毫无压力..........
废话太多了,我们结合上篇文章中的场景需求,看看如何使用jdk7的新api,优雅且高效的解决该问题,代码如下:
01 |
public class FileSize { |
03 |
private final static ForkJoinPool forkJoinPool = new ForkJoinPool(); |
05 |
private static class FileSizeFinder extends RecursiveTask<Long> { |
08 |
public FileSizeFinder( final File theFile) { |
12 |
@Override public Long compute() { |
17 |
final File[] children = file.listFiles(); |
18 |
if (children != null ) { |
19 |
List<ForkJoinTask<Long>> tasks = |
20 |
new ArrayList<ForkJoinTask<Long>>(); |
21 |
for ( final File child : children) { |
23 |
size += child.length(); |
25 |
tasks.add( new FileSizeFinder(child)); |
29 |
for ( final ForkJoinTask<Long> task : invokeAll(tasks)) { |
39 |
public static void main( final String[] args) { |
40 |
final long start = System.nanoTime(); |
41 |
final long total = forkJoinPool.invoke( |
42 |
new FileSizeFinder( new File( "D:/tools" ))); |
43 |
final long end = System.nanoTime(); |
44 |
System.out.println( "Total Size: " + total); |
45 |
System.out.println( "Time taken: " + (end - start)/ 1 .0e9); |
上述tools文件夹,放了tomcat,maven,spring的包和maven的仓库,各种子目录很多,大小是543M,cpu i5,4g内存,运行结果为:
2 |
Time taken: 0.697063339 |
上述代码中,我们创建了ForkJoinPool实例的引用,用static修饰,即他在整个应用程序中共享。随后定义一个名为FileSizeFinder的静态内部类,它继承RecursiveTask类,并实现了compute(),该方法可以用来作为任务的执行引擎。invokeAll()函数将等待所有的子任务完成之后,才会去执行下一步循环累加操作。在任务被阻塞期间,其执行线程并非什么也不做,而是可以被调度去做其他的任务,最后每个任务都将compute函数结束时返回计算的结果。
本例中,我们递归的将扫描任务进行分解,直到不能拆分。但一般来说,拆分粒度太细会显著增加线程调度的开销,所以我们不建议将问题拆分的过小。
java.until.concurrent包中,定义了很多线程安全的集合类,这个集合类既保证了并发编程环境下的数据安全性,同时也可以被当做同步点来使用。线程安全很重要,但是我们不想为此牺牲太多的性能。后面我们继续探讨concurrent中与并发应用性能息息相关的一些数据结构。
多线程IO操作(fork-join版),布布扣,bubuko.com