001: /*
002: * Copyright (c) 2001 by Matt Welsh and The Regents of the University of
003: * California. All rights reserved.
004: *
005: * Permission to use, copy, modify, and distribute this software and its
006: * documentation for any purpose, without fee, and without written agreement is
007: * hereby granted, provided that the above copyright notice and the following
008: * two paragraphs appear in all copies of this software.
009: *
010: * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011: * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012: * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013: * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014: *
015: * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016: * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017: * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
018: * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019: * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020: *
021: * Author: Matt Welsh <mdw@cs.berkeley.edu>
022: *
023: */
024:
025: package seda.sandStorm.internal;
026:
027: import seda.sandStorm.api.*;
028: import seda.sandStorm.api.internal.*;
029: import java.util.*;
030:
031: /**
032: * Used as a proxy to observe and measure communication behavior between
033: * stages. By handing out a SinkProxy instead of a FiniteQueue, it is
034: * possible to gather statistics on event communication between stages.
035: * This is used by StageGraph to construct a graph of the communication
036: * patterns between stages.
037: *
038: * @author Matt Welsh
039: */
040: public class SinkProxy implements SinkIF, ProfilableIF {
041:
042: private static final boolean DEBUG = false;
043:
044: private ManagerIF mgr;
045: private StageWrapperIF toStage;
046: private StageGraph stageGraph;
047: public SinkIF thesink;
048: private Thread client = null;
049: private Hashtable clientTbl = null;
050:
051: /**
052: * Maintains a running sum of the number of elements enqueued onto
053: * this sink.
054: */
055: public int enqueueCount;
056:
057: /**
058: * Maintains a running sum of the number of elements successfully
059: * enqueued onto this sink (that is, not rejected by the enqueue predicate).
060: */
061: public int enqueueSuccessCount;
062:
063: /**
064: * Used to maintain a timer for statistics gathering.
065: */
066: public long timer;
067:
068: /**
069: * Create a SinkProxy for the given sink.
070: *
071: * @param sink The sink to create a proxy for.
072: * @param mgr The associated manager.
073: * @param toStage The stage which this sink pushes events to.
074: */
075: public SinkProxy(SinkIF sink, ManagerIF mgr, StageWrapperIF toStage) {
076: this .thesink = sink;
077: this .mgr = mgr;
078: this .stageGraph = mgr.getProfiler().getGraphProfiler();
079: this .toStage = toStage;
080: this .enqueueCount = 0;
081: this .enqueueSuccessCount = 0;
082: this .timer = 0;
083: }
084:
085: /**
086: * Return the size of the queue.
087: */
088: public int size() {
089: if (thesink == null)
090: return 0;
091: return thesink.size();
092: }
093:
094: public void enqueue(QueueElementIF enqueueMe) throws SinkException {
095: recordUse();
096: enqueueCount++;
097: thesink.enqueue(enqueueMe);
098: enqueueSuccessCount++;
099: }
100:
101: public boolean enqueue_lossy(QueueElementIF enqueueMe) {
102: recordUse();
103: enqueueCount++;
104: boolean pass = thesink.enqueue_lossy(enqueueMe);
105: if (pass)
106: enqueueSuccessCount++;
107: return pass;
108: }
109:
110: public void enqueue_many(QueueElementIF[] enqueueMe)
111: throws SinkException {
112: recordUse();
113: if (enqueueMe != null) {
114: enqueueCount += enqueueMe.length;
115: }
116: thesink.enqueue_many(enqueueMe);
117: if (enqueueMe != null) {
118: enqueueSuccessCount += enqueueMe.length;
119: }
120: }
121:
122: /**
123: * Return the profile size of the queue.
124: */
125: public int profileSize() {
126: return size();
127: }
128:
129: public Object enqueue_prepare(QueueElementIF enqueueMe[])
130: throws SinkException {
131: recordUse();
132: if (enqueueMe != null) {
133: enqueueCount += enqueueMe.length;
134: }
135: Object key = thesink.enqueue_prepare(enqueueMe);
136: if (enqueueMe != null) {
137: enqueueSuccessCount += enqueueMe.length;
138: }
139: return key;
140: }
141:
142: public void enqueue_commit(Object key) {
143: thesink.enqueue_commit(key);
144: }
145:
146: public void enqueue_abort(Object key) {
147: thesink.enqueue_abort(key);
148: }
149:
150: public void setEnqueuePredicate(EnqueuePredicateIF pred) {
151: thesink.setEnqueuePredicate(pred);
152: }
153:
154: public EnqueuePredicateIF getEnqueuePredicate() {
155: return thesink.getEnqueuePredicate();
156: }
157:
158: public String toString() {
159: return "[SinkProxy for toStage=" + toStage + "]";
160: }
161:
162: private void recordUse() {
163: if (DEBUG)
164: System.err.println("SinkProxy: Recording use of " + this
165: + " by thread " + Thread.currentThread());
166:
167: if (client == null) {
168: client = Thread.currentThread();
169:
170: StageGraphEdge edge = new StageGraphEdge();
171: edge.fromStage = stageGraph.getStageFromThread(client);
172: edge.toStage = toStage;
173: edge.sink = this ;
174: stageGraph.addEdge(edge);
175:
176: } else {
177: Thread t = Thread.currentThread();
178: if (client != t) {
179: if (clientTbl == null)
180: clientTbl = new Hashtable();
181: if (clientTbl.get(t) == null) {
182: clientTbl.put(t, t);
183:
184: StageGraphEdge edge = new StageGraphEdge();
185: edge.fromStage = stageGraph.getStageFromThread(t);
186: edge.toStage = toStage;
187: edge.sink = this;
188: stageGraph.addEdge(edge);
189: }
190: }
191: }
192: }
193:
194: }
|