1 /* 2 * Copyright (c) 2003, 2012, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package com.sun.corba.se.impl.orbutil.threadpool; 27 28 import java.util.LinkedList; 29 30 import com.sun.corba.se.spi.orbutil.threadpool.ThreadPool; 31 import com.sun.corba.se.spi.orbutil.threadpool.Work; 32 import com.sun.corba.se.spi.orbutil.threadpool.WorkQueue; 33 34 import com.sun.corba.se.impl.orbutil.ORBConstants; 35 import com.sun.corba.se.impl.orbutil.threadpool.ThreadPoolImpl; 36 37 import com.sun.corba.se.spi.monitoring.MonitoringConstants; 38 import com.sun.corba.se.spi.monitoring.MonitoringFactories; 39 import com.sun.corba.se.spi.monitoring.MonitoredObject; 40 import com.sun.corba.se.spi.monitoring.LongMonitoredAttributeBase; 41 42 public class WorkQueueImpl implements WorkQueue 43 { 44 private ThreadPool workerThreadPool; 45 private LinkedList theWorkQueue = new LinkedList(); 46 private long workItemsAdded = 0; 47 48 // Initialized to 1 to avoid divide by zero in averageTimeInQueue() 49 private long workItemsDequeued = 1; 50 51 private long totalTimeInQueue = 0; 52 53 // Name of the work queue 54 private String name; 55 56 // MonitoredObject for work queue 57 private MonitoredObject workqueueMonitoredObject; 58 59 public WorkQueueImpl() { 60 name=ORBConstants.WORKQUEUE_DEFAULT_NAME; 61 initializeMonitoring(); 62 } 63 64 public WorkQueueImpl(ThreadPool workerThreadPool) { 65 this(workerThreadPool, ORBConstants.WORKQUEUE_DEFAULT_NAME); 66 } 67 68 public WorkQueueImpl(ThreadPool workerThreadPool, String name) { 69 this.workerThreadPool = workerThreadPool; 70 this.name = name; 71 initializeMonitoring(); 72 } 73 74 // Setup monitoring for this workqueue 75 private void initializeMonitoring() { 76 workqueueMonitoredObject = MonitoringFactories. 77 getMonitoredObjectFactory(). 78 createMonitoredObject(name, 79 MonitoringConstants.WORKQUEUE_MONITORING_DESCRIPTION); 80 81 LongMonitoredAttributeBase b1 = new 82 LongMonitoredAttributeBase(MonitoringConstants.WORKQUEUE_TOTAL_WORK_ITEMS_ADDED, 83 MonitoringConstants.WORKQUEUE_TOTAL_WORK_ITEMS_ADDED_DESCRIPTION) { 84 public Object getValue() { 85 return new Long(WorkQueueImpl.this.totalWorkItemsAdded()); 86 } 87 }; 88 workqueueMonitoredObject.addAttribute(b1); 89 LongMonitoredAttributeBase b2 = new 90 LongMonitoredAttributeBase(MonitoringConstants.WORKQUEUE_WORK_ITEMS_IN_QUEUE, 91 MonitoringConstants.WORKQUEUE_WORK_ITEMS_IN_QUEUE_DESCRIPTION) { 92 public Object getValue() { 93 return new Long(WorkQueueImpl.this.workItemsInQueue()); 94 } 95 }; 96 workqueueMonitoredObject.addAttribute(b2); 97 LongMonitoredAttributeBase b3 = new 98 LongMonitoredAttributeBase(MonitoringConstants.WORKQUEUE_AVERAGE_TIME_IN_QUEUE, 99 MonitoringConstants.WORKQUEUE_AVERAGE_TIME_IN_QUEUE_DESCRIPTION) { 100 public Object getValue() { 101 return new Long(WorkQueueImpl.this.averageTimeInQueue()); 102 } 103 }; 104 workqueueMonitoredObject.addAttribute(b3); 105 } 106 107 108 // Package private method to get the monitored object for this 109 // class 110 MonitoredObject getMonitoredObject() { 111 return workqueueMonitoredObject; 112 } 113 114 public synchronized void addWork(Work work) { 115 workItemsAdded++; 116 work.setEnqueueTime(System.currentTimeMillis()); 117 theWorkQueue.addLast(work); 118 ((ThreadPoolImpl)workerThreadPool).notifyForAvailableWork(this); 119 } 120 121 synchronized Work requestWork(long waitTime) throws TimeoutException, InterruptedException 122 { 123 Work workItem; 124 ((ThreadPoolImpl)workerThreadPool).incrementNumberOfAvailableThreads(); 125 126 if (theWorkQueue.size() != 0) { 127 workItem = (Work)theWorkQueue.removeFirst(); 128 totalTimeInQueue += System.currentTimeMillis() - workItem.getEnqueueTime(); 129 workItemsDequeued++; 130 ((ThreadPoolImpl)workerThreadPool).decrementNumberOfAvailableThreads(); 131 return workItem; 132 } 133 134 try { 135 136 long remainingWaitTime = waitTime; 137 long finishTime = System.currentTimeMillis() + waitTime; 138 139 do { 140 141 this.wait(remainingWaitTime); 142 143 if (theWorkQueue.size() != 0) { 144 workItem = (Work)theWorkQueue.removeFirst(); 145 totalTimeInQueue += System.currentTimeMillis() - workItem.getEnqueueTime(); 146 workItemsDequeued++; 147 ((ThreadPoolImpl)workerThreadPool).decrementNumberOfAvailableThreads(); 148 return workItem; 149 } 150 151 remainingWaitTime = finishTime - System.currentTimeMillis(); 152 153 } while (remainingWaitTime > 0); 154 155 ((ThreadPoolImpl)workerThreadPool).decrementNumberOfAvailableThreads(); 156 throw new TimeoutException(); 157 158 } catch (InterruptedException ie) { 159 ((ThreadPoolImpl)workerThreadPool).decrementNumberOfAvailableThreads(); 160 throw ie; 161 } 162 } 163 164 public void setThreadPool(ThreadPool workerThreadPool) { 165 this.workerThreadPool = workerThreadPool; 166 } 167 168 public ThreadPool getThreadPool() { 169 return workerThreadPool; 170 } 171 172 /** 173 * Returns the total number of Work items added to the Queue. 174 * This method is unsynchronized and only gives a snapshot of the 175 * state when it is called 176 */ 177 public long totalWorkItemsAdded() { 178 return workItemsAdded; 179 } 180 181 /** 182 * Returns the total number of Work items in the Queue to be processed 183 * This method is unsynchronized and only gives a snapshot of the 184 * state when it is called 185 */ 186 public int workItemsInQueue() { 187 return theWorkQueue.size(); 188 } 189 190 public synchronized long averageTimeInQueue() { 191 return (totalTimeInQueue/workItemsDequeued); 192 } 193 194 public String getName() { 195 return name; 196 } 197 } 198 199 // End of file.