001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.mq.il.http;
023:
024: import java.lang.reflect.Method;
025: import java.net.URL;
026: import java.util.Properties;
027:
028: import org.jboss.logging.Logger;
029: import org.jboss.mq.Connection;
030: import org.jboss.mq.il.ClientIL;
031: import org.jboss.mq.il.ClientILService;
032:
033: /**
034: * The HTTP/S implementation of the ClientILService object. One of these
035: * exists for each Connection object. Normally, this would be where you
036: * would boot up a client-side listener for the server to invoke. However
037: * in this case, we poll the server instead. In short, the ClientIL that we
038: * provide, once serialized and shipped to the server, simply places requests
039: * the server makes to it in a server-side storage mechanism. Then, when we
040: * send an HTTP request to the server, the servlet looks for queued requests
041: * waiting for us, batches them up and returns them in the response. Since
042: * we place ALL requests delivered to ANY instance of the ClientIL in a
043: * central storage queue, we have to have a way to get only the requests placed
044: * in storage by OUR ClientIL. Originally, I attempted to use the ConnectionId
045: * for this purpose, but it proooved to be less than ideal due to the fact that
046: * it created many cases where requests were being fielded to an instance of a
047: * ClientIL which was sent over the wire prior to the server returning our
048: * ConnectionId. This resulted in lost requests. Furthermore, since this had
049: * no control over exactly when the ConnectionId was set, we were forced to
050: * loop until it was not null. The current implementation doesn�t' suffer from
051: * these issues, as we can take full control over the process of setting our
052: * identifier and therefore, set the identifier on our ClientIL at creation time.
053: *
054: * @author Nathan Phelps (nathan@jboss.org)
055: * @version $Revision: 57198 $
056: * @created January 15, 2003
057: */
058: public class HTTPClientILService implements Runnable, ClientILService {
059:
060: private static Logger log = Logger
061: .getLogger(HTTPClientILService.class);
062:
063: private HTTPClientIL clientIL;
064: private Connection connection;
065: private URL url = null;
066: private long timeout = 60000;
067: private long restInterval = 0;
068: private Thread worker;
069: private String clientILIdentifier;
070:
071: private static int threadNumber = 0;
072:
073: public HTTPClientILService() {
074: if (log.isTraceEnabled()) {
075: log.trace("created");
076: }
077: }
078:
079: public ClientIL getClientIL() throws Exception {
080: if (log.isTraceEnabled()) {
081: log.trace("getClientIL()");
082: }
083: return this .clientIL;
084: }
085:
086: public void init(Connection connection, Properties props)
087: throws Exception {
088: if (log.isTraceEnabled()) {
089: log.trace("init(Connection " + connection.toString()
090: + ", Properties " + props.toString() + ")");
091: }
092: this .connection = connection;
093: this .url = HTTPClient.resolveServerUrl(props
094: .getProperty(HTTPServerILFactory.SERVER_URL_KEY));
095: this .clientILIdentifier = this .getClientILIdentifier(this .url);
096: this .clientIL = new HTTPClientIL(this .clientILIdentifier);
097: try {
098: if (System.getProperties().containsKey(
099: HTTPServerILFactory.TIMEOUT_KEY)) {
100: this .timeout = Long
101: .valueOf(
102: System
103: .getProperty(HTTPServerILFactory.TIMEOUT_KEY))
104: .longValue();
105: } else {
106: this .timeout = Long
107: .valueOf(
108: props
109: .getProperty(HTTPServerILFactory.TIMEOUT_KEY))
110: .longValue();
111: }
112: if (System.getProperties().containsKey(
113: HTTPServerILFactory.REST_INTERVAL_KEY)) {
114: this .restInterval = Long
115: .valueOf(
116: System
117: .getProperty(HTTPServerILFactory.REST_INTERVAL_KEY))
118: .longValue();
119: } else {
120: this .restInterval = Long
121: .valueOf(
122: props
123: .getProperty(HTTPServerILFactory.REST_INTERVAL_KEY))
124: .longValue();
125: }
126: } catch (Exception exception) {
127: } // we'll just use the default value
128: }
129:
130: public void start() throws Exception {
131: if (log.isTraceEnabled()) {
132: log.trace("start()");
133: }
134: clientIL.stopped = false;
135: worker = new Thread(Connection.getThreadGroup(), this ,
136: "HTTPClientILService-" + threadNumber++);
137: worker.setDaemon(true);
138: worker.start();
139: }
140:
141: public void stop() throws Exception {
142: if (log.isTraceEnabled()) {
143: log.trace("stop()");
144: }
145: clientIL.stopped = true;
146: }
147:
148: public void run() {
149: if (log.isTraceEnabled()) {
150: log.trace("run()");
151: }
152: HTTPILRequest request = new HTTPILRequest();
153: request.setMethodName("clientListening");
154: while (clientIL.stopped == false) {
155: try {
156: if (this .clientILIdentifier != null
157: && clientIL.stopped == false) {
158: request.setArguments(new Object[] {
159: this .clientILIdentifier,
160: new Long(this .timeout) }, new Class[] {
161: String.class, Long.class });
162: if (log.isDebugEnabled()) {
163: log.debug("Sending a request to '"
164: + this .url.toString()
165: + "' for ClientIL #"
166: + this .clientILIdentifier + ".");
167: }
168: // The server responds with a HTTPILRequest object, not a HTTPILResponse object as you might expect.
169: // this is becuase the server is invoking the client.
170: HTTPILRequest[] response = (HTTPILRequest[]) HTTPClient
171: .post(this .url, request);
172: if (response != null) {
173: if (log.isDebugEnabled()) {
174: log
175: .debug("Logging each response received in this batch for ClientIL #"
176: + this .clientILIdentifier
177: + ".");
178: }
179: for (int i = 0; i < response.length; i++) {
180: if (log.isDebugEnabled()) {
181: log.debug(response.toString());
182: }
183: Method method = this .connection
184: .getClass()
185: .getMethod(
186: response[i].getMethodName(),
187: response[i]
188: .getArgumentTypes());
189: method.invoke(this .connection, response[i]
190: .getArguments());
191: if (log.isDebugEnabled()) {
192: log
193: .debug("Server invoked method '"
194: + method.getName()
195: + "' on ClientIL #"
196: + this .clientILIdentifier
197: + ".");
198: }
199: }
200: } else {
201: log.warn("The request posted to '"
202: + this .url.toString()
203: + "' on behalf of ClientIL #"
204: + this .clientILIdentifier
205: + " returned an unexpected response.");
206: }
207:
208: try {
209: if (log.isDebugEnabled()) {
210: log.debug("Resting "
211: + String.valueOf(this .restInterval)
212: + " milliseconds on ClientIL #"
213: + this .clientILIdentifier + ".");
214: }
215: Thread.sleep(this .restInterval);
216: } catch (InterruptedException exception) {
217: } // We'll just skip the rest, and go ahead and issue another request immediatly.
218:
219: } else {
220: log
221: .warn("ClientIL Id is null, waiting 50 milliseconds to get one.");
222: Thread.sleep(50);
223: }
224: } catch (Exception exception) {
225: if (log.isDebugEnabled()) {
226: log
227: .debug("Exception of type '"
228: + exception.getClass().getName()
229: + "' occured when trying to receive request from server URL '"
230: + this .url + ".'");
231: }
232: this .connection.asynchFailure(exception.getMessage(),
233: exception);
234: break;
235: }
236: }
237: if (this .clientIL.stopped) {
238: if (log.isDebugEnabled()) {
239: log.debug("Notifying the server that ClientIL #"
240: + this .clientILIdentifier + " has stopped.");
241: }
242: try {
243: request.setMethodName("stopClientListening");
244: request.setArguments(
245: new Object[] { this .clientILIdentifier },
246: new Class[] { String.class });
247: HTTPClient.post(this .url, request);
248: } catch (Exception exception) {
249: if (log.isDebugEnabled()) {
250: log
251: .debug("Attempt to notify the server that ClientIL #"
252: + this .clientILIdentifier
253: + " failed due to exception with description '"
254: + exception.getMessage()
255: + ".' This means that requests will stay in the storage queue even though the client has stopped.");
256: }
257: }
258: }
259: }
260:
261: private String getClientILIdentifier(URL url) throws Exception {
262: HTTPILRequest request = new HTTPILRequest();
263: request.setMethodName("getClientILIdentifer");
264: return (String) HTTPClient.post(url, request);
265: }
266: }
|