001: /**
002: * Copyright 2003-2007 Luck Consulting Pty Ltd
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */package net.sf.ehcache.distribution;
016:
017: import net.sf.ehcache.CacheException;
018: import net.sf.ehcache.Ehcache;
019: import net.sf.ehcache.Element;
020: import net.sf.ehcache.Status;
021: import org.apache.commons.logging.Log;
022: import org.apache.commons.logging.LogFactory;
023:
024: import java.io.Serializable;
025: import java.rmi.UnmarshalException;
026: import java.util.ArrayList;
027: import java.util.LinkedList;
028: import java.util.List;
029:
030: /**
031: * Listens to {@link net.sf.ehcache.CacheManager} and {@link net.sf.ehcache.Cache} events and propagates those to
032: * {@link CachePeer} peers of the Cache asynchronously.
033: * <p/>
034: * Updates are guaranteed to be replicated in the order in which they are received.
035: * <p/>
036: * While much faster in operation than {@link RMISynchronousCacheReplicator}, it does suffer from a number
037: * of problems. Elements, which may be being spooled to DiskStore may stay around in memory because references
038: * are being held to them from {@link EventMessage}s which are queued up. The replication thread runs once
039: * per second, limiting the build up. However a lot of elements can be put into a cache in that time. We do not want
040: * to get an {@link OutOfMemoryError} using distribution in circumstances when it would not happen if we were
041: * just using the DiskStore.
042: * <p/>
043: * Accordingly, the Element values in {@link EventMessage}s are held by {@link java.lang.ref.SoftReference} in the queue,
044: * so that they can be discarded if required by the GC to avoid an {@link OutOfMemoryError}. A log message
045: * will be issued on each flush of the queue if there were any forced discards. One problem with GC collection
046: * of SoftReferences is that the VM (JDK1.5 anyway) will do that rather than grow the heap size to the maximum.
047: * The workaround is to either set minimum heap size to the maximum heap size to force heap allocation at start
048: * up, or put up with a few lost messages while the heap grows.
049: *
050: * @author Greg Luck
051: * @version $Id: RMIAsynchronousCacheReplicator.java 556 2007-10-29 02:06:30Z gregluck $
052: */
053: public class RMIAsynchronousCacheReplicator extends
054: RMISynchronousCacheReplicator {
055:
056: private static final Log LOG = LogFactory
057: .getLog(RMIAsynchronousCacheReplicator.class.getName());
058:
059: /**
060: * A thread which handles replication, so that replication can take place asynchronously and not hold up the cache
061: */
062: protected Thread replicationThread = new ReplicationThread();
063:
064: /**
065: * The amount of time the replication thread sleeps after it detects the replicationQueue is empty
066: * before checking again.
067: */
068: protected int asynchronousReplicationInterval;
069:
070: /**
071: * A queue of updates.
072: */
073: protected final List replicationQueue = new LinkedList();
074:
075: /**
076: * Constructor for internal and subclass use
077: *
078: * @param replicatePuts
079: * @param replicateUpdates
080: * @param replicateUpdatesViaCopy
081: * @param replicateRemovals
082: * @param asynchronousReplicationInterval
083: *
084: */
085: public RMIAsynchronousCacheReplicator(boolean replicatePuts,
086: boolean replicateUpdates, boolean replicateUpdatesViaCopy,
087: boolean replicateRemovals,
088: int asynchronousReplicationInterval) {
089: super (replicatePuts, replicateUpdates, replicateUpdatesViaCopy,
090: replicateRemovals);
091: this .asynchronousReplicationInterval = asynchronousReplicationInterval;
092: status = Status.STATUS_ALIVE;
093: replicationThread.start();
094: }
095:
096: /**
097: * RemoteDebugger method for the replicationQueue thread.
098: * <p/>
099: * Note that the replicationQueue thread locks the cache for the entire time it is writing elements to the disk.
100: */
101: private void replicationThreadMain() {
102: while (true) {
103: // Wait for elements in the replicationQueue
104: while (alive() && replicationQueue != null
105: && replicationQueue.size() == 0) {
106: try {
107: Thread.sleep(asynchronousReplicationInterval);
108: } catch (InterruptedException e) {
109: LOG.debug("Spool Thread interrupted.");
110: return;
111: }
112: }
113: if (notAlive()) {
114: return;
115: }
116: try {
117: if (replicationQueue.size() != 0) {
118: flushReplicationQueue();
119: }
120: } catch (Throwable e) {
121: LOG.warn("Exception on flushing of replication queue: "
122: + e.getMessage() + ". Continuing...", e);
123: }
124: }
125: }
126:
127: /**
128: * {@inheritDoc}
129: * <p/>
130: * This implementation queues the put notification for in-order replication to peers.
131: *
132: * @param cache the cache emitting the notification
133: * @param element the element which was just put into the cache.
134: */
135: public final void notifyElementPut(final Ehcache cache,
136: final Element element) throws CacheException {
137: if (notAlive()) {
138: return;
139: }
140:
141: if (!replicatePuts) {
142: return;
143: }
144:
145: if (!element.isSerializable()) {
146: if (LOG.isWarnEnabled()) {
147: LOG
148: .warn("Object with key "
149: + element.getObjectKey()
150: + " is not Serializable and cannot be replicated");
151: }
152: return;
153: }
154: addToReplicationQueue(new CacheEventMessage(EventMessage.PUT,
155: cache, element, null));
156: }
157:
158: /**
159: * Called immediately after an element has been put into the cache and the element already
160: * existed in the cache. This is thus an update.
161: * <p/>
162: * The {@link net.sf.ehcache.Cache#put(net.sf.ehcache.Element)} method
163: * will block until this method returns.
164: * <p/>
165: * Implementers may wish to have access to the Element's fields, including value, so the element is provided.
166: * Implementers should be careful not to modify the element. The effect of any modifications is undefined.
167: *
168: * @param cache the cache emitting the notification
169: * @param element the element which was just put into the cache.
170: */
171: public final void notifyElementUpdated(final Ehcache cache,
172: final Element element) throws CacheException {
173: if (notAlive()) {
174: return;
175: }
176: if (!replicateUpdates) {
177: return;
178: }
179:
180: if (replicateUpdatesViaCopy) {
181: if (!element.isSerializable()) {
182: if (LOG.isWarnEnabled()) {
183: LOG
184: .warn("Object with key "
185: + element.getObjectKey()
186: + " is not Serializable and cannot be updated via copy");
187: }
188: return;
189: }
190: addToReplicationQueue(new CacheEventMessage(
191: EventMessage.PUT, cache, element, null));
192: } else {
193: if (!element.isKeySerializable()) {
194: if (LOG.isWarnEnabled()) {
195: LOG
196: .warn("Key "
197: + element.getObjectKey()
198: + " is not Serializable and cannot be replicated.");
199: }
200: return;
201: }
202: addToReplicationQueue(new CacheEventMessage(
203: EventMessage.REMOVE, cache, null, element.getKey()));
204: }
205: }
206:
207: /**
208: * Called immediately after an attempt to remove an element. The remove method will block until
209: * this method returns.
210: * <p/>
211: * This notification is received regardless of whether the cache had an element matching
212: * the removal key or not. If an element was removed, the element is passed to this method,
213: * otherwise a synthetic element, with only the key set is passed in.
214: * <p/>
215: *
216: * @param cache the cache emitting the notification
217: * @param element the element just deleted, or a synthetic element with just the key set if
218: * no element was removed.
219: */
220: public final void notifyElementRemoved(final Ehcache cache,
221: final Element element) throws CacheException {
222: if (notAlive()) {
223: return;
224: }
225:
226: if (!replicateRemovals) {
227: return;
228: }
229:
230: if (!element.isKeySerializable()) {
231: if (LOG.isWarnEnabled()) {
232: LOG
233: .warn("Key "
234: + element.getObjectKey()
235: + " is not Serializable and cannot be replicated.");
236: }
237: return;
238: }
239: addToReplicationQueue(new CacheEventMessage(
240: EventMessage.REMOVE, cache, null, element.getKey()));
241: }
242:
243: /**
244: * Called during {@link net.sf.ehcache.Ehcache#removeAll()} to indicate that the all
245: * elements have been removed from the cache in a bulk operation. The usual
246: * {@link #notifyElementRemoved(net.sf.ehcache.Ehcache,net.sf.ehcache.Element)}
247: * is not called.
248: * <p/>
249: * This notification exists because clearing a cache is a special case. It is often
250: * not practical to serially process notifications where potentially millions of elements
251: * have been bulk deleted.
252: *
253: * @param cache the cache emitting the notification
254: */
255: public void notifyRemoveAll(final Ehcache cache) {
256: if (notAlive()) {
257: return;
258: }
259:
260: if (!replicateRemovals) {
261: return;
262: }
263:
264: addToReplicationQueue(new CacheEventMessage(
265: EventMessage.REMOVE_ALL, cache, null, null));
266: }
267:
268: /**
269: * Adds a message to the queue.
270: * <p/>
271: * This method checks the state of the replication thread and warns
272: * if it has stopped and then discards the message.
273: *
274: * @param cacheEventMessage
275: */
276: protected void addToReplicationQueue(
277: CacheEventMessage cacheEventMessage) {
278: if (!replicationThread.isAlive()) {
279: LOG
280: .error("CacheEventMessages cannot be added to the replication queue"
281: + " because the replication thread has died.");
282: } else {
283: synchronized (replicationQueue) {
284: replicationQueue.add(cacheEventMessage);
285: }
286: }
287: }
288:
289: /**
290: * Gets called once per {@link #asynchronousReplicationInterval}.
291: * <p/>
292: * Sends accumulated messages in bulk to each peer. i.e. if ther are 100 messages and 1 peer,
293: * 1 RMI invocation results, not 100. Also, if a peer is unavailable this is discovered in only 1 try.
294: * <p/>
295: * Makes a copy of the queue so as not to hold up the enqueue operations.
296: * <p/>
297: * Any exceptions are caught so that the replication thread does not die, and because errors are expected,
298: * due to peers becoming unavailable.
299: * <p/>
300: * This method issues warnings for problems that can be fixed with configuration changes.
301: */
302: private void flushReplicationQueue() {
303: List replicationQueueCopy;
304: synchronized (replicationQueue) {
305: if (replicationQueue.size() == 0) {
306: return;
307: }
308:
309: replicationQueueCopy = new ArrayList(replicationQueue);
310: replicationQueue.clear();
311: }
312:
313: Ehcache cache = ((CacheEventMessage) replicationQueueCopy
314: .get(0)).cache;
315: List cachePeers = listRemoteCachePeers(cache);
316:
317: List resolvedEventMessages = extractAndResolveEventMessages(replicationQueueCopy);
318:
319: for (int j = 0; j < cachePeers.size(); j++) {
320: CachePeer cachePeer = (CachePeer) cachePeers.get(j);
321: try {
322: cachePeer.send(resolvedEventMessages);
323: } catch (UnmarshalException e) {
324: String message = e.getMessage();
325: if (message.indexOf("Read time out") != 0) {
326: LOG
327: .warn("Unable to send message to remote peer due to socket read timeout. Consider increasing"
328: + " the socketTimeoutMillis setting in the cacheManagerPeerListenerFactory. "
329: + "Message was: " + e.getMessage());
330: } else {
331: LOG
332: .debug("Unable to send message to remote peer. Message was: "
333: + e.getMessage());
334: }
335: } catch (Throwable t) {
336: LOG.warn(
337: "Unable to send message to remote peer. Message was: "
338: + t.getMessage(), t);
339: }
340: }
341: if (LOG.isWarnEnabled()) {
342: int eventMessagesNotResolved = replicationQueueCopy.size()
343: - resolvedEventMessages.size();
344: if (eventMessagesNotResolved > 0) {
345: LOG
346: .warn(eventMessagesNotResolved
347: + " messages were discarded on replicate due to reclamation of "
348: + "SoftReferences by the VM. Consider increasing the maximum heap size and/or setting the "
349: + "starting heap size to a higher value.");
350: }
351:
352: }
353: }
354:
355: /**
356: * Extracts CacheEventMessages and attempts to get a hard reference to the underlying EventMessage
357: * <p/>
358: * If an EventMessage has been invalidated due to SoftReference collection of the Element, it is not
359: * propagated. This only affects puts and updates via copy.
360: *
361: * @param replicationQueueCopy
362: * @return a list of EventMessages which were able to be resolved
363: */
364: private static List extractAndResolveEventMessages(
365: List replicationQueueCopy) {
366: List list = new ArrayList();
367: for (int i = 0; i < replicationQueueCopy.size(); i++) {
368: EventMessage eventMessage = ((CacheEventMessage) replicationQueueCopy
369: .get(i)).getEventMessage();
370: if (eventMessage != null && eventMessage.isValid()) {
371: list.add(eventMessage);
372: }
373: }
374: return list;
375: }
376:
377: /**
378: * A background daemon thread that writes objects to the file.
379: */
380: private final class ReplicationThread extends Thread {
381: public ReplicationThread() {
382: super ("Replication Thread");
383: setDaemon(true);
384: setPriority(Thread.NORM_PRIORITY);
385: }
386:
387: /**
388: * RemoteDebugger thread method.
389: */
390: public final void run() {
391: replicationThreadMain();
392: }
393: }
394:
395: /**
396: * A wrapper around an EventMessage, which enables the element to be enqueued along with
397: * what is to be done with it.
398: * <p/>
399: * The wrapper holds a {@link java.lang.ref.SoftReference} to the {@link EventMessage}, so that the queue is never
400: * the cause of an {@link OutOfMemoryError}
401: */
402: private static class CacheEventMessage {
403:
404: private final Ehcache cache;
405: private final EventMessage eventMessage;
406:
407: public CacheEventMessage(int event, Ehcache cache,
408: Element element, Serializable key) {
409: eventMessage = new EventMessage(event, key, element);
410: this .cache = cache;
411: }
412:
413: /**
414: * Gets the component EventMessage
415: */
416: public final EventMessage getEventMessage() {
417: return eventMessage;
418: }
419:
420: }
421:
422: /**
423: * Give the replicator a chance to flush the replication queue, then cleanup and free resources when no longer needed
424: */
425: public final void dispose() {
426: status = Status.STATUS_SHUTDOWN;
427: flushReplicationQueue();
428: }
429:
430: /**
431: * Creates a clone of this listener. This method will only be called by ehcache before a cache is initialized.
432: * <p/>
433: * This may not be possible for listeners after they have been initialized. Implementations should throw
434: * CloneNotSupportedException if they do not support clone.
435: *
436: * @return a clone
437: * @throws CloneNotSupportedException if the listener could not be cloned.
438: */
439: public Object clone() throws CloneNotSupportedException {
440: //shutup checkstyle
441: super .clone();
442: return new RMIAsynchronousCacheReplicator(replicatePuts,
443: replicateUpdates, replicateUpdatesViaCopy,
444: replicateRemovals, asynchronousReplicationInterval);
445: }
446:
447: }
|