package com.mwc.util;import java.util.*;

/**
 * A pool of threads attached to a queue of 
 * runnable references.
 * @author Matthew W. Coan (May 26, 2002)
 */public class ThreadPool {
   private int _min;
   private int _max;
   private long _timeOut;
   private int _maxQueue;   private LinkedList _threadList = new LinkedList();   private LinkedList _queue = new LinkedList();
   private boolean _stopped = false;
   private Object _stopLock = new Object();      private void _debug(String msg) {
      if(false)         System.out.println("["+Thread.currentThread().getName()+"]"+msg);   }

   /**
    * This class makes the thread pools status information
    * available.
    */   public class Stat {
      public int min;
      public int max;      public long sleepTime;
      public long timeOut;
      public int queueMax;      public int queueSize;
      public boolean stopped;      public int poolSize;
      
      private Stat() {}      
      public String toString() {         StringBuffer buf = new StringBuffer();         buf.append("min="+min+"\n");
         buf.append("max="+max+"\n");         buf.append("timeOut="+timeOut+"\n");
         buf.append("queueMax="+queueMax+"\n");
         buf.append("queueSize="+queueSize+"\n");
         buf.append("stopped="+stopped+"\n");
         buf.append("poolSize="+poolSize+"\n");         return buf.toString();
      }   }
   
   /**
    * A worker thread in the pool.
    */   class PoolThread extends Thread {      private long _waitingFlag = -1;
      private Object _waitingLock = new Object();
      private Object _waitOn = new Object();      /**
       * Return the waiting flag value.
       * @return -1 if not waiting, 
       *         the time it started waiting otherwise.
       */
      public long isWaiting() {
         synchronized(_waitingLock) {
            return _waitingFlag;
         }      }
         
      /**
       * Get and run tasks from the queue until stoped.  
       * If there are no tasks in the queue wait for timeOut
       * ms.  If timeOut is reached and the current size 
       * of the pool is greater then the min pool size
       * this extra thread is removed from the queue.
       */      public void run() {
         try {            _debug("PoolThread.run()");
            Runnable task;
            boolean timeOutReached;
            int qSize;
            long temp;            long totalWait = 0;
                           while(true) {               synchronized(_threadList) {
                  synchronized(_stopLock) {
                     if(_stopped) {                        _debug("STOP");                        _removeMe();
                        return;
                     }
                  }               }               task = null;
               temp = 0;
               synchronized(_queue) {
                  qSize = _queue.size();                  if(qSize != 0) {
                     task = (Runnable)_queue.get(0);                     _queue.remove(0);                     timeOutReached = false;                     _queue.notifyAll();                  }               }               timeOutReached = false;               _debug("QUEEU_SIZE="+qSize);
               if(qSize == 0) {                  try {
                     _debug("WAITING");System.out.println("["+Thread.currentThread().getName()+"]WAITING");                     synchronized(_waitingLock) {
                        temp = _waitingFlag = new Date().getTime();
                     }
                     synchronized(_threadList) {                        _threadList.notify();
                     }
                     synchronized(_waitOn) {
                        if(_timeOut < 0) 
                           _waitOn.wait();
                        else                           _waitOn.wait(Math.abs(_timeOut - totalWait));
                     }System.out.println("["+Thread.currentThread().getName()+"]WAKEUP");                     timeOutReached = true;
                  }                  catch(InterruptedException ie) {                     synchronized(_threadList) {
                        _debug("Interrupted");                        _removeMe();
                        return;
                     }
                  }                  finally {                     totalWait += (new Date().getTime() - temp);                     synchronized(_waitingLock) {
                        _waitingFlag = -1;
                     }                  }
               }                                       if(timeOutReached) {
                  synchronized(_threadList) {
                     _debug("WAITING FOR QUEUE LOCK 4");                     synchronized(_queue) {
                        _debug("GOT QUEUE LOCK 4");                        if((totalWait < _timeOut) 
                           || _queue.size() > 0 
                           || _threadList.size() <= _min) {                           _debug("RUN AGAIN");
                           continue;                        }
                     }                     _debug("TIMEOUT");                     _removeMe();
                     return;
                  }
               }
               else {
                  totalWait = 0;                  _debug("RUNNING TASK");
                  task.run();
                  _debug("TASK DONE");               }
            }
         }
         catch(Throwable throwable) {            throwable.printStackTrace();
         }      }
   }
   
   /**
    * Remove the current thread from the thread list.
    */
   private void _removeMe() {
      if(_threadList.size() > _min) {         _debug("REMOVING THREAD " + Thread.currentThread().getName());         Iterator it = _threadList.iterator();         Thread t;
         while(it.hasNext()) {            t = (Thread)it.next();            if(t == Thread.currentThread())               it.remove();
         }         _threadList.notify();      }   }
   
   /**
    * Construct a thread pool.
    * @param min the min number of threads to keep in the pool
    * @param max the max number of threads to keep in the pool
    * @param timeOut the max amout of time in milliseconds a 
    *        thread can spend waiting before it is removed from the pool.
    * @param maxQueue the max size of the queue
    */
   public ThreadPool(int min, int max, long timeOut, int maxQueue) {      _min = min;      _max = max;      _timeOut = timeOut;
      _maxQueue = maxQueue;
      PoolThread pt;      for(int i = 0; i < _min; i++) {         pt = new PoolThread();
         _threadList.add(pt);         pt.start();
      }   }
       /**
    * Put a task in the queue to be run by a worker thread. 
    * @param task a java.lang.Runnable object that needs to be executied
    */
   public void postTaskForExecution(Runnable task) {
      _debug("ThreadPool.postTaskForExecution(Runnable task)");
      _debug("WAITING FOR QUEUE LOCK 1");      synchronized(_queue) {
         _debug("GOT QUEUE LOCK");         while(_queue.size() >= _maxQueue) {            try {
               _debug("WAITING FOR QUEUE TO GET SMALLER");               _queue.wait();            }            catch(InterruptedException ie) {
               ;            }
         }
         _queue.add(task);         _queue.notifyAll();         _debug("TASK IN QUEUE");
      }         
      synchronized(_threadList) {         _debug("GOT THREAD LIST LOCK");
         PoolThread pt;
         long time,t;
         int index;
         boolean doNotify;         while(true) {            _debug("LOOP");
            synchronized(_stopLock) {
               if(_stopped)
                  return;
            }
            _debug("PAST STOP LOCK");            index = -1;            time = -1;            // Find thread that has been waiting the longest
            for(int i = 0; i < _threadList.size(); i++) {               pt = (PoolThread)_threadList.get(i);               t = pt.isWaiting();
               if(((time == -1) || (t < time)) && t != -1) {
                  time = t;
                  index = i;               }
            }            _debug("index="+index);
            if(index != -1) {
               pt = (PoolThread)_threadList.get(index);
               _debug("WAITING FOR QUEUE LOCK 2");
               doNotify = false;               synchronized(_queue) {
                  _debug("QUEUE LOCK GOT");                  if(_queue.size() > 0) {
                     _debug("NOTIFY POOLED THREAD");                     doNotify = true;
                  }
               }               if(doNotify) {
                  while(pt.isWaiting() != -1) {System.out.println("["+Thread.currentThread().getName()+"]NOTIFY("+pt.getName()+")");                     synchronized(pt._waitOn) {                        pt._waitOn.notify();                     }                  }
               }
               _debug("RETURN");               return;
            }                        if(_threadList.size() >= _max)               return;
            else {               boolean isDone = false;               _debug("GET QUEUE LOCK NEXT");
               synchronized(_queue) {                  _debug("GOT QUEUE LOCK AGAIN");
                  if(_queue.size() == 0)
                     isDone = true;
               }
               if(!isDone) {                  pt = new PoolThread();
                  _debug("CREATING AND STARTING NEW POOLED THREAD " + pt.getName());
                  _threadList.add(pt);
                  pt.start();
               }
               else
                  _debug("TASK ALREADY RAN");
               _debug("RETURN");               return;
            }
         }      }
   }
         /**
    * Stop the thread pool (cancels all worker threads).
    */
   public void stop() {      synchronized(_stopLock) {         _stopped = true;      }      synchronized(_queue) {         _queue.notifyAll();      }
   }
      /**
    * Get the current thread pool stats.
    * @return a ThreadPool.Stat object containing the current stats.
    */
   public Stat getStats() {      Stat ret = new Stat();      ret.max = _max;      ret.min = _min;      ret.timeOut = _timeOut;      ret.queueMax = _maxQueue;
      synchronized(_threadList) {
         ret.poolSize = _threadList.size();
      }      synchronized(_stopLock) {         ret.stopped = _stopped;      }
      synchronized(_queue) {         ret.queueSize = _queue.size();
      }      return ret;
   }}