rss· 投稿· 设为首页· 加入收藏· 繁體版
当前位置: 火魔网 » 程序开发 » Java基础

Java线程池应用

在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源。在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收。所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁。如何利用已有对象来服务就是一个需要解决的关键问题,其实这就是一些"池化资源"技术产生的原因。比如大家所熟悉的数据库连接池正是遵循这一思想而产生的,本文将介绍的线程池技术同样符合这一思想。

目前,一些著名的大公司都特别看好这项技术,并早已经在他们的产品中应用该技术。比如IBM的WebSphere,IONA的Orbix 2000在SUN的 Jini中,Microsoft的MTS(Microsoft Transaction Server 2.0),COM+等。

现在您是否也想在服务器程序应用该项技术?

一、简介
线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,


BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)

corePoolSize: 线程池维护线程的最少数量
maximumPoolSize:线程池维护线程的最大数量
keepAliveTime: 线程池维护线程所允许的空闲时间
unit: 线程池维护线程所允许的空闲时间的单位
workQueue: 线程池所使用的缓冲队列
handler: 线程池对拒绝任务的处理策略

一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是 Runnable类型对象的run()方法。

当一个任务通过execute(Runnable)方法欲添加到线程池时:

如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。

如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。

如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。

如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。

也就是:处理任务的优先级为:
核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。

当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。

unit可选的参数为java.util.concurrent.TimeUnit中的几个静态属性:
NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。

workQueue我常用的是:java.util.concurrent.ArrayBlockingQueue

handler有四个选择:
ThreadPoolExecutor.AbortPolicy()
抛出java.util.concurrent.RejectedExecutionException异常
ThreadPoolExecutor.CallerRunsPolicy()
重试添加当前的任务,他会自动重复调用execute()方法
ThreadPoolExecutor.DiscardOldestPolicy()
抛弃旧的任务
ThreadPoolExecutor.DiscardPolicy()
抛弃当前的任务

二、一般用法举例
//------------------------------------------------------------

Java代码
  • import java.util.concurrent.ArrayBlockingQueue;   
  • import java.util.concurrent.ThreadPoolExecutor;   
  • import java.util.concurrent.TimeUnit;   
  •   
  • public class TestThreadPool2 {   
  •     private static int produceTaskSleepTime = 2;   
  •        
  •     private static int produceTaskMaxNumber = 10;   
  •   
  •     public static void main(String[] args) {   
  •   
  •     //构造一个线程池   
  •     ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 3,   
  •     TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3),   
  •     new ThreadPoolExecutor.DiscardOldestPolicy());   
  •   
  •     for(int i=1;i<=produceTaskMaxNumber;i++){   
  •     try {   
  •     //产生一个任务,并将其加入到线程池   
  •     String task = "task@ " + i;   
  •     System.out.println("put " + task);   
  •     threadPool.execute(new ThreadPoolTask(task));   
  •   
  •     //便于观察,等待一段时间   
  •     Thread.sleep(produceTaskSleepTime);   
  •     } catch (Exception e) {   
  •     e.printStackTrace();   
  •     }   
  •     }   
  •     }   
  • }   
  •   
  • import java.io.Serializable;   
  •   
  • public class ThreadPoolTask implements Runnable, Serializable {   
  •   
  •     private static final long serialVersionUID = 0;   
  •     private static int consumeTaskSleepTime = 2000;   
  •     //保存任务所需要的数据   
  •     private Object threadPoolTaskData;   
  •   
  •     ThreadPoolTask(Object tasks){   
  •     this.threadPoolTaskData = tasks;   
  •     }   
  •     public void run(){   
  •     //处理一个任务,这里的处理方式太简单了,仅仅是一个打印语句   
  •     System.out.println(Thread.currentThread().getName());      
  •     System.out.println("start .."+threadPoolTaskData);   
  •        
  •     try {   
  •     ////便于观察,等待一段时间   
  •     Thread.sleep(consumeTaskSleepTime);   
  •     } catch (Exception e) {   
  •     e.printStackTrace();   
  •     }   
  •     threadPoolTaskData = null;   
  •     }   
  •        
  •     public Object getTask(){   
  •     return this.threadPoolTaskData;   
  •     }   
  •     }   
  • 运行结果:   
  • put task@ 1  
  • pool-1-thread-1  
  • start ..task@ 1  
  • put task@ 2  
  • pool-1-thread-2  
  • start ..task@ 2  
  • put task@ 3  
  • put task@ 4  
  • put task@ 5  
  • put task@ 6  
  • pool-1-thread-3  
  • start ..task@ 3  
  • put task@ 7  
  • pool-1-thread-4  
  • start ..task@ 4  
  • put task@ 8  
  • put task@ 9  
  • put task@ 10  
  • pool-1-thread-1  
  • start ..task@ 8  
  • pool-1-thread-2  
  • start ..task@ 9  
  • pool-1-thread-3  
  • start ..task@ 10  
  • 线程池主要是用来 处理多个请求时,减少资源消耗,提高应用性能。

            下面的代码是来自于 孙卫琴:<<Java网络编程精解>> 中线程池实现源码, 代码结构简单清晰,对于理解线程池,wait(),notify()方法都有有很好的帮助.

            view plaincopy to clipboardprint?
    import java.util.LinkedList;  
    public class ThreadPool extends ThreadGroup {  
        private boolean isClosed = false; // 线程池是否关闭  
        private LinkedList<Runnable> workQueue; // 表示工作队列  
        private static int threadPoolID; // 表示线程池ID  
        private int threadID; // 表示工作线程ID  
        public ThreadPool(int poolSize) { // poolSize指定线程池中的工作线程数目  
            super("ThreadPool-" + (threadPoolID++));  
            setDaemon(true);  
            workQueue = new LinkedList<Runnable>(); // 创建工作队列  
            for (int i = 0; i < poolSize; i++)  
                new WorkThread().start(); // 创建并启动工作线程  
        }  
        /** 向工作队列中加入一个新任务,由工作线程去执行该任务 */ 
        public synchronized void execute(Runnable task) {  
            if (isClosed) { // 线程池被关则抛出IllegalStateException异常  
                throw new IllegalStateException();  
            }  
            if (task != null) {  
                workQueue.add(task);  
                notify(); // 唤醒正在getTask()方法中等待任务的工作线程  
            }  
        }  
        /** 从工作队列中取出一个任务,工作线程会调用此方法 */ 
        protected synchronized Runnable getTask() throws InterruptedException {  
            while (workQueue.size() == 0) {  
                if (isClosed) {  
                    return null;  
                }  
                wait(); // 如果工作队列中没有任务,就等待任务  
            }  
            return workQueue.removeFirst();  
        }  
        /** 关闭线程池 */ 
        public synchronized void close() {  
            if (!isClosed) {  
                isClosed = true;  
                workQueue.clear();   
                interrupt();   
            }  
        }  
        /** 等待工作线程把所有任务执行完 */ 
        public void join() {  
            synchronized (this) {  
                isClosed = true;  
                notifyAll(); // 唤醒还在getTask()方法中等待任务的工作线程  
            }  
            Thread[] threads = new Thread[activeCount()];  
            // enumerate()方法继承自ThreadGroup类,获得线程组中当前所有活着的工作线程  
            int count = enumerate(threads);  
            for (int i = 0; i < count; i++) { // 等待所有工作线程运行结束  
                try {  
                    threads[i].join(); // 等待工作线程运行结束  
                } catch (InterruptedException ex) {  
                }  
            }  
        }  
        /** 内部类:工作线程 */ 
        private class WorkThread extends Thread {  
            public WorkThread() {  
                super(ThreadPool.this, "WorkThread-" + (threadID++));  
            }  
            public void run() {  
                while (!isInterrupted()) { // isInterrupted()方法继承自Thread类,判断线程是否被中断  
                    Runnable task = null;  
                    try {  
                        task = getTask();  
                    } catch (InterruptedException ex) {  
                    }  
                    if (task == null)  
                        return;  
                    try {   
                        task.run();  
                    } catch (Throwable t) {  
                        t.printStackTrace();  
                    }  
                }  
            }  
        }  
    }  
     
     
     
     
    // Test  
    public class ThreadPoolTester {  
        public static void main(String[] args) {  
            int numTasks = 10;  
            int poolSize = 6;  
            ThreadPool threadPool = new ThreadPool(poolSize); // 创建线程池  
            for (int i = 0; i < numTasks; i++) {  
                threadPool.execute(createTask(i));  
            }  
            threadPool.join();  
        }  
        private static Runnable createTask(final int taskID) {  
            return new Runnable() {  
                public void run() {  
                    try {  
                        Thread.sleep(5000);  
                    } catch (InterruptedException ex) {  
                    }  
                    // Your Task.....  
                }  
            };  
        }  

    顶一下
    (0)
    踩一下
    (0)