001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2006 Continuent, Inc.
004: * Contact: sequoia@continuent.org
005: *
006: * Licensed under the Apache License, Version 2.0 (the "License");
007: * you may not use this file except in compliance with the License.
008: * You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing, software
013: * distributed under the License is distributed on an "AS IS" BASIS,
014: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015: * See the License for the specific language governing permissions and
016: * limitations under the License.
017: *
018: * Initial developer(s): Damian Arregui.
019: * Contributor(s): ______________________.
020: */package org.continuent.sequoia.controller.virtualdatabase;
021:
022: import java.io.Serializable;
023: import java.util.HashMap;
024: import java.util.Iterator;
025: import java.util.Map;
026: import java.util.Map.Entry;
027:
028: import org.continuent.sequoia.common.log.Trace;
029: import org.continuent.sequoia.controller.requests.AbstractRequest;
030:
031: /**
032: * This class defines a RequestResultFailoverCache.<br>
033: * <br>
034: * It is used to implement the transparent failover feature. It temporarily
035: * stores requests results so that they can be retrieved by the driver. Results
036: * are stored/retrieved with a request ID.<br>
037: * <br>
038: * An associated clean-up thread is started at instantiation time. It takes care
039: * of removing cache entries which are too old.
040: *
041: * @author <a href="mailto:damian.arregui@continuent.com">Damian Arregui</a>
042: * @version 1.0
043: */
044: public class RequestResultFailoverCache implements Runnable {
045:
046: /**
047: * Time elapsed between two clean-up runs (in ms).
048: */
049: private static final long CACHE_CLEANUP_TIMEOUT = 10000;
050:
051: /**
052: * Period of time after which a cache entry is considered to be too old (in
053: * ms).
054: */
055: private long entryTimeout;
056:
057: /** Distribued virtual database logger */
058: private Trace logger;
059:
060: // Request result cache
061: // request ID -> result
062: private Map requestIdResult = new HashMap();
063:
064: // Transaction ID to request ID mapping
065: // transaction ID -> request ID
066: private Map transactionIdRequestId = new HashMap();
067:
068: // Connection ID to request ID mapping
069: // connection ID -> request ID
070: private Map connectionIdRequestId = new HashMap();
071:
072: private boolean isKilled = false;
073:
074: // This class is used to store in cache a expirationDate together with each
075: // result
076: private class CachedResult {
077: private Serializable result;
078: private long expirationDate;
079:
080: /**
081: * Creates a new <code>CachedResult</code> object containing a result and
082: * automatically associating a expirationDate to it
083: *
084: * @param result result to store in this entry
085: */
086: public CachedResult(Serializable result, long entryTimeout) {
087: this .result = result;
088: expirationDate = System.currentTimeMillis() + entryTimeout;
089: }
090:
091: /**
092: * Return the result
093: *
094: * @return result stored in the entry
095: */
096: public Serializable getResult() {
097: return result;
098: }
099:
100: /**
101: * Timestamp (in ms) when the result expires
102: *
103: * @return creation expirationDate
104: */
105: public long getExpirationDate() {
106: return expirationDate;
107: }
108: }
109:
110: /**
111: * Creates a new <code>RequestResultFailoverCache</code> object and starts
112: * its associated clean-up thread.
113: *
114: * @param logger logger to use to display messages
115: * @param entryTimeout time in ms after which an entry is removed from the
116: * cache
117: */
118: public RequestResultFailoverCache(Trace logger, long entryTimeout) {
119: this .logger = logger;
120: this .entryTimeout = entryTimeout;
121: (new Thread(this , "RequestResultFailoverCacheCleanupThread"))
122: .start();
123: }
124:
125: /**
126: * Stores in cache a result associated with a given request.
127: *
128: * @param request request executed
129: * @param result result of the execution of the request
130: */
131: public synchronized void store(AbstractRequest request,
132: Serializable result) {
133: Long requestId = new Long(request.getId());
134:
135: // Add to the cache first (else other lists would temporarily point to a
136: // non-existing entry).
137: synchronized (requestIdResult) {
138: if (requestIdResult.isEmpty()) {
139: // Wake up thread to purge results as needed
140: requestIdResult.notify();
141: }
142: requestIdResult.put(requestId, new CachedResult(result,
143: entryTimeout));
144: }
145:
146: if (!request.isAutoCommit()) { // Replace last result for the transaction
147: Long transactionId = new Long(request.getTransactionId());
148: if (transactionIdRequestId.containsKey(transactionId)) {
149: requestIdResult.remove(transactionIdRequestId
150: .get(transactionId));
151: }
152: transactionIdRequestId.put(transactionId, requestId);
153: }
154:
155: if (request.isPersistentConnection()) { // Replace the last result for the persistent connection
156: Long connectionId = new Long(request
157: .getPersistentConnectionId());
158: if (connectionIdRequestId.containsKey(connectionId)) {
159: requestIdResult.remove(connectionIdRequestId
160: .get(connectionId));
161: }
162: connectionIdRequestId.put(connectionId, requestId);
163: }
164:
165: if (logger.isDebugEnabled())
166: logger.debug("Stored result for request ID: "
167: + request.getId() + " -> " + result);
168: }
169:
170: /**
171: * Retrieves from cache the result associated with a request ID.
172: *
173: * @param requestId id of the request to retrieve
174: * @return result or null if result not found
175: */
176: public synchronized Serializable retrieve(long requestId) {
177: Serializable res = null;
178: Long requestIdLong = new Long(requestId);
179: if (requestIdResult.containsKey(requestIdLong)) {
180: res = ((CachedResult) requestIdResult.get(requestIdLong))
181: .getResult();
182: if (logger.isDebugEnabled())
183: logger.debug("Retrieved result for request ID: "
184: + requestId + " -> " + res);
185: } else { // Not found
186: if (logger.isDebugEnabled())
187: logger
188: .debug("No result found in failover cache for request "
189: + requestId);
190: }
191: return res;
192: }
193:
194: /**
195: * Takes care of removing cache entries which are too old.
196: *
197: * @see java.lang.Runnable#run()
198: */
199: public void run() {
200: // Thread runs forever
201: while (!isKilled) {
202: try {
203: synchronized (requestIdResult) {
204: // Wait if there is no result else just sleep for the configured time
205: // interval
206: if (requestIdResult.isEmpty())
207: requestIdResult.wait();
208: else
209: requestIdResult.wait(CACHE_CLEANUP_TIMEOUT);
210: }
211: } catch (InterruptedException e) {
212: // Ignore
213: }
214: removeOldEntries();
215: }
216: }
217:
218: /**
219: * Shutdown this thread so that it terminates asap. Note that if the thread
220: * was waiting it will still proceed to the cleanup operations before
221: * terminating.
222: */
223: public void shutdown() {
224: isKilled = true;
225: synchronized (requestIdResult) {
226: requestIdResult.notifyAll();
227: }
228: }
229:
230: private void removeOldEntries() {
231: if (logger.isDebugEnabled())
232: logger
233: .debug("Cleaning-up request result failover cache...");
234: synchronized (this ) {
235: long currentTimeMillis = System.currentTimeMillis();
236:
237: // Remove expired entries from the cache (iterate over request IDs)
238: for (Iterator iter = requestIdResult.entrySet().iterator(); iter
239: .hasNext();) {
240: CachedResult cachedResult = (CachedResult) ((Entry) iter
241: .next()).getValue();
242: if ((currentTimeMillis > cachedResult
243: .getExpirationDate())) {
244: iter.remove();
245: if (logger.isDebugEnabled())
246: logger
247: .debug("Removed result from failover cache: "
248: + cachedResult.getResult());
249: }
250: }
251:
252: // Iterate over transaction IDs
253: for (Iterator iter = transactionIdRequestId.entrySet()
254: .iterator(); iter.hasNext();) {
255: Entry entry = (Entry) iter.next();
256: if (requestIdResult.get(entry.getValue()) == null) { // No more in the cache, the transaction has completed
257: iter.remove();
258: if (logger.isDebugEnabled())
259: logger.debug("Removed transaction "
260: + entry.getKey()
261: + " from failover cache");
262: }
263: }
264:
265: // Iterate over connection IDs
266: for (Iterator iter = connectionIdRequestId.entrySet()
267: .iterator(); iter.hasNext();) {
268: Entry entry = (Entry) iter.next();
269: if (requestIdResult.get(entry.getValue()) == null) { // No more in the cache, the connection is closed
270: iter.remove();
271: if (logger.isDebugEnabled())
272: logger.debug("Removed persistent connection "
273: + entry.getKey()
274: + " from failover cache");
275: }
276: }
277:
278: }
279: }
280: }
|