<span style="font-family: Arial, Helvetica, sans-serif;">import java.util.ArrayList;</span>
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.io.IntWritable;
public class ArrayTest {
public static void main(String[] args) {
long start = System.currentTimeMillis() ;
ArrayTest blt = new ArrayTest() ;
List<IntWritable> messages = new ArrayList<IntWritable>();
for(int i =0 ;i<10000; i++) {
messages.add(new IntWritable(i)) ;
}
// RejectedExecutionHandler retryHandler = new RetryRejectedExecutionHandler();
// ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
// .newCachedThreadPool();
// executor.setMaximumPoolSize(64);
// executor.setRejectedExecutionHandler(retryHandler);
//
// for (IntWritable i : messages) {
// executor.execute(blt.new Worker(i));
// }
//
// executor.shutdown();
// try {
// executor.awaitTermination(60, TimeUnit.SECONDS);
// } catch (Exception e) {
// e.printStackTrace() ;
// }
ExecutorService pool = Executors.newCachedThreadPool(); // 并发测试
CompletionService<Boolean> exchangeResult = new ExecutorCompletionService<Boolean>(
pool);
int destSize = 0;
for(IntWritable i : messages) {
exchangeResult.submit(blt.new Worker(i));
destSize++;
}
int count = 0;
while (count < destSize) {
Future<Boolean> f = exchangeResult.poll();
if (f == null)
continue;
count++ ;
}
try {
pool.awaitTermination(60, TimeUnit.SECONDS);
} catch (Throwable e) {
e.printStackTrace();
}
System.out.println("Last " + (System.currentTimeMillis() - start) + " ms");
}
class Worker implements Callable<Boolean> {
IntWritable msg;
public Worker(IntWritable msg) {
this.msg = msg;
}
@Override
public Boolean call() throws Exception {
try {
int sum = msg.get() ;
} catch (Exception e) {
e.printStackTrace() ;
}
return true;
}
}
// class Worker implements Runnable {
// IntWritable msg;
//
// public Worker(IntWritable msg) {
// this.msg = msg;
// }
//
// @Override
// public void run() {
// try {
// int sum = msg.get() ;
// } catch (Exception e) {
// e.printStackTrace() ;
// }
// }
// }
}
class RetryRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace() ;
}
executor.execute(r);
}
}
第一种:Last 60186 ms
第二种:
package concurrencyTest;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.io.IntWritable;
public class ArrayTest {
public static void main(String[] args) {
long start = System.currentTimeMillis() ;
ArrayTest blt = new ArrayTest() ;
List<IntWritable> messages = new ArrayList<IntWritable>();
for(int i =0 ;i<10000; i++) {
messages.add(new IntWritable(i)) ;
}
RejectedExecutionHandler retryHandler = new RetryRejectedExecutionHandler();
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
.newCachedThreadPool();
executor.setMaximumPoolSize(64);
executor.setRejectedExecutionHandler(retryHandler);
for (IntWritable i : messages) {
executor.execute(blt.new Worker(i));
}
executor.shutdown();
try {
executor.awaitTermination(60, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace() ;
}
// ExecutorService pool = Executors.newCachedThreadPool(); // 并发测试
// CompletionService<Boolean> exchangeResult = new ExecutorCompletionService<Boolean>(
// pool);
// int destSize = 0;
// for(IntWritable i : messages) {
// exchangeResult.submit(blt.new Worker(i));
// destSize++;
// }
//
// int count = 0;
// while (count < destSize) {
// Future<Boolean> f = exchangeResult.poll();
// if (f == null)
// continue;
// count++ ;
// }
// try {
// pool.awaitTermination(60, TimeUnit.SECONDS);
// } catch (Throwable e) {
// e.printStackTrace();
// }
System.out.println("Last " + (System.currentTimeMillis() - start) + " ms");
}
// class Worker implements Callable<Boolean> {
// IntWritable msg;
//
// public Worker(IntWritable msg) {
// this.msg = msg;
// }
//
// @Override
// public Boolean call() throws Exception {
// try {
// int sum = msg.get() ;
// } catch (Exception e) {
// e.printStackTrace() ;
// }
// return true;
// }
// }
class Worker implements Runnable {
IntWritable msg;
public Worker(IntWritable msg) {
this.msg = msg;
}
@Override
public void run() {
try {
int sum = msg.get() ;
} catch (Exception e) {
e.printStackTrace() ;
}
}
}
}
class RetryRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace() ;
}
executor.execute(r);
}
}
Last 271 ms
可能是前一种写法有问题,导致性能很差,但是奇怪没有报错!
第一种写法,即使用Callable<Boolean>在线程数较少时(<100),没有问题,但是线程数较大,如100,000,则会由于资源不足而阻塞,性能急剧降低。
第二种写法则不存在此问题。
分享到:
相关推荐
JavaExecutor并发框架.pdf
Java是天生就支持并发的语言,支持并发意味着多线程,线程的频繁创建在高并发及大数据量是非常消耗资源的,因为java提供了线程池。这篇文章主要介绍下并发包下的Executor接口,Executor接口虽然作为一个非常旧的接口...
详细介绍java并发编程相关知识: 基础知识 并发与并行 Java并发演进历史 Java并发模型 线程模型 存储模型 JVM同步原语 volatile CAS 线程安全 保护“共享数据” 低级并发工具 原子变量 锁...
大数据组件-监控-spark-driver/executor性能的prometheus-grafana模板插件
用户应用new SparkContext后,集群就会为在Worker上分配executor,但是增加executor的时候需要考虑好内存消耗,因为一台机器的内存分配给越多的executor,每个executor的内存就越小,以致出现过多的数据spill over...
Executor: 一个接口,其定义了一个接收Runnable对象的方法executor,其方法签名为executor(Runnable command),该方法接收一个Runable实例,它用来执行一个任务,任务即一个实现了Runnable接口的类,一般来说,...
并发编程之Executor线程池原理与源码解读.pdf
2.newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。 3.newScheduledThreadPool 创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。 4....
非常好的Java并发框架Executor图例,结构清晰,继承关系清楚。
1、Executor框架简介 从JDK5开始,工作单元和执行机制隔离开来,工作单元包括Runnable和Callable,执行机制由Executor提供。 调用关系:Java线程一对一映射到本地操作系统的系统线程,当多线程程序分解若干...
1.1 并发简史 1.2 线程的优势 1.2.1 发挥多处理器的强大能力 1.2.2 建模的简单性 1.2.3 异步事件的简化处理 1.2.4 响应更灵敏的用户界面 1.3 线程带来的风险 1.3.1 安全性问题 1.3.2 活跃性问题 1.3.3 ...
hadoop自带的Container-executor在配置yarn-kerberos时存在问题,以及在配置cgroup时需要把container-executor.cfg的上级目录拥有者均改为root,带来不便。 所以需要重新编译Container-executor,这边提供重新编译好...
高并发编程第三阶段12讲 sun.misc.Unsafe介绍以及几种Counter方案性能对比.mp4 高并发编程第三阶段13讲 一个JNI程序的编写,通过Java去调用C,C++程序.mp4 高并发编程第三阶段14讲 Unsafe中的方法使用,一半是...
高效率 快捷操作
cr是一个以最大并发运行您任务的job executor
随着多核处理器的普及,使用并发成为构建高性能应用程序的关键。Java 5以及6在开发并发程序取得了显著的进步,提高了Java虚拟机的性能,提高了并发类的可伸缩性,并加入了丰富的新并发构建块。在本书中,这些便利...
mybatis中的sqlsession--executor实现 mybatis中的sqlsession--executor实现
基于Android开发的Executor线程池示例
该文档详细记录了Executor框架结构、使用示意图、ThreadPoolExecutor使用示例、线程池原理分析、几种常见线程池(FixedThreadPool、SingleThreadExecutor、CachedThreadPool)的详解以及线程池大小确定等内容