001: /*
002: * The Apache Software License, Version 1.1
003: *
004: *
005: * Copyright (c) 2002 The Apache Software Foundation. All rights
006: * reserved.
007: *
008: * Redistribution and use in source and binary forms, with or without
009: * modification, are permitted provided that the following conditions
010: * are met:
011: *
012: * 1. Redistributions of source code must retain the above copyright
013: * notice, this list of conditions and the following disclaimer.
014: *
015: * 2. Redistributions in binary form must reproduce the above copyright
016: * notice, this list of conditions and the following disclaimer in
017: * the documentation and/or other materials provided with the
018: * distribution.
019: *
020: * 3. The end-user documentation included with the redistribution,
021: * if any, must include the following acknowledgment:
022: * "This product includes software developed by the
023: * Apache Software Foundation (http://www.apache.org/)."
024: * Alternately, this acknowledgment may appear in the software itself,
025: * if and wherever such third-party acknowledgments normally appear.
026: *
027: * 4. The names "WSIF" and "Apache Software Foundation" must
028: * not be used to endorse or promote products derived from this
029: * software without prior written permission. For written
030: * permission, please contact apache@apache.org.
031: *
032: * 5. Products derived from this software may not be called "Apache",
033: * nor may "Apache" appear in their name, without prior written
034: * permission of the Apache Software Foundation.
035: *
036: * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
037: * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
038: * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
039: * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
040: * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
041: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
042: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
043: * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
044: * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
045: * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
046: * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
047: * SUCH DAMAGE.
048: * ====================================================================
049: *
050: * This software consists of voluntary contributions made by many
051: * individuals on behalf of the Apache Software Foundation and was
052: * originally based on software copyright (c) 2001, 2002, International
053: * Business Machines, Inc., http://www.apache.org. For more
054: * information on the Apache Software Foundation, please see
055: * <http://www.apache.org/>.
056: */
057:
058: package org.apache.wsif.base;
059:
060: import java.io.ByteArrayInputStream;
061: import java.io.ByteArrayOutputStream;
062: import java.io.IOException;
063: import java.io.InputStream;
064: import java.io.ObjectOutputStream;
065: import java.io.ObjectStreamClass;
066: import java.io.Serializable;
067: import java.io.StreamCorruptedException;
068: import java.util.ArrayList;
069: import java.util.ConcurrentModificationException;
070: import java.util.HashMap;
071: import java.util.Iterator;
072:
073: import org.apache.wsif.WSIFConstants;
074: import org.apache.wsif.WSIFCorrelationId;
075: import org.apache.wsif.WSIFCorrelationService;
076: import org.apache.wsif.WSIFException;
077: import org.apache.wsif.logging.MessageLogger;
078: import org.apache.wsif.logging.Trc;
079:
080: /**
081: * WSIFDefaultCorrelationService provides a default implementation of a
082: * WSIFCorrelationService using a Hashmap as the backing store.
083: * @author Ant Elder <antelder@apache.org>
084: */
085: public class WSIFDefaultCorrelationService implements
086: WSIFCorrelationService {
087:
088: private HashMap correlatorStore; // associates IDs with WSIFOperators
089: private HashMap timeouts; // associates IDs with a timeout time
090: private Thread timeoutWatcher; // watches for timeout times expiring
091: private boolean shutdown; // has the correlation service been shutdown
092:
093: /**
094: * WSIFCorrelationServiceLocator should be used to
095: * create a correlation service.
096: */
097: public WSIFDefaultCorrelationService() {
098: Trc.entry(this );
099: Trc.exit();
100: }
101:
102: /**
103: * Adds an entry to the correlation service.
104: * @param correlator the key to associate with the state. This will be
105: * a JMS message correlation ID.
106: * @param state the state to be stored. This will be a WSIFOperation.
107: * @param timeout a timeout period after which the key and associated
108: * state will be deleted from the correlation service. A
109: * value of zero indicates there should be no timeout.
110: */
111: public synchronized void put(WSIFCorrelationId correlator,
112: Serializable state, long timeout) throws WSIFException {
113: Trc.entry(this , correlator, state, new Long(timeout));
114: if (correlator != null && state != null) {
115: if (correlatorStore == null) {
116: initialise();
117: }
118: try {
119: correlatorStore.put(correlator, serialize(state));
120: if (timeout > 0) {
121: if (timeouts == null) {
122: initTimeouts();
123: }
124: timeouts.put(correlator, new Long(System
125: .currentTimeMillis()
126: + timeout));
127: }
128: } catch (IOException ex) {
129: Trc.exception(ex);
130: throw new WSIFException(ex.toString());
131: }
132: } else {
133: throw new IllegalArgumentException("cannot put null "
134: + ((correlator == null) ? "correlator" : "state"));
135: }
136: Trc.exit();
137: }
138:
139: /**
140: * Retrieves an entry (a WSIFOperation) from the correlation service.
141: * @param id the key of the state to retrieved
142: * @return the state associated with the id, or null if there is no
143: * match for the id.
144: */
145: public synchronized Serializable get(WSIFCorrelationId id)
146: throws WSIFException {
147: Trc.entry(this , id);
148: if (correlatorStore == null) {
149: throw new WSIFException(
150: "get called on correlation service but put never done");
151: } else if (id == null) {
152: throw new IllegalArgumentException("cannot get null");
153: } else {
154: try {
155: Serializable s = (Serializable) unserialize((byte[]) correlatorStore
156: .get(id));
157: Trc.exit(s);
158: return s;
159: } catch (Exception ex) {
160: Trc.exception(ex);
161: throw new WSIFException(ex.toString());
162: }
163: }
164: }
165:
166: /**
167: * Removes an entry form the correlation service.
168: * @param id the key of entry to be removed
169: */
170: public synchronized void remove(WSIFCorrelationId id)
171: throws WSIFException {
172: Trc.entry(this , id);
173: if (correlatorStore == null) {
174: throw new WSIFException(
175: "corelation service has been shutdown");
176: } else if (id == null) {
177: throw new IllegalArgumentException("cannot remove null");
178: } else {
179: correlatorStore.remove(id);
180: if (timeouts != null) {
181: timeouts.remove(id);
182: }
183: Trc.exit();
184: }
185: }
186:
187: private synchronized void remove(ArrayList expiredKeys) {
188: Trc.entry(this , expiredKeys);
189: if (expiredKeys != null && correlatorStore != null) {
190: Serializable id;
191: for (Iterator i = expiredKeys.iterator(); i.hasNext();) {
192: id = (Serializable) i.next();
193: correlatorStore.remove(id);
194: timeouts.remove(id);
195: MessageLogger.log("WSIF.0008W", id);
196: }
197: }
198: Trc.exit();
199: }
200:
201: /**
202: * Shutsdown the correlation service.
203: */
204: public void shutdown() {
205: Trc.entry(this );
206: shutdown = true;
207: Trc.exit();
208: }
209:
210: private void initialise() {
211: shutdown = false;
212: correlatorStore = new HashMap();
213: }
214:
215: private void initTimeouts() {
216: timeouts = new HashMap();
217: timeoutWatcher = new Thread() {
218: public void run() {
219: while (!shutdown) {
220: try {
221: sleep(WSIFConstants.CORRELATION_TIMEOUT_DELAY);
222: } catch (InterruptedException ex) {
223: Trc.ignoredException(ex);
224: }
225: checkForTimeouts();
226: }
227: if (correlatorStore != null)
228: correlatorStore = null;
229: if (timeouts != null)
230: timeouts = null;
231: }
232: };
233: timeoutWatcher
234: .setName("WSIFDefaultCorrelationService timeout watcher");
235: timeoutWatcher.start();
236: }
237:
238: private void checkForTimeouts() {
239: Long expireTime;
240: Serializable key;
241: ArrayList expiredKeys = new ArrayList();
242: Long now = new Long(System.currentTimeMillis());
243: // add to expiredKeys all the keys whose timouts have expired
244: try {
245: for (Iterator i = timeouts.keySet().iterator(); i.hasNext();) {
246: key = (Serializable) i.next();
247: expireTime = (Long) timeouts.get(key);
248: if (now.compareTo(expireTime) > 0) {
249: // now greater than expireTime
250: expiredKeys.add(key);
251: }
252: }
253: } catch (ConcurrentModificationException ex) {
254: Trc.ignoredException(ex);
255: } // ignore this, get the others next time
256:
257: if (expiredKeys.size() > 0) {
258: remove(expiredKeys);
259: }
260:
261: }
262:
263: private byte[] serialize(Object o) throws IOException {
264: if (o == null) {
265: return null;
266: } else {
267: ByteArrayOutputStream baos = new ByteArrayOutputStream();
268: ObjectOutputStream so = new ObjectOutputStream(baos);
269: so.writeObject(o);
270: so.flush();
271: return baos.toByteArray();
272: }
273: }
274:
275: private Object unserialize(byte[] bytes) throws IOException,
276: ClassNotFoundException {
277: if (bytes == null) {
278: return null;
279: } else {
280: ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
281: //ObjectInputStream si = new ObjectInputStream(bais);
282: WSIFObjectInputStream si = new WSIFObjectInputStream(bais);
283: return si.readObject();
284: }
285: }
286:
287: }
|