001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.mq.il.oil;
023:
024: import java.io.BufferedInputStream;
025: import java.io.BufferedOutputStream;
026: import java.io.IOException;
027: import java.io.ObjectInputStream;
028: import java.io.ObjectOutputStream;
029: import java.net.ServerSocket;
030: import java.net.Socket;
031: import java.rmi.RemoteException;
032:
033: import org.jboss.mq.Connection;
034: import org.jboss.mq.ReceiveRequest;
035: import org.jboss.mq.SpyDestination;
036:
037: /**
038: * The RMI implementation of the ConnectionReceiver object
039: *
040: * @author Norbert Lataille (Norbert.Lataille@m4x.org)
041: * @author Hiram Chirino (Cojonudo14@hotmail.com)
042: * @version $Revision: 57198 $
043: * @created August 16, 2001
044: */
045: public final class OILClientILService implements java.lang.Runnable,
046: org.jboss.mq.il.ClientILService {
047: private final static org.jboss.logging.Logger cat = org.jboss.logging.Logger
048: .getLogger(OILClientILService.class);
049:
050: //the client IL
051: private OILClientIL clientIL;
052:
053: //The thread that is doing the Socket reading work
054: private Thread worker;
055:
056: // the connected client
057: private Socket socket = null;
058:
059: //A link on my connection
060: private Connection connection;
061:
062: //Should this service be running ?
063: private boolean running;
064:
065: //The server socket we listen for a connection on
066: private ServerSocket serverSocket;
067:
068: /**
069: * Number of OIL Worker threads started.
070: */
071: private static int threadNumber = 0;
072:
073: /**
074: * If the TcpNoDelay option should be used on the socket.
075: */
076: private boolean enableTcpNoDelay = false;
077:
078: /**
079: * getClientIL method comment.
080: *
081: * @return The ClientIL value
082: * @exception java.lang.Exception Description of Exception
083: */
084: public org.jboss.mq.il.ClientIL getClientIL()
085: throws java.lang.Exception {
086: return clientIL;
087: }
088:
089: /**
090: * init method comment.
091: *
092: * @param connection Description of Parameter
093: * @param props Description of Parameter
094: * @exception java.lang.Exception Description of Exception
095: */
096: public void init(org.jboss.mq.Connection connection,
097: java.util.Properties props) throws java.lang.Exception {
098: this .connection = connection;
099: serverSocket = new ServerSocket(0);
100:
101: String t = props
102: .getProperty(OILServerILFactory.OIL_TCPNODELAY_KEY);
103: if (t != null)
104: enableTcpNoDelay = t.equals("yes");
105:
106: clientIL = new OILClientIL(java.net.InetAddress.getLocalHost(),
107: serverSocket.getLocalPort(), enableTcpNoDelay);
108:
109: }
110:
111: /**
112: * Main processing method for the OILClientILService object
113: */
114: public void run() {
115: int code = 0;
116: ObjectOutputStream out = null;
117: ObjectInputStream in = null;
118: socket = null;
119: int serverPort = serverSocket.getLocalPort();
120:
121: try {
122: if (cat.isDebugEnabled())
123: cat
124: .debug("Waiting for the server to connect to me on port "
125: + serverSocket.getLocalPort());
126:
127: // We may close() before we get a connection so we need to
128: // periodicaly check to see if we were !running.
129: //
130: serverSocket.setSoTimeout(1000);
131: while (running && socket == null) {
132: try {
133: socket = serverSocket.accept();
134: } catch (java.io.InterruptedIOException e) {
135: // do nothing, running flag will be checked
136: continue;
137: } catch (IOException e) {
138: if (running)
139: connection
140: .asynchFailure(
141: "Error accepting connection from server in OILClientILService.",
142: e);
143: return; // finally block will clean up!
144: }
145: }
146:
147: if (running) {
148: socket.setTcpNoDelay(enableTcpNoDelay);
149: socket.setSoTimeout(0);
150: out = new ObjectOutputStream(new BufferedOutputStream(
151: socket.getOutputStream()));
152: out.flush();
153: in = new ObjectInputStream(new BufferedInputStream(
154: socket.getInputStream()));
155: } else {
156: // not running so exit
157: // let the finally block do the clean up
158: //
159: return;
160: }
161: } catch (IOException e) {
162: connection.asynchFailure(
163: "Could not initialize the OILClientIL Service.", e);
164: return;
165: } finally {
166: try {
167: serverSocket.close();
168: serverSocket = null;
169: } catch (IOException e) {
170: if (cat.isDebugEnabled())
171: cat
172: .debug(
173: "run: an error occured closing the server socket",
174: e);
175: }
176: }
177:
178: // now process request from the client.
179: //
180: while (running) {
181: try {
182: code = in.readByte();
183: } catch (java.io.InterruptedIOException e) {
184: continue;
185: } catch (IOException e) {
186: // Server has gone, bye, bye
187: break;
188: }
189:
190: try {
191:
192: switch (code) {
193: case OILConstants.RECEIVE:
194: int numReceives = in.readInt();
195: org.jboss.mq.ReceiveRequest[] messages = new org.jboss.mq.ReceiveRequest[numReceives];
196: for (int i = 0; i < numReceives; ++i) {
197: messages[i] = new ReceiveRequest();
198: messages[i].readExternal(in);
199: }
200: connection.asynchDeliver(messages);
201: break;
202:
203: case OILConstants.DELETE_TEMPORARY_DESTINATION:
204: connection
205: .asynchDeleteTemporaryDestination((SpyDestination) in
206: .readObject());
207: break;
208:
209: case OILConstants.CLOSE:
210: connection.asynchClose();
211: break;
212:
213: case OILConstants.PONG:
214: connection.asynchPong(in.readLong());
215: break;
216:
217: default:
218: throw new RemoteException("Bad method code !");
219: }
220:
221: //Everthing was OK
222: //
223: try {
224: out.writeByte(OILConstants.SUCCESS);
225: out.flush();
226: } catch (IOException e) {
227: connection
228: .asynchFailure("Connection failure(1)", e);
229: break; // exit the loop
230: }
231: } catch (Exception e) {
232: if (!running) {
233: // if not running then don't bother to log an error
234: //
235: break;
236: }
237:
238: try {
239: cat.error("Exception handling server request", e);
240: out.writeByte(OILConstants.EXCEPTION);
241: out.writeObject(e);
242: out.reset();
243: out.flush();
244: } catch (IOException e2) {
245: connection.asynchFailure("Connection failure(2)",
246: e2);
247: break;
248: }
249: }
250: } // end while
251:
252: // exited loop, so clean up the conection
253: //
254: try {
255: cat.debug("Closing receiver connections on port: "
256: + serverPort);
257: out.close();
258: in.close();
259: socket.close();
260: socket = null;
261: } catch (IOException e) {
262: connection.asynchFailure("Connection failure", e);
263: }
264:
265: // ensure the flag is set correctly
266: //
267: running = false;
268: }
269:
270: /**
271: * start method comment.
272: *
273: * @exception java.lang.Exception Description of Exception
274: */
275: public void start() throws java.lang.Exception {
276:
277: running = true;
278: worker = new Thread(Connection.getThreadGroup(), this ,
279: "OILClientILService-" + threadNumber++);
280: worker.setDaemon(true);
281: worker.start();
282:
283: }
284:
285: /**
286: * @exception java.lang.Exception Description of Exception
287: */
288: public void stop() throws java.lang.Exception {
289: cat.trace("Stop called on OILClientService");
290: running = false;
291: worker.interrupt();
292: }
293: }
294: // vim:expandtab:tabstop=3:shiftwidth=3
|