001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.catalina.tribes.tipis;
018:
019: import java.io.Serializable;
020:
021: import org.apache.catalina.tribes.Channel;
022: import org.apache.catalina.tribes.ChannelException;
023: import org.apache.catalina.tribes.ChannelListener;
024: import org.apache.catalina.tribes.Member;
025: import org.apache.catalina.tribes.MembershipListener;
026: import org.apache.catalina.tribes.group.RpcCallback;
027: import org.apache.catalina.tribes.util.Arrays;
028: import org.apache.catalina.tribes.UniqueId;
029:
030: /**
031: * A smart implementation of a stateful replicated map. uses primary/secondary backup strategy.
032: * One node is always the primary and one node is always the backup.
033: * This map is synchronized across a cluster, and only has one backup member.<br/>
034: * A perfect usage for this map would be a session map for a session manager in a clustered environment.<br/>
035: * The only way to modify this list is to use the <code>put, putAll, remove</code> methods.
036: * entrySet, entrySetFull, keySet, keySetFull, returns all non modifiable sets.<br><br>
037: * If objects (values) in the map change without invoking <code>put()</code> or <code>remove()</code>
038: * the data can be distributed using two different methods:<br>
039: * <code>replicate(boolean)</code> and <code>replicate(Object, boolean)</code><br>
040: * These two methods are very important two understand. The map can work with two set of value objects:<br>
041: * 1. Serializable - the entire object gets serialized each time it is replicated<br>
042: * 2. ReplicatedMapEntry - this interface allows for a isDirty() flag and to replicate diffs if desired.<br>
043: * Implementing the <code>ReplicatedMapEntry</code> interface allows you to decide what objects
044: * get replicated and how much data gets replicated each time.<br>
045: * If you implement a smart AOP mechanism to detect changes in underlying objects, you can replicate
046: * only those changes by implementing the ReplicatedMapEntry interface, and return true when isDiffable()
047: * is invoked.<br><br>
048: *
049: * This map implementation doesn't have a background thread running to replicate changes.
050: * If you do have changes without invoking put/remove then you need to invoke one of the following methods:
051: * <ul>
052: * <li><code>replicate(Object,boolean)</code> - replicates only the object that belongs to the key</li>
053: * <li><code>replicate(boolean)</code> - Scans the entire map for changes and replicates data</li>
054: * </ul>
055: * the <code>boolean</code> value in the <code>replicate</code> method used to decide
056: * whether to only replicate objects that implement the <code>ReplicatedMapEntry</code> interface
057: * or to replicate all objects. If an object doesn't implement the <code>ReplicatedMapEntry</code> interface
058: * each time the object gets replicated the entire object gets serialized, hence a call to <code>replicate(true)</code>
059: * will replicate all objects in this map that are using this node as primary.
060: *
061: * <br><br><b>REMBER TO CALL <code>breakdown()</code> or <code>finalize()</code> when you are done with the map to
062: * avoid memory leaks.<br><br>
063: * @todo implement periodic sync/transfer thread
064: * @author Filip Hanik
065: * @version 1.0
066: */
067: public class LazyReplicatedMap extends AbstractReplicatedMap implements
068: RpcCallback, ChannelListener, MembershipListener {
069: protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory
070: .getLog(LazyReplicatedMap.class);
071:
072: //------------------------------------------------------------------------------
073: // CONSTRUCTORS / DESTRUCTORS
074: //------------------------------------------------------------------------------
075: /**
076: * Creates a new map
077: * @param channel The channel to use for communication
078: * @param timeout long - timeout for RPC messags
079: * @param mapContextName String - unique name for this map, to allow multiple maps per channel
080: * @param initialCapacity int - the size of this map, see HashMap
081: * @param loadFactor float - load factor, see HashMap
082: */
083: public LazyReplicatedMap(Object owner, Channel channel,
084: long timeout, String mapContextName, int initialCapacity,
085: float loadFactor, ClassLoader[] cls) {
086: super (owner, channel, timeout, mapContextName, initialCapacity,
087: loadFactor, Channel.SEND_OPTIONS_DEFAULT, cls);
088: }
089:
090: /**
091: * Creates a new map
092: * @param channel The channel to use for communication
093: * @param timeout long - timeout for RPC messags
094: * @param mapContextName String - unique name for this map, to allow multiple maps per channel
095: * @param initialCapacity int - the size of this map, see HashMap
096: */
097: public LazyReplicatedMap(Object owner, Channel channel,
098: long timeout, String mapContextName, int initialCapacity,
099: ClassLoader[] cls) {
100: super (owner, channel, timeout, mapContextName, initialCapacity,
101: LazyReplicatedMap.DEFAULT_LOAD_FACTOR,
102: Channel.SEND_OPTIONS_DEFAULT, cls);
103: }
104:
105: /**
106: * Creates a new map
107: * @param channel The channel to use for communication
108: * @param timeout long - timeout for RPC messags
109: * @param mapContextName String - unique name for this map, to allow multiple maps per channel
110: */
111: public LazyReplicatedMap(Object owner, Channel channel,
112: long timeout, String mapContextName, ClassLoader[] cls) {
113: super (owner, channel, timeout, mapContextName,
114: LazyReplicatedMap.DEFAULT_INITIAL_CAPACITY,
115: LazyReplicatedMap.DEFAULT_LOAD_FACTOR,
116: Channel.SEND_OPTIONS_DEFAULT, cls);
117: }
118:
119: //------------------------------------------------------------------------------
120: // METHODS TO OVERRIDE
121: //------------------------------------------------------------------------------
122: protected int getStateMessageType() {
123: return AbstractReplicatedMap.MapMessage.MSG_STATE;
124: }
125:
126: /**
127: * publish info about a map pair (key/value) to other nodes in the cluster
128: * @param key Object
129: * @param value Object
130: * @return Member - the backup node
131: * @throws ChannelException
132: */
133: protected Member[] publishEntryInfo(Object key, Object value)
134: throws ChannelException {
135: if (!(key instanceof Serializable && value instanceof Serializable))
136: return new Member[0];
137: Member[] members = getMapMembers();
138: int firstIdx = getNextBackupIndex();
139: int nextIdx = firstIdx;
140: Member[] backup = new Member[0];
141:
142: //there are no backups
143: if (members.length == 0 || firstIdx == -1)
144: return backup;
145:
146: boolean success = false;
147: do {
148: //select a backup node
149: Member next = members[nextIdx];
150:
151: //increment for the next round of back up selection
152: nextIdx = nextIdx + 1;
153: if (nextIdx >= members.length)
154: nextIdx = 0;
155:
156: if (next == null) {
157: continue;
158: }
159: MapMessage msg = null;
160: try {
161: backup = wrap(next);
162: //publish the backup data to one node
163: msg = new MapMessage(getMapContextName(),
164: MapMessage.MSG_BACKUP, false,
165: (Serializable) key, (Serializable) value, null,
166: backup);
167: if (log.isTraceEnabled())
168: log.trace("Publishing backup data:" + msg + " to: "
169: + next.getName());
170: UniqueId id = getChannel().send(backup, msg,
171: getChannelSendOptions());
172: if (log.isTraceEnabled())
173: log
174: .trace("Data published:" + msg + " msg Id:"
175: + id);
176: //we published out to a backup, mark the test success
177: success = true;
178: } catch (ChannelException x) {
179: log.error("Unable to replicate backup key:" + key
180: + " to backup:" + next + ". Reason:"
181: + x.getMessage(), x);
182: }
183: try {
184: //publish the data out to all nodes
185: Member[] proxies = excludeFromSet(backup,
186: getMapMembers());
187: if (success && proxies.length > 0) {
188: msg = new MapMessage(getMapContextName(),
189: MapMessage.MSG_PROXY, false,
190: (Serializable) key, null, null, backup);
191: if (log.isTraceEnabled())
192: log.trace("Publishing proxy data:" + msg
193: + " to: "
194: + Arrays.toNameString(proxies));
195: getChannel().send(proxies, msg,
196: getChannelSendOptions());
197: }
198: } catch (ChannelException x) {
199: //log the error, but proceed, this should only happen if a node went down,
200: //and if the node went down, then it can't receive the message, the others
201: //should still get it.
202: log.error("Unable to replicate proxy key:" + key
203: + " to backup:" + next + ". Reason:"
204: + x.getMessage(), x);
205: }
206: } while (!success && (firstIdx != nextIdx));
207: return backup;
208: }
209:
210: }
|