001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.catalina.tribes.tipis;
018:
019: import java.io.Serializable;
020:
021: import org.apache.catalina.tribes.Channel;
022: import org.apache.catalina.tribes.ChannelException;
023: import org.apache.catalina.tribes.ChannelListener;
024: import org.apache.catalina.tribes.Member;
025: import org.apache.catalina.tribes.MembershipListener;
026: import org.apache.catalina.tribes.group.RpcCallback;
027:
028: /**
029: * All-to-all replication for a hash map implementation. Each node in the cluster will carry an identical
030: * copy of the map.<br><br>
031: * This map implementation doesn't have a background thread running to replicate changes.
032: * If you do have changes without invoking put/remove then you need to invoke one of the following methods:
033: * <ul>
034: * <li><code>replicate(Object,boolean)</code> - replicates only the object that belongs to the key</li>
035: * <li><code>replicate(boolean)</code> - Scans the entire map for changes and replicates data</li>
036: * </ul>
037: * the <code>boolean</code> value in the <code>replicate</code> method used to decide
038: * whether to only replicate objects that implement the <code>ReplicatedMapEntry</code> interface
039: * or to replicate all objects. If an object doesn't implement the <code>ReplicatedMapEntry</code> interface
040: * each time the object gets replicated the entire object gets serialized, hence a call to <code>replicate(true)</code>
041: * will replicate all objects in this map that are using this node as primary.
042: *
043: * <br><br><b>REMBER TO CALL <code>breakdown()</code> or <code>finalize()</code> when you are done with the map to
044: * avoid memory leaks.<br><br>
045: * @todo implement periodic sync/transfer thread
046: * @author Filip Hanik
047: * @version 1.0
048: *
049: * @todo memberDisappeared, should do nothing except change map membership
050: * by default it relocates the primary objects
051: */
052: public class ReplicatedMap extends AbstractReplicatedMap implements
053: RpcCallback, ChannelListener, MembershipListener {
054:
055: protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory
056: .getLog(ReplicatedMap.class);
057:
058: //------------------------------------------------------------------------------
059: // CONSTRUCTORS / DESTRUCTORS
060: //------------------------------------------------------------------------------
061: /**
062: * Creates a new map
063: * @param channel The channel to use for communication
064: * @param timeout long - timeout for RPC messags
065: * @param mapContextName String - unique name for this map, to allow multiple maps per channel
066: * @param initialCapacity int - the size of this map, see HashMap
067: * @param loadFactor float - load factor, see HashMap
068: */
069: public ReplicatedMap(Object owner, Channel channel, long timeout,
070: String mapContextName, int initialCapacity,
071: float loadFactor, ClassLoader[] cls) {
072: super (owner, channel, timeout, mapContextName, initialCapacity,
073: loadFactor, Channel.SEND_OPTIONS_DEFAULT, cls);
074: }
075:
076: /**
077: * Creates a new map
078: * @param channel The channel to use for communication
079: * @param timeout long - timeout for RPC messags
080: * @param mapContextName String - unique name for this map, to allow multiple maps per channel
081: * @param initialCapacity int - the size of this map, see HashMap
082: */
083: public ReplicatedMap(Object owner, Channel channel, long timeout,
084: String mapContextName, int initialCapacity,
085: ClassLoader[] cls) {
086: super (owner, channel, timeout, mapContextName, initialCapacity,
087: AbstractReplicatedMap.DEFAULT_LOAD_FACTOR,
088: Channel.SEND_OPTIONS_DEFAULT, cls);
089: }
090:
091: /**
092: * Creates a new map
093: * @param channel The channel to use for communication
094: * @param timeout long - timeout for RPC messags
095: * @param mapContextName String - unique name for this map, to allow multiple maps per channel
096: */
097: public ReplicatedMap(Object owner, Channel channel, long timeout,
098: String mapContextName, ClassLoader[] cls) {
099: super (owner, channel, timeout, mapContextName,
100: AbstractReplicatedMap.DEFAULT_INITIAL_CAPACITY,
101: AbstractReplicatedMap.DEFAULT_LOAD_FACTOR,
102: Channel.SEND_OPTIONS_DEFAULT, cls);
103: }
104:
105: //------------------------------------------------------------------------------
106: // METHODS TO OVERRIDE
107: //------------------------------------------------------------------------------
108: protected int getStateMessageType() {
109: return AbstractReplicatedMap.MapMessage.MSG_STATE_COPY;
110: }
111:
112: /**
113: * publish info about a map pair (key/value) to other nodes in the cluster
114: * @param key Object
115: * @param value Object
116: * @return Member - the backup node
117: * @throws ChannelException
118: */
119: protected Member[] publishEntryInfo(Object key, Object value)
120: throws ChannelException {
121: if (!(key instanceof Serializable && value instanceof Serializable))
122: return new Member[0];
123: //select a backup node
124: Member[] backup = getMapMembers();
125:
126: if (backup == null || backup.length == 0)
127: return null;
128:
129: //publish the data out to all nodes
130: MapMessage msg = new MapMessage(getMapContextName(),
131: MapMessage.MSG_COPY, false, (Serializable) key,
132: (Serializable) value, null, backup);
133:
134: getChannel()
135: .send(getMapMembers(), msg, getChannelSendOptions());
136:
137: return backup;
138: }
139:
140: }
|