001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.mq.server;
023:
024: import java.util.Iterator;
025:
026: import javax.jms.Destination;
027: import javax.jms.JMSException;
028: import javax.jms.Queue;
029: import javax.jms.TemporaryQueue;
030: import javax.jms.TemporaryTopic;
031: import javax.jms.Topic;
032:
033: import org.jboss.mq.AcknowledgementRequest;
034: import org.jboss.mq.ConnectionToken;
035: import org.jboss.mq.DurableSubscriptionID;
036: import org.jboss.mq.SpyDestination;
037: import org.jboss.mq.SpyMessage;
038: import org.jboss.mq.Subscription;
039: import org.jboss.mq.TransactionRequest;
040: import org.jboss.mq.il.jvm.JVMClientIL;
041:
042: import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
043:
044: /**
045: * A pass through Interceptor, which keeps track of when a
046: * client was last active. If a client is inactive for too long,
047: * then it is disconnected from the server.<p>
048: *
049: * This is only necessary for stateless transports like HTTP
050: *
051: * @author <a href="mailto:hchirino@jboss.org">Hiram Chirino</a>
052: * @author adrian@jboss.org
053: */
054: public class ClientMonitorInterceptor extends
055: JMSServerInterceptorSupport {
056: //The list of Clients by ConnectionTokens
057: ConcurrentReaderHashMap clients = new ConcurrentReaderHashMap();
058:
059: private static class ClientStats {
060: private long lastUsed = System.currentTimeMillis();
061:
062: boolean disconnectIfInactive = true;
063: }
064:
065: public void disconnectInactiveClients(long disconnectTime) {
066: log.debug("Checking for timedout clients.");
067: Iterator i = clients.keySet().iterator();
068: while (i.hasNext()) {
069: ConnectionToken dc = (ConnectionToken) i.next();
070: ClientStats cs = (ClientStats) clients.get(dc);
071: if (cs.disconnectIfInactive && cs.lastUsed < disconnectTime) {
072: try {
073: log
074: .debug("Disconnecting client due to inactivity timeout: "
075: + dc);
076: connectionClosing(dc);
077: } catch (Throwable e) {
078: }
079: }
080: }
081: }
082:
083: /**
084: * Peek the stats. For testing.
085: *
086: * @param dc the connection token
087: * @return the stats
088: */
089: public ClientStats peekClientStats(ConnectionToken dc) {
090: return (ClientStats) clients.get(dc);
091: }
092:
093: public ClientStats getClientStats(ConnectionToken dc)
094: throws JMSException {
095: ClientStats cq = (ClientStats) clients.get(dc);
096: if (cq != null)
097: return cq;
098:
099: // Remove any previous token with a null client id and remove it
100: if (dc.getClientID() != null) {
101: ConnectionToken withoutID = new ConnectionToken(null,
102: dc.clientIL, dc.getSessionId());
103: clients.remove(withoutID);
104: }
105:
106: cq = new ClientStats();
107:
108: // The JVM clientil does not ping.
109: if (dc.clientIL instanceof JVMClientIL)
110: cq.disconnectIfInactive = false;
111:
112: clients.put(dc, cq);
113: return cq;
114: }
115:
116: public TemporaryTopic getTemporaryTopic(ConnectionToken dc)
117: throws JMSException {
118: getClientStats(dc).lastUsed = System.currentTimeMillis();
119: return getNext().getTemporaryTopic(dc);
120: }
121:
122: public TemporaryQueue getTemporaryQueue(ConnectionToken dc)
123: throws JMSException {
124: getClientStats(dc).lastUsed = System.currentTimeMillis();
125: return getNext().getTemporaryQueue(dc);
126: }
127:
128: public void connectionClosing(ConnectionToken dc)
129: throws JMSException {
130: clients.remove(dc);
131: getNext().connectionClosing(dc);
132: }
133:
134: public void addMessage(ConnectionToken dc, SpyMessage message)
135: throws JMSException {
136: getClientStats(dc).lastUsed = System.currentTimeMillis();
137: getNext().addMessage(dc, message);
138: }
139:
140: public Queue createQueue(ConnectionToken dc, String dest)
141: throws JMSException {
142: getClientStats(dc).lastUsed = System.currentTimeMillis();
143: return getNext().createQueue(dc, dest);
144:
145: }
146:
147: public Topic createTopic(ConnectionToken dc, String dest)
148: throws JMSException {
149: getClientStats(dc).lastUsed = System.currentTimeMillis();
150: return getNext().createTopic(dc, dest);
151: }
152:
153: public void deleteTemporaryDestination(ConnectionToken dc,
154: SpyDestination dest) throws JMSException {
155: getClientStats(dc).lastUsed = System.currentTimeMillis();
156: getNext().deleteTemporaryDestination(dc, dest);
157: }
158:
159: public void transact(ConnectionToken dc, TransactionRequest t)
160: throws JMSException {
161: getClientStats(dc).lastUsed = System.currentTimeMillis();
162: getNext().transact(dc, t);
163:
164: }
165:
166: public void acknowledge(ConnectionToken dc,
167: AcknowledgementRequest item) throws JMSException {
168: getClientStats(dc).lastUsed = System.currentTimeMillis();
169: getNext().acknowledge(dc, item);
170: }
171:
172: public SpyMessage[] browse(ConnectionToken dc, Destination dest,
173: String selector) throws JMSException {
174: getClientStats(dc).lastUsed = System.currentTimeMillis();
175: return getNext().browse(dc, dest, selector);
176: }
177:
178: public SpyMessage receive(ConnectionToken dc, int subscriberId,
179: long wait) throws JMSException {
180: getClientStats(dc).lastUsed = System.currentTimeMillis();
181: return getNext().receive(dc, subscriberId, wait);
182: }
183:
184: public void setEnabled(ConnectionToken dc, boolean enabled)
185: throws JMSException {
186: getClientStats(dc).lastUsed = System.currentTimeMillis();
187: getNext().setEnabled(dc, enabled);
188: }
189:
190: public void unsubscribe(ConnectionToken dc, int subscriptionId)
191: throws JMSException {
192: getClientStats(dc).lastUsed = System.currentTimeMillis();
193: getNext().unsubscribe(dc, subscriptionId);
194: }
195:
196: public void destroySubscription(ConnectionToken dc,
197: DurableSubscriptionID id) throws JMSException {
198: getClientStats(dc).lastUsed = System.currentTimeMillis();
199: getNext().destroySubscription(dc, id);
200: }
201:
202: public void subscribe(org.jboss.mq.ConnectionToken dc,
203: org.jboss.mq.Subscription s) throws JMSException {
204:
205: getClientStats(dc).lastUsed = System.currentTimeMillis();
206: getNext().subscribe(dc, s);
207:
208: }
209:
210: public void ping(ConnectionToken dc, long clientTime)
211: throws JMSException {
212: getClientStats(dc).lastUsed = System.currentTimeMillis();
213: getNext().ping(dc, clientTime);
214: }
215:
216: public Subscription getSubscription(ConnectionToken dc,
217: int subscriberId) throws JMSException {
218: getClientStats(dc).lastUsed = System.currentTimeMillis();
219: return getNext().getSubscription(dc, subscriberId);
220:
221: }
222: }
|