001: /*
002: * Copyright (c) 1998 - 2005 Versant Corporation
003: * All rights reserved. This program and the accompanying materials
004: * are made available under the terms of the Eclipse Public License v1.0
005: * which accompanies this distribution, and is available at
006: * http://www.eclipse.org/legal/epl-v10.html
007: *
008: * Contributors:
009: * Versant Corporation - initial API and implementation
010: */
011: package com.versant.core.metric;
012:
013: import com.versant.core.common.Debug;
014:
015: import java.util.*;
016:
017: import com.versant.core.common.BindingSupportImpl;
018:
019: /**
020: * Ring buffer for performance metric samples. The data is stored in a set
021: * of parallel arrays one for each metric, one for the dates and one for
022: * unique sample ID. All the HasMetric sources are polled at regular
023: * intervals by a background thread.
024: */
025: public final class MetricSnapshotStore implements Runnable {
026:
027: private HasMetrics[] sources;
028: private BaseMetric[] baseMetrics;
029: private List otherMetrics = new ArrayList();
030: private Metric[] all;
031: private int capacity;
032: private Date[] dates; // date for each set of samples
033: private int[] ids; // unique ID for each set of samples
034: private int[][] buf;
035: private int pos; // where the next samples are stored in buf
036: private int count;
037: private boolean locked;
038: private int lastID;
039:
040: private Thread snapshotThread;
041: private boolean run;
042: private int sampleIntervalMs;
043: private long nextSampleTime;
044:
045: public MetricSnapshotStore(int capacity, int sampleIntervalMs) {
046: this .capacity = capacity;
047: this .sampleIntervalMs = sampleIntervalMs;
048: dates = new Date[capacity];
049: ids = new int[capacity];
050: }
051:
052: /**
053: * Add a new source of metrics.
054: */
055: public void addSource(HasMetrics source) {
056: lock();
057: try {
058: if (sources == null) {
059: sources = new HasMetrics[] { source };
060: } else {
061: HasMetrics[] a = new HasMetrics[sources.length + 1];
062: System.arraycopy(sources, 0, a, 0, sources.length);
063: a[sources.length] = source;
064: sources = a;
065: }
066:
067: ArrayList list = new ArrayList();
068: source.addMetrics(list);
069:
070: // extract the base metrics, sort by name, add them + set indexes
071: int n = list.size();
072: ArrayList base = new ArrayList(n);
073: for (int i = 0; i < n; i++) {
074: Object o = list.get(i);
075: if (o instanceof BaseMetric) {
076: base.add(o);
077: }
078: }
079: Collections.sort(base);
080: int firstNewBase;
081: int baseSize = base.size();
082: if (baseMetrics == null || baseMetrics.length == 0) {
083: firstNewBase = 0;
084: baseMetrics = new BaseMetric[baseSize];
085: base.toArray(baseMetrics);
086: } else {
087: firstNewBase = baseMetrics.length;
088: BaseMetric[] a = new BaseMetric[firstNewBase + baseSize];
089: System.arraycopy(baseMetrics, 0, a, 0, firstNewBase);
090: for (int i = 0; i < baseSize; i++) {
091: a[firstNewBase + i] = (BaseMetric) base.get(i);
092: }
093: baseMetrics = a;
094: }
095: for (int i = firstNewBase; i < baseMetrics.length; i++) {
096: baseMetrics[i].setIndex(i);
097: }
098:
099: // add more arrays for samples to buf
100: if (buf == null) {
101: buf = new int[baseMetrics.length][];
102: } else {
103: int[][] a = new int[baseMetrics.length][];
104: System.arraycopy(buf, 0, a, 0, firstNewBase);
105: buf = a;
106: }
107: for (int i = firstNewBase; i < buf.length; i++) {
108: buf[i] = new int[capacity];
109: }
110:
111: // extract the non-base metrics and add them
112: ArrayList other = new ArrayList();
113: for (int i = 0; i < n; i++) {
114: Object o = list.get(i);
115: if (!(o instanceof BaseMetric)) {
116: other.add(o);
117: }
118: }
119: Collections.sort(other);
120: otherMetrics.addAll(other);
121: int otherSize = otherMetrics.size();
122: all = new Metric[baseMetrics.length + otherSize];
123: System
124: .arraycopy(baseMetrics, 0, all, 0,
125: baseMetrics.length);
126: for (int i = 0; i < otherSize; i++) {
127: all[baseMetrics.length + i] = (Metric) otherMetrics
128: .get(i);
129: }
130: } finally {
131: unlock();
132: }
133: }
134:
135: /**
136: * Start our background snapshot thread if not already done.
137: */
138: public void start(String id) {
139: if (snapshotThread == null) {
140: run = true;
141: snapshotThread = new Thread(this , "VOA Metric Store " + id);
142: snapshotThread.setDaemon(true);
143: snapshotThread.start();
144: }
145: }
146:
147: /**
148: * Begin adding a new set of samples and return the index in buf to
149: * modify to update the data. Readers are locked out until endUpdate
150: * is invoked. A unique ID is assigned to the set of samples and the
151: * current Date is recorded. The caller must use getBuf and the index
152: * return by this method to fill in the sample data.
153: * @see #endUpdate()
154: */
155: private int beginUpdate() {
156: lock();
157: ids[pos] = ++lastID;
158: dates[pos] = new Date();
159: return pos;
160: }
161:
162: /**
163: * Finish updating samples.
164: * @see #beginUpdate()
165: */
166: private void endUpdate() {
167: pos = (pos + 1) % capacity;
168: if (count < capacity)
169: count++;
170: unlock();
171: }
172:
173: /**
174: * Return only when we have acquired the lock.
175: */
176: private synchronized void lock() {
177: for (; locked;) {
178: try {
179: wait();
180: } catch (InterruptedException e) {
181: // ignore
182: }
183: }
184: locked = true;
185: }
186:
187: /**
188: * Release the lock so other threads can get access.
189: */
190: private synchronized void unlock() {
191: if (Debug.DEBUG) {
192: if (!locked) {
193: throw BindingSupportImpl.getInstance().internal(
194: "unlock() called with locked == false");
195: }
196: }
197: locked = false;
198: notify();
199: }
200:
201: public int getCapacity() {
202: return capacity;
203: }
204:
205: /**
206: * Set the maximum number of samples to store in the buffer.
207: */
208: public void setCapacity(int max) {
209: lock();
210: try {
211: MetricSnapshotPacket old = getNewSnapshotsImp(0);
212: capacity = max;
213: buf = new int[baseMetrics.length][];
214: for (int i = 0; i < baseMetrics.length; i++)
215: buf[i] = new int[capacity];
216: dates = new Date[capacity];
217: ids = new int[capacity];
218: if (old != null) {
219: int n = old.getSize();
220: if (n > max)
221: n = max;
222: int first = old.getSize() - n;
223: for (int i = 0; i < baseMetrics.length; i++) {
224: System.arraycopy(old.getBuf()[i], first, buf[i], 0,
225: n);
226: }
227: System.arraycopy(old.getDates(), first, dates, 0, n);
228: System.arraycopy(old.getIds(), first, ids, 0, n);
229: count = n;
230: pos = n % capacity;
231: } else {
232: count = 0;
233: pos = 0;
234: }
235: } finally {
236: unlock();
237: }
238: }
239:
240: /**
241: * Get the samples added since the set of samples with unique ID of id.
242: * Returns null if none. Use an ID of 0 to get all samples.
243: */
244: public MetricSnapshotPacket getNewSnapshots(int id) {
245: lock();
246: try {
247: return getNewSnapshotsImp(id);
248: } finally {
249: unlock();
250: }
251: }
252:
253: private MetricSnapshotPacket getNewSnapshotsImp(int id) {
254: if (count < capacity) {
255: int first;
256: for (first = pos - 1; first >= 0; first--) {
257: if (ids[first] == id)
258: break;
259: }
260: first++;
261: int n = pos - first;
262: if (n == 0)
263: return null;
264: int[][] sbuf = new int[baseMetrics.length][];
265: for (int i = 0; i < baseMetrics.length; i++) {
266: sbuf[i] = new int[n];
267: System.arraycopy(buf[i], first, sbuf[i], 0, n);
268: }
269: Date[] dbuf = new Date[n];
270: System.arraycopy(dates, first, dbuf, 0, n);
271: int[] ibuf = new int[n];
272: System.arraycopy(ids, first, ibuf, 0, n);
273: return new MetricSnapshotPacket(dbuf, ibuf, sbuf);
274: } else {
275: if (ids[(pos + capacity - 1) % capacity] == id)
276: return null;
277: int first = pos;
278: int c = capacity;
279: for (; c > 0; first = (first + 1) % capacity, c--) {
280: if (ids[first] == id)
281: break;
282: }
283: first = (first + 1) % capacity;
284: if (first >= pos) {
285: int h1 = capacity - first;
286: int h2 = pos;
287: int n = h1 + h2;
288: int[][] sbuf = new int[baseMetrics.length][];
289: for (int i = 0; i < baseMetrics.length; i++) {
290: sbuf[i] = new int[n];
291: System.arraycopy(buf[i], first, sbuf[i], 0, h1);
292: System.arraycopy(buf[i], 0, sbuf[i], h1, h2);
293: }
294: Date[] dbuf = new Date[n];
295: System.arraycopy(dates, first, dbuf, 0, h1);
296: System.arraycopy(dates, 0, dbuf, h1, h2);
297: int[] ibuf = new int[n];
298: System.arraycopy(ids, first, ibuf, 0, h1);
299: System.arraycopy(ids, 0, ibuf, h1, h2);
300: return new MetricSnapshotPacket(dbuf, ibuf, sbuf);
301: } else {
302: int n = pos - first;
303: int[][] sbuf = new int[baseMetrics.length][];
304: for (int i = 0; i < baseMetrics.length; i++) {
305: sbuf[i] = new int[n];
306: System.arraycopy(buf[i], first, sbuf[i], 0, n);
307: }
308: Date[] dbuf = new Date[n];
309: System.arraycopy(dates, first, dbuf, 0, n);
310: int[] ibuf = new int[n];
311: System.arraycopy(ids, first, ibuf, 0, n);
312: return new MetricSnapshotPacket(dbuf, ibuf, sbuf);
313: }
314: }
315: }
316:
317: /**
318: * Get the most recent performance metric snapshot.
319: */
320: public MetricSnapshotPacket getMostRecentSnapshot(int lastId) {
321: lock();
322: try {
323: if (count == 0)
324: return null;
325: int p = (pos + capacity - 1) % capacity;
326: if (ids[p] == lastId)
327: return null;
328: int[][] sbuf = new int[baseMetrics.length][];
329: for (int i = 0; i < baseMetrics.length; i++) {
330: sbuf[i] = new int[] { buf[i][p] };
331: }
332: return new MetricSnapshotPacket(new Date[] { dates[p] },
333: new int[] { ids[p] }, sbuf);
334: } finally {
335: unlock();
336: }
337: }
338:
339: public int getSampleIntervalMs() {
340: return sampleIntervalMs;
341: }
342:
343: public void setSampleIntervalMs(int sampleIntervalMs) {
344: if (sampleIntervalMs < 100)
345: sampleIntervalMs = 100;
346: int diff = sampleIntervalMs - this .sampleIntervalMs;
347: if (diff != 0) {
348: this .sampleIntervalMs = sampleIntervalMs;
349: nextSampleTime += diff;
350: snapshotThread.interrupt();
351: }
352: ;
353: }
354:
355: public void shutdown() {
356: if (snapshotThread != null) {
357: run = false;
358: snapshotThread.interrupt();
359: snapshotThread = null;
360: }
361: }
362:
363: public void run() {
364: nextSampleTime = System.currentTimeMillis();
365: nextSampleTime += sampleIntervalMs - nextSampleTime
366: % sampleIntervalMs;
367: for (; run;) {
368: long now = System.currentTimeMillis();
369: long diff = nextSampleTime - now;
370: if (diff <= 0) {
371: if (sources != null) {
372: int pos = beginUpdate();
373: try {
374: for (int i = 0; i < sources.length; i++) {
375: sources[i].sampleMetrics(buf, pos);
376: }
377: } finally {
378: endUpdate();
379: }
380: }
381: for (;;) {
382: nextSampleTime += sampleIntervalMs;
383: diff = nextSampleTime - now;
384: if (diff > 0)
385: break;
386: }
387: }
388: try {
389: Thread.sleep(diff);
390: } catch (InterruptedException e) {
391: // ignore
392: }
393: }
394: }
395:
396: public Metric[] getMetrics() {
397: lock();
398: try {
399: return all;
400: } finally {
401: unlock();
402: }
403: }
404:
405: }
|