001: /*
002: * Copyright (C) The MX4J Contributors.
003: * All rights reserved.
004: *
005: * This software is distributed under the terms of the MX4J License version 1.0.
006: * See the terms of the MX4J License in the documentation provided with this software.
007: */
008:
009: package mx4j.remote;
010:
011: import java.io.IOException;
012: import java.util.Arrays;
013: import java.util.HashMap;
014: import java.util.LinkedList;
015: import java.util.List;
016: import java.util.Map;
017:
018: import javax.management.Notification;
019: import javax.management.NotificationFilter;
020: import javax.management.NotificationListener;
021: import javax.management.ObjectName;
022: import javax.management.remote.NotificationResult;
023: import javax.management.remote.TargetedNotification;
024:
025: import mx4j.log.Log;
026: import mx4j.log.Logger;
027:
028: /**
029: * Base implementation of the RemoteNotificationServerHandler interface.
030: *
031: * @version $Revision: 1.12 $
032: */
033: public class DefaultRemoteNotificationServerHandler implements
034: RemoteNotificationServerHandler {
035: private static int listenerID;
036:
037: private final NotificationListener listener;
038: private final Map tuples = new HashMap();
039: private final NotificationBuffer buffer;
040: private volatile boolean closed;
041:
042: /**
043: * Creates a new remote notification server handler.
044: *
045: * @param environment Contains environment variables used to configure this handler
046: * @see MX4JRemoteConstants#NOTIFICATION_BUFFER_CAPACITY
047: * @see MX4JRemoteConstants#NOTIFICATION_PURGE_DISTANCE
048: */
049: public DefaultRemoteNotificationServerHandler(Map environment) {
050: listener = new ServerListener();
051: buffer = new NotificationBuffer(environment);
052: }
053:
054: public Integer generateListenerID(ObjectName name,
055: NotificationFilter filter) {
056: synchronized (DefaultRemoteNotificationServerHandler.class) {
057: return new Integer(++listenerID);
058: }
059: }
060:
061: public NotificationListener getServerNotificationListener() {
062: return listener;
063: }
064:
065: public void addNotificationListener(Integer id,
066: NotificationTuple tuple) {
067: if (closed)
068: return;
069: synchronized (tuples) {
070: tuples.put(id, tuple);
071: }
072: }
073:
074: public NotificationTuple removeNotificationListener(Integer id) {
075: if (closed)
076: return null;
077: synchronized (tuples) {
078: return (NotificationTuple) tuples.remove(id);
079: }
080: }
081:
082: public NotificationResult fetchNotifications(long sequenceNumber,
083: int maxNotifications, long timeout) throws IOException {
084: if (closed)
085: throw new IOException(
086: "RemoteNotificationServerHandler is closed");
087: return buffer.getNotifications(sequenceNumber,
088: maxNotifications, timeout);
089: }
090:
091: public NotificationTuple[] close() {
092: Logger logger = getLogger();
093: closed = true;
094: stopWaitingForNotifications(buffer);
095: synchronized (tuples) {
096: NotificationTuple[] result = (NotificationTuple[]) tuples
097: .values().toArray(
098: new NotificationTuple[tuples.size()]);
099: tuples.clear();
100: if (logger.isEnabledFor(Logger.DEBUG))
101: logger
102: .debug("RemoteNotificationServerHandler closed, returning: "
103: + Arrays.asList(result));
104: return result;
105: }
106: }
107:
108: /**
109: * When a connection is closed, it may be possible that a client RMI call is waiting in
110: * {@link #waitForNotifications}, so here we wake it up, letting the thread return to the
111: * client and free resources on client's side.
112: *
113: * @param lock The object on which {@link #notifyAll} should be called
114: */
115: private void stopWaitingForNotifications(Object lock) {
116: synchronized (lock) {
117: lock.notifyAll();
118: }
119: }
120:
121: /**
122: * Called when there are no notifications to send to the client.
123: * It is guaranteed that no notification can be added before this method waits on the given lock.
124: * It should wait on the given lock for the specified timeout, and return true
125: * to send notifications (if no notifications arrived, an empty notification array
126: * will be returned to the client), or false if no notifications should be sent to
127: * the client.
128: *
129: * @param lock The object on which {@link #wait} should be called
130: * @param timeout The amount of time to wait (guaranteed to be strictly greater than 0)
131: */
132: protected boolean waitForNotifications(Object lock, long timeout) {
133: Logger logger = getLogger();
134: long start = 0;
135: if (logger.isEnabledFor(Logger.DEBUG)) {
136: logger
137: .debug("Waiting for notifications " + timeout
138: + " ms");
139: start = System.currentTimeMillis();
140: }
141:
142: synchronized (lock) {
143: try {
144: lock.wait(timeout);
145: } catch (InterruptedException x) {
146: Thread.currentThread().interrupt();
147: }
148: }
149:
150: if (logger.isEnabledFor(Logger.DEBUG)) {
151: long elapsed = System.currentTimeMillis() - start;
152: logger.debug("Waited for notifications " + elapsed + " ms");
153: }
154:
155: return true;
156: }
157:
158: /**
159: * This method filters the given notification array and returns a possibly smaller array containing
160: * only notifications that passed successfully the filtering.
161: * Default behavior is no filtering, but subclasses may choose to change this bahavior.
162: * For example, for RMI, one can assure that all notifications are truly serializable, and log those
163: * that are not.
164: */
165: protected TargetedNotification[] filterNotifications(
166: TargetedNotification[] notifications) {
167: return notifications;
168: }
169:
170: private void addNotification(Integer id, Notification notification) {
171: buffer.add(new TargetedNotification(notification, id));
172: }
173:
174: protected Logger getLogger() {
175: return Log.getLogger(getClass().getName());
176: }
177:
178: private class ServerListener implements NotificationListener {
179: public void handleNotification(Notification notification,
180: Object handback) {
181: Integer id = (Integer) handback;
182: addNotification(id, notification);
183: }
184: }
185:
186: private class NotificationBuffer {
187: private final List notifications = new LinkedList();
188: private int maxCapacity;
189: private int purgeDistance;
190: private long firstSequence;
191: private long lastSequence;
192: private long lowestExpectedSequence = -1;
193:
194: private NotificationBuffer(Map environment) {
195: if (environment != null) {
196: try {
197: Integer maxCapacityInteger = (Integer) environment
198: .get(MX4JRemoteConstants.NOTIFICATION_BUFFER_CAPACITY);
199: if (maxCapacityInteger != null)
200: maxCapacity = maxCapacityInteger.intValue();
201: } catch (Exception ignored) {
202: }
203:
204: try {
205: Integer purgeDistanceInteger = (Integer) environment
206: .get(MX4JRemoteConstants.NOTIFICATION_PURGE_DISTANCE);
207: if (purgeDistanceInteger != null)
208: purgeDistance = purgeDistanceInteger.intValue();
209: } catch (Exception ignored) {
210: }
211: }
212: if (maxCapacity <= 0)
213: maxCapacity = 1024;
214: if (purgeDistance <= 0)
215: purgeDistance = 128;
216: }
217:
218: private int getSize() {
219: synchronized (this ) {
220: return notifications.size();
221: }
222: }
223:
224: private void add(TargetedNotification notification) {
225: Logger logger = getLogger();
226: synchronized (this ) {
227: if (notifications.size() == maxCapacity) {
228: if (logger.isEnabledFor(Logger.DEBUG))
229: logger.debug("Notification buffer full: "
230: + this );
231: removeRange(0, 1);
232: }
233: notifications.add(notification);
234: ++lastSequence;
235: if (logger.isEnabledFor(Logger.DEBUG))
236: logger.debug("Notification added to buffer: "
237: + this );
238: notifyAll();
239: }
240: }
241:
242: private void removeRange(int start, int end) {
243: synchronized (this ) {
244: notifications.subList(start, end).clear();
245: firstSequence += end - start;
246: }
247: }
248:
249: private long getFirstSequenceNumber() {
250: synchronized (this ) {
251: return firstSequence;
252: }
253: }
254:
255: private long getLastSequenceNumber() {
256: synchronized (this ) {
257: return lastSequence;
258: }
259: }
260:
261: private NotificationResult getNotifications(
262: long sequenceNumber, int maxNotifications, long timeout) {
263: Logger logger = getLogger();
264: synchronized (this ) {
265: NotificationResult result = null;
266: int size = 0;
267: if (sequenceNumber < 0) {
268: // We loose the notifications between addNotificationListener() and fetchNotifications(), but c'est la vie.
269: long sequence = getLastSequenceNumber();
270: size = new Long(sequence + 1).intValue();
271: result = new NotificationResult(
272: getFirstSequenceNumber(), sequence,
273: new TargetedNotification[0]);
274: if (lowestExpectedSequence < 0)
275: lowestExpectedSequence = sequence;
276: if (logger.isEnabledFor(Logger.DEBUG))
277: logger.debug("First fetchNotification call: "
278: + this + ", returning " + result);
279: } else {
280: long firstSequence = getFirstSequenceNumber();
281:
282: int losts = 0;
283: int start = new Long(sequenceNumber - firstSequence)
284: .intValue();
285: // In the time between 2 fetches the buffer may have overflew, so that start < 0.
286: // It simply mean that we send the first notification we have (start = 0),
287: // and the client will emit a notification lost event.
288: if (start < 0) {
289: losts = -start;
290: start = 0;
291: }
292:
293: List sublist = null;
294: boolean send = false;
295: while (size == 0) {
296: int end = notifications.size();
297: if (end - start > maxNotifications)
298: end = start + maxNotifications;
299:
300: sublist = notifications.subList(start, end);
301: size = sublist.size();
302:
303: if (closed || send)
304: break;
305:
306: if (size == 0) {
307: if (timeout <= 0)
308: break;
309: if (logger.isEnabledFor(Logger.DEBUG))
310: logger
311: .debug("No notifications to send, waiting "
312: + timeout + " ms");
313:
314: // We wait for notifications to arrive. Since we release the lock on the buffer
315: // other threads can modify it. To avoid ConcurrentModificationException we compute
316: // again the sublist by coming up back to the while statement
317: send = waitForNotifications(this , timeout);
318: }
319: }
320:
321: TargetedNotification[] notifications = (TargetedNotification[]) sublist
322: .toArray(new TargetedNotification[size]);
323: notifications = filterNotifications(notifications);
324: result = new NotificationResult(firstSequence,
325: sequenceNumber + losts + size,
326: notifications);
327: if (logger.isEnabledFor(Logger.DEBUG))
328: logger
329: .debug("Non-first fetchNotification call: "
330: + this
331: + ", returning "
332: + result);
333:
334: int purged = purgeNotifications(sequenceNumber,
335: size);
336: if (logger.isEnabledFor(Logger.DEBUG))
337: logger.debug("Purged " + purged
338: + " notifications: " + this );
339: }
340: return result;
341: }
342: }
343:
344: private int purgeNotifications(long sequenceNumber, int size) {
345: // Record the lowest expected sequence number sent by the client.
346: // New clients will always have an initial big sequence number
347: // (they're initialized with getLastSequenceNumber()), while old
348: // clients can have lesser sequence numbers.
349: // Here we record the lesser of these sequence numbers, that is the
350: // sequence number of the oldest notification any client may ever ask.
351: // This way we can purge old notifications that have already been
352: // delivered to clients.
353:
354: // The worst case is when a client has a long interval between fetchNotifications()
355: // calls, and another client has a short interval. The lowestExpectedSequence will
356: // grow with the second client, until a purge happens, so the first client can
357: // loose notifications. By tuning appropriately the purgeDistance and the interval
358: // between fetchNotifications() calls, it should never happen.
359:
360: int result = 0;
361: synchronized (this ) {
362: if (sequenceNumber <= lowestExpectedSequence) {
363: long lowest = Math.min(lowestExpectedSequence,
364: sequenceNumber);
365:
366: long firstSequence = getFirstSequenceNumber();
367: if (lowest - firstSequence > purgeDistance) {
368: // Purge only half of the old notifications, for safety
369: int purgeSize = purgeDistance >> 1;
370: removeRange(0, purgeSize);
371: result = purgeSize;
372: }
373:
374: long expected = Math.max(sequenceNumber + size,
375: firstSequence);
376: lowestExpectedSequence = expected;
377: }
378: }
379: return result;
380: }
381:
382: public String toString() {
383: StringBuffer buffer = new StringBuffer(
384: "NotificationBuffer@");
385: buffer.append(Integer.toHexString(hashCode())).append("[");
386: buffer.append("first=").append(getFirstSequenceNumber())
387: .append(", ");
388: buffer.append("last=").append(getLastSequenceNumber())
389: .append(", ");
390: buffer.append("size=").append(getSize()).append(", ");
391: buffer.append("lowestExpected=").append(
392: lowestExpectedSequence).append(", ");
393: buffer.append("maxCapacity=").append(maxCapacity).append(
394: ", ");
395: buffer.append("purgeDistance=").append(purgeDistance)
396: .append("]");
397: return buffer.toString();
398: }
399: }
400: }
|