001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.mq.il.oil2;
023:
024: import java.io.IOException;
025: import java.io.ObjectInputStream;
026: import java.io.ObjectOutputStream;
027: import java.util.Iterator;
028:
029: import org.jboss.logging.Logger;
030:
031: import EDU.oswego.cs.dl.util.concurrent.Channel;
032: import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
033: import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
034: import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
035: import EDU.oswego.cs.dl.util.concurrent.Slot;
036: import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
037:
038: /**
039: * The OIL2 implementation of the ServerIL object
040: *
041: * @author <a href="mailto:hiram.chirino@jboss.org">Hiram Chirino</a>
042: * @version $Revision: 1$
043: */
044: public final class OIL2SocketHandler implements java.lang.Cloneable,
045: Runnable {
046: final static private Logger log = Logger
047: .getLogger(OIL2SocketHandler.class);
048:
049: /**
050: * Messages will be read from the input stream
051: */
052: private ObjectInputStream in;
053:
054: /**
055: * Messages will be writen to the output stream
056: */
057: private ObjectOutputStream out;
058:
059: /**
060: * Should we be receiving messages??
061: */
062: private boolean running;
063:
064: /**
065: * The thread group that the reader thread should join.
066: */
067: private final ThreadGroup partentThreadGroup;
068:
069: /**
070: * Reader thread.
071: */
072: private Thread worker;
073:
074: /**
075: * Number of OIL2 Worker threads started.
076: */
077: private static int threadNumber = 0;
078:
079: /**
080: * Requst create slots to wait for responses,
081: * those slots are stored in this hashmap.
082: *
083: * This field uses copy on write semantics.
084: */
085: volatile ConcurrentHashMap responseSlots = new ConcurrentHashMap();
086:
087: /**
088: * The request listner is notified of new requests
089: * and of asyncronous IO errors.
090: */
091: OIL2RequestListner requestListner;
092:
093: /**
094: * If the socket handler is currently pumping messages.
095: */
096: private volatile boolean pumpingData = false;
097:
098: /**
099: * Pump mutex
100: */
101: private Object pumpMutex = new Object();
102:
103: /**
104: * The that new request get placed into when they arrived.
105: */
106: LinkedQueue requestQueue = new LinkedQueue();
107:
108: /**
109: * The thread pool used to service incoming requests..
110: */
111: PooledExecutor pool;
112:
113: /**
114: * Constructor for the OILServerIL object
115: *
116: * @param a Description of Parameter
117: * @param port Description of Parameter
118: */
119: public OIL2SocketHandler(ObjectInputStream in,
120: ObjectOutputStream out, ThreadGroup partentThreadGroup) {
121: this .in = in;
122: this .out = out;
123: this .partentThreadGroup = partentThreadGroup;
124:
125: synchronized (OIL2SocketHandler.class) {
126: if (pool == null) {
127: pool = new PooledExecutor(50);
128: // supply a ThreadFactory to the pool to create daemon threads
129: log
130: .debug("Setting the OIL2SocketHandler's thread factory");
131: pool.setThreadFactory(new ThreadFactory() {
132: private int threadNo = 0;
133:
134: public Thread newThread(Runnable r) {
135: Thread t = new Thread(
136: OIL2SocketHandler.this .partentThreadGroup,
137: r, "OIL2SocketHandler Thread-"
138: + threadNo++);
139: t.setDaemon(true);
140: return t;
141: }
142: });
143: pool.setMinimumPoolSize(1);
144: pool.setKeepAliveTime(1000 * 60);
145: pool.runWhenBlocked();
146: pool.createThreads(1);
147: }
148: }
149: }
150:
151: /**
152: * #Description of the Method
153: *
154: * @return Description of the Returned Value
155: * @exception Exception Description of Exception
156: */
157: public void sendRequest(OIL2Request request) throws IOException {
158: // if (log.isTraceEnabled())
159: // log.trace("Sending request: " + request);
160:
161: try {
162: synchronized (out) {
163: out.writeByte(1);
164: request.writeExternal(out);
165: out.reset();
166: out.flush();
167: }
168: } catch (IOException e) {
169: throw e;
170: }
171:
172: }
173:
174: /**
175: * #Description of the Method
176: */
177: private void registerResponseSlot(OIL2Request request,
178: Slot responseSlot) throws IOException {
179: responseSlots.put(request.requestId, responseSlot);
180: }
181:
182: /**
183: * #Description of the Method
184: */
185: public void setRequestListner(OIL2RequestListner requestListner) {
186: this .requestListner = requestListner;
187: }
188:
189: /**
190: * #Description of the Method
191: *
192: * @return Description of the Returned Value
193: * @exception Exception Description of Exception
194: */
195: public void sendResponse(OIL2Response response) throws IOException {
196: // if (log.isTraceEnabled())
197: // log.trace("Sending response: " + response);
198:
199: try {
200: synchronized (out) {
201: out.writeByte(2);
202: response.writeExternal(out);
203: out.reset();
204: out.flush();
205: }
206: } catch (IOException e) {
207: throw e;
208: }
209: }
210:
211: /**
212: * Pumps messages from the input stream.
213: *
214: * If the request object is not null, then the target message is
215: * the response object for the request argument. The target
216: * message is returned.
217: *
218: * If the request object is null, then the target message is
219: * the first new request that is encountered. The new request
220: * messag is returned.
221: *
222: * All message received before the target message are pumped.
223: * A pumped message is placed in either Response Slots or
224: * the Request Queue depending on if the message is a response
225: * or requests.
226: *
227: * @param request The request object that is waiting for a response.
228: * @return the request or reponse object that this method was looking for
229: * @exception IOException Description of Exception
230: */
231: private Object pumpMessages(OIL2Request request, Channel mySlot)
232: throws IOException, ClassNotFoundException,
233: InterruptedException {
234:
235: synchronized (pumpMutex) {
236: // Is somebody else pumping data??
237: if (pumpingData) {
238: return null;
239: } else
240: pumpingData = true;
241: }
242:
243: try {
244: while (true) {
245: if (mySlot != null) {
246: // Do we have our response sitting in our slot allready??
247: Object o;
248: while ((o = mySlot.peek()) != null) {
249: o = mySlot.take();
250: if (o != this ) {
251: return o;
252: }
253: }
254: }
255:
256: byte code = in.readByte();
257: switch (code) {
258: // Request received... pass it up
259: case 1:
260: OIL2Request newRequest = new OIL2Request();
261: newRequest.readExternal(in);
262:
263: // Are we looking for a request??
264: if (request == null) {
265: return newRequest;
266: } else {
267: requestQueue.put(newRequest);
268: }
269:
270: break;
271:
272: // Response received... find the response slot
273: case 2:
274:
275: OIL2Response response = new OIL2Response();
276: response.readExternal(in);
277:
278: // No reponse id to response to..
279: if (response.correlationRequestId == null)
280: continue;
281:
282: // Is this the response object we are looking for
283: if (request != null
284: && request.requestId
285: .equals(response.correlationRequestId)) {
286: return response;
287: } else {
288:
289: Slot slot = (Slot) responseSlots
290: .remove(response.correlationRequestId);
291:
292: if (slot != null) {
293: slot.put(response);
294: } else {
295: // This should not happen...
296: if (log.isTraceEnabled())
297: log.warn("No slot registered for: "
298: + response);
299: }
300: }
301: break;
302: } // switch
303: } // while
304: } finally {
305: synchronized (pumpMutex) {
306: pumpingData = false;
307: }
308:
309: Thread thread = Thread.currentThread();
310: boolean interrupted = thread.isInterrupted();
311:
312: // We are done, let somebody know that they can
313: // start pumping us again.
314: Iterator i = responseSlots.values().iterator();
315: while (i.hasNext()) {
316: Slot s = (Slot) i.next();
317: if (s != mySlot)
318: s.offer(this , 0);
319: }
320:
321: // Only notify the request waiter if we are not
322: // giving him a message on this method call.
323: if (request != null) {
324: requestQueue.put(this );
325: }
326:
327: if (interrupted)
328: thread.interrupt();
329: }
330: }
331:
332: public OIL2Response synchRequest(OIL2Request request)
333: throws IOException, InterruptedException,
334: ClassNotFoundException {
335:
336: // if (log.isTraceEnabled())
337: // log.trace("Sending request: "+request);
338:
339: Slot slot = new Slot();
340: registerResponseSlot(request, slot);
341: sendRequest(request);
342:
343: Object o = null;
344: while (true) {
345: // Do we have something in our queue??
346: if (o != null) {
347: // was is a request message??
348: if (o != this ) {
349: // if (log.isTraceEnabled())
350: // log.trace("Got response: "+o);
351: return (OIL2Response) o;
352: }
353: // See if we have another message in the queue.
354: o = slot.peek();
355: if (o != null)
356: o = slot.take();
357: } else {
358: // We did not have any messages in the slot,
359: // so we have to go pumping..
360: o = pumpMessages(request, slot);
361: if (o == null) {
362: // Somebody else is in the pump, wait till we
363: // are notified to get in.
364: o = slot.take();
365: }
366: }
367: } // end while
368: }
369:
370: public class RequestRunner implements Runnable {
371: OIL2Request request;
372:
373: RequestRunner(OIL2Request request) {
374: this .request = request;
375: }
376:
377: public void run() {
378: requestListner.handleRequest(request);
379: }
380: }
381:
382: /**
383: * Main processing method for the OILClientILService object
384: */
385: public void run() {
386: try {
387:
388: Object o = null;
389: while (running) {
390: // Do we have something in our queue??
391: if (o != null) {
392: // was is a request message??
393: if (o != this ) {
394: pool
395: .execute(new RequestRunner(
396: (OIL2Request) o));
397: }
398: // See if we have another message in the queue.
399: o = requestQueue.peek();
400: if (o != null)
401: o = requestQueue.take();
402: } else {
403: // We did not have any messages in the queue,
404: // so we have to go pumping..
405: o = pumpMessages(null, requestQueue);
406: if (o == null) {
407: // Somebody else is in the pump, wait till we
408: // are notified to get in.
409: o = requestQueue.take();
410: }
411: }
412: } // end while
413:
414: } catch (InterruptedException e) {
415: if (log.isTraceEnabled())
416: log.trace("Stopped due to interruption");
417: } catch (Exception e) {
418: if (log.isTraceEnabled())
419: log.trace("Stopping due to unexcpected exception: ", e);
420: requestListner.handleConnectionException(e);
421: }
422:
423: // ensure the flag is set correctly
424: running = false;
425: if (log.isTraceEnabled())
426: log.trace("Stopped");
427: }
428:
429: public void start() //throws java.lang.Exception
430: {
431: if (log.isTraceEnabled())
432: log.trace("Starting");
433:
434: running = true;
435: worker = new Thread(partentThreadGroup, this , "OIL2 Worker-"
436: + threadNumber++);
437: worker.setDaemon(true);
438: worker.start();
439:
440: }
441:
442: public void stop() {
443: if (log.isTraceEnabled())
444: log.trace("Stopping");
445: running = false;
446: worker.interrupt();
447: }
448:
449: }
|