001: package org.objectweb.celtix.bus.transports.jms;
002:
003: import java.io.ByteArrayInputStream;
004: import java.io.ByteArrayOutputStream;
005: import java.io.IOException;
006:
007: import java.util.Calendar;
008: import java.util.GregorianCalendar;
009: import java.util.SimpleTimeZone;
010: import java.util.TimeZone;
011: import java.util.concurrent.Executor;
012: import java.util.concurrent.RejectedExecutionException;
013: import java.util.logging.Level;
014: import java.util.logging.Logger;
015:
016: import javax.jms.JMSException;
017: import javax.jms.Message;
018: import javax.jms.Queue;
019: import javax.jms.QueueSender;
020: import javax.jms.TextMessage;
021: import javax.naming.NamingException;
022: import javax.wsdl.WSDLException;
023: import javax.xml.ws.handler.MessageContext;
024:
025: import org.objectweb.celtix.Bus;
026: import org.objectweb.celtix.BusEvent;
027: import org.objectweb.celtix.BusEventListener;
028: import org.objectweb.celtix.BusException;
029: import org.objectweb.celtix.bus.busimpl.ComponentCreatedEvent;
030: import org.objectweb.celtix.bus.busimpl.ComponentRemovedEvent;
031: import org.objectweb.celtix.bus.configuration.ConfigurationEvent;
032: import org.objectweb.celtix.bus.management.counters.TransportServerCounters;
033: import org.objectweb.celtix.common.logging.LogUtils;
034: import org.objectweb.celtix.configuration.Configuration;
035: import org.objectweb.celtix.context.OutputStreamMessageContext;
036: import org.objectweb.celtix.transports.ServerTransport;
037: import org.objectweb.celtix.transports.ServerTransportCallback;
038: import org.objectweb.celtix.transports.jms.JMSServerBehaviorPolicyType;
039: import org.objectweb.celtix.transports.jms.context.JMSMessageHeadersType;
040: import org.objectweb.celtix.ws.addressing.EndpointReferenceType;
041:
042: public class JMSServerTransport extends JMSTransportBase implements
043: ServerTransport, BusEventListener {
044: static final Logger LOG = LogUtils
045: .getL7dLogger(JMSServerTransport.class);
046: private static final String JMS_SERVER_TRANSPORT_MESSAGE = JMSServerTransport.class
047: .getName()
048: + ".IncomingMessage";
049:
050: ServerTransportCallback callback;
051: TransportServerCounters counters;
052: private PooledSession listenerSession;
053: private Thread listenerThread;
054: private JMSServerBehaviorPolicyType serverBehaviourPolicy;
055:
056: public JMSServerTransport(Bus b, EndpointReferenceType address)
057: throws WSDLException {
058: super (b, address, true);
059: serverBehaviourPolicy = getServerPolicy(configuration);
060: counters = new TransportServerCounters("JMSServerTranpsort");
061: entry("JMSServerTransport Constructor");
062: bus.sendEvent(new ComponentCreatedEvent(this ));
063: }
064:
065: private JMSServerBehaviorPolicyType getServerPolicy(
066: Configuration conf) {
067: JMSServerBehaviorPolicyType pol = conf.getObject(
068: JMSServerBehaviorPolicyType.class, "jmsServer");
069: if (pol == null) {
070: pol = new JMSServerBehaviorPolicyType();
071: }
072: return pol;
073: }
074:
075: public JMSServerBehaviorPolicyType getJMSServerBehaviourPolicy() {
076: return serverBehaviourPolicy;
077: }
078:
079: public void activate(ServerTransportCallback transportCB)
080: throws IOException {
081: entry("JMSServerTransport activate().... ");
082: callback = transportCB;
083:
084: try {
085: LOG.log(Level.FINE, "establishing JMS connection");
086: JMSProviderHub.connect(this );
087:
088: //Get a non-pooled session.
089: listenerSession = sessionFactory.get(targetDestination);
090: listenerThread = new JMSListenerThread(listenerSession,
091: this );
092: listenerThread.start();
093: } catch (JMSException ex) {
094: LOG.log(Level.FINE,
095: "JMS connect failed with JMSException : ", ex);
096: throw new IOException(ex.getMessage());
097: } catch (NamingException nex) {
098: LOG.log(Level.FINE,
099: "JMS connect failed with NamingException : ", nex);
100: throw new IOException(nex.getMessage());
101: }
102: }
103:
104: public OutputStreamMessageContext rebase(MessageContext context,
105: EndpointReferenceType decoupledResponseEndpoint)
106: throws IOException {
107: OutputStreamMessageContext octx = new JMSOutputStreamContext(
108: context);
109:
110: String replyTo = decoupledResponseEndpoint.getAddress()
111: .getValue();
112: replyTo = replyTo.substring(replyTo.indexOf('#') + 1);
113: octx.put(JMSConstants.JMS_REBASED_REPLY_TO, replyTo);
114: return octx;
115: }
116:
117: public OutputStreamMessageContext createOutputStreamContext(
118: MessageContext context) throws IOException {
119: return new JMSOutputStreamContext(context);
120: }
121:
122: public void finalPrepareOutputStreamContext(
123: OutputStreamMessageContext context) throws IOException {
124: }
125:
126: public void deactivate() throws IOException {
127: try {
128: listenerSession.consumer().close();
129: if (listenerThread != null) {
130: listenerThread.join();
131: }
132: sessionFactory.shutdown();
133: } catch (InterruptedException e) {
134: //Don't do anything...
135: } catch (JMSException ex) {
136: //
137: }
138: }
139:
140: public void shutdown() {
141: entry("JMSServerTransport shutdown()");
142: try {
143: this .deactivate();
144: } catch (IOException ex) {
145: // Ignore for now.
146: }
147: bus.sendEvent(new ComponentRemovedEvent(this ));
148: }
149:
150: public void postDispatch(MessageContext bindingContext,
151: OutputStreamMessageContext context) throws IOException {
152:
153: Message message = (Message) bindingContext
154: .get(JMS_SERVER_TRANSPORT_MESSAGE);
155: PooledSession replySession = null;
156: // ensure non-oneways in point-to-point domain
157: counters.getRequestTotal().increase();
158:
159: if (!context.isOneWay()) {
160: if (queueDestinationStyle) {
161: try {
162: // send reply
163: Queue replyTo = getReplyToDestination(context,
164: message);
165: replySession = sessionFactory.get(false);
166:
167: Message reply = marshalResponse(message, context,
168: replySession);
169: setReplyCorrelationID(message, reply);
170:
171: QueueSender sender = (QueueSender) replySession
172: .producer();
173:
174: sendResponse(context, message, reply, sender,
175: replyTo);
176:
177: } catch (JMSException ex) {
178: LOG.log(Level.WARNING,
179: "Failed in post dispatch ...", ex);
180: counters.getTotalError().increase();
181: throw new IOException(ex.getMessage());
182: } catch (NamingException nex) {
183: LOG.log(Level.WARNING,
184: "Failed in post dispatch ...", nex);
185: counters.getTotalError().increase();
186: throw new IOException(nex.getMessage());
187: } finally {
188: // house-keeping
189: if (replySession != null) {
190: sessionFactory.recycle(replySession);
191: }
192: }
193: } else {
194: // we will never receive a non-oneway invocation in pub-sub
195: // domain from Celtix client - however a mis-behaving pure JMS
196: // client could conceivably make suce an invocation, in which
197: // case we silently discard the reply
198: LOG.log(Level.WARNING,
199: "discarding reply for non-oneway invocation ",
200: "with 'topic' destinationStyle");
201: counters.getTotalError().increase();
202: }
203: } else {
204: // counter for oneway request
205: counters.getRequestOneWay().increase();
206: }
207: }
208:
209: public Queue getReplyToDestination(
210: OutputStreamMessageContext context, Message message)
211: throws JMSException, NamingException {
212: Queue replyTo;
213: // If WS-Addressing had set the replyTo header.
214: if (context.get(JMSConstants.JMS_REBASED_REPLY_TO) != null) {
215: replyTo = sessionFactory
216: .getQueueFromInitialContext((String) context
217: .get(JMSConstants.JMS_REBASED_REPLY_TO));
218: } else {
219: replyTo = (null != message.getJMSReplyTo()) ? (Queue) message
220: .getJMSReplyTo()
221: : (Queue) replyDestination;
222: }
223:
224: return replyTo;
225: }
226:
227: public Message marshalResponse(Message message,
228: OutputStreamMessageContext context,
229: PooledSession replySession) throws JMSException {
230:
231: Message reply;
232: boolean textPayload = message instanceof TextMessage ? true
233: : false;
234: if (textPayload) {
235: reply = marshal(context.getOutputStream().toString(),
236: replySession.session(), null,
237: JMSConstants.TEXT_MESSAGE_TYPE);
238: } else {
239: reply = marshal(((ByteArrayOutputStream) context
240: .getOutputStream()).toByteArray(), replySession
241: .session(), null, JMSConstants.BINARY_MESSAGE_TYPE);
242: }
243:
244: return reply;
245: }
246:
247: public void setReplyCorrelationID(Message message, Message reply)
248: throws JMSException {
249: String correlationID = message.getJMSCorrelationID();
250:
251: if (correlationID == null
252: || "".equals(correlationID)
253: && serverBehaviourPolicy
254: .isUseMessageIDAsCorrelationID()) {
255: correlationID = message.getJMSMessageID();
256: }
257:
258: if (correlationID != null && !"".equals(correlationID)) {
259: reply.setJMSCorrelationID(correlationID);
260: }
261: }
262:
263: public void sendResponse(OutputStreamMessageContext context,
264: Message request, Message reply, QueueSender sender,
265: Queue replyTo) throws JMSException {
266: JMSMessageHeadersType headers = (JMSMessageHeadersType) context
267: .get(JMSConstants.JMS_SERVER_HEADERS);
268:
269: int deliveryMode = getJMSDeliveryMode(headers);
270: int priority = getJMSPriority(headers);
271: long ttl = getTimeToLive(headers);
272:
273: setMessageProperties(headers, reply);
274:
275: LOG.log(Level.FINE, "server sending reply: ", reply);
276:
277: long timeToLive = 0;
278: if (request.getJMSExpiration() > 0) {
279: TimeZone tz = new SimpleTimeZone(0, "GMT");
280: Calendar cal = new GregorianCalendar(tz);
281: timeToLive = request.getJMSExpiration()
282: - cal.getTimeInMillis();
283: }
284:
285: if (timeToLive >= 0) {
286: ttl = ttl > 0 ? ttl : timeToLive;
287: sender.send(replyTo, reply, deliveryMode, priority, ttl);
288: } else {
289: LOG
290: .log(Level.INFO,
291: "Message time to live is already expired skipping response.");
292: }
293: }
294:
295: /**
296: * Helper method to process incoming message.
297: *
298: * @param message the incoming message
299: */
300: protected void incoming(Message message) throws IOException {
301: try {
302: LOG.log(Level.FINE, "server received request: ", message);
303:
304: String msgType = message instanceof TextMessage ? JMSConstants.TEXT_MESSAGE_TYPE
305: : JMSConstants.BINARY_MESSAGE_TYPE;
306: Object request = unmarshal(message, msgType);
307:
308: byte[] bytes = null;
309:
310: if (JMSConstants.TEXT_MESSAGE_TYPE.equals(msgType)) {
311: String requestString = (String) request;
312: LOG.log(Level.FINE, "server received request: ",
313: requestString);
314: bytes = requestString.getBytes();
315: } else {
316: bytes = (byte[]) request;
317: }
318:
319: JMSInputStreamContext context = new JMSInputStreamContext(
320: new ByteArrayInputStream(bytes));
321: populateIncomingContext(message, context,
322: JMSConstants.JMS_SERVER_HEADERS);
323:
324: context.put(JMS_SERVER_TRANSPORT_MESSAGE, message);
325: callback.dispatch(context, this );
326:
327: } catch (JMSException jmsex) {
328: //TODO: need to revisit for which exception should we throw.
329: throw new IOException(jmsex.getMessage());
330: }
331: }
332:
333: class JMSListenerThread extends Thread {
334: final JMSServerTransport theTransport;
335: private final PooledSession listenSession;
336:
337: public JMSListenerThread(PooledSession session,
338: JMSServerTransport transport) {
339: listenSession = session;
340: theTransport = transport;
341: }
342:
343: public void run() {
344: try {
345: while (true) {
346: Message message = listenSession.consumer()
347: .receive();
348: if (message == null) {
349: LOG
350: .log(
351: Level.WARNING,
352: "Null message received from message consumer.",
353: " Exiting ListenerThread::run().");
354: return;
355: }
356: while (message != null) {
357: Executor executor = theTransport.callback
358: .getExecutor();
359: if (executor == null) {
360: executor = theTransport.bus
361: .getWorkQueueManager()
362: .getAutomaticWorkQueue();
363: }
364: if (executor != null) {
365: try {
366: executor.execute(new JMSExecutor(
367: theTransport, message));
368: message = null;
369: } catch (RejectedExecutionException ree) {
370: //FIXME - no room left on workqueue, what to do
371: //for now, loop until it WILL fit on the queue,
372: //although we could just dispatch on this thread.
373: }
374: } else {
375: //shouldn't ever get here....
376: try {
377: theTransport.incoming(message);
378: } catch (IOException ex) {
379: LOG
380: .log(
381: Level.WARNING,
382: "Failed to process incoming message : ",
383: ex);
384: }
385: message = null;
386: }
387: }
388: }
389: } catch (JMSException jmsex) {
390: jmsex.printStackTrace();
391: LOG.log(Level.SEVERE,
392: "Exiting ListenerThread::run(): ", jmsex
393: .getMessage());
394: } catch (Throwable jmsex) {
395: jmsex.printStackTrace();
396: LOG.log(Level.SEVERE,
397: "Exiting ListenerThread::run(): ", jmsex
398: .getMessage());
399: }
400: }
401: }
402:
403: static class JMSExecutor implements Runnable {
404: Message message;
405: JMSServerTransport transport;
406:
407: JMSExecutor(JMSServerTransport t, Message m) {
408: message = m;
409: transport = t;
410: }
411:
412: public void run() {
413: try {
414: transport.incoming(message);
415: } catch (IOException ex) {
416: //TODO: Decide what to do if we receive the exception.
417: LOG.log(Level.WARNING,
418: "Failed to process incoming message : ", ex);
419: }
420: }
421:
422: }
423:
424: public void processEvent(BusEvent e) throws BusException {
425: if (e.getID().equals(ConfigurationEvent.RECONFIGURED)) {
426: String configName = (String) e.getSource();
427: reConfigure(configName);
428: }
429: }
430:
431: private void reConfigure(String configName) {
432: if ("servicesMonitoring".equals(configName)) {
433: if (bus.getConfiguration().getBoolean("servicesMonitoring")) {
434: counters.resetCounters();
435: } else {
436: counters.stopCounters();
437: }
438: }
439: }
440: }
|