001: /*
002: * MessageQueueClient: The message queue client library
003: * Copyright (C) 2007 Rift IT Contracting
004: *
005: * This library is free software; you can redistribute it and/or
006: * modify it under the terms of the GNU Lesser General Public
007: * License as published by the Free Software Foundation; either
008: * version 2.1 of the License, or (at your option) any later version.
009: *
010: * This library is distributed in the hope that it will be useful,
011: * but WITHOUT ANY WARRANTY; without even the implied warranty of
012: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
013: * Lesser General Public License for more details.
014: *
015: * You should have received a copy of the GNU Lesser General Public
016: * License along with this library; if not, write to the Free Software
017: * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
018: *
019: * RPCMessageHandler.java
020: */
021:
022: // the package path
023: package com.rift.coad.daemon.messageservice.rpc;
024:
025: // java imports
026: import java.util.List;
027: import java.lang.reflect.InvocationHandler;
028: import java.lang.reflect.Method;
029: import javax.naming.Context;
030: import javax.naming.InitialContext;
031:
032: // logging import
033: import org.apache.log4j.Logger;
034:
035: // coadunation imports
036: import com.rift.coad.daemon.messageservice.Producer;
037: import com.rift.coad.daemon.messageservice.MessageProducer;
038: import com.rift.coad.daemon.messageservice.RPCMessage;
039: import com.rift.coad.daemon.messageservice.Message;
040: import com.rift.coad.util.connection.ConnectionManager;
041:
042: /**
043: * This object handles the creation of a RPC message.
044: *
045: * @author Brett Chaldecott
046: */
047: public class RPCMessageHandler implements InvocationHandler {
048:
049: // the logger reference
050: protected Logger log = Logger.getLogger(RPCMessageHandler.class
051: .getName());
052:
053: // private member variables
054: private Class targetInterface = null;
055: private Producer producer = null;
056: private String targetURL = null;
057: private String[] services = null;
058: private boolean reply = true;
059: private boolean broadcast = false;
060: private String correlationId = null;
061: private boolean usedCorrelationId = false;
062:
063: /**
064: * Creates a new instance of RPCMessageHandler
065: *
066: * @param from The address the message will come from.
067: * @param targetInterface The interface the message will wrap to.
068: * @param targetURL The target url for the request.
069: * @param reply TRUE if the message should reply, FALSE if it a oneway call.
070: * @exception RPCMessageClientException
071: */
072: public RPCMessageHandler(String from, Class targetInterface,
073: String targetURL, boolean reply)
074: throws RPCMessageClientException {
075: try {
076: this .targetInterface = targetInterface;
077: MessageProducer messageProducer = (MessageProducer) ConnectionManager
078: .getInstance().getConnection(MessageProducer.class,
079: MessageProducer.JNDI_URL);
080: producer = messageProducer.createProducer(from);
081: this .targetURL = targetURL;
082: this .reply = reply;
083: } catch (Exception ex) {
084: throw new RPCMessageClientException(
085: "Failed to init the message handler : "
086: + ex.getMessage(), ex);
087: }
088: }
089:
090: /**
091: * Creates a new instance of RPCMessageHandler
092: *
093: * @param from The address the message will come from.
094: * @param targetInterface The interface the message will wrap to.
095: * @param services The list of services.
096: * @param broadcast True if the messages must be sent all the systems that
097: * supply the listed services.
098: * @param reply TRUE if the message should reply, FALSE if it a oneway call.
099: * @exception RPCMessageClientException
100: */
101: public RPCMessageHandler(String from, Class targetInterface,
102: List services, boolean broadcast, boolean reply)
103: throws RPCMessageClientException {
104: try {
105: this .targetInterface = targetInterface;
106: MessageProducer messageProducer = (MessageProducer) ConnectionManager
107: .getInstance().getConnection(MessageProducer.class,
108: MessageProducer.JNDI_URL);
109: producer = messageProducer.createProducer(from);
110: copyServices(services);
111: this .broadcast = broadcast;
112: this .reply = reply;
113: } catch (Exception ex) {
114: throw new RPCMessageClientException(
115: "Failed to init the message handler : "
116: + ex.getMessage(), ex);
117: }
118: }
119:
120: /**
121: * Creates a new instance of RPCMessageHandler
122: *
123: * @param from The address the message will come from.
124: * @param targetInterface The interface the message will wrap to.
125: * @param targetURL The target url for the request.
126: * @param correlationId The correlation id for the message.
127: * @exception RPCMessageClientException
128: */
129: public RPCMessageHandler(String from, Class targetInterface,
130: String targetURL, String correlationId)
131: throws RPCMessageClientException {
132: try {
133: this .targetInterface = targetInterface;
134: MessageProducer messageProducer = (MessageProducer) ConnectionManager
135: .getInstance().getConnection(MessageProducer.class,
136: MessageProducer.JNDI_URL);
137: producer = messageProducer.createProducer(from);
138: this .targetURL = targetURL;
139: this .correlationId = correlationId;
140: } catch (Exception ex) {
141: throw new RPCMessageClientException(
142: "Failed to init the message handler : "
143: + ex.getMessage(), ex);
144: }
145: }
146:
147: /**
148: * Creates a new instance of RPCMessageHandler
149: *
150: * @param from The address the message will come from.
151: * @param targetInterface The interface the message will wrap to.
152: * @param services The list of services.
153: * @param broadcast True if the messages must be sent all the systems that
154: * supply the listed services.
155: * @param correlationId The correlation id for the message.
156: * @exception RPCMessageClientException
157: */
158: public RPCMessageHandler(String from, Class targetInterface,
159: List services, boolean broadcast, String correlationId)
160: throws RPCMessageClientException {
161: try {
162: this .targetInterface = targetInterface;
163: MessageProducer messageProducer = (MessageProducer) ConnectionManager
164: .getInstance().getConnection(MessageProducer.class,
165: MessageProducer.JNDI_URL);
166: producer = messageProducer.createProducer(from);
167: copyServices(services);
168: this .broadcast = broadcast;
169: this .correlationId = correlationId;
170: } catch (Exception ex) {
171: throw new RPCMessageClientException(
172: "Failed to init the message handler : "
173: + ex.getMessage(), ex);
174: }
175: }
176:
177: /**
178: * Creates a new instance of RPCMessageHandler
179: *
180: * @param context The context that will be used to retrieve the service.
181: * @param jndiURL The url of the MessageProducer.
182: * @param from The address the message will come from.
183: * @param targetInterface The interface the message will wrap to.
184: * @param targetURL The target url for the request.
185: * @exception RPCMessageClientException
186: */
187: public RPCMessageHandler(InitialContext context, String jndiURL,
188: String from, Class targetInterface, String targetURL)
189: throws RPCMessageClientException {
190: try {
191: this .targetInterface = targetInterface;
192: MessageProducer messageProducer = (MessageProducer) ConnectionManager
193: .getInstance(context).getConnection(
194: MessageProducer.class, jndiURL);
195: producer = messageProducer.createProducer(from);
196: this .targetURL = targetURL;
197: this .reply = false;
198: } catch (Exception ex) {
199: throw new RPCMessageClientException(
200: "Failed to init the message handler : "
201: + ex.getMessage(), ex);
202: }
203: }
204:
205: /**
206: * Creates a new instance of RPCMessageHandler
207: *
208: * @param context The context that will be used to retrieve the service.
209: * @param jndiURL The url of the MessageProducer.
210: * @param from The address the message will come from.
211: * @param targetInterface The interface the message will wrap to.
212: * @param services The list of services the message will be sent to.
213: * @param broadcast True if the messages must be sent all the systems that
214: * supply the listed services.
215: * @exception RPCMessageClientException
216: */
217: public RPCMessageHandler(InitialContext context, String jndiURL,
218: String from, Class targetInterface, List services,
219: boolean broadcast) throws RPCMessageClientException {
220: try {
221: this .targetInterface = targetInterface;
222: MessageProducer messageProducer = (MessageProducer) ConnectionManager
223: .getInstance(context).getConnection(
224: MessageProducer.class, jndiURL);
225: producer = messageProducer.createProducer(from);
226: copyServices(services);
227: this .reply = false;
228: this .broadcast = broadcast;
229: } catch (Exception ex) {
230: throw new RPCMessageClientException(
231: "Failed to init the message handler : "
232: + ex.getMessage(), ex);
233: }
234: }
235:
236: /**
237: * This method is responsible for handling the invocation request.
238: *
239: * @return The results.
240: * @param proxy The proxy that the call is being made on.
241: * @param method The method that the call has been made on.
242: * @param args The arguments passed to that method.
243: * @exception Throwable
244: */
245: public Object invoke(Object proxy, Method method, Object[] args)
246: throws Throwable {
247: if (correlationId != null && usedCorrelationId) {
248: throw new RPCMessageClientException(
249: "The correlation Id has been used cannot re-use this "
250: + "object.");
251: }
252: usedCorrelationId = true;
253: try {
254: RPCMessage message = null;
255: if (targetURL != null) {
256: message = producer
257: .createRPCMessage(Message.POINT_TO_POINT);
258: message.setTarget(targetURL);
259: } else if (broadcast == false) {
260: message = producer
261: .createRPCMessage(Message.POINT_TO_SERVICE);
262: message.setServices(this .services);
263: } else {
264: message = producer
265: .createRPCMessage(Message.POINT_TO_MULTI_SERVICE);
266: message.setServices(this .services);
267: }
268: message.setReply(reply);
269: message.setCorrelationId(correlationId);
270:
271: Method targetMethod = this .targetInterface.getMethod(method
272: .getName(), method.getParameterTypes());
273: message.defineMethod(targetMethod.getReturnType(), method
274: .getName(), method.getParameterTypes());
275: message.setArguments(args);
276:
277: producer.submit(message);
278: return message.getMessageId();
279: } catch (Exception ex) {
280: log.error("Failed to setup the RPC message because : "
281: + ex.getMessage(), ex);
282: throw new RPCMessageClientException(
283: "Failed to setup the RPC message because : "
284: + ex.getMessage(), ex);
285: }
286: }
287:
288: /**
289: * This method copyies the services to a string array.
290: */
291: private void copyServices(List serviceList) {
292: services = new String[serviceList.size()];
293: for (int index = 0; index < serviceList.size(); index++) {
294: services[index] = (String) serviceList.get(index);
295: }
296: }
297:
298: }
|