001: package chat.business;
002:
003: import java.util.Vector;
004: import java.util.Enumeration;
005:
006: /**
007: * This class manages the contents of a discussion. Messages may be added
008: * at any time. There is an option limit on the size of the message queue,
009: * if so the oldest messages are deleted as new ones arrive. <P>
010: *
011: * Optionally the method startHarvester() may be called, once, to start
012: * a background thread that enforces an age limit on messages. If this
013: * method is not called, then there will be no age limit. <P>
014: *
015: * This class implements what I call the "on hold push" algorithm.
016: * The server (this class) maintains some state (a list of messages).
017: * Clients (ContentsPresentation.po and the web browser) request snapshots of this state.
018: * If the state has changed
019: * since the last time the client asked for a snapshot, then a new
020: * snapshot is sent to the client right away. If the state has not changed
021: * since the last time the client asked for a snapshot, then the request
022: * for a snapshot blocks until the state changes (either by a new message
023: * being added, or messages being deleted). As soon as the client has
024: * processed the snapshot it gets, it immediatly requests a new snapshot. <P>
025: *
026: * This has the following advantages:
027: *
028: * As soon as new information is available, it is sent immediatly to
029: * all interested parties.
030: *
031: * Since a whole snapshot is sent every time, the problem of having
032: * to do "diffs" between the old and new states is avoided. As is the
033: * problem of how to recover when this mechanism breaks down.
034: *
035: * Only standard HTTP is used, so the method is compatible with firewalls,
036: * proxies, all browsers, and even, in the case of an applet,
037: * paranoid security policies. (HTTP requests are commonly
038: * blocked on a read in real life. But usually it is a filesystem read,
039: * and the delay is much shorter.) For an example of this, try running
040: * this application in the Enhydra Multiserver, and then add a WAI
041: * connection. Your requests are being tunneled via CORBA from the
042: * Netscape server to the Multiserver, yet the application runs fine.
043: *
044: * Because the interested parties all have already established a socket
045: * connection, and are poised and ready to read data, they recieve the
046: * new state the instant it changes, pretty much as quickly as is possible.
047: *
048: * It is simple, robust and effective.
049: *
050: * This has the following disadvantages:
051: *
052: * If you have N users, you will have N open socket connections. This
053: * method does not scale well to very large values of N. If N is too large,
054: * some users will experience network errors, and their automatic
055: * reloading will stop. See your operating system for details on the
056: * maximum number of open sockets, and the optional NumThreads setting
057: * for connections in the Enhydra Multiserver config file.
058: *
059: * The whole state is sent every time. This uses more bandwidth than
060: * methods that only send deltas to the state.
061: *
062: * If the clients stop asking for snapshots, there is no way (from the
063: * server) to do anything about it.
064: *
065: * In order to tell if a client has a current copy of the state of things,
066: * timestamps are used. Every time the state changes, a unique identifier
067: * is assigned to the new current state (using time). The important thing
068: * is that the identifiers uniquely identify their state. Time is just
069: * one way of generating unique identifiers. Every time the client asks
070: * for a snapshot, it also sends the identifier for the state it currently
071: * has (there must be a special identifier used for the first request, when
072: * the client has no data. In this program it sends 0). If the id sent with
073: * the request matches, then the request blocks until the state changes.
074: *
075: * In real life most connections to the server use TCP/IP, which imposes
076: * a timeout limit on how long requests can block on a read.
077: * More importantly, web browsers will give up after a short period of time.
078: * Therefore, when a request is sent in, in addition to the current id a
079: * time limit is sent. This tells the server the maximum number of seconds
080: * the client is willing to wait (in this program 60 seconds is used).
081: * If this time limit expires, and the state has not yet changed, the
082: * server must return an answer to the client. In this program a snapshot
083: * is returned, even though the browser already has the same data
084: * (if the client were an applet, a special "no change" response could be
085: * sent). If a client wants to force an update, or wants to do a
086: * non-blocking read for some other reason, then it simply sets the timeout
087: * to zero.
088: *
089: *
090: * Static methods are used because there is only one chat room. This
091: * program's main goal is to show off a simple yet "real" Enhydra application.
092: */
093: public class Discussion implements Runnable {
094:
095: // The current list of Message objects, in the order they were recieved.
096: static private Vector contents = new Vector();
097:
098: // The total number of messages added to this discussion.
099: static private long totalReceived = 0;
100:
101: // The unique identifier for the current state.
102: static private long lastStateChangeTime = System
103: .currentTimeMillis();
104:
105: // The current number of clients blocked on a read.
106: static private int numWaiting = 0;
107:
108: // Only create at most one instance. This is only used by the thread.
109: static private Discussion singleton;
110:
111: // The thread that deletes old messages. This will only be started if
112: // startHarvester() is called (which is optional).
113: static private Thread harvester;
114:
115: // Default settings for the thread. Will be overwritten by startHarvester().
116: static private int lifetimeSec = 300;
117: static private int intervalSec = 10;
118:
119: // Maximum allowed number of messages. 0 means no limit.
120: public static int maxQueueSize = 200;
121:
122: /**
123: * Do not allow instances. Use the static methods.
124: * Only one instance is ever created, and that is just for the thread.
125: */
126: private Discussion() {
127: }
128:
129: /**
130: * Add a message to the discussion. Any waiting clients will be
131: * instantly notified.
132: */
133: public static void addMessage(String name, String text) {
134: MessageImpl msg = new MessageImpl(name, text);
135: synchronized (contents) {
136: // Add it to the list.
137: totalReceived++;
138: contents.addElement(msg);
139: // If there are too many, delete the oldest ones.
140: while ((maxQueueSize > 0)
141: && (contents.size() > maxQueueSize))
142: contents.removeElementAt(0);
143: // Pick a new unique state identifier.
144: // Wake up all the clients who are blocked on read.
145: updateCurrentState();
146: }
147: }
148:
149: /**
150: * Create a new unique identifier for the current state.
151: * Then wake up all the clients who are blocked on read.
152: * This uses the current time, but it could be based on the content, or
153: * a counter. The important thing is that the new identifier is one
154: * that has never been used before.
155: */
156: static private void updateCurrentState() {
157: long now = System.currentTimeMillis();
158: synchronized (contents) {
159: // Be sure that the new time will be at least 1 msec greater
160: // than the old time.
161: if (now <= lastStateChangeTime)
162: // Extremely rare.
163: lastStateChangeTime++;
164: else
165: // Most of the time.
166: lastStateChangeTime = now;
167: // Wake up any pending readers.
168: contents.notifyAll();
169: }
170: }
171:
172: /**
173: * Throw out all the current messages.
174: * All the waiting clients will be immediatly notified.
175: */
176: static public void clear() {
177: synchronized (contents) {
178: contents.removeAllElements();
179: updateCurrentState();
180: }
181: }
182:
183: /**
184: * How many messages have been recieved in total?
185: */
186: static public long getTotalReceived() {
187: return totalReceived;
188: }
189:
190: /**
191: * How many messages are there right now in the list?
192: */
193: static public long getCurrentSize() {
194: return contents.size();
195: }
196:
197: /**
198: * Get a snapshot of the current state. Might block.
199: *
200: * @param currentState
201: * The state identifier that was returned last time this was called.
202: * This is the id of the state that the client currently has. It is
203: * asking for a snapshot of the state after things change and
204: * this id is not longer current. If this has already happened, the
205: * call will immediatly return. If it has not happned, the call will
206: * block (not return) until things change.
207: * @param wait
208: * The maxmimum number of seconds this call is allowed to block for.
209: * Send 0 for an instant response. If the call blocks and then
210: * runs out of time, a snapshot is returned.
211: * @return
212: * A Snapshot object. This is just a way to return two things at
213: * once: a Vector of Message objects and a state identifier.
214: * The client should use the state identifier for the next call to
215: * this method. The client should call this method again as soon as
216: * it is done displaying the results.
217: */
218: public static SnapshotImpl getContents(long currentState, long wait) {
219: waitForNewMessage(currentState, wait);
220: return new SnapshotImpl((Vector) contents.clone(),
221: lastStateChangeTime);
222: }
223:
224: /**
225: * Helper function. Only does the waiting. Waits till browserState
226: * is not the current state, or for wait seconds, whichever comes
227: * first. This function will return immediatly if no waiting is
228: * needed. While waiting the thread is sleeping, not kept running
229: * in a loop (which would waste processor time).
230: */
231: private static void waitForNewMessage(long browserState, long wait) {
232: if ((browserState != lastStateChangeTime) || (wait <= 0))
233: // Already new data to report.
234: return;
235: long now = System.currentTimeMillis();
236: long giveUpTime = now + (wait * 1000);
237: // We loop here because the wait might return prematurely.
238: while (browserState == lastStateChangeTime) {
239: long leftToGo = giveUpTime - now;
240: if (leftToGo <= 0)
241: // Either the wait time was 0 or we ran out of time.
242: break;
243: synchronized (contents) {
244: // Keep track of how many threads are waiting. This is done
245: // by dead reckoning, so care must be taken to make sure the
246: // counter doesn't get off.
247: numWaiting++;
248: try {
249: contents.wait(leftToGo);
250: } catch (InterruptedException e) {
251: } finally {
252: numWaiting--;
253: }
254: }
255: now = System.currentTimeMillis();
256: }
257: }
258:
259: /**
260: * How may clients are blocked on a read? This is, effectivly, the
261: * number of people participating in the discussion.
262: */
263: public static int getNumWaiting() {
264: return numWaiting;
265: }
266:
267: /**
268: * Internal use only. This is provided only for the thread that deletes
269: * messages that are too old.
270: */
271: public void run() {
272: while (true) {
273: /*
274: * Sleep for a while.
275: */
276: long now = System.currentTimeMillis();
277: long wakeUp = now + (intervalSec * 1000);
278: while (now < wakeUp) {
279: long leftToGo = wakeUp - now;
280: if (leftToGo <= 0)
281: break;
282: try {
283: Thread.sleep(leftToGo);
284: } catch (InterruptedException e) {
285: }
286: now = System.currentTimeMillis();
287: }
288: /*
289: * Now make a pass through the message list and delete any
290: * that are too old. Keep track of whether or not we actually
291: * delete any.
292: */
293: boolean dirty = false;
294: synchronized (contents) {
295: long expires = System.currentTimeMillis()
296: - (1000 * lifetimeSec);
297: Enumeration e = contents.elements();
298: while (e.hasMoreElements()) {
299: MessageImpl msg = (MessageImpl) e.nextElement();
300: if (msg == null)
301: continue;
302: if (msg.getWhen() < expires) {
303: // We found an old one! Remove it.
304: contents.removeElement(msg);
305: dirty = true;
306: }
307: }
308: }
309: /*
310: * If we just changed the contents of the list, then we
311: * need to get a new state identifier and wake up all the
312: * clients.
313: */
314: if (dirty) {
315: synchronized (contents) {
316: updateCurrentState();
317: }
318: }
319: // Go back to sleep...
320: }
321: }
322:
323: /**
324: * Call this function (only once) if you want to start the
325: * harverter thread. It runs in the background and periodically
326: * deletes messages that are too old. Most of the time the thread
327: * is sleeping. If this method is not called, then no age limit on
328: * messages will be enforced. <P>
329: *
330: * The longest a message can live is lifetimeSec + intervalSec seconds
331: * (in the extreme case). <P>
332: *
333: * The default value for the interval is a little shorter than the
334: * default value for the browser's timeout. This is so that if the
335: * browser is left idle (and messages start being deleted), the
336: * refresh cycle will sync up with this threads interval, and
337: * fewer updates will happen (delete, delete, delete... instead of
338: * timout, delete, timeout, delete, timeout, delete...). It's not
339: * a big deal, but hey...
340: *
341: * @param lifetimeSec
342: * How long should messages be kept for (seconds).
343: * @param
344: */
345: static public void startHarvester(int lifetimeSec, int intervalSec) {
346: // Be sure we don't start two threads.
347: if ((harvester == null) || !harvester.isAlive()) {
348: Discussion.lifetimeSec = lifetimeSec;
349: Discussion.intervalSec = intervalSec;
350: if (singleton == null)
351: singleton = new Discussion();
352: harvester = new Thread(singleton);
353: harvester.start();
354: }
355: }
356:
357: static public void stopHarvester() {
358: if ((harvester != null) || harvester.isAlive()) {
359: harvester.stop();
360: }
361:
362: }
363:
364: }
|