001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.mq.il.oil;
023:
024: import java.io.BufferedInputStream;
025: import java.io.BufferedOutputStream;
026:
027: import java.io.ObjectInputStream;
028: import java.io.ObjectOutputStream;
029: import java.net.InetAddress;
030:
031: import java.net.Socket;
032: import javax.jms.Destination;
033:
034: import javax.jms.JMSException;
035: import javax.jms.Queue;
036: import javax.jms.TemporaryQueue;
037: import javax.jms.TemporaryTopic;
038: import javax.jms.Topic;
039: import javax.net.SocketFactory;
040:
041: import org.jboss.logging.Logger;
042: import org.jboss.mq.AcknowledgementRequest;
043: import org.jboss.mq.ConnectionToken;
044: import org.jboss.mq.DurableSubscriptionID;
045: import org.jboss.mq.SpyDestination;
046: import org.jboss.mq.SpyMessage;
047: import org.jboss.mq.TransactionRequest;
048: import org.jboss.mq.il.ServerIL;
049:
050: /**
051: * The JVM implementation of the ServerIL object
052: *
053: * @author Hiram Chirino (Cojonudo14@hotmail.com)
054: * @author Norbert Lataille (Norbert.Lataille@m4x.org)
055: * @version $Revision: 57198 $
056: * @created August 16, 2001
057: */
058: public final class OILServerIL implements java.io.Serializable,
059: java.lang.Cloneable, org.jboss.mq.il.ServerIL {
060: static final long serialVersionUID = 5576846920031604128L;
061: private static Logger log = Logger.getLogger(OILServerIL.class);
062:
063: /** The org.jboss.mq.il.oil2.localAddr system property allows a client to
064: *define the local interface to which its sockets should be bound
065: */
066: private final static String LOCAL_ADDR = "org.jboss.mq.il.oil.localAddr";
067: /** The org.jboss.mq.il.oil2.localPort system property allows a client to
068: *define the local port to which its sockets should be bound
069: */
070: private final static String LOCAL_PORT = "org.jboss.mq.il.oil.localPort";
071:
072: /** The server host name/IP to connect to
073: */
074: private InetAddress addr;
075: /** The server port to connect to.
076: */
077: private int port;
078: /** The name of the class implementing the javax.net.SocketFactory to
079: * use for creating the client socket.
080: */
081: private String socketFactoryName;
082:
083: /**
084: * If the TcpNoDelay option should be used on the socket.
085: */
086: private boolean enableTcpNoDelay = false;
087: /** The local interface name/IP to use for the client
088: */
089: private transient InetAddress localAddr;
090: /** The local port to use for the client
091: */
092: private transient int localPort;
093:
094: /**
095: * Description of the Field
096: */
097: private transient ObjectInputStream in;
098:
099: /**
100: * Description of the Field
101: */
102: private transient ObjectOutputStream out;
103:
104: /**
105: * Description of the Field
106: */
107: private transient Socket socket;
108:
109: /**
110: * Constructor for the OILServerIL object
111: *
112: * @param a Description of Parameter
113: * @param port Description of Parameter
114: */
115: public OILServerIL(InetAddress addr, int port,
116: String socketFactoryName, boolean enableTcpNoDelay) {
117: this .addr = addr;
118: this .port = port;
119: this .socketFactoryName = socketFactoryName;
120: this .enableTcpNoDelay = enableTcpNoDelay;
121: }
122:
123: /**
124: * Sets the ConnectionToken attribute of the OILServerIL object
125: *
126: * @param dest The new ConnectionToken value
127: * @exception Exception Description of Exception
128: */
129: public synchronized void setConnectionToken(ConnectionToken dest)
130: throws Exception {
131: checkConnection();
132: out.writeByte(OILConstants.SET_SPY_DISTRIBUTED_CONNECTION);
133: out.writeObject(dest);
134: waitAnswer();
135: }
136:
137: /**
138: * Sets the Enabled attribute of the OILServerIL object
139: *
140: * @param dc The new Enabled value
141: * @param enabled The new Enabled value
142: * @exception JMSException Description of Exception
143: * @exception Exception Description of Exception
144: */
145: public synchronized void setEnabled(ConnectionToken dc,
146: boolean enabled) throws JMSException, Exception {
147: checkConnection();
148: out.writeByte(OILConstants.SET_ENABLED);
149: out.writeBoolean(enabled);
150: waitAnswer();
151: }
152:
153: /**
154: * Gets the ID attribute of the OILServerIL object
155: *
156: * @return The ID value
157: * @exception Exception Description of Exception
158: */
159: public synchronized String getID() throws Exception {
160: checkConnection();
161: out.writeByte(OILConstants.GET_ID);
162: return (String) waitAnswer();
163: }
164:
165: /**
166: * Gets the TemporaryQueue attribute of the OILServerIL object
167: *
168: * @param dc Description of Parameter
169: * @return The TemporaryQueue value
170: * @exception JMSException Description of Exception
171: * @exception Exception Description of Exception
172: */
173: public synchronized TemporaryQueue getTemporaryQueue(
174: ConnectionToken dc) throws JMSException, Exception {
175: checkConnection();
176: out.writeByte(OILConstants.GET_TEMPORARY_QUEUE);
177: return (TemporaryQueue) waitAnswer();
178: }
179:
180: /**
181: * Gets the TemporaryTopic attribute of the OILServerIL object
182: *
183: * @param dc Description of Parameter
184: * @return The TemporaryTopic value
185: * @exception JMSException Description of Exception
186: * @exception Exception Description of Exception
187: */
188: public synchronized TemporaryTopic getTemporaryTopic(
189: ConnectionToken dc) throws JMSException, Exception {
190: checkConnection();
191: out.writeByte(OILConstants.GET_TEMPORARY_TOPIC);
192: return (TemporaryTopic) waitAnswer();
193: }
194:
195: /**
196: * #Description of the Method
197: *
198: * @param dc Description of Parameter
199: * @param item Description of Parameter
200: * @exception JMSException Description of Exception
201: * @exception Exception Description of Exception
202: */
203: public synchronized void acknowledge(ConnectionToken dc,
204: AcknowledgementRequest item) throws JMSException, Exception {
205: checkConnection();
206: out.writeByte(OILConstants.ACKNOWLEDGE);
207: item.writeExternal(out);
208: waitAnswer();
209: }
210:
211: /**
212: * Adds a feature to the Message attribute of the OILServerIL object
213: *
214: * @param dc The feature to be added to the Message attribute
215: * @param val The feature to be added to the Message attribute
216: * @exception Exception Description of Exception
217: */
218: public synchronized void addMessage(ConnectionToken dc,
219: SpyMessage val) throws Exception {
220: checkConnection();
221: out.writeByte(OILConstants.ADD_MESSAGE);
222: SpyMessage.writeMessage(val, out);
223: waitAnswer();
224: }
225:
226: /**
227: * #Description of the Method
228: *
229: * @param dc Description of Parameter
230: * @param dest Description of Parameter
231: * @param selector Description of Parameter
232: * @return Description of the Returned Value
233: * @exception JMSException Description of Exception
234: * @exception Exception Description of Exception
235: */
236: public synchronized SpyMessage[] browse(ConnectionToken dc,
237: Destination dest, String selector) throws JMSException,
238: Exception {
239: checkConnection();
240: out.writeByte(OILConstants.BROWSE);
241: out.writeObject(dest);
242: out.writeObject(selector);
243: return (SpyMessage[]) waitAnswer();
244: }
245:
246: /**
247: * #Description of the Method
248: *
249: * @param ID Description of Parameter
250: * @exception JMSException Description of Exception
251: * @exception Exception Description of Exception
252: */
253: public synchronized void checkID(String ID) throws JMSException,
254: Exception {
255: checkConnection();
256: out.writeByte(OILConstants.CHECK_ID);
257: out.writeObject(ID);
258: waitAnswer();
259: }
260:
261: /**
262: * #Description of the Method
263: *
264: * @param userName Description of Parameter
265: * @param password Description of Parameter
266: * @return Description of the Returned Value
267: * @exception JMSException Description of Exception
268: * @exception Exception Description of Exception
269: */
270: public synchronized String checkUser(String userName,
271: String password) throws JMSException, Exception {
272: checkConnection();
273: out.writeByte(OILConstants.CHECK_USER);
274: out.writeObject(userName);
275: out.writeObject(password);
276: return (String) waitAnswer();
277: }
278:
279: /**
280: * #Description of the Method
281: *
282: * @param userName Description of Parameter
283: * @param password Description of Parameter
284: * @return Description of the Returned Value
285: * @exception JMSException Description of Exception
286: * @exception Exception Description of Exception
287: */
288: public synchronized String authenticate(String userName,
289: String password) throws JMSException, Exception {
290: checkConnection();
291: out.writeByte(OILConstants.AUTHENTICATE);
292: out.writeObject(userName);
293: out.writeObject(password);
294: return (String) waitAnswer();
295: }
296:
297: /**
298: * #Description of the Method
299: *
300: * @return Description of the Returned Value
301: * @exception CloneNotSupportedException Description of Exception
302: */
303: public Object clone() throws CloneNotSupportedException {
304: return super .clone();
305: }
306:
307: /**
308: * Need to clone because there are instance variables tha can get clobbered.
309: * All Multiple connections can NOT share the same JVMServerIL object
310: *
311: * @return Description of the Returned Value
312: * @exception Exception Description of Exception
313: */
314: public ServerIL cloneServerIL() throws Exception {
315: return (ServerIL) clone();
316: }
317:
318: /**
319: * #Description of the Method
320: *
321: * @param dc Description of Parameter
322: * @exception JMSException Description of Exception
323: * @exception Exception Description of Exception
324: */
325: public synchronized void connectionClosing(ConnectionToken dc)
326: throws JMSException, Exception {
327: try {
328: checkConnection();
329: out.writeByte(OILConstants.CONNECTION_CLOSING);
330: waitAnswer();
331: } finally {
332: destroyConnection();
333: }
334: }
335:
336: /**
337: * #Description of the Method
338: *
339: * @param dc Description of Parameter
340: * @param dest Description of Parameter
341: * @return Description of the Returned Value
342: * @exception JMSException Description of Exception
343: * @exception Exception Description of Exception
344: */
345: public synchronized Queue createQueue(ConnectionToken dc,
346: String dest) throws JMSException, Exception {
347: checkConnection();
348: out.writeByte(OILConstants.CREATE_QUEUE);
349: out.writeObject(dest);
350: return (Queue) waitAnswer();
351: }
352:
353: /**
354: * #Description of the Method
355: *
356: * @param dc Description of Parameter
357: * @param dest Description of Parameter
358: * @return Description of the Returned Value
359: * @exception JMSException Description of Exception
360: * @exception Exception Description of Exception
361: */
362: public synchronized Topic createTopic(ConnectionToken dc,
363: String dest) throws JMSException, Exception {
364: checkConnection();
365: out.writeByte(OILConstants.CREATE_TOPIC);
366: out.writeObject(dest);
367: return (Topic) waitAnswer();
368: }
369:
370: /**
371: * #Description of the Method
372: *
373: * @param dc Description of Parameter
374: * @param dest Description of Parameter
375: * @exception JMSException Description of Exception
376: * @exception Exception Description of Exception
377: */
378: public synchronized void deleteTemporaryDestination(
379: ConnectionToken dc, SpyDestination dest)
380: throws JMSException, Exception {
381: checkConnection();
382: out.writeByte(OILConstants.DELETE_TEMPORARY_DESTINATION);
383: out.writeObject(dest);
384: waitAnswer();
385: }
386:
387: /**
388: * #Description of the Method
389: *
390: * @param id Description of Parameter
391: * @exception JMSException Description of Exception
392: * @exception Exception Description of Exception
393: */
394: public synchronized void destroySubscription(ConnectionToken dc,
395: DurableSubscriptionID id) throws JMSException, Exception {
396: checkConnection();
397: out.writeByte(OILConstants.DESTROY_SUBSCRIPTION);
398: out.writeObject(id);
399: waitAnswer();
400: }
401:
402: /**
403: * #Description of the Method
404: *
405: * @param dc Description of Parameter
406: * @param clientTime Description of Parameter
407: * @exception Exception Description of Exception
408: */
409: public synchronized void ping(ConnectionToken dc, long clientTime)
410: throws Exception {
411: checkConnection();
412: out.writeByte(OILConstants.PING);
413: out.writeLong(clientTime);
414: waitAnswer();
415: }
416:
417: /**
418: * #Description of the Method
419: *
420: * @param dc Description of Parameter
421: * @param subscriberId Description of Parameter
422: * @param wait Description of Parameter
423: * @return Description of the Returned Value
424: * @exception Exception Description of Exception
425: */
426: public synchronized SpyMessage receive(ConnectionToken dc,
427: int subscriberId, long wait) throws Exception, Exception {
428: checkConnection();
429: out.writeByte(OILConstants.RECEIVE);
430: out.writeInt(subscriberId);
431: out.writeLong(wait);
432: return (SpyMessage) waitAnswer();
433: }
434:
435: /**
436: * #Description of the Method
437: *
438: * @param dc Description of Parameter
439: * @param s Description of Parameter
440: * @exception JMSException Description of Exception
441: * @exception Exception Description of Exception
442: */
443: public synchronized void subscribe(ConnectionToken dc,
444: org.jboss.mq.Subscription s) throws JMSException, Exception {
445: checkConnection();
446: out.writeByte(OILConstants.SUBSCRIBE);
447: out.writeObject(s);
448: waitAnswer();
449: }
450:
451: /**
452: * #Description of the Method
453: *
454: * @param dc Description of Parameter
455: * @param t Description of Parameter
456: * @exception JMSException Description of Exception
457: * @exception Exception Description of Exception
458: */
459: public synchronized void transact(org.jboss.mq.ConnectionToken dc,
460: TransactionRequest t) throws JMSException, Exception {
461: checkConnection();
462: out.writeByte(OILConstants.TRANSACT);
463: t.writeExternal(out);
464: waitAnswer();
465: }
466:
467: /**
468: * #Description of the Method
469: *
470: * @param dc Description of Parameter
471: * @param subscriptionId Description of Parameter
472: * @exception JMSException Description of Exception
473: * @exception Exception Description of Exception
474: */
475: public synchronized void unsubscribe(ConnectionToken dc,
476: int subscriptionId) throws JMSException, Exception {
477: checkConnection();
478: out.writeByte(OILConstants.UNSUBSCRIBE);
479: out.writeInt(subscriptionId);
480: waitAnswer();
481: }
482:
483: /**
484: * #Description of the Method
485: *
486: * @exception Exception Description of Exception
487: */
488: private void checkConnection() throws Exception {
489: if (socket == null) {
490: createConnection();
491: }
492: }
493:
494: /**
495: * Used to establish a new connection to the server
496: *
497: * @exception Exception Description of Exception
498: */
499: private void createConnection() throws Exception {
500: boolean tracing = log.isTraceEnabled();
501: if (tracing)
502: log.trace("Connecting to : " + addr + ":" + port);
503:
504: /** Attempt to load the socket factory and if this fails, use the
505: * default socket factory impl.
506: */
507: SocketFactory socketFactory = null;
508: if (socketFactoryName != null) {
509: try {
510: ClassLoader loader = Thread.currentThread()
511: .getContextClassLoader();
512: Class factoryClass = loader
513: .loadClass(socketFactoryName);
514: socketFactory = (SocketFactory) factoryClass
515: .newInstance();
516: } catch (Exception e) {
517: log.debug("Failed to load socket factory: "
518: + socketFactoryName, e);
519: }
520: }
521: // Use the default socket factory
522: if (socketFactory == null) {
523: socketFactory = SocketFactory.getDefault();
524: }
525:
526: // Look for a local address and port as properties
527: String tmp = System.getProperty(LOCAL_ADDR);
528: if (tmp != null)
529: this .localAddr = InetAddress.getByName(tmp);
530: tmp = System.getProperty(LOCAL_PORT);
531: if (tmp != null)
532: this .localPort = Integer.parseInt(tmp);
533: if (tracing) {
534: log.trace("Connecting with addr=" + addr + ", port=" + port
535: + ", localAddr=" + localAddr + ", localPort="
536: + localPort + ", socketFactory=" + socketFactory);
537: }
538:
539: if (localAddr != null)
540: socket = socketFactory.createSocket(addr, port, localAddr,
541: localPort);
542: else
543: socket = socketFactory.createSocket(addr, port);
544:
545: socket.setTcpNoDelay(enableTcpNoDelay);
546: in = new ObjectInputStream(new BufferedInputStream(socket
547: .getInputStream()));
548: out = new ObjectOutputStream(new BufferedOutputStream(socket
549: .getOutputStream()));
550: out.flush();
551: }
552:
553: /**
554: * Used to close the current connection with the server
555: *
556: * @exception Exception Description of Exception
557: */
558: private void destroyConnection() throws Exception {
559: try {
560: out.close();
561: in.close();
562: } finally {
563: socket.close();
564: }
565: }
566:
567: /**
568: * #Description of the Method
569: *
570: * @return Description of the Returned Value
571: * @exception Exception Description of Exception
572: */
573: private Object waitAnswer() throws Exception {
574: out.reset();
575: out.flush();
576: int val = in.readByte();
577: if (val == OILConstants.SUCCESS) {
578: return null;
579: }
580: if (val == OILConstants.SUCCESS_OBJECT) {
581: return in.readObject();
582: } else {
583: Exception e = (Exception) in.readObject();
584: throw e;
585: }
586: }
587: }
|