001: /*
002: * hgcommons 7
003: * Hammurapi Group Common Library
004: * Copyright (C) 2003 Hammurapi Group
005: *
006: * This program is free software; you can redistribute it and/or
007: * modify it under the terms of the GNU Lesser General Public
008: * License as published by the Free Software Foundation; either
009: * version 2 of the License, or (at your option) any later version.
010: *
011: * This program is distributed in the hope that it will be useful,
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
014: * Lesser General Public License for more details.
015: *
016: * You should have received a copy of the GNU Lesser General Public
017: * License along with this library; if not, write to the Free Software
018: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
019: *
020: * URL: http://www.hammurapi.biz/hammurapi-biz/ef/xmenu/hammurapi-group/products/products/hgcommons/index.html
021: * e-Mail: support@hammurapi.biz
022: */
023: package biz.hammurapi.metrics;
024:
025: import java.util.Collection;
026: import java.util.HashMap;
027: import java.util.Iterator;
028: import java.util.LinkedList;
029: import java.util.Map;
030: import java.util.Timer;
031: import java.util.TimerTask;
032:
033: import biz.hammurapi.config.Component;
034: import biz.hammurapi.config.ConfigurationException;
035:
036: /**
037: * Slices metrics.
038: * @author Pavel Vlasov
039: * @version $Revision: 1.3 $
040: */
041: public class SlicingMeasurementConsumer implements MeasurementConsumer,
042: Component {
043:
044: private class SliceEntry {
045: String category;
046: Slice slice;
047:
048: /**
049: * @param category
050: * @param slice
051: */
052: SliceEntry(String category, Slice slice) {
053: super ();
054: this .category = category;
055: this .slice = slice;
056: }
057: }
058:
059: private Map slices = new HashMap();
060: private LinkedList sliceQueue = new LinkedList();
061: private boolean keepMeasurements = false;
062: private long tick = 60000;
063: private int maxQueue = 1000;
064: private SliceConsumer sliceConsumer = new ConsoleSliceConsumer();
065: private Timer timer;
066: private boolean isOwnTimer;
067:
068: protected SliceConsumer getSliceConsumer() {
069: return sliceConsumer;
070: }
071:
072: /**
073: * Creates a new instance with internal timer.
074: * @param tick Slice size in milliseconds
075: * @param keepMeasurements If true individual measurements are reported, only aggregated values otherwise
076: * @param maxQueue Maximum number of slices pending to be consumed. 0 - no limit. If sampling ratio is higher than
077: * consuming ration then excessive slices will be dropped with a notice on console.
078: */
079: public SlicingMeasurementConsumer(long tick,
080: boolean keepMeasurements, int maxQueue,
081: SliceConsumer sliceConsumer) {
082: this (tick, keepMeasurements, maxQueue, sliceConsumer, null);
083: }
084:
085: /**
086: * Creates a new instance with internal timer.
087: * @param tick Slice size in milliseconds
088: * @param keepMeasurements If true individual measurements are reported, only aggregated values otherwise
089: * @param maxQueue Maximum number of slices pending to be consumed. 0 - no limit. If sampling ratio is higher than
090: * consuming ration then excessive slices will be dropped with a notice on console.
091: * @param timer Timer to use for slicing metrics and passing them to slice consumer. If it is null then an internal timer is
092: * created.
093: */
094: public SlicingMeasurementConsumer(long tick,
095: boolean keepMeasurements, int maxQueue,
096: SliceConsumer sliceConsumer, Timer timer) {
097: super ();
098: this .keepMeasurements = keepMeasurements;
099: this .tick = tick;
100: this .maxQueue = maxQueue;
101: this .sliceConsumer = sliceConsumer;
102: this .timer = timer;
103: }
104:
105: /**
106: * Default constructor with default settings.
107: */
108: public SlicingMeasurementConsumer() {
109: super ();
110: }
111:
112: public void addMeasurement(String name, double value, long time) {
113: synchronized (slices) {
114: Slice slice = (Slice) slices.get(name);
115: if (slice == null) {
116: slice = new SimpleSlice(name, keepMeasurements);
117: slices.put(name, slice);
118: }
119: slice.add(value, time);
120:
121: if (slice.getTo() - slice.getFrom() >= tick) {
122: slices.remove(name);
123: addSliceToQueue(null, slice);
124: }
125: }
126: }
127:
128: private Map instances = new HashMap();
129:
130: private class CategorizedConsumer implements MeasurementConsumer,
131: Component {
132:
133: String category;
134: Map slices = new HashMap();
135:
136: /**
137: * @param category
138: */
139: public CategorizedConsumer(String category) {
140: super ();
141: this .category = category;
142: }
143:
144: public void addMeasurement(String name, double value, long time) {
145: synchronized (slices) {
146: Slice slice = (Slice) slices.get(name);
147: if (slice == null) {
148: slice = new SimpleSlice(name, keepMeasurements);
149: slices.put(name, slice);
150: }
151: slice.add(value, time);
152:
153: if (slice.getTo() - slice.getFrom() >= tick) {
154: slices.remove(name);
155: addSliceToQueue(category, slice);
156: }
157: }
158: }
159:
160: public void start() throws ConfigurationException {
161: SlicingMeasurementConsumer.this .start();
162: }
163:
164: public void stop() throws ConfigurationException {
165: SlicingMeasurementConsumer.this .stop();
166: }
167:
168: public void setOwner(Object owner) {
169: // Ignore
170: }
171: }
172:
173: /**
174: * @param category
175: * @return Instance for a category.
176: */
177: public MeasurementConsumer getCategoryInstance(final String category) {
178: synchronized (instances) {
179: MeasurementConsumer ret = (MeasurementConsumer) instances
180: .get(category);
181: if (ret == null) {
182: ret = new CategorizedConsumer(category);
183: instances.put(category, ret);
184: }
185: return ret;
186: }
187: }
188:
189: private int droppedCounter;
190: private long firstDropped;
191: private Thread slicingThread;
192:
193: public void shutdown() {
194: to = System.currentTimeMillis();
195:
196: synchronized (slices) {
197: Iterator it = slices.values().iterator();
198: while (it.hasNext()) {
199: Slice slice = (Slice) it.next();
200: it.remove();
201: addSliceToQueue(null, slice);
202: }
203: }
204:
205: synchronized (instances) {
206: Iterator iit = instances.values().iterator();
207: while (iit.hasNext()) {
208: CategorizedConsumer cConsumer = (CategorizedConsumer) iit
209: .next();
210: synchronized (cConsumer.slices) {
211: Iterator it = cConsumer.slices.values().iterator();
212: while (it.hasNext()) {
213: Slice slice = (Slice) it.next();
214: it.remove();
215: addSliceToQueue(cConsumer.category, slice);
216: }
217: }
218: }
219: }
220:
221: // MeasurementCategoryFactory.unregister(this); - Unregister explicitly!!!
222: addSliceToQueue(null, null);
223: try {
224: slicingThread.join();
225: } catch (InterruptedException e) {
226: throw new MetricsException(e);
227: }
228:
229: // Stop the time is it is our own.
230: if (isOwnTimer) {
231: timer.cancel();
232: }
233: }
234:
235: /**
236: *
237: */
238: private void addSliceToQueue(String category, final Slice slice) {
239: synchronized (sliceQueue) {
240: if (slice != null && maxQueue != 0
241: && sliceQueue.size() > maxQueue) {
242: firstDropped = slice.getTo();
243: droppedCounter++;
244: } else {
245: if (droppedCounter > 0) {
246: sliceQueue.add(new SliceEntry("DroppedSlices",
247: new Slice() {
248:
249: public long getFrom() {
250: return firstDropped;
251: }
252:
253: public long getTo() {
254: return slice.getTo();
255: }
256:
257: public int getNumber() {
258: return droppedCounter;
259: }
260:
261: public double getMin() {
262: return 0;
263: }
264:
265: public double getMax() {
266: return 0;
267: }
268:
269: public double getAvg() {
270: return 0;
271: }
272:
273: public double getTotal() {
274: return 0;
275: }
276:
277: public void add(double value, long time) {
278: throw new UnsupportedOperationException();
279: }
280:
281: public void add(Metric metric) {
282: throw new UnsupportedOperationException();
283: }
284:
285: public Collection getMeasurements() {
286: return null;
287: }
288:
289: public String getName() {
290: return "DROPPED SLICES";
291: }
292:
293: public double getDeviation() {
294: return 0;
295: }
296:
297: }));
298: droppedCounter = 0;
299: }
300:
301: sliceQueue.add(slice == null ? null : new SliceEntry(
302: category, slice));
303: sliceQueue.notifyAll();
304: }
305: }
306: }
307:
308: /**
309: *
310: */
311: private void onTick() {
312: long now = System.currentTimeMillis();
313:
314: synchronized (slices) {
315: Iterator it = slices.values().iterator();
316: while (it.hasNext()) {
317: Slice slice = (Slice) it.next();
318: if (now - slice.getFrom() >= tick) {
319: it.remove();
320: addSliceToQueue(null, slice);
321: }
322: }
323: }
324:
325: synchronized (instances) {
326: Iterator iit = instances.values().iterator();
327: while (iit.hasNext()) {
328: CategorizedConsumer cConsumer = (CategorizedConsumer) iit
329: .next();
330: synchronized (cConsumer.slices) {
331: Iterator it = cConsumer.slices.values().iterator();
332: while (it.hasNext()) {
333: Slice slice = (Slice) it.next();
334: if (now - slice.getFrom() >= tick) {
335: it.remove();
336: addSliceToQueue(cConsumer.category, slice);
337: }
338: }
339: }
340: }
341: }
342:
343: if (sliceConsumer instanceof HousekeepingSliceConsumer) {
344: ((HousekeepingSliceConsumer) sliceConsumer).onTick(now);
345: }
346: }
347:
348: {
349: slicingThread = new Thread() {
350: {
351: setName("Slice queue processor");
352: setDaemon(true);
353: setPriority(Thread.MIN_PRIORITY);
354: start();
355: }
356:
357: private SliceEntry getSliceEntry()
358: throws InterruptedException {
359: synchronized (sliceQueue) {
360: while (sliceQueue.isEmpty()) {
361: sliceQueue.wait();
362: }
363:
364: return (SliceEntry) sliceQueue.removeFirst();
365: }
366: }
367:
368: public void run() {
369: while (true) {
370: try {
371: SliceEntry entry = getSliceEntry();
372: if (entry == null) {
373: if (sliceConsumer instanceof Component) {
374: try {
375: ((Component) sliceConsumer).stop();
376: } catch (ConfigurationException e1) {
377: e1.printStackTrace();
378: }
379: }
380: return;
381: }
382:
383: // Put entry back to queue if slice consumer is unable to consume
384: // and sleep.
385: if (!sliceConsumer.consumeSlice(entry.category,
386: entry.slice)) {
387: synchronized (sliceQueue) {
388: sliceQueue.add(entry);
389: }
390: sleep(tick);
391: }
392: } catch (InterruptedException e) {
393: return;
394: }
395: }
396: }
397: };
398: }
399:
400: private int useCounter;
401:
402: protected long from;
403: protected long to;
404:
405: private TimerTask tickTask;
406:
407: /**
408: * Increments use counter
409: */
410: public void start() throws ConfigurationException {
411: if (timer == null) {
412: timer = new Timer(true);
413: isOwnTimer = true;
414: }
415:
416: tickTask = new TimerTask() {
417: public void run() {
418: onTick();
419: }
420: };
421:
422: timer.scheduleAtFixedRate(tickTask, tick, tick);
423:
424: if (useCounter == 0) {
425: from = System.currentTimeMillis();
426: if (sliceConsumer instanceof Component) {
427: ((Component) sliceConsumer).start();
428: }
429: }
430:
431: ++useCounter;
432: }
433:
434: /**
435: * Decrements use counter and invokes shutdown() when counter==0
436: */
437: public void stop() throws ConfigurationException {
438: if (--useCounter == 0) {
439: shutdown();
440: }
441: }
442:
443: public void setOwner(Object owner) {
444: // Ignore
445: }
446:
447: public int getMaxQueue() {
448: return maxQueue;
449: }
450:
451: public void setMaxQueue(int maxQueue) {
452: this .maxQueue = maxQueue;
453: }
454:
455: public long getTick() {
456: return tick;
457: }
458:
459: public void setTick(long tick) {
460: this .tick = tick;
461: }
462:
463: public void setSliceConsumer(SliceConsumer sliceConsumer) {
464: this .sliceConsumer = sliceConsumer;
465: }
466:
467: public boolean isKeepMeasurements() {
468: return keepMeasurements;
469: }
470:
471: public void setKeepMeasurements(boolean keepMeasurements) {
472: this.keepMeasurements = keepMeasurements;
473: }
474:
475: }
|