001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.mq.il.oil2;
023:
024: import java.io.BufferedInputStream;
025: import java.io.BufferedOutputStream;
026: import java.io.IOException;
027: import java.io.ObjectInputStream;
028: import java.io.ObjectOutputStream;
029: import java.net.InetAddress;
030: import java.net.Socket;
031:
032: import javax.jms.Destination;
033: import javax.jms.JMSException;
034: import javax.jms.Queue;
035: import javax.jms.TemporaryQueue;
036: import javax.jms.TemporaryTopic;
037: import javax.jms.Topic;
038: import javax.net.SocketFactory;
039:
040: import org.jboss.logging.Logger;
041: import org.jboss.mq.AcknowledgementRequest;
042: import org.jboss.mq.Connection;
043: import org.jboss.mq.ConnectionToken;
044: import org.jboss.mq.DurableSubscriptionID;
045: import org.jboss.mq.SpyDestination;
046: import org.jboss.mq.SpyMessage;
047: import org.jboss.mq.TransactionRequest;
048: import org.jboss.mq.il.ServerIL;
049:
050: /**
051: * The JVM implementation of the ServerIL object
052: *
053: * @author <a href="mailto:hiram.chirino@jboss.org">Hiram Chirino</a>
054: * @version $Revision: $
055: * @created August 16, 2001
056: */
057: public final class OIL2ServerIL implements java.io.Serializable,
058: java.lang.Cloneable, org.jboss.mq.il.ServerIL, OIL2Constants {
059: static final long serialVersionUID = 1841984837999477932L;
060:
061: private final static Logger log = Logger
062: .getLogger(OIL2ServerIL.class);
063: /** The org.jboss.mq.il.oil2.localAddr system property allows a client to
064: *define the local interface to which its sockets should be bound
065: */
066: private final static String LOCAL_ADDR = "org.jboss.mq.il.oil2.localAddr";
067: /** The org.jboss.mq.il.oil2.localPort system property allows a client to
068: *define the local port to which its sockets should be bound
069: */
070: private final static String LOCAL_PORT = "org.jboss.mq.il.oil2.localPort";
071:
072: /** The server host name/IP to connect to
073: */
074: private String addr;
075: /** The server port to connect to.
076: */
077: private int port;
078: /** The name of the class implementing the javax.net.SocketFactory to
079: * use for creating the client socket.
080: */
081: private String socketFactoryName;
082:
083: /**
084: * If the TcpNoDelay option should be used on the socket.
085: */
086: private boolean enableTcpNoDelay = false;
087: /** The local interface name/IP to use for the client
088: */
089: private transient InetAddress localAddr;
090: /** The local port to use for the client
091: */
092: private transient int localPort;
093: /**
094: * Description of the Field
095: */
096: private transient ObjectInputStream in;
097:
098: /**
099: * Description of the Field
100: */
101: private transient ObjectOutputStream out;
102:
103: /**
104: * Description of the Field
105: */
106: private transient Socket socket;
107:
108: OIL2SocketHandler socketHandler;
109:
110: class RequestListner implements OIL2RequestListner {
111: public void handleConnectionException(Exception e) {
112: }
113:
114: public void handleRequest(OIL2Request request) {
115: }
116:
117: }
118:
119: /**
120: * Constructor for the OILServerIL object
121: *
122: * @param addr, the server host or ip
123: * @param port, the server port
124: * @param socketFactoryName, the name of the javax.net.SocketFactory to use
125: * @param enableTcpNoDelay,
126: */
127: public OIL2ServerIL(String addr, int port,
128: String socketFactoryName, boolean enableTcpNoDelay) {
129: this .addr = addr;
130: this .port = port;
131: this .socketFactoryName = socketFactoryName;
132: this .enableTcpNoDelay = enableTcpNoDelay;
133: }
134:
135: synchronized public void connect() throws IOException {
136: if (socket == null) {
137: boolean tracing = log.isTraceEnabled();
138: if (tracing)
139: log.trace("Connecting to : " + addr + ":" + port);
140:
141: /** Attempt to load the socket factory and if this fails, use the
142: * default socket factory impl.
143: */
144: SocketFactory socketFactory = null;
145: if (socketFactoryName != null) {
146: try {
147: ClassLoader loader = Thread.currentThread()
148: .getContextClassLoader();
149: Class factoryClass = loader
150: .loadClass(socketFactoryName);
151: socketFactory = (SocketFactory) factoryClass
152: .newInstance();
153: } catch (Exception e) {
154: log.debug("Failed to load socket factory: "
155: + socketFactoryName, e);
156: }
157: }
158: // Use the default socket factory
159: if (socketFactory == null) {
160: socketFactory = SocketFactory.getDefault();
161: }
162:
163: // Look for a local address and port as properties
164: String tmp = System.getProperty(LOCAL_ADDR);
165: if (tmp != null)
166: this .localAddr = InetAddress.getByName(tmp);
167: tmp = System.getProperty(LOCAL_PORT);
168: if (tmp != null)
169: this .localPort = Integer.parseInt(tmp);
170: if (tracing) {
171: log.trace("Connecting with addr=" + addr + ", port="
172: + port + ", localAddr=" + localAddr
173: + ", localPort=" + localPort
174: + ", socketFactory=" + socketFactory);
175: }
176:
177: if (localAddr != null)
178: socket = socketFactory.createSocket(addr, port,
179: localAddr, localPort);
180: else
181: socket = socketFactory.createSocket(addr, port);
182:
183: if (tracing)
184: log.trace("Connection established.");
185:
186: socket.setTcpNoDelay(enableTcpNoDelay);
187: out = new ObjectOutputStream(new BufferedOutputStream(
188: socket.getOutputStream()));
189: out.flush();
190: in = new ObjectInputStream(new BufferedInputStream(socket
191: .getInputStream()));
192:
193: if (tracing)
194: log.trace("Streams initialized.");
195:
196: socketHandler = new OIL2SocketHandler(in, out, Connection
197: .getThreadGroup());
198: socketHandler.setRequestListner(new RequestListner());
199: socketHandler.start();
200: }
201: }
202:
203: /**
204: * Sets the ConnectionToken attribute of the OILServerIL object
205: *
206: * @param dest The new ConnectionToken value
207: * @exception Exception Description of Exception
208: */
209: public void setConnectionToken(ConnectionToken dest)
210: throws Exception {
211: connect();
212: OIL2Request request = new OIL2Request(
213: OIL2Constants.SERVER_SET_SPY_DISTRIBUTED_CONNECTION,
214: new Object[] { dest });
215: OIL2Response response = socketHandler.synchRequest(request);
216: response.evalThrowsJMSException();
217: }
218:
219: /**
220: * Sets the Enabled attribute of the OILServerIL object
221: *
222: * @param dc The new Enabled value
223: * @param enabled The new Enabled value
224: * @exception JMSException Description of Exception
225: * @exception Exception Description of Exception
226: */
227: public void setEnabled(ConnectionToken dc, boolean enabled)
228: throws JMSException, Exception {
229: connect();
230: OIL2Request request = new OIL2Request(
231: OIL2Constants.SERVER_SET_ENABLED,
232: new Object[] { new Boolean(enabled) });
233: OIL2Response response = socketHandler.synchRequest(request);
234: response.evalThrowsJMSException();
235: }
236:
237: /**
238: * Gets the ID attribute of the OILServerIL object
239: *
240: * @return The ID value
241: * @exception Exception Description of Exception
242: */
243: public String getID() throws Exception {
244: connect();
245: OIL2Request request = new OIL2Request(
246: OIL2Constants.SERVER_GET_ID, null);
247: OIL2Response response = socketHandler.synchRequest(request);
248: return (String) response.evalThrowsJMSException();
249: }
250:
251: /**
252: * Gets the TemporaryQueue attribute of the OILServerIL object
253: *
254: * @param dc Description of Parameter
255: * @return The TemporaryQueue value
256: * @exception JMSException Description of Exception
257: * @exception Exception Description of Exception
258: */
259: public TemporaryQueue getTemporaryQueue(ConnectionToken dc)
260: throws JMSException, Exception {
261: connect();
262: OIL2Request request = new OIL2Request(
263: OIL2Constants.SERVER_GET_TEMPORARY_QUEUE, null);
264: OIL2Response response = socketHandler.synchRequest(request);
265: return (TemporaryQueue) response.evalThrowsJMSException();
266: }
267:
268: /**
269: * Gets the TemporaryTopic attribute of the OILServerIL object
270: *
271: * @param dc Description of Parameter
272: * @return The TemporaryTopic value
273: * @exception JMSException Description of Exception
274: * @exception Exception Description of Exception
275: */
276: public TemporaryTopic getTemporaryTopic(ConnectionToken dc)
277: throws JMSException, Exception {
278: connect();
279: OIL2Request request = new OIL2Request(
280: OIL2Constants.SERVER_GET_TEMPORARY_TOPIC, null);
281: OIL2Response response = socketHandler.synchRequest(request);
282: return (TemporaryTopic) response.evalThrowsJMSException();
283: }
284:
285: /**
286: * #Description of the Method
287: *
288: * @param dc Description of Parameter
289: * @param item Description of Parameter
290: * @exception JMSException Description of Exception
291: * @exception Exception Description of Exception
292: */
293: public void acknowledge(ConnectionToken dc,
294: AcknowledgementRequest item) throws JMSException, Exception {
295: connect();
296: OIL2Request request = new OIL2Request(
297: OIL2Constants.SERVER_ACKNOWLEDGE, new Object[] { item });
298: OIL2Response response = socketHandler.synchRequest(request);
299: response.evalThrowsJMSException();
300:
301: }
302:
303: /**
304: * Adds a feature to the Message attribute of the OILServerIL object
305: *
306: * @param dc The feature to be added to the Message attribute
307: * @param val The feature to be added to the Message attribute
308: * @exception Exception Description of Exception
309: */
310: public void addMessage(ConnectionToken dc, SpyMessage val)
311: throws Exception {
312:
313: connect();
314: OIL2Request request = new OIL2Request(
315: OIL2Constants.SERVER_ADD_MESSAGE, new Object[] { val });
316: OIL2Response response = socketHandler.synchRequest(request);
317: response.evalThrowsJMSException();
318: }
319:
320: /**
321: * #Description of the Method
322: *
323: * @param dc Description of Parameter
324: * @param dest Description of Parameter
325: * @param selector Description of Parameter
326: * @return Description of the Returned Value
327: * @exception JMSException Description of Exception
328: * @exception Exception Description of Exception
329: */
330: public SpyMessage[] browse(ConnectionToken dc, Destination dest,
331: String selector) throws JMSException, Exception {
332: connect();
333: OIL2Request request = new OIL2Request(
334: OIL2Constants.SERVER_BROWSE, new Object[] { dest,
335: selector });
336: OIL2Response response = socketHandler.synchRequest(request);
337: return (SpyMessage[]) response.evalThrowsJMSException();
338: }
339:
340: /**
341: * #Description of the Method
342: *
343: * @param ID Description of Parameter
344: * @exception JMSException Description of Exception
345: * @exception Exception Description of Exception
346: */
347: public void checkID(String ID) throws JMSException, Exception {
348: connect();
349: OIL2Request request = new OIL2Request(
350: OIL2Constants.SERVER_CHECK_ID, new Object[] { ID });
351: OIL2Response response = socketHandler.synchRequest(request);
352: response.evalThrowsJMSException();
353: }
354:
355: /**
356: * #Description of the Method
357: *
358: * @param userName Description of Parameter
359: * @param password Description of Parameter
360: * @return Description of the Returned Value
361: * @exception JMSException Description of Exception
362: * @exception Exception Description of Exception
363: */
364: public String checkUser(String userName, String password)
365: throws JMSException, Exception {
366: connect();
367: OIL2Request request = new OIL2Request(
368: OIL2Constants.SERVER_CHECK_USER, new Object[] {
369: userName, password });
370: OIL2Response response = socketHandler.synchRequest(request);
371: return (String) response.evalThrowsJMSException();
372: }
373:
374: /**
375: * #Description of the Method
376: *
377: * @param userName Description of Parameter
378: * @param password Description of Parameter
379: * @return Description of the Returned Value
380: * @exception JMSException Description of Exception
381: * @exception Exception Description of Exception
382: */
383: public String authenticate(String userName, String password)
384: throws JMSException, Exception {
385: connect();
386: OIL2Request request = new OIL2Request(
387: OIL2Constants.SERVER_AUTHENTICATE, new Object[] {
388: userName, password });
389: OIL2Response response = socketHandler.synchRequest(request);
390: return (String) response.evalThrowsJMSException();
391:
392: }
393:
394: /**
395: * #Description of the Method
396: *
397: * @return Description of the Returned Value
398: * @exception CloneNotSupportedException Description of Exception
399: */
400: public Object clone() throws CloneNotSupportedException {
401: return super .clone();
402: }
403:
404: /**
405: * Need to clone because there are instance variables tha can get clobbered.
406: * All Multiple connections can NOT share the same JVMServerIL object
407: *
408: * @return Description of the Returned Value
409: * @exception Exception Description of Exception
410: */
411: public ServerIL cloneServerIL() throws Exception {
412: return (ServerIL) clone();
413: }
414:
415: /**
416: * #Description of the Method
417: *
418: * @param dc Description of Parameter
419: * @exception JMSException Description of Exception
420: * @exception Exception Description of Exception
421: */
422: public void connectionClosing(ConnectionToken dc)
423: throws JMSException, Exception {
424: try {
425: connect();
426: OIL2Request request = new OIL2Request(
427: OIL2Constants.SERVER_CONNECTION_CLOSING, null);
428: OIL2Response response = socketHandler.synchRequest(request);
429: response.evalThrowsJMSException();
430: } finally {
431: close();
432: }
433: }
434:
435: /**
436: * #Description of the Method
437: *
438: * @param dc Description of Parameter
439: * @param dest Description of Parameter
440: * @return Description of the Returned Value
441: * @exception JMSException Description of Exception
442: * @exception Exception Description of Exception
443: */
444: public Queue createQueue(ConnectionToken dc, String dest)
445: throws JMSException, Exception {
446: connect();
447: OIL2Request request = new OIL2Request(
448: OIL2Constants.SERVER_CREATE_QUEUE,
449: new Object[] { dest });
450: OIL2Response response = socketHandler.synchRequest(request);
451: return (Queue) response.evalThrowsJMSException();
452:
453: }
454:
455: /**
456: * #Description of the Method
457: *
458: * @param dc Description of Parameter
459: * @param dest Description of Parameter
460: * @return Description of the Returned Value
461: * @exception JMSException Description of Exception
462: * @exception Exception Description of Exception
463: */
464: public Topic createTopic(ConnectionToken dc, String dest)
465: throws JMSException, Exception {
466: connect();
467: OIL2Request request = new OIL2Request(
468: OIL2Constants.SERVER_CREATE_TOPIC,
469: new Object[] { dest });
470: OIL2Response response = socketHandler.synchRequest(request);
471: return (Topic) response.evalThrowsJMSException();
472:
473: }
474:
475: /**
476: * #Description of the Method
477: *
478: * @param dc Description of Parameter
479: * @param dest Description of Parameter
480: * @exception JMSException Description of Exception
481: * @exception Exception Description of Exception
482: */
483: public void deleteTemporaryDestination(ConnectionToken dc,
484: SpyDestination dest) throws JMSException, Exception {
485: connect();
486: OIL2Request request = new OIL2Request(
487: OIL2Constants.SERVER_DELETE_TEMPORARY_DESTINATION,
488: new Object[] { dest });
489: OIL2Response response = socketHandler.synchRequest(request);
490: response.evalThrowsJMSException();
491:
492: }
493:
494: /**
495: * #Description of the Method
496: *
497: * @param id Description of Parameter
498: * @exception JMSException Description of Exception
499: * @exception Exception Description of Exception
500: */
501: public void destroySubscription(ConnectionToken dc,
502: DurableSubscriptionID id) throws JMSException, Exception {
503: connect();
504: OIL2Request request = new OIL2Request(
505: OIL2Constants.SERVER_DESTROY_SUBSCRIPTION,
506: new Object[] { id });
507: OIL2Response response = socketHandler.synchRequest(request);
508: response.evalThrowsJMSException();
509:
510: }
511:
512: /**
513: * #Description of the Method
514: *
515: * @param dc Description of Parameter
516: * @param clientTime Description of Parameter
517: * @exception Exception Description of Exception
518: */
519: public void ping(ConnectionToken dc, long clientTime)
520: throws Exception {
521: connect();
522: OIL2Request request = new OIL2Request(
523: OIL2Constants.SERVER_PING, new Object[] { new Long(
524: clientTime) });
525: OIL2Response response = socketHandler.synchRequest(request);
526: response.evalThrowsJMSException();
527: }
528:
529: /**
530: * #Description of the Method
531: *
532: * @param dc Description of Parameter
533: * @param subscriberId Description of Parameter
534: * @param wait Description of Parameter
535: * @return Description of the Returned Value
536: * @exception Exception Description of Exception
537: */
538: public SpyMessage receive(ConnectionToken dc, int subscriberId,
539: long wait) throws Exception, Exception {
540: connect();
541: OIL2Request request = new OIL2Request(
542: OIL2Constants.SERVER_RECEIVE, new Object[] {
543: new Integer(subscriberId), new Long(wait) });
544: OIL2Response response = socketHandler.synchRequest(request);
545: return (SpyMessage) response.evalThrowsJMSException();
546:
547: }
548:
549: /**
550: * #Description of the Method
551: *
552: * @param dc Description of Parameter
553: * @param s Description of Parameter
554: * @exception JMSException Description of Exception
555: * @exception Exception Description of Exception
556: */
557: public void subscribe(ConnectionToken dc,
558: org.jboss.mq.Subscription s) throws JMSException, Exception {
559: connect();
560: OIL2Request request = new OIL2Request(
561: OIL2Constants.SERVER_SUBSCRIBE, new Object[] { s });
562: OIL2Response response = socketHandler.synchRequest(request);
563: response.evalThrowsJMSException();
564:
565: }
566:
567: /**
568: * #Description of the Method
569: *
570: * @param dc Description of Parameter
571: * @param t Description of Parameter
572: * @exception JMSException Description of Exception
573: * @exception Exception Description of Exception
574: */
575: public void transact(org.jboss.mq.ConnectionToken dc,
576: TransactionRequest t) throws JMSException, Exception {
577: connect();
578: OIL2Request request = new OIL2Request(
579: OIL2Constants.SERVER_TRANSACT, new Object[] { t });
580: OIL2Response response = socketHandler.synchRequest(request);
581: response.evalThrowsJMSException();
582: }
583:
584: /**
585: * #Description of the Method
586: *
587: * @param dc Description of Parameter
588: * @param subscriptionId Description of Parameter
589: * @exception JMSException Description of Exception
590: * @exception Exception Description of Exception
591: */
592: public void unsubscribe(ConnectionToken dc, int subscriptionId)
593: throws JMSException, Exception {
594: connect();
595: OIL2Request request = new OIL2Request(
596: OIL2Constants.SERVER_UNSUBSCRIBE,
597: new Object[] { new Integer(subscriptionId) });
598: OIL2Response response = socketHandler.synchRequest(request);
599: response.evalThrowsJMSException();
600:
601: }
602:
603: /**
604: * Used to close the current connection with the server
605: *
606: * @exception Exception Description of Exception
607: */
608: synchronized public void close() {
609: try {
610: if (socket != null) {
611: socketHandler.stop();
612: in.close();
613: out.close();
614: socket.close();
615: socket = null;
616: }
617: } catch (IOException e) {
618: log
619: .debug(
620: "Exception occured while closing opened resources: ",
621: e);
622: }
623: }
624: }
|