001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: *
017: */
018:
019: /////////////////////////////////////////
020: ////////
021: //////// This code is mostly unused at present
022: //////// it seems that only notifyListeners()
023: //////// is used.
024: ////////
025: //////// However, it does look useful.
026: //////// And it may one day be used...
027: ////////
028: /////////////////////////////////////////
029: package org.apache.jmeter.threads;
030:
031: import java.util.Iterator;
032: import java.util.List;
033:
034: import org.apache.commons.collections.Buffer;
035: import org.apache.commons.collections.BufferUtils;
036: import org.apache.commons.collections.buffer.UnboundedFifoBuffer;
037: import org.apache.jmeter.samplers.SampleEvent;
038: import org.apache.jmeter.samplers.SampleListener;
039: import org.apache.jmeter.testbeans.TestBeanHelper;
040: import org.apache.jmeter.testelement.TestElement;
041: import org.apache.jorphan.logging.LoggingManager;
042: import org.apache.log.Logger;
043:
044: /**
045: * The <code>ListenerNotifier</code> thread is responsible for performing
046: * asynchronous notifications that a sample has occurred. Each time a sample
047: * occurs, the <code>addLast</code> method should be called to add the sample
048: * and its list of listeners to the notification queue. This thread will then
049: * notify those listeners asynchronously at some future time.
050: * <p>
051: * In the current implementation, the notifications will be made in batches,
052: * with 2 seconds between the beginning of successive batches. If the notifier
053: * thread starts to get behind, the priority of the thread will be increased in
054: * an attempt to help it to keep up.
055: *
056: * @see org.apache.jmeter.samplers.SampleListener
057: *
058: * @version $Revision: 493779 $
059: */
060: public class ListenerNotifier {
061: private static Logger log = LoggingManager.getLoggerForClass();
062:
063: /**
064: * The number of milliseconds between batches of notifications.
065: */
066: private static final int SLEEP_TIME = 2000;
067:
068: /**
069: * Indicates whether or not this thread should remain running. The thread
070: * will continue running after this field is set to false until the next
071: * batch of notifications has been completed and the notification queue is
072: * empty.
073: */
074: private boolean running = true;
075:
076: /**
077: * Indicates whether or not this thread has stopped. No further
078: * notifications will be performed.
079: */
080: private boolean isStopped = true;
081:
082: /**
083: * The queue containing the notifications to be performed. Each notification
084: * consists of a pair of entries in this queue. The first is the
085: * {@link org.apache.jmeter.samplers.SampleEvent SampleEvent} representing
086: * the sample. The second is a List of
087: * {@link org.apache.jmeter.samplers.SampleListener SampleListener}s which
088: * should be notified.
089: */
090: private Buffer listenerEvents = BufferUtils
091: .synchronizedBuffer(new UnboundedFifoBuffer());
092:
093: /**
094: * Stops the ListenerNotifier thread. The thread will continue processing
095: * any events remaining in the notification queue before it actually stops,
096: * but this method will return immediately.
097: */
098: public void stop() {
099: running = false;
100: }
101:
102: /**
103: * Indicates whether or not the thread has stopped. This will not return
104: * true until the <code>stop</code> method has been called and any
105: * remaining notifications in the queue have been completed.
106: *
107: * @return true if the ListenerNotifier has completely stopped, false
108: * otherwise
109: */
110: public boolean isStopped() {
111: return isStopped;
112: }
113:
114: /**
115: * Process the events in the notification queue until the thread has been
116: * told to stop and the notification queue is empty.
117: * <p>
118: * In the current implementation, this method will iterate continually until
119: * the thread is told to stop. In each iteration it will process any
120: * notifications that are in the queue at the beginning of the iteration,
121: * and then will sleep until it is time to start the next batch. As long as
122: * the thread is keeping up, each batch should start 2 seconds after the
123: * beginning of the last batch. This exact behavior is subject to change.
124: */
125: public void run() {
126: boolean isMaximumPriority = false;
127: int normalCount = 0;
128:
129: while (running) {
130: long startTime = System.currentTimeMillis();
131: processNotifications();
132: long sleep = SLEEP_TIME
133: - (System.currentTimeMillis() - startTime);
134:
135: // If the thread has been told to stop then we shouldn't sleep
136: if (!running) {
137: break;
138: }
139:
140: if (sleep < 0) {
141: isMaximumPriority = true;
142: normalCount = 0;
143: if (log.isInfoEnabled()) {
144: log
145: .info("ListenerNotifier exceeded maximum "
146: + "notification time by "
147: + (-sleep) + "ms");
148: }
149: boostPriority();
150: } else {
151: normalCount++;
152:
153: // If there have been three consecutive iterations since the
154: // last iteration which took too long to execute, return the
155: // thread to normal priority.
156: if (isMaximumPriority && normalCount >= 3) {
157: isMaximumPriority = false;
158: unboostPriority();
159: }
160:
161: if (log.isDebugEnabled()) {
162: log.debug("ListenerNotifier sleeping for " + sleep
163: + "ms");
164: }
165:
166: try {
167: Thread.sleep(sleep);
168: } catch (InterruptedException e) {
169: }
170: }
171: }
172:
173: // Make sure that all pending notifications are processed before
174: // actually ending the thread.
175: processNotifications();
176: isStopped = true;
177: }
178:
179: /**
180: * Process all of the pending notifications. Only the samples which are in
181: * the queue when this method is called will be processed. Any samples added
182: * between the time when this method is called and when it exits are saved
183: * for the next batch.
184: */
185: private void processNotifications() {
186: int listenerEventsSize = listenerEvents.size();
187: if (log.isDebugEnabled()) {
188: log.debug("ListenerNotifier: processing "
189: + listenerEventsSize + " events");
190: }
191:
192: while (listenerEventsSize > 0) {
193: // Since this is a FIFO and this is the only place we remove
194: // from it (only from a single thread) we don't have to remove
195: // these two items in one atomic operation. Each individual
196: // remove is atomic (because we use a synchronized buffer),
197: // which is necessary since the buffer can be accessed from
198: // other threads (to add things to the buffer).
199: SampleEvent res = (SampleEvent) listenerEvents.remove();
200: List listeners = (List) listenerEvents.remove();
201:
202: notifyListeners(res, listeners);
203:
204: listenerEventsSize -= 2;
205: }
206: }
207:
208: /**
209: * Boost the priority of the current thread to maximum priority. If the
210: * thread is already at maximum priority then this will have no effect.
211: */
212: private void boostPriority() {
213: if (Thread.currentThread().getPriority() != Thread.MAX_PRIORITY) {
214: log
215: .info("ListenerNotifier: Boosting thread priority to maximum.");
216: Thread.currentThread().setPriority(Thread.MAX_PRIORITY);
217: }
218: }
219:
220: /**
221: * Return the priority of the current thread to normal. If the thread is
222: * already at normal priority then this will have no effect.
223: */
224: private void unboostPriority() {
225: if (Thread.currentThread().getPriority() != Thread.NORM_PRIORITY) {
226: log
227: .info("ListenerNotifier: Returning thread priority to normal.");
228: Thread.currentThread().setPriority(Thread.NORM_PRIORITY);
229: }
230: }
231:
232: /**
233: * Notify a list of listeners that a sample has occurred.
234: *
235: * @param res
236: * the sample event that has occurred. Must be non-null.
237: * @param listeners
238: * a list of the listeners which should be notified. This list
239: * must not be null and must contain only SampleListener
240: * elements.
241: */
242: public void notifyListeners(SampleEvent res, List listeners) {
243: Iterator iter = listeners.iterator();
244: while (iter.hasNext()) {
245: try {
246: SampleListener sampleListener = ((SampleListener) iter
247: .next());
248: TestBeanHelper.prepare((TestElement) sampleListener);
249: sampleListener.sampleOccurred(res);
250: } catch (RuntimeException e) {
251: log.error("Detected problem in Listener: ", e);
252: log.info("Continuing to process further listeners");
253: }
254: }
255: }
256:
257: /**
258: * Add a new sample event to the notification queue. The notification will
259: * be performed asynchronously and this method will return immediately.
260: *
261: * @param item
262: * the sample event that has occurred. Must be non-null.
263: * @param listeners
264: * a list of the listeners which should be notified. This list
265: * must not be null and must contain only SampleListener
266: * elements.
267: */
268: public void addLast(SampleEvent item, List listeners) {
269: // Must use explicit synchronization here so that the item and
270: // listeners are added together atomically
271: synchronized (listenerEvents) {
272: listenerEvents.add(item);
273: listenerEvents.add(listeners);
274: }
275: }
276: }
|