001: package com.jofti.cache.adapter.listener;
002:
003: import java.util.ArrayList;
004: import java.util.Iterator;
005: import java.util.List;
006: import java.util.Map;
007: import java.util.Properties;
008:
009: import org.apache.commons.logging.Log;
010: import org.apache.commons.logging.LogFactory;
011:
012: import com.ibm.websphere.objectgrid.ObjectGridException;
013: import com.ibm.websphere.objectgrid.ObjectGridRuntimeException;
014: import com.ibm.websphere.objectgrid.ObjectMap;
015: import com.ibm.websphere.objectgrid.TxID;
016: import com.ibm.websphere.objectgrid.plugins.LogElement;
017: import com.ibm.websphere.objectgrid.plugins.LogSequence;
018: import com.ibm.websphere.objectgrid.plugins.MapEventListener;
019: import com.ibm.websphere.objectgrid.plugins.index.MapIndexInfo;
020: import com.ibm.websphere.objectgrid.plugins.index.MapIndexPlugin;
021: import com.jofti.api.Index;
022: import com.jofti.api.IndexQuery;
023: import com.jofti.cache.IBaseAdaptor;
024: import com.jofti.core.GenericIndexFactory;
025: import com.jofti.core.IParsedQuery;
026: import com.jofti.core.InternalIndex;
027: import com.jofti.exception.JoftiException;
028: import com.jofti.util.ObjectProcedureAdapter;
029: import com.jofti.util.OpenHashMap;
030:
031: /**
032: * The Class used to provide the connection between the adapter and the Cache for Listener adapters. </p>
033: * The Listener is for ObjectGrid and above. </p>
034: * @author xenephon
035: * @version 1.2
036: * @since 1.2
037: *
038: */
039: public class ObjectGridEventListener implements MapIndexPlugin,
040: MapEventListener {
041:
042: IBaseAdaptor base = null;
043: String name;
044:
045: private static Log log = LogFactory
046: .getLog(CoherenceEventListener.class);
047:
048: public ObjectGridEventListener(IBaseAdaptor base, String name) {
049: this .base = base;
050: this .name = name;
051:
052: }
053:
054: /* (non-Javadoc)
055: * @see com.ibm.websphere.objectgrid.plugins.index.MapIndexPlugin#doBatchUpdate(com.ibm.websphere.objectgrid.TxID, com.ibm.websphere.objectgrid.plugins.LogSequence)
056: */
057: public void doBatchUpdate(TxID arg0, LogSequence arg1)
058: throws ObjectGridRuntimeException {
059:
060: if (log.isDebugEnabled()) {
061: log.debug("transaction id:" + arg0 + " number of entries "
062: + arg1.size());
063: }
064: if (arg1.isDirty() && arg1.size() > 0) {
065: try {
066: base.acquireUpdateLock();
067:
068: // do deletes first
069: Iterator it = arg1.getAllChanges();
070:
071: while (it.hasNext()) {
072: LogElement element = (LogElement) it.next();
073:
074: switch (element.getType().getCode()) {
075: case LogElement.CODE_DELETE:
076: case LogElement.CODE_EVICT:
077: if (log.isDebugEnabled()) {
078: log.debug("delete or eviction for "
079: + element.getCacheEntry().getKey());
080: }
081: elementRemoved(
082: element.getCacheEntry().getKey(), null);
083: break;
084: case LogElement.CODE_INSERT:
085: if (log.isDebugEnabled()) {
086: log.debug("insert for "
087: + element.getCacheEntry().getKey());
088: }
089: elementAdded(element.getCacheEntry().getKey(),
090: element.getCurrentValue());
091: break;
092: case LogElement.CODE_UPDATE:
093: if (log.isDebugEnabled()) {
094: log.debug("update for "
095: + element.getCacheEntry().getKey());
096: }
097: elementUpdated(
098: element.getCacheEntry().getKey(),
099: element.getCurrentValue());
100: break;
101: default:
102: if (log.isDebugEnabled()) {
103: log.debug("event ignored for "
104: + element.getCacheEntry().getKey());
105: }
106: break;
107: }
108: }
109:
110: } catch (JoftiException e) {
111: log.error(
112: "bulk update event: Unable to complete update for tx:"
113: + arg0, e);
114: } finally {
115: base.releaseUpdateLock();
116: }
117: }
118:
119: }
120:
121: /* (non-Javadoc)
122: * @see com.ibm.websphere.objectgrid.plugins.index.MapIndexPlugin#getIndexProxy(com.ibm.websphere.objectgrid.plugins.index.MapIndexInfo)
123: */
124: public Object getIndexProxy(MapIndexInfo arg0) {
125:
126: return new BaseIndexProxy(arg0);
127: }
128:
129: /* (non-Javadoc)
130: * @see com.ibm.websphere.objectgrid.plugins.index.MapIndexPlugin#getName()
131: */
132: public String getName() {
133:
134: return name;
135: }
136:
137: /* (non-Javadoc)
138: * @see com.ibm.websphere.objectgrid.plugins.index.MapIndexPlugin#setAttributeName(java.lang.String)
139: */
140: public void setAttributeName(String arg0) {
141: // NO-OP
142:
143: }
144:
145: /* (non-Javadoc)
146: * @see com.ibm.websphere.objectgrid.plugins.index.MapIndexPlugin#undoBatchUpdate(com.ibm.websphere.objectgrid.TxID, com.ibm.websphere.objectgrid.plugins.LogSequence)
147: */
148: public void undoBatchUpdate(TxID arg0, LogSequence arg1)
149: throws ObjectGridException {
150: if (log.isDebugEnabled()) {
151: log.debug("transaction id:" + arg0 + " number of entries "
152: + arg1.size());
153: }
154: List errors = new ArrayList();
155: if (arg1.isDirty() && arg1.size() > 0) {
156: try {
157: base.acquireUpdateLock();
158:
159: // do deletes first
160: Iterator it = arg1.getAllChanges();
161:
162: while (it.hasNext()) {
163: LogElement element = (LogElement) it.next();
164: try {
165: switch (element.getUndoType().getCode()) {
166: case LogElement.CODE_DELETE:
167: if (log.isDebugEnabled()) {
168: log.debug("delete undo for "
169: + element.getCacheEntry()
170: .getKey());
171: }
172: elementRemoved(element.getCacheEntry()
173: .getKey(), null);
174: break;
175: case LogElement.CODE_INSERT:
176: if (log.isDebugEnabled()) {
177: log.debug("insert undo for "
178: + element.getCacheEntry()
179: .getKey());
180: }
181: elementAdded(element.getCacheEntry()
182: .getKey(), element.getBeforeImage());
183: break;
184: case LogElement.CODE_UPDATE:
185: if (log.isDebugEnabled()) {
186: log.debug("update undo for "
187: + element.getCacheEntry()
188: .getKey());
189: }
190: elementUpdated(element.getCacheEntry()
191: .getKey(), element.getBeforeImage());
192: break;
193: default:
194: if (log.isDebugEnabled()) {
195: log.debug("event ignored for "
196: + element.getCacheEntry()
197: .getKey());
198: }
199: break;
200: }
201: } catch (Exception e) {
202: log.error("error undoing change " + element);
203: errors.add(element.getCacheEntry().getKey());
204: }
205: }
206:
207: } catch (JoftiException e) {
208: log.error(
209: "bulk update event: Unable to complete update for tx:"
210: + arg0, e);
211: } finally {
212: base.releaseUpdateLock();
213: }
214: if (errors.size() > 0) {
215: throw new ObjectGridException(
216: "error undoing transaction changes for:"
217: + errors);
218: }
219: }
220:
221: }
222:
223: /* (non-Javadoc)
224: * @see com.ibm.websphere.objectgrid.plugins.MapEventListener#entryEvicted(java.lang.Object, java.lang.Object)
225: */
226: public void entryEvicted(Object arg0, Object arg1) {
227:
228: try {
229: base.acquireUpdateLock();
230:
231: elementRemoved(arg0, arg1);
232:
233: } catch (JoftiException e) {
234: log.error(
235: "bulk update event: Unable to complete update for tx:"
236: + arg0, e);
237: } finally {
238: base.releaseUpdateLock();
239: }
240:
241: }
242:
243: /* (non-Javadoc)
244: * @see com.ibm.websphere.objectgrid.plugins.MapEventListener#preloadCompleted(java.lang.Throwable)
245: */
246: public void preloadCompleted(Throwable arg0) {
247: // ??
248:
249: }
250:
251: private void elementRemoved(Object key, Object value)
252: throws JoftiException {
253:
254: key = base.decorateKey(key);
255:
256: synchronized (base.getCacheLock(key)) {
257: base.getIndex().removeByKey((Comparable) key);
258: }
259:
260: if (log.isDebugEnabled()) {
261: log.debug("Remove event: removed from index " + key);
262: }
263:
264: }
265:
266: private void elementAdded(Object key, Object value)
267: throws JoftiException {
268:
269: key = base.decorateKey(key);
270:
271: InternalIndex index = base.getIndex();
272:
273: synchronized (base.getCacheLock(key)) {
274: // insert into the index
275: index.insert(key, value);
276: }
277:
278: if (log.isDebugEnabled()) {
279: log.debug("Add Event: entry added to index " + key
280: + " value: " + value);
281: }
282:
283: }
284:
285: private void elementUpdated(Object key, Object value)
286: throws JoftiException {
287:
288: key = base.decorateKey(key);
289:
290: InternalIndex index = base.getIndex();
291:
292: synchronized (base.getCacheLock(key)) {
293: // remove all entries first
294: index.removeByKey(key);
295: // insert into the index
296: index.insert(key, value);
297: }
298:
299: if (log.isDebugEnabled()) {
300: log.debug("Add Event: entry added to index " + key
301: + " value: " + value);
302: }
303:
304: }
305:
306: /**
307: * <p>
308: * The proxy provides the implementation for the object returned from the getIndexProxy method.
309: * The proxy is repsonsible for mediating the queries with the ObjectGridAdapter and dealing with
310: * modification of the committed result set with the current transaction scoped results.
311: * </p>
312: * <p>
313: * A new query run on the proxy results in a temporary index being built on those entries in the transaction
314: * that have been inserted or updated.
315: * </p>
316: * <p>
317: * The general sequence is to:
318: * <ul>
319: * <li>Run the query against the committed index
320: * <li>remove the entries that have been marked as remove in this transaction
321: * <li> build a temporary index on the inserted and updated values
322: * <li>query the temp index
323: * <li>merge the results
324: * <li>apply any of the paging/maxresults logic
325: * </ul>
326: *
327: * @author steve
328: *
329: */
330: class BaseIndexProxy implements Index {
331:
332: MapIndexInfo info = null;
333:
334: BaseIndexProxy(MapIndexInfo info) {
335: this .info = info;
336: }
337:
338: /* (non-Javadoc)
339: * @see com.jofti.api.Index#addQuery(java.lang.String, com.jofti.api.IndexQuery)
340: */
341: public IndexQuery addQuery(String name, IndexQuery query)
342: throws JoftiException {
343:
344: return ((Index) base).addQuery(name, query);
345: }
346:
347: /* (non-Javadoc)
348: * @see com.jofti.api.Index#getQuery(java.lang.String)
349: */
350: public IndexQuery getQuery(String name) {
351: return ((Index) base).getQuery(name);
352: }
353:
354: /* (non-Javadoc)
355: * @see com.jofti.api.Index#query(com.jofti.api.IndexQuery)
356: */
357: public Map query(IndexQuery query) throws JoftiException {
358:
359: if (info == null) {
360: return ((Index) base).query(query);
361: }
362: // decorate the results here
363: List txChanges = info.getTransactionChanges(true);
364:
365: // loop through and build temp index
366: InternalIndex localIndex = GenericIndexFactory
367: .getInstance().createIndex(
368: base.getIndex().getClass().getName(),
369: base.getIndex().getIntrospector(),
370: new Properties(), "local");
371:
372: final ObjectMap objectMap = info.getMap();
373: // query temp index
374: final Map res = ((Index) base).query(query);
375:
376: for (int i = 0; i < txChanges.size(); i++) {
377: LogElement element = (LogElement) txChanges.get(i);
378: if (element.getType().getCode() == LogElement.CODE_DELETE
379: || element.getType().getCode() == LogElement.CODE_UPDATE) {
380: res.remove(element.getCacheEntry().getKey());
381: }
382: if (element.getType().getCode() == LogElement.CODE_INSERT
383: || element.getType().getCode() == LogElement.CODE_UPDATE) {
384: localIndex.insert(element.getCacheEntry().getKey(),
385: element.getCurrentValue());
386: }
387: }
388: // merge results
389: IParsedQuery parsedQuery = (IParsedQuery) base
390: .processQuery(query, base.getIndex()
391: .getParserManager());
392:
393: OpenHashMap localRes = (OpenHashMap) localIndex
394: .query(parsedQuery);
395:
396: // merge results
397: localRes.forEachPair(new ObjectProcedureAdapter() {
398:
399: public boolean apply(Object key, Object value)
400: throws RuntimeException {
401: try {
402: res.put(key, objectMap.get(key));
403: } catch (ObjectGridException e) {
404: log.error(
405: "Failure retrieving value in transaction:"
406: + key, e);
407: }
408: return true;
409: }
410: });
411: if (parsedQuery.getMaxResults() > 0
412: || parsedQuery.getFirstResult() > 0) {
413: return base.limitResults(res, parsedQuery
414: .getFirstResult(), parsedQuery.getMaxResults());
415: } else {
416: return res;
417: }
418: }
419:
420: }
421:
422: }
|