1 /*
   2  * Copyright (c) 2003, 2012, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package com.sun.corba.se.impl.orbutil.threadpool;
  27 
  28 import java.util.LinkedList;
  29 
  30 import com.sun.corba.se.spi.orbutil.threadpool.ThreadPool;
  31 import com.sun.corba.se.spi.orbutil.threadpool.Work;
  32 import com.sun.corba.se.spi.orbutil.threadpool.WorkQueue;
  33 
  34 import com.sun.corba.se.impl.orbutil.ORBConstants;
  35 import com.sun.corba.se.impl.orbutil.threadpool.ThreadPoolImpl;
  36 
  37 import com.sun.corba.se.spi.monitoring.MonitoringConstants;
  38 import com.sun.corba.se.spi.monitoring.MonitoringFactories;
  39 import com.sun.corba.se.spi.monitoring.MonitoredObject;
  40 import com.sun.corba.se.spi.monitoring.LongMonitoredAttributeBase;
  41 
  42 public class WorkQueueImpl implements WorkQueue
  43 {
  44     private ThreadPool workerThreadPool;
  45     private LinkedList theWorkQueue = new LinkedList();
  46     private long workItemsAdded = 0;
  47 
  48     // Initialized to 1 to avoid divide by zero in averageTimeInQueue()
  49     private long workItemsDequeued = 1;
  50 
  51     private long totalTimeInQueue = 0;
  52 
  53     // Name of the work queue
  54     private String name;
  55 
  56     // MonitoredObject for work queue
  57     private MonitoredObject workqueueMonitoredObject;
  58 
  59     public WorkQueueImpl() {
  60         name=ORBConstants.WORKQUEUE_DEFAULT_NAME;
  61         initializeMonitoring();
  62     }
  63 
  64     public WorkQueueImpl(ThreadPool workerThreadPool) {
  65         this(workerThreadPool, ORBConstants.WORKQUEUE_DEFAULT_NAME);
  66     }
  67 
  68     public WorkQueueImpl(ThreadPool workerThreadPool, String name) {
  69         this.workerThreadPool = workerThreadPool;
  70         this.name = name;
  71         initializeMonitoring();
  72     }
  73 
  74     // Setup monitoring for this workqueue
  75     private void initializeMonitoring() {
  76         workqueueMonitoredObject = MonitoringFactories.
  77                             getMonitoredObjectFactory().
  78                             createMonitoredObject(name,
  79                             MonitoringConstants.WORKQUEUE_MONITORING_DESCRIPTION);
  80 
  81         LongMonitoredAttributeBase b1 = new
  82             LongMonitoredAttributeBase(MonitoringConstants.WORKQUEUE_TOTAL_WORK_ITEMS_ADDED,
  83                     MonitoringConstants.WORKQUEUE_TOTAL_WORK_ITEMS_ADDED_DESCRIPTION) {
  84                 public Object getValue() {
  85                     return new Long(WorkQueueImpl.this.totalWorkItemsAdded());
  86                 }
  87             };
  88         workqueueMonitoredObject.addAttribute(b1);
  89         LongMonitoredAttributeBase b2 = new
  90             LongMonitoredAttributeBase(MonitoringConstants.WORKQUEUE_WORK_ITEMS_IN_QUEUE,
  91                     MonitoringConstants.WORKQUEUE_WORK_ITEMS_IN_QUEUE_DESCRIPTION) {
  92                 public Object getValue() {
  93                     return new Long(WorkQueueImpl.this.workItemsInQueue());
  94                 }
  95             };
  96         workqueueMonitoredObject.addAttribute(b2);
  97         LongMonitoredAttributeBase b3 = new
  98             LongMonitoredAttributeBase(MonitoringConstants.WORKQUEUE_AVERAGE_TIME_IN_QUEUE,
  99                     MonitoringConstants.WORKQUEUE_AVERAGE_TIME_IN_QUEUE_DESCRIPTION) {
 100                 public Object getValue() {
 101                     return new Long(WorkQueueImpl.this.averageTimeInQueue());
 102                 }
 103             };
 104         workqueueMonitoredObject.addAttribute(b3);
 105     }
 106 
 107 
 108     // Package private method to get the monitored object for this
 109     // class
 110     MonitoredObject getMonitoredObject() {
 111         return workqueueMonitoredObject;
 112     }
 113 
 114     public synchronized void addWork(Work work) {
 115             workItemsAdded++;
 116             work.setEnqueueTime(System.currentTimeMillis());
 117             theWorkQueue.addLast(work);
 118             ((ThreadPoolImpl)workerThreadPool).notifyForAvailableWork(this);
 119     }
 120 
 121     synchronized Work requestWork(long waitTime) throws TimeoutException, InterruptedException
 122     {
 123         Work workItem;
 124         ((ThreadPoolImpl)workerThreadPool).incrementNumberOfAvailableThreads();
 125 
 126             if (theWorkQueue.size() != 0) {
 127                 workItem = (Work)theWorkQueue.removeFirst();
 128                 totalTimeInQueue += System.currentTimeMillis() - workItem.getEnqueueTime();
 129                 workItemsDequeued++;
 130                 ((ThreadPoolImpl)workerThreadPool).decrementNumberOfAvailableThreads();
 131                 return workItem;
 132             }
 133 
 134             try {
 135 
 136                 long remainingWaitTime = waitTime;
 137                 long finishTime = System.currentTimeMillis() + waitTime;
 138 
 139                 do {
 140 
 141                     this.wait(remainingWaitTime);
 142 
 143                     if (theWorkQueue.size() != 0) {
 144                         workItem = (Work)theWorkQueue.removeFirst();
 145                         totalTimeInQueue += System.currentTimeMillis() - workItem.getEnqueueTime();
 146                         workItemsDequeued++;
 147                         ((ThreadPoolImpl)workerThreadPool).decrementNumberOfAvailableThreads();
 148                         return workItem;
 149                     }
 150 
 151                     remainingWaitTime = finishTime - System.currentTimeMillis();
 152 
 153                 } while (remainingWaitTime > 0);
 154 
 155                 ((ThreadPoolImpl)workerThreadPool).decrementNumberOfAvailableThreads();
 156                 throw new TimeoutException();
 157 
 158             } catch (InterruptedException ie) {
 159                 ((ThreadPoolImpl)workerThreadPool).decrementNumberOfAvailableThreads();
 160                 throw ie;
 161             }
 162     }
 163 
 164     public void setThreadPool(ThreadPool workerThreadPool) {
 165             this.workerThreadPool = workerThreadPool;
 166     }
 167 
 168     public ThreadPool getThreadPool() {
 169             return workerThreadPool;
 170     }
 171 
 172     /**
 173      * Returns the total number of Work items added to the Queue.
 174      * This method is unsynchronized and only gives a snapshot of the
 175      * state when it is called
 176      */
 177     public long totalWorkItemsAdded() {
 178         return workItemsAdded;
 179     }
 180 
 181     /**
 182      * Returns the total number of Work items in the Queue to be processed
 183      * This method is unsynchronized and only gives a snapshot of the
 184      * state when it is called
 185      */
 186     public int workItemsInQueue() {
 187         return theWorkQueue.size();
 188     }
 189 
 190     public synchronized long averageTimeInQueue() {
 191         return (totalTimeInQueue/workItemsDequeued);
 192     }
 193 
 194     public String getName() {
 195         return name;
 196     }
 197 }
 198 
 199 // End of file.