001: /*
002: * %W% %E%
003: *
004: * Copyright 1990-2006 Sun Microsystems, Inc. All Rights Reserved.
005: * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER
006: *
007: * This program is free software; you can redistribute it and/or
008: * modify it under the terms of the GNU General Public License version
009: * 2 only, as published by the Free Software Foundation.
010: *
011: * This program is distributed in the hope that it will be useful, but
012: * WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
014: * General Public License version 2 for more details (a copy is
015: * included at /legal/license.txt).
016: *
017: * You should have received a copy of the GNU General Public License
018: * version 2 along with this work; if not, write to the Free Software
019: * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA
021: *
022: * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa
023: * Clara, CA 95054 or visit www.sun.com if you need additional
024: * information or have any questions.
025: */
026:
027: package com.sun.jumpimpl.process;
028:
029: import com.sun.jump.message.JUMPMessage;
030: import com.sun.jump.message.JUMPMessageHandler;
031: import com.sun.jump.message.JUMPMessageDispatcher;
032: import com.sun.jump.message.JUMPMessageDispatcherTypeException;
033: import com.sun.jump.message.JUMPTimedOutException;
034: import com.sun.jump.message.JUMPUnblockedException;
035:
036: import com.sun.jump.os.JUMPOSInterface;
037: import com.sun.jumpimpl.os.JUMPMessageQueueInterfaceImpl;
038:
039: import java.io.IOException;
040: import java.util.List;
041: import java.util.ArrayList;
042: import java.util.Map;
043: import java.util.HashMap;
044:
045: /**
046: * A generic JUMPMessageDispatcher implementation.
047: */
048: public class JUMPMessageDispatcherImpl implements JUMPMessageDispatcher {
049: // A JUMPMessageDispatcherImpl has one Listener for each
050: // messageType with a registered handler.
051: // JUMPMessageDispatcherImpl and Listener are by necessity
052: // somewhat intertwined, as explained here.
053: // JUMPMessageDispatcherImpl.register() creates Listeners on
054: // demand and adds them to listeners. Listeners are not removed
055: // by cancelRegistration(); instead, Listeners remove themselves
056: // and exit some time after all their handlers have been canceled
057: // and no other handlers have been registered. This ensures that
058: // at most one thread is ever listening for any messageType, and
059: // that no message that has a registered handler will be dropped
060: // since there will always be a Listener running for that
061: // messageType. Both JUMPMessageDispatcherImpl and Listener
062: // synchronize on lock while accessing listeners. Additionally,
063: // Listener synchronizes on lock when accessing Listener.handlers.
064: // It could synchronize on itself, but in most cases we already
065: // need to synchronize on lock, so using lock for everything is
066: // simpler. We never block while holding lock and there shouldn't
067: // be much if any contention for it.
068:
069: // A JUMPMessageDispatcherImpl has one DirectRegistration for each
070: // messageType with at least one outstanding registration. We
071: // must be careful to unreserve a messageType only when it is no
072: // longer in use and can no longer be used, otherwise the
073: // low-level code could use memory that it has freed, which would
074: // be very un-Java. With the messageType -> DirectRegistration
075: // required by the API, we need to keep a use count and do other
076: // gymnastics to ensure this.
077:
078: private static final JUMPMessageQueueInterfaceImpl jumpMessageQueueInterfaceImpl = (JUMPMessageQueueInterfaceImpl) JUMPOSInterface
079: .getInstance().getQueueInterface();
080:
081: private static JUMPMessageDispatcherImpl INSTANCE = null;
082:
083: // directRegistrations maps String messageType to DirectRegistration.
084: // Guarded by lock.
085: // Invariant: If there is a mapping from messageType to a
086: // DirectRegistration, then at least one registration is still
087: // outstanding for the messageType, otherwise no registrations
088: // are outstanding.
089:
090: private final Map directRegistrations = new HashMap();
091:
092: // listeners maps String messageType to Listener.
093: // Guarded by lock.
094: // Invariant: If there is a mapping from messageType to a Listener,
095: // then the Listener is active, otherwise there is no Listener.
096:
097: private final Map listeners = new HashMap();
098:
099: // lock guards both directRegistrations and listeners. We need
100: // one lock so we can tell whether a messageType is registered one
101: // way or the other without races.
102:
103: private final Object lock = new Object();
104:
105: public static synchronized JUMPMessageDispatcherImpl getInstance() {
106: if (INSTANCE == null) {
107: INSTANCE = new JUMPMessageDispatcherImpl();
108: }
109: return INSTANCE;
110: }
111:
112: /**
113: * Construction allowed only by getInstance().
114: */
115: private JUMPMessageDispatcherImpl() {
116: }
117:
118: public Object registerDirect(String messageType)
119: throws JUMPMessageDispatcherTypeException, IOException {
120: DirectRegistration directRegistration;
121: synchronized (lock) {
122: if (listeners.containsKey(messageType)) {
123: throw new JUMPMessageDispatcherTypeException("Type "
124: + messageType
125: + " already registered with handlers");
126: }
127:
128: directRegistration = getDirectRegistration(messageType);
129: directRegistration.incrementUseCount();
130: }
131:
132: return new DirectRegistrationToken(directRegistration);
133: }
134:
135: // Externally synchronized on lock.
136: private DirectRegistration getDirectRegistration(String messageType)
137: throws IOException {
138: DirectRegistration directRegistration = (DirectRegistration) directRegistrations
139: .get(messageType);
140:
141: if (directRegistration == null) {
142: directRegistration = new DirectRegistration(messageType);
143:
144: // Be careful to maintain our invariant (and free
145: // resources) even on OutOfMemoryError, etc.
146:
147: boolean success = false;
148: try {
149: directRegistrations
150: .put(messageType, directRegistration);
151: success = true;
152: } finally {
153: if (!success) {
154: // Free OS resources.
155: directRegistration.close();
156: }
157: }
158: }
159:
160: return directRegistration;
161: }
162:
163: public JUMPMessage waitForMessage(String messageType, long timeout)
164: throws JUMPMessageDispatcherTypeException,
165: JUMPTimedOutException, IOException {
166: DirectRegistration directRegistration;
167: synchronized (lock) {
168: directRegistration = (DirectRegistration) directRegistrations
169: .get(messageType);
170: if (directRegistration == null) {
171: throw new JUMPMessageDispatcherTypeException("Type "
172: + messageType
173: + " not registered for direct listening");
174: }
175: directRegistration.incrementUseCount();
176: }
177:
178: try {
179: return doWaitForMessage(messageType, timeout);
180: } finally {
181: directRegistration.decrementUseCountMaybeClose();
182: }
183: }
184:
185: /**
186: * @throws JUMPTimedOutException
187: * @throws JUMPUnblockedException
188: * @throws IOException
189: */
190: private JUMPMessage doWaitForMessage(String messageType,
191: long timeout) throws JUMPTimedOutException, IOException {
192: byte[] raw = jumpMessageQueueInterfaceImpl.receiveMessage(
193: messageType, timeout);
194: return new MessageImpl.Message(raw);
195: }
196:
197: /**
198: * NOTE: the handler will be called in an arbitrary thread. Use
199: * appropriate synchronization. Handlers may be called in an
200: * arbitrary order. If a handler is registered multiple times, it
201: * will be called a corresponding number of times for each
202: * message, and must be canceled a corresponding number of times.
203: */
204: public Object registerHandler(String messageType,
205: JUMPMessageHandler handler)
206: throws JUMPMessageDispatcherTypeException, IOException {
207: if (messageType == null) {
208: throw new NullPointerException("messageType can't be null");
209: }
210: if (handler == null) {
211: throw new NullPointerException("handler can't be null");
212: }
213:
214: Listener listener;
215: synchronized (lock) {
216: if (directRegistrations.containsKey(messageType)) {
217: throw new JUMPMessageDispatcherTypeException("Type "
218: + messageType
219: + " already registered for direct listening");
220: }
221:
222: listener = getListener(messageType);
223:
224: // Add the handler while synchronized on lock so that a
225: // new Listener won't exit before the handler is added.
226: // If this fails its ok, the Listener will exit soon if no
227: // other handlers are registered for it.
228:
229: listener.addHandler(handler);
230: }
231:
232: return new HandlerRegistrationToken(listener, handler);
233: }
234:
235: // Externally synchronized on lock.
236: private Listener getListener(String messageType) throws IOException {
237: Listener listener = (Listener) listeners.get(messageType);
238: if (listener == null) {
239: listener = new Listener(messageType);
240:
241: // Be careful to maintain our invariant (and free
242: // resources) even on OutOfMemoryError, etc.
243:
244: boolean success = false;
245: try {
246: listeners.put(messageType, listener);
247: listener.start();
248: success = true;
249: } finally {
250: if (!success) {
251: // Free OS resources.
252: listener.close();
253: // Remove listener from the Map. This is ok even
254: // if it was never added.
255: listeners.remove(messageType);
256: }
257: }
258: }
259: return listener;
260: }
261:
262: public void cancelRegistration(Object registrationToken)
263: throws IOException {
264: ((RegistrationToken) registrationToken).cancelRegistration();
265: }
266:
267: private interface RegistrationToken {
268: void cancelRegistration() throws IOException;
269: }
270:
271: private static class HandlerRegistrationToken implements
272: RegistrationToken {
273: private final Listener listener;
274: private final JUMPMessageHandler handler;
275:
276: // Don't allow cancelRegistration() to be called twice.
277: private boolean canceled = false;
278:
279: public HandlerRegistrationToken(Listener listener,
280: JUMPMessageHandler handler) {
281: this .listener = listener;
282: this .handler = handler;
283: }
284:
285: public void cancelRegistration() throws IOException {
286: synchronized (this ) {
287: if (canceled) {
288: throw new IllegalStateException(
289: "Registration has already been canceled.");
290: }
291: canceled = true;
292: }
293:
294: listener.removeHandler(handler);
295: }
296: }
297:
298: private static class DirectRegistrationToken implements
299: RegistrationToken {
300: private final DirectRegistration directRegistration;
301:
302: // Don't allow cancelRegistration() to be called twice.
303: private boolean canceled = false;
304:
305: public DirectRegistrationToken(
306: DirectRegistration directRegistration) {
307: this .directRegistration = directRegistration;
308: }
309:
310: public void cancelRegistration() {
311: synchronized (this ) {
312: if (canceled) {
313: throw new IllegalStateException(
314: "Registration has already been canceled.");
315: }
316: canceled = true;
317: }
318: directRegistration.decrementUseCountMaybeClose();
319: }
320: }
321:
322: private class DirectRegistration {
323: private final String messageType;
324:
325: // useCount is incremented for every direct registration of
326: // messageType and when a message receive begins, and
327: // decremented when the registration is canceled or a message
328: // read is finished. When the count falls to zero, the
329: // low-level resources are freed, and the directRegistrations
330: // mapping is removed, therefore this DirectRegistration can
331: // never be used again to access the (freed) low-level
332: // resources.
333:
334: private int useCount = 0;
335:
336: public DirectRegistration(String messageType)
337: throws IOException {
338: this .messageType = messageType;
339: // Make sure we've got a receive queue for the messageType.
340: jumpMessageQueueInterfaceImpl.reserve(messageType);
341: }
342:
343: // Externally synchronized on lock.
344: public void incrementUseCount() {
345: useCount++;
346: }
347:
348: public void decrementUseCountMaybeClose() {
349: synchronized (lock) {
350: useCount--;
351: if (useCount == 0) {
352: close();
353: directRegistrations.remove(messageType);
354: }
355: }
356: }
357:
358: public void close() {
359: // Tell the low-level code we're done with the message queue.
360: jumpMessageQueueInterfaceImpl.unreserve(messageType);
361: }
362: }
363:
364: /*
365: * Frequently asked questions:
366: * 1. Why doesn't Listener extend Thread? Extending Thread would
367: * put lots of unnecessary and inappropriate methods into its
368: * API. It should keep control over those things to itself.
369: * 2. How about implementing Runnable then? The fact that Listener
370: * uses a Thread and/or Runnable is an implementation detail
371: * and shouldn't be exposed in its API. The inner class
372: * implementing Runnable keeps the implementation private.
373: * Only those methods intended to be called from outside the
374: * class itself are public.
375: * 3. How can we get the thread to exit when it's blocking in
376: * JUMPMessageReceiveQueue.receiveMessage()?
377: * There are three choices:
378: * 1. Make JUMPMessageReceiveQueue.receiveMessage() interruptible,
379: * and interrupt the thread. We probably don't want to go there.
380: * 2. Send a message that the thread will see and exit on.
381: * This isn't as easy as it sounds since sending messages
382: * may fail, e.g., if the Listener is processing messages
383: * slowly and its queue has filled up. But in that case
384: * it should exit after reading one of the "real" messages
385: * whether our sentinel is sent/received or not.
386: * 3. Periodically time out and check for exit. We do this,
387: * it's simple and effective and doesn't need any extra low-level
388: * support such as interrupt handling, although it doesn't stop
389: * the thread immediately, and requires the thread to wake up
390: * periodically.
391: */
392: private class Listener {
393: // Guarded by this.
394: private final List handlers = new ArrayList();
395:
396: private final String messageType;
397:
398: public Listener(String messageType) throws IOException {
399: this .messageType = messageType;
400: // Make sure we've got a receive queue for the messageType.
401: jumpMessageQueueInterfaceImpl.reserve(messageType);
402: }
403:
404: // Externally synchronized on lock.
405: public void addHandler(JUMPMessageHandler handler) {
406: handlers.add(handler);
407: }
408:
409: public void removeHandler(JUMPMessageHandler handler)
410: throws IOException {
411: synchronized (lock) {
412: handlers.remove(handler);
413: if (handlers.isEmpty()) {
414: // Wake up the listening thread so it can exit if
415: // it finds handlers is still empty.
416: jumpMessageQueueInterfaceImpl.unblock(messageType);
417: }
418: }
419: }
420:
421: public void start() {
422: Thread thread = new Thread(new Runnable() {
423: public void run() {
424: try {
425: listen();
426: } finally {
427: close();
428: }
429: }
430: });
431: thread.setName(this .getClass().getName() + ": "
432: + messageType);
433: thread.setDaemon(true);
434: thread.start();
435: }
436:
437: public void close() {
438: // Tell the low-level code we're done with the message queue.
439: jumpMessageQueueInterfaceImpl.unreserve(messageType);
440: }
441:
442: private void listen() {
443: // FIXME We should either log Errors and RuntimeExceptions
444: // and continue, or cleanup and make sure they're thrown.
445: while (true) {
446: try {
447: JUMPMessage msg = doWaitForMessage(messageType, 0L);
448: dispatchMessage(msg);
449: } catch (JUMPUnblockedException e) {
450: // This is normal. It's time to check for exit.
451: } catch (JUMPTimedOutException e) {
452: // This shouldn't happen. Handle like IOException.
453: } catch (IOException e) {
454: // Unexpected exception.
455: e.printStackTrace();
456: }
457: synchronized (lock) {
458: if (handlers.isEmpty()) {
459: // Remove ourselves from the map and exit.
460: listeners.remove(messageType);
461: break;
462: }
463: }
464: }
465: }
466:
467: // NOTE: Handlers should not be called while holding our
468: // monitor since it can lead to inadvertent deadlocks.
469: // However, not synchronizing on "this" here can result in
470: // handlers being called even after they've been removed.
471: // This is a generally accepted hazard of patterns like this.
472:
473: private void dispatchMessage(JUMPMessage msg) {
474: JUMPMessageHandler[] handlersSnapshot;
475:
476: // Get a snapsot with the lock held.
477:
478: synchronized (lock) {
479: handlersSnapshot = (JUMPMessageHandler[]) handlers
480: .toArray(new JUMPMessageHandler[handlers.size()]);
481: }
482:
483: // Call handlers with the lock released.
484:
485: for (int i = 0; i < handlersSnapshot.length; i++) {
486: JUMPMessageHandler handler = handlersSnapshot[i];
487: try {
488: handler.handleMessage(msg);
489: } catch (RuntimeException e) {
490: e.printStackTrace();
491: }
492: }
493: }
494: }
495: }
|