001: /**
002: * Copyright 2003-2007 Luck Consulting Pty Ltd
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */package net.sf.ehcache.distribution;
016:
017: import net.sf.ehcache.CacheException;
018: import net.sf.ehcache.Ehcache;
019: import net.sf.ehcache.Element;
020: import net.sf.ehcache.Status;
021: import org.apache.commons.logging.Log;
022: import org.apache.commons.logging.LogFactory;
023:
024: import java.io.Serializable;
025: import java.util.List;
026:
027: /**
028: * Listens to {@link net.sf.ehcache.CacheManager} and {@link net.sf.ehcache.Cache} events and propagates those to
029: * {@link CachePeer} peers of the Cache.
030: *
031: * @author Greg Luck
032: * @version $Id: RMISynchronousCacheReplicator.java 519 2007-07-27 07:11:45Z gregluck $
033: */
034: public class RMISynchronousCacheReplicator implements CacheReplicator {
035:
036: private static final Log LOG = LogFactory
037: .getLog(RMISynchronousCacheReplicator.class.getName());
038:
039: /**
040: * The status of the replicator. Only replicates when <code>STATUS_ALIVE</code>
041: */
042: protected Status status;
043:
044: /**
045: * Whether to replicate puts.
046: */
047: protected final boolean replicatePuts;
048:
049: /**
050: * Whether to replicate updates.
051: */
052: protected final boolean replicateUpdates;
053:
054: /**
055: * Whether an update (a put) should be by copy or by invalidation, (a remove).
056: * <p/>
057: * By copy is best when the entry is expensive to produce. By invalidation is best when
058: * we are really trying to force other caches to sync back to a canonical source like a database.
059: * An example of a latter usage would be a read/write cache being used in Hibernate.
060: * <p/>
061: * This setting only has effect if <code>#replicateUpdates</code> is true.
062: */
063: protected final boolean replicateUpdatesViaCopy;
064:
065: /**
066: * Whether to replicate removes
067: */
068: protected final boolean replicateRemovals;
069:
070: /**
071: * Constructor for internal and subclass use
072: *
073: * @param replicatePuts
074: * @param replicateUpdates
075: * @param replicateUpdatesViaCopy
076: * @param replicateRemovals
077: */
078: public RMISynchronousCacheReplicator(boolean replicatePuts,
079: boolean replicateUpdates, boolean replicateUpdatesViaCopy,
080: boolean replicateRemovals) {
081: this .replicatePuts = replicatePuts;
082: this .replicateUpdates = replicateUpdates;
083: this .replicateUpdatesViaCopy = replicateUpdatesViaCopy;
084: this .replicateRemovals = replicateRemovals;
085: status = Status.STATUS_ALIVE;
086: }
087:
088: /**
089: * Called immediately after an element has been put into the cache. The {@link net.sf.ehcache.Cache#put(net.sf.ehcache.Element)} method
090: * will block until this method returns.
091: * <p/>
092: * Implementers may wish to have access to the Element's fields, including value, so the element is provided.
093: * Implementers should be careful not to modify the element. The effect of any modifications is undefined.
094: *
095: * @param cache the cache emitting the notification
096: * @param element the element which was just put into the cache.
097: */
098: public void notifyElementPut(final Ehcache cache,
099: final Element element) throws CacheException {
100: if (notAlive()) {
101: return;
102: }
103:
104: if (!replicatePuts) {
105: return;
106: }
107:
108: if (!element.isSerializable()) {
109: if (LOG.isWarnEnabled()) {
110: LOG
111: .warn("Object with key "
112: + element.getObjectKey()
113: + " is not Serializable and cannot be replicated");
114: }
115: return;
116: }
117:
118: replicatePutNotification(cache, element);
119: }
120:
121: /**
122: * Does the actual RMI remote call
123: *
124: * @param element
125: * @param cache
126: * @throws RemoteCacheException if anything goes wrong with the remote call
127: */
128: private static void replicatePutNotification(Ehcache cache,
129: Element element) throws RemoteCacheException {
130: List cachePeers = listRemoteCachePeers(cache);
131: for (int i = 0; i < cachePeers.size(); i++) {
132: CachePeer cachePeer = (CachePeer) cachePeers.get(i);
133: try {
134: cachePeer.put(element);
135: } catch (Throwable t) {
136: throw new RemoteCacheException(
137: "Error doing put to remote peer. Message was: "
138: + t.getMessage());
139: }
140: }
141: }
142:
143: /**
144: * Called immediately after an element has been put into the cache and the element already
145: * existed in the cache. This is thus an update.
146: * <p/>
147: * The {@link net.sf.ehcache.Cache#put(net.sf.ehcache.Element)} method
148: * will block until this method returns.
149: * <p/>
150: * Implementers may wish to have access to the Element's fields, including value, so the element is provided.
151: * Implementers should be careful not to modify the element. The effect of any modifications is undefined.
152: *
153: * @param cache the cache emitting the notification
154: * @param element the element which was just put into the cache.
155: */
156: public void notifyElementUpdated(final Ehcache cache,
157: final Element element) throws CacheException {
158: if (notAlive()) {
159: return;
160: }
161: if (!replicateUpdates) {
162: return;
163: }
164:
165: if (replicateUpdatesViaCopy) {
166: if (!element.isSerializable()) {
167: if (LOG.isWarnEnabled()) {
168: LOG
169: .warn("Object with key "
170: + element.getObjectKey()
171: + " is not Serializable and cannot be updated via copy");
172: }
173: return;
174: }
175:
176: replicatePutNotification(cache, element);
177: } else {
178: if (!element.isKeySerializable()) {
179: if (LOG.isWarnEnabled()) {
180: LOG
181: .warn("Key "
182: + element.getObjectKey()
183: + " is not Serializable and cannot be replicated.");
184: }
185: return;
186: }
187:
188: replicateRemovalNotification(cache, (Serializable) element
189: .getObjectKey());
190: }
191: }
192:
193: /**
194: * Called immediately after an attempt to remove an element. The remove method will block until
195: * this method returns.
196: * <p/>
197: * This notification is received regardless of whether the cache had an element matching
198: * the removal key or not. If an element was removed, the element is passed to this method,
199: * otherwise a synthetic element, with only the key set is passed in.
200: * <p/>
201: *
202: * @param cache the cache emitting the notification
203: * @param element the element just deleted, or a synthetic element with just the key set if
204: * no element was removed.param element just deleted
205: */
206: public void notifyElementRemoved(final Ehcache cache,
207: final Element element) throws CacheException {
208: if (notAlive()) {
209: return;
210: }
211:
212: if (!replicateRemovals) {
213: return;
214: }
215:
216: if (!element.isKeySerializable()) {
217: if (LOG.isWarnEnabled()) {
218: LOG
219: .warn("Key "
220: + element.getObjectKey()
221: + " is not Serializable and cannot be replicated.");
222: }
223: return;
224: }
225:
226: replicateRemovalNotification(cache, (Serializable) element
227: .getObjectKey());
228: }
229:
230: /**
231: * Does the actual RMI remote call
232: *
233: * @param key
234: * @param cache
235: * @throws RemoteCacheException if anything goes wrong with the remote call
236: */
237: private static void replicateRemovalNotification(Ehcache cache,
238: Serializable key) throws RemoteCacheException {
239: List cachePeers = listRemoteCachePeers(cache);
240: for (int i = 0; i < cachePeers.size(); i++) {
241: CachePeer cachePeer = (CachePeer) cachePeers.get(i);
242: try {
243: cachePeer.remove(key);
244: } catch (Throwable e) {
245: throw new RemoteCacheException(
246: "Error doing remove to remote peer. Message was: "
247: + e.getMessage());
248: }
249: }
250: }
251:
252: /**
253: * {@inheritDoc}
254: * <p/>
255: * This implementation does not propagate expiries. It does not need to do anything because the element will
256: * expire in the remote cache at the same time. If the remote peer is not configured the same way they should
257: * not be in an cache cluster.
258: */
259: public final void notifyElementExpired(final Ehcache cache,
260: final Element element) {
261: /*do not propagate expiries. The element should expire in the remote cache at the same time, thus
262: preseerving coherency.
263: */
264: }
265:
266: /**
267: * Called immediately after an element is evicted from the cache. Evicted in this sense
268: * means evicted from one store and not moved to another, so that it exists nowhere in the
269: * local cache.
270: * <p/>
271: * In a sense the Element has been <i>removed</i> from the cache, but it is different,
272: * thus the separate notification.
273: * <p/>
274: * This replicator does not propagate these events
275: *
276: * @param cache the cache emitting the notification
277: * @param element the element that has just been evicted
278: */
279: public void notifyElementEvicted(final Ehcache cache,
280: final Element element) {
281: /**
282: * do not notify these
283: */
284: }
285:
286: /**
287: * Called during {@link net.sf.ehcache.Ehcache#removeAll()} to indicate that the all
288: * elements have been removed from the cache in a bulk operation. The usual
289: * {@link #notifyElementRemoved(net.sf.ehcache.Ehcache,net.sf.ehcache.Element)}
290: * is not called.
291: * <p/>
292: * This notification exists because clearing a cache is a special case. It is often
293: * not practical to serially process notifications where potentially millions of elements
294: * have been bulk deleted.
295: *
296: * @param cache the cache emitting the notification
297: */
298: public void notifyRemoveAll(final Ehcache cache) {
299: if (notAlive()) {
300: return;
301: }
302:
303: if (!replicateRemovals) {
304: return;
305: }
306:
307: replicateRemoveAllNotification(cache);
308: }
309:
310: private void replicateRemoveAllNotification(Ehcache cache) {
311: List cachePeers = listRemoteCachePeers(cache);
312: for (int i = 0; i < cachePeers.size(); i++) {
313: CachePeer cachePeer = (CachePeer) cachePeers.get(i);
314: try {
315: cachePeer.removeAll();
316: } catch (Throwable e) {
317: throw new RemoteCacheException(
318: "Error doing removeAll to remote peer. Message was: "
319: + e.getMessage());
320: }
321: }
322: }
323:
324: /**
325: * Package protected List of cache peers
326: *
327: * @param cache
328: * @return a list of {@link CachePeer} peers for the given cache, excluding the local peer.
329: */
330: static List listRemoteCachePeers(Ehcache cache) {
331: CacheManagerPeerProvider provider = cache.getCacheManager()
332: .getCachePeerProvider();
333: return provider.listRemoteCachePeers(cache);
334: }
335:
336: /**
337: * @return whether update is through copy or invalidate
338: */
339: public final boolean isReplicateUpdatesViaCopy() {
340: return replicateUpdatesViaCopy;
341: }
342:
343: /**
344: * Asserts that the replicator is active.
345: *
346: * @return true if the status is not STATUS_ALIVE
347: */
348: public final boolean notAlive() {
349: return !alive();
350: }
351:
352: /**
353: * Checks that the replicator is is <code>STATUS_ALIVE</code>.
354: */
355: public final boolean alive() {
356: if (status == null) {
357: return false;
358: } else {
359: return (status.equals(Status.STATUS_ALIVE));
360: }
361: }
362:
363: /**
364: * Give the replicator a chance to cleanup and free resources when no longer needed
365: */
366: public void dispose() {
367: status = Status.STATUS_SHUTDOWN;
368: }
369:
370: /**
371: * Creates a clone of this listener. This method will only be called by ehcache before a cache is initialized.
372: * <p/>
373: * This may not be possible for listeners after they have been initialized. Implementations should throw
374: * CloneNotSupportedException if they do not support clone.
375: *
376: * @return a clone
377: * @throws CloneNotSupportedException if the listener could not be cloned.
378: */
379: public Object clone() throws CloneNotSupportedException {
380: //shutup checkstyle
381: super .clone();
382: return new RMISynchronousCacheReplicator(replicatePuts,
383: replicateUpdates, replicateUpdatesViaCopy,
384: replicateRemovals);
385: }
386: }
|