`
cloudeagle_bupt
  • 浏览: 538967 次
文章分类
社区版块
存档分类
最新评论

Executor并发性能对比

阅读更多
<span style="font-family: Arial, Helvetica, sans-serif;">import java.util.ArrayList;</span>
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.io.IntWritable;

public class ArrayTest {

	public static void main(String[] args) {
		long start = System.currentTimeMillis() ;
		ArrayTest blt = new ArrayTest() ;
		List<IntWritable> messages = new ArrayList<IntWritable>();
		
		for(int i =0 ;i<10000; i++) {
			 messages.add(new IntWritable(i)) ;
		}
//		RejectedExecutionHandler retryHandler = new RetryRejectedExecutionHandler();		
//	    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
//	            .newCachedThreadPool();
//	    executor.setMaximumPoolSize(64);
//	    executor.setRejectedExecutionHandler(retryHandler);
//	    
//	     for (IntWritable i : messages) {
//	        executor.execute(blt.new Worker(i));
//	      }
//
//	      executor.shutdown();
//	      try {
//	        executor.awaitTermination(60, TimeUnit.SECONDS);
//	      } catch (Exception e) {
//	        e.printStackTrace() ;
//	      }
		
		ExecutorService pool = Executors.newCachedThreadPool(); // 并发测试
		CompletionService<Boolean> exchangeResult = new ExecutorCompletionService<Boolean>(
				pool);
		int destSize = 0;
		for(IntWritable i : messages) {
			exchangeResult.submit(blt.new Worker(i));
			destSize++;
		}
		
		int count = 0;
		while (count < destSize) {
			Future<Boolean> f = exchangeResult.poll();
			if (f == null)
				continue;
			count++ ;
		}
	    try {
			pool.awaitTermination(60, TimeUnit.SECONDS);
		} catch (Throwable e) {
			e.printStackTrace();
		}
		System.out.println("Last " + (System.currentTimeMillis() - start) + " ms");
	}

	  class Worker implements  Callable<Boolean> {
			IntWritable msg;

			public Worker(IntWritable msg) {
				this.msg = msg;
			}
			
			@Override
			public Boolean call() throws Exception {
				try {
					 int sum = msg.get() ;
				} catch (Exception e) {
					e.printStackTrace() ;
				}
				return true;
			}
		}
	
//	  class Worker implements  Runnable {
//			IntWritable msg;
//
//			public Worker(IntWritable msg) {
//				this.msg = msg;
//			}
//
//			@Override
//			public void run() {
//				try {
//					 int sum = msg.get() ;
//				} catch (Exception e) {
//					e.printStackTrace() ;
//				}
//			}
//		}
	  
}

class RetryRejectedExecutionHandler implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
      try {
        Thread.sleep(10);
      } catch (InterruptedException e) {
        e.printStackTrace() ;
      }
      executor.execute(r);
    }
}



第一种:Last 60186 ms


第二种:


package concurrencyTest;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.io.IntWritable;

public class ArrayTest {

	public static void main(String[] args) {
		long start = System.currentTimeMillis() ;
		ArrayTest blt = new ArrayTest() ;
		List<IntWritable> messages = new ArrayList<IntWritable>();
		
		for(int i =0 ;i<10000; i++) {
			 messages.add(new IntWritable(i)) ;
		}
		RejectedExecutionHandler retryHandler = new RetryRejectedExecutionHandler();		
	    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
	            .newCachedThreadPool();
	    executor.setMaximumPoolSize(64);
	    executor.setRejectedExecutionHandler(retryHandler);
	    
	     for (IntWritable i : messages) {
	        executor.execute(blt.new Worker(i));
	      }

	      executor.shutdown();
	      try {
	        executor.awaitTermination(60, TimeUnit.SECONDS);
	      } catch (Exception e) {
	        e.printStackTrace() ;
	      }
		
//		ExecutorService pool = Executors.newCachedThreadPool(); // 并发测试
//		CompletionService<Boolean> exchangeResult = new ExecutorCompletionService<Boolean>(
//				pool);
//		int destSize = 0;
//		for(IntWritable i : messages) {
//			exchangeResult.submit(blt.new Worker(i));
//			destSize++;
//		}
//		
//		int count = 0;
//		while (count < destSize) {
//			Future<Boolean> f = exchangeResult.poll();
//			if (f == null)
//				continue;
//			count++ ;
//		}
//	    try {
//			pool.awaitTermination(60, TimeUnit.SECONDS);
//		} catch (Throwable e) {
//			e.printStackTrace();
//		}
		System.out.println("Last " + (System.currentTimeMillis() - start) + " ms");
	}

//	  class Worker implements  Callable<Boolean> {
//			IntWritable msg;
//
//			public Worker(IntWritable msg) {
//				this.msg = msg;
//			}
//			
//			@Override
//			public Boolean call() throws Exception {
//				try {
//					 int sum = msg.get() ;
//				} catch (Exception e) {
//					e.printStackTrace() ;
//				}
//				return true;
//			}
//		}
	
	  class Worker implements  Runnable {
			IntWritable msg;

			public Worker(IntWritable msg) {
				this.msg = msg;
			}

			@Override
			public void run() {
				try {
					 int sum = msg.get() ;
				} catch (Exception e) {
					e.printStackTrace() ;
				}
			}
		}
	  
}

class RetryRejectedExecutionHandler implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
      try {
        Thread.sleep(10);
      } catch (InterruptedException e) {
        e.printStackTrace() ;
      }
      executor.execute(r);
    }
}


Last 271 ms

可能是前一种写法有问题,导致性能很差,但是奇怪没有报错!

第一种写法,即使用Callable<Boolean>在线程数较少时(<100),没有问题,但是线程数较大,如100,000,则会由于资源不足而阻塞,性能急剧降低。
第二种写法则不存在此问题。


分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics