0001: /*
0002: * Licensed to the Apache Software Foundation (ASF) under one or more
0003: * contributor license agreements. See the NOTICE file distributed with
0004: * this work for additional information regarding copyright ownership.
0005: * The ASF licenses this file to You under the Apache License, Version 2.0
0006: * (the "License"); you may not use this file except in compliance with
0007: * the License. You may obtain a copy of the License at
0008: *
0009: * http://www.apache.org/licenses/LICENSE-2.0
0010: *
0011: * Unless required by applicable law or agreed to in writing, software
0012: * distributed under the License is distributed on an "AS IS" BASIS,
0013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014: * See the License for the specific language governing permissions and
0015: * limitations under the License.
0016: */
0017:
0018: package org.apache.catalina.tribes.tipis;
0019:
0020: import java.io.IOException;
0021: import java.io.ObjectInput;
0022: import java.io.ObjectOutput;
0023: import java.io.Serializable;
0024: import java.io.UnsupportedEncodingException;
0025: import java.util.ArrayList;
0026: import java.util.Collection;
0027: import java.util.Collections;
0028: import java.util.HashMap;
0029: import java.util.Iterator;
0030: import java.util.LinkedHashSet;
0031: import java.util.Map;
0032: import java.util.Set;
0033:
0034: import org.apache.catalina.tribes.Channel;
0035: import org.apache.catalina.tribes.ChannelException;
0036: import org.apache.catalina.tribes.ChannelListener;
0037: import org.apache.catalina.tribes.Heartbeat;
0038: import org.apache.catalina.tribes.Member;
0039: import org.apache.catalina.tribes.MembershipListener;
0040: import org.apache.catalina.tribes.group.Response;
0041: import org.apache.catalina.tribes.group.RpcCallback;
0042: import org.apache.catalina.tribes.group.RpcChannel;
0043: import org.apache.catalina.tribes.io.XByteBuffer;
0044: import org.apache.catalina.tribes.membership.MemberImpl;
0045: import org.apache.catalina.tribes.util.Arrays;
0046: import org.apache.juli.logging.Log;
0047: import org.apache.juli.logging.LogFactory;
0048: import java.util.concurrent.ConcurrentHashMap;
0049:
0050: /**
0051: *
0052: * @author Filip Hanik
0053: * @version 1.0
0054: */
0055: public abstract class AbstractReplicatedMap extends ConcurrentHashMap
0056: implements RpcCallback, ChannelListener, MembershipListener,
0057: Heartbeat {
0058: protected static Log log = LogFactory
0059: .getLog(AbstractReplicatedMap.class);
0060:
0061: /**
0062: * The default initial capacity - MUST be a power of two.
0063: */
0064: public static final int DEFAULT_INITIAL_CAPACITY = 16;
0065:
0066: /**
0067: * The load factor used when none specified in constructor.
0068: **/
0069: public static final float DEFAULT_LOAD_FACTOR = 0.75f;
0070:
0071: /**
0072: * Used to identify the map
0073: */
0074: final String chset = "ISO-8859-1";
0075:
0076: //------------------------------------------------------------------------------
0077: // INSTANCE VARIABLES
0078: //------------------------------------------------------------------------------
0079: protected abstract int getStateMessageType();
0080:
0081: /**
0082: * Timeout for RPC messages, how long we will wait for a reply
0083: */
0084: protected transient long rpcTimeout = 5000;
0085: /**
0086: * Reference to the channel for sending messages
0087: */
0088: protected transient Channel channel;
0089: /**
0090: * The RpcChannel to send RPC messages through
0091: */
0092: protected transient RpcChannel rpcChannel;
0093: /**
0094: * The Map context name makes this map unique, this
0095: * allows us to have more than one map shared
0096: * through one channel
0097: */
0098: protected transient byte[] mapContextName;
0099: /**
0100: * Has the state been transferred
0101: */
0102: protected transient boolean stateTransferred = false;
0103: /**
0104: * Simple lock object for transfers
0105: */
0106: protected transient Object stateMutex = new Object();
0107: /**
0108: * A list of members in our map
0109: */
0110: protected transient HashMap mapMembers = new HashMap();
0111: /**
0112: * Our default send options
0113: */
0114: protected transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT;
0115: /**
0116: * The owner of this map, ala a SessionManager for example
0117: */
0118: protected transient Object mapOwner;
0119: /**
0120: * External class loaders if serialization and deserialization is to be performed successfully.
0121: */
0122: protected transient ClassLoader[] externalLoaders;
0123:
0124: /**
0125: * The node we are currently backing up data to, this index will rotate
0126: * on a round robin basis
0127: */
0128: protected transient int currentNode = 0;
0129:
0130: /**
0131: * Since the map keeps internal membership
0132: * this is the timeout for a ping message to be responded to
0133: * If a remote map doesn't respond within this timeframe,
0134: * its considered dead.
0135: */
0136: protected transient long accessTimeout = 5000;
0137:
0138: /**
0139: * Readable string of the mapContextName value
0140: */
0141: protected transient String mapname = "";
0142:
0143: //------------------------------------------------------------------------------
0144: // CONSTRUCTORS
0145: //------------------------------------------------------------------------------
0146:
0147: /**
0148: * Creates a new map
0149: * @param channel The channel to use for communication
0150: * @param timeout long - timeout for RPC messags
0151: * @param mapContextName String - unique name for this map, to allow multiple maps per channel
0152: * @param initialCapacity int - the size of this map, see HashMap
0153: * @param loadFactor float - load factor, see HashMap
0154: * @param cls - a list of classloaders to be used for deserialization of objects.
0155: */
0156: public AbstractReplicatedMap(Object owner, Channel channel,
0157: long timeout, String mapContextName, int initialCapacity,
0158: float loadFactor, int channelSendOptions, ClassLoader[] cls) {
0159: super (initialCapacity, loadFactor, 15);
0160: init(owner, channel, mapContextName, timeout,
0161: channelSendOptions, cls);
0162:
0163: }
0164:
0165: /**
0166: * Helper methods, wraps a single member in an array
0167: * @param m Member
0168: * @return Member[]
0169: */
0170: protected Member[] wrap(Member m) {
0171: if (m == null)
0172: return new Member[0];
0173: else
0174: return new Member[] { m };
0175: }
0176:
0177: /**
0178: * Initializes the map by creating the RPC channel, registering itself as a channel listener
0179: * This method is also responsible for initiating the state transfer
0180: * @param owner Object
0181: * @param channel Channel
0182: * @param mapContextName String
0183: * @param timeout long
0184: * @param channelSendOptions int
0185: * @param cls ClassLoader[]
0186: */
0187: protected void init(Object owner, Channel channel,
0188: String mapContextName, long timeout,
0189: int channelSendOptions, ClassLoader[] cls) {
0190: log
0191: .info("Initializing AbstractReplicatedMap with context name:"
0192: + mapContextName);
0193: this .mapOwner = owner;
0194: this .externalLoaders = cls;
0195: this .channelSendOptions = channelSendOptions;
0196: this .channel = channel;
0197: this .rpcTimeout = timeout;
0198:
0199: try {
0200: this .mapname = mapContextName;
0201: //unique context is more efficient if it is stored as bytes
0202: this .mapContextName = mapContextName.getBytes(chset);
0203: } catch (UnsupportedEncodingException x) {
0204: log.warn("Unable to encode mapContextName["
0205: + mapContextName + "] using getBytes(" + chset
0206: + ") using default getBytes()", x);
0207: this .mapContextName = mapContextName.getBytes();
0208: }
0209: if (log.isTraceEnabled())
0210: log
0211: .trace("Created Lazy Map with name:"
0212: + mapContextName + ", bytes:"
0213: + Arrays.toString(this .mapContextName));
0214:
0215: //create an rpc channel and add the map as a listener
0216: this .rpcChannel = new RpcChannel(this .mapContextName, channel,
0217: this );
0218: //add this map as a message listener
0219: this .channel.addChannelListener(this );
0220: //listen for membership notifications
0221: this .channel.addMembershipListener(this );
0222:
0223: try {
0224: //broadcast our map, this just notifies other members of our existence
0225: broadcast(MapMessage.MSG_INIT, true);
0226: //transfer state from another map
0227: transferState();
0228: //state is transferred, we are ready for messaging
0229: broadcast(MapMessage.MSG_START, true);
0230: } catch (ChannelException x) {
0231: log.warn("Unable to send map start message.");
0232: throw new RuntimeException(
0233: "Unable to start replicated map.", x);
0234: }
0235: }
0236:
0237: /**
0238: * Sends a ping out to all the members in the cluster, not just map members
0239: * that this map is alive.
0240: * @param timeout long
0241: * @throws ChannelException
0242: */
0243: protected void ping(long timeout) throws ChannelException {
0244: //send out a map membership message, only wait for the first reply
0245: MapMessage msg = new MapMessage(this .mapContextName,
0246: MapMessage.MSG_INIT, false, null, null, null,
0247: wrap(channel.getLocalMember(false)));
0248: if (channel.getMembers().length > 0) {
0249: //send a ping, wait for all nodes to reply
0250: Response[] resp = rpcChannel.send(channel.getMembers(),
0251: msg, rpcChannel.ALL_REPLY, (channelSendOptions),
0252: (int) accessTimeout);
0253: for (int i = 0; i < resp.length; i++) {
0254: memberAlive(resp[i].getSource());
0255: } //for
0256: }
0257: //update our map of members, expire some if we didn't receive a ping back
0258: synchronized (mapMembers) {
0259: Iterator it = mapMembers.entrySet().iterator();
0260: long now = System.currentTimeMillis();
0261: while (it.hasNext()) {
0262: Map.Entry entry = (Map.Entry) it.next();
0263: long access = ((Long) entry.getValue()).longValue();
0264: if ((now - access) > timeout) {
0265: it.remove();
0266: memberDisappeared((Member) entry.getKey());
0267: }
0268: }
0269: }//synch
0270: }
0271:
0272: /**
0273: * We have received a member alive notification
0274: * @param member Member
0275: */
0276: protected void memberAlive(Member member) {
0277: synchronized (mapMembers) {
0278: if (!mapMembers.containsKey(member)) {
0279: mapMemberAdded(member);
0280: } //end if
0281: mapMembers
0282: .put(member, new Long(System.currentTimeMillis()));
0283: }
0284: }
0285:
0286: /**
0287: * Helper method to broadcast a message to all members in a channel
0288: * @param msgtype int
0289: * @param rpc boolean
0290: * @throws ChannelException
0291: */
0292: protected void broadcast(int msgtype, boolean rpc)
0293: throws ChannelException {
0294: //send out a map membership message, only wait for the first reply
0295: MapMessage msg = new MapMessage(this .mapContextName, msgtype,
0296: false, null, null, null, wrap(channel
0297: .getLocalMember(false)));
0298: if (rpc) {
0299: Response[] resp = rpcChannel.send(channel.getMembers(),
0300: msg, rpcChannel.FIRST_REPLY, (channelSendOptions),
0301: rpcTimeout);
0302: for (int i = 0; i < resp.length; i++) {
0303: mapMemberAdded(resp[i].getSource());
0304: messageReceived(resp[i].getMessage(), resp[i]
0305: .getSource());
0306: }
0307: } else {
0308: channel.send(channel.getMembers(), msg, channelSendOptions);
0309: }
0310: }
0311:
0312: public void breakdown() {
0313: finalize();
0314: }
0315:
0316: public void finalize() {
0317: try {
0318: broadcast(MapMessage.MSG_STOP, false);
0319: } catch (Exception ignore) {
0320: }
0321: //cleanup
0322: if (this .rpcChannel != null) {
0323: this .rpcChannel.breakdown();
0324: }
0325: if (this .channel != null) {
0326: this .channel.removeChannelListener(this );
0327: this .channel.removeMembershipListener(this );
0328: }
0329: this .rpcChannel = null;
0330: this .channel = null;
0331: this .mapMembers.clear();
0332: super .clear();
0333: this .stateTransferred = false;
0334: this .externalLoaders = null;
0335: }
0336:
0337: public int hashCode() {
0338: return Arrays.hashCode(this .mapContextName);
0339: }
0340:
0341: public boolean equals(Object o) {
0342: if (o == null)
0343: return false;
0344: if (!(o instanceof AbstractReplicatedMap))
0345: return false;
0346: if (!(o.getClass().equals(this .getClass())))
0347: return false;
0348: AbstractReplicatedMap other = (AbstractReplicatedMap) o;
0349: return Arrays.equals(mapContextName, other.mapContextName);
0350: }
0351:
0352: //------------------------------------------------------------------------------
0353: // GROUP COM INTERFACES
0354: //------------------------------------------------------------------------------
0355: public Member[] getMapMembers(HashMap members) {
0356: synchronized (members) {
0357: Member[] result = new Member[members.size()];
0358: members.keySet().toArray(result);
0359: return result;
0360: }
0361: }
0362:
0363: public Member[] getMapMembers() {
0364: return getMapMembers(this .mapMembers);
0365: }
0366:
0367: public Member[] getMapMembersExcl(Member[] exclude) {
0368: synchronized (mapMembers) {
0369: HashMap list = (HashMap) mapMembers.clone();
0370: for (int i = 0; i < exclude.length; i++)
0371: list.remove(exclude[i]);
0372: return getMapMembers(list);
0373: }
0374: }
0375:
0376: /**
0377: * Replicates any changes to the object since the last time
0378: * The object has to be primary, ie, if the object is a proxy or a backup, it will not be replicated<br>
0379: * @param complete - if set to true, the object is replicated to its backup
0380: * if set to false, only objects that implement ReplicatedMapEntry and the isDirty() returns true will
0381: * be replicated
0382: */
0383: public void replicate(Object key, boolean complete) {
0384: if (log.isTraceEnabled())
0385: log.trace("Replicate invoked on key:" + key);
0386: MapEntry entry = (MapEntry) super .get(key);
0387: if (entry == null)
0388: return;
0389: if (!entry.isSerializable())
0390: return;
0391: if (entry != null && entry.isPrimary()
0392: && entry.getBackupNodes() != null
0393: && entry.getBackupNodes().length > 0) {
0394: Object value = entry.getValue();
0395: //check to see if we need to replicate this object isDirty()||complete
0396: boolean repl = complete
0397: || ((value instanceof ReplicatedMapEntry) && ((ReplicatedMapEntry) value)
0398: .isDirty());
0399:
0400: if (!repl) {
0401: if (log.isTraceEnabled())
0402: log.trace("Not replicating:" + key
0403: + ", no change made");
0404:
0405: return;
0406: }
0407: //check to see if the message is diffable
0408: boolean diff = ((value instanceof ReplicatedMapEntry) && ((ReplicatedMapEntry) value)
0409: .isDiffable());
0410: MapMessage msg = null;
0411: if (diff) {
0412: ReplicatedMapEntry rentry = (ReplicatedMapEntry) entry
0413: .getValue();
0414: try {
0415: rentry.lock();
0416: //construct a diff message
0417: msg = new MapMessage(mapContextName,
0418: MapMessage.MSG_BACKUP, true,
0419: (Serializable) entry.getKey(), null, rentry
0420: .getDiff(), entry.getBackupNodes());
0421: } catch (IOException x) {
0422: log
0423: .error(
0424: "Unable to diff object. Will replicate the entire object instead.",
0425: x);
0426: } finally {
0427: rentry.unlock();
0428: }
0429:
0430: }
0431: if (msg == null) {
0432: //construct a complete
0433: msg = new MapMessage(mapContextName,
0434: MapMessage.MSG_BACKUP, false,
0435: (Serializable) entry.getKey(),
0436: (Serializable) entry.getValue(), null, entry
0437: .getBackupNodes());
0438:
0439: }
0440: try {
0441: if (channel != null && entry.getBackupNodes() != null
0442: && entry.getBackupNodes().length > 0) {
0443: channel.send(entry.getBackupNodes(), msg,
0444: channelSendOptions);
0445: }
0446: } catch (ChannelException x) {
0447: log.error("Unable to replicate data.", x);
0448: }
0449: } //end if
0450:
0451: }
0452:
0453: /**
0454: * This can be invoked by a periodic thread to replicate out any changes.
0455: * For maps that don't store objects that implement ReplicatedMapEntry, this
0456: * method should be used infrequently to avoid large amounts of data transfer
0457: * @param complete boolean
0458: */
0459: public void replicate(boolean complete) {
0460: Iterator i = super .entrySet().iterator();
0461: while (i.hasNext()) {
0462: Map.Entry e = (Map.Entry) i.next();
0463: replicate(e.getKey(), complete);
0464: } //while
0465:
0466: }
0467:
0468: public void transferState() {
0469: try {
0470: Member[] members = getMapMembers();
0471: Member backup = members.length > 0 ? (Member) members[0]
0472: : null;
0473: if (backup != null) {
0474: MapMessage msg = new MapMessage(mapContextName,
0475: getStateMessageType(), false, null, null, null,
0476: null);
0477: Response[] resp = rpcChannel.send(
0478: new Member[] { backup }, msg,
0479: rpcChannel.FIRST_REPLY, channelSendOptions,
0480: rpcTimeout);
0481: if (resp.length > 0) {
0482: synchronized (stateMutex) {
0483: msg = (MapMessage) resp[0].getMessage();
0484: msg.deserialize(getExternalLoaders());
0485: ArrayList list = (ArrayList) msg.getValue();
0486: for (int i = 0; i < list.size(); i++) {
0487: messageReceived((Serializable) list.get(i),
0488: resp[0].getSource());
0489: } //for
0490: }
0491: } else {
0492: log
0493: .warn("Transfer state, 0 replies, probably a timeout.");
0494: }
0495: }
0496: } catch (ChannelException x) {
0497: log.error("Unable to transfer LazyReplicatedMap state.", x);
0498: } catch (IOException x) {
0499: log.error("Unable to transfer LazyReplicatedMap state.", x);
0500: } catch (ClassNotFoundException x) {
0501: log.error("Unable to transfer LazyReplicatedMap state.", x);
0502: }
0503: stateTransferred = true;
0504: }
0505:
0506: /**
0507: * @todo implement state transfer
0508: * @param msg Serializable
0509: * @return Serializable - null if no reply should be sent
0510: */
0511: public Serializable replyRequest(Serializable msg,
0512: final Member sender) {
0513: if (!(msg instanceof MapMessage))
0514: return null;
0515: MapMessage mapmsg = (MapMessage) msg;
0516:
0517: //map init request
0518: if (mapmsg.getMsgType() == mapmsg.MSG_INIT) {
0519: mapmsg.setBackUpNodes(wrap(channel.getLocalMember(false)));
0520: return mapmsg;
0521: }
0522:
0523: //map start request
0524: if (mapmsg.getMsgType() == mapmsg.MSG_START) {
0525: mapmsg.setBackUpNodes(wrap(channel.getLocalMember(false)));
0526: mapMemberAdded(sender);
0527: return mapmsg;
0528: }
0529:
0530: //backup request
0531: if (mapmsg.getMsgType() == mapmsg.MSG_RETRIEVE_BACKUP) {
0532: MapEntry entry = (MapEntry) super .get(mapmsg.getKey());
0533: if (entry == null || (!entry.isSerializable()))
0534: return null;
0535: mapmsg.setValue((Serializable) entry.getValue());
0536: return mapmsg;
0537: }
0538:
0539: //state transfer request
0540: if (mapmsg.getMsgType() == mapmsg.MSG_STATE
0541: || mapmsg.getMsgType() == mapmsg.MSG_STATE_COPY) {
0542: synchronized (stateMutex) { //make sure we dont do two things at the same time
0543: ArrayList list = new ArrayList();
0544: Iterator i = super .entrySet().iterator();
0545: while (i.hasNext()) {
0546: Map.Entry e = (Map.Entry) i.next();
0547: MapEntry entry = (MapEntry) super .get(e.getKey());
0548: if (entry.isSerializable()) {
0549: boolean copy = (mapmsg.getMsgType() == mapmsg.MSG_STATE_COPY);
0550: MapMessage me = new MapMessage(mapContextName,
0551: copy ? MapMessage.MSG_COPY
0552: : MapMessage.MSG_PROXY, false,
0553: (Serializable) entry.getKey(),
0554: copy ? (Serializable) entry.getValue()
0555: : null, null, entry
0556: .getBackupNodes());
0557: list.add(me);
0558: }
0559: }
0560: mapmsg.setValue(list);
0561: return mapmsg;
0562:
0563: } //synchronized
0564: }
0565:
0566: return null;
0567:
0568: }
0569:
0570: /**
0571: * If the reply has already been sent to the requesting thread,
0572: * the rpc callback can handle any data that comes in after the fact.
0573: * @param msg Serializable
0574: * @param sender Member
0575: */
0576: public void leftOver(Serializable msg, Member sender) {
0577: //left over membership messages
0578: if (!(msg instanceof MapMessage))
0579: return;
0580:
0581: MapMessage mapmsg = (MapMessage) msg;
0582: try {
0583: mapmsg.deserialize(getExternalLoaders());
0584: if (mapmsg.getMsgType() == MapMessage.MSG_START) {
0585: mapMemberAdded(mapmsg.getBackupNodes()[0]);
0586: } else if (mapmsg.getMsgType() == MapMessage.MSG_INIT) {
0587: memberAlive(mapmsg.getBackupNodes()[0]);
0588: }
0589: } catch (IOException x) {
0590: log.error("Unable to deserialize MapMessage.", x);
0591: } catch (ClassNotFoundException x) {
0592: log.error("Unable to deserialize MapMessage.", x);
0593: }
0594: }
0595:
0596: public void messageReceived(Serializable msg, Member sender) {
0597: if (!(msg instanceof MapMessage))
0598: return;
0599:
0600: MapMessage mapmsg = (MapMessage) msg;
0601: if (log.isTraceEnabled()) {
0602: log
0603: .trace("Map[" + mapname + "] received message:"
0604: + mapmsg);
0605: }
0606:
0607: try {
0608: mapmsg.deserialize(getExternalLoaders());
0609: } catch (IOException x) {
0610: log.error("Unable to deserialize MapMessage.", x);
0611: return;
0612: } catch (ClassNotFoundException x) {
0613: log.error("Unable to deserialize MapMessage.", x);
0614: return;
0615: }
0616: if (log.isTraceEnabled())
0617: log.trace("Map message received from:" + sender.getName()
0618: + " msg:" + mapmsg);
0619: if (mapmsg.getMsgType() == MapMessage.MSG_START) {
0620: mapMemberAdded(mapmsg.getBackupNodes()[0]);
0621: }
0622:
0623: if (mapmsg.getMsgType() == MapMessage.MSG_STOP) {
0624: memberDisappeared(mapmsg.getBackupNodes()[0]);
0625: }
0626:
0627: if (mapmsg.getMsgType() == MapMessage.MSG_PROXY) {
0628: MapEntry entry = (MapEntry) super .get(mapmsg.getKey());
0629: if (entry == null) {
0630: entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue());
0631: entry.setBackup(false);
0632: entry.setProxy(true);
0633: entry.setBackupNodes(mapmsg.getBackupNodes());
0634: super .put(entry.getKey(), entry);
0635: } else {
0636: entry.setProxy(true);
0637: entry.setBackup(false);
0638: entry.setBackupNodes(mapmsg.getBackupNodes());
0639: }
0640: }
0641:
0642: if (mapmsg.getMsgType() == MapMessage.MSG_REMOVE) {
0643: super .remove(mapmsg.getKey());
0644: }
0645:
0646: if (mapmsg.getMsgType() == MapMessage.MSG_BACKUP
0647: || mapmsg.getMsgType() == MapMessage.MSG_COPY) {
0648: MapEntry entry = (MapEntry) super .get(mapmsg.getKey());
0649: if (entry == null) {
0650: entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue());
0651: entry
0652: .setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP);
0653: entry.setProxy(false);
0654: entry.setBackupNodes(mapmsg.getBackupNodes());
0655: if (mapmsg.getValue() != null
0656: && mapmsg.getValue() instanceof ReplicatedMapEntry) {
0657: ((ReplicatedMapEntry) mapmsg.getValue())
0658: .setOwner(getMapOwner());
0659: }
0660: } else {
0661: entry
0662: .setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP);
0663: entry.setProxy(false);
0664: entry.setBackupNodes(mapmsg.getBackupNodes());
0665: if (entry.getValue() instanceof ReplicatedMapEntry) {
0666: ReplicatedMapEntry diff = (ReplicatedMapEntry) entry
0667: .getValue();
0668: if (mapmsg.isDiff()) {
0669: try {
0670: diff.lock();
0671: diff.applyDiff(mapmsg.getDiffValue(), 0,
0672: mapmsg.getDiffValue().length);
0673: } catch (Exception x) {
0674: log.error("Unable to apply diff to key:"
0675: + entry.getKey(), x);
0676: } finally {
0677: diff.unlock();
0678: }
0679: } else {
0680: if (mapmsg.getValue() != null)
0681: entry.setValue(mapmsg.getValue());
0682: ((ReplicatedMapEntry) entry.getValue())
0683: .setOwner(getMapOwner());
0684: } //end if
0685: } else if (mapmsg.getValue() instanceof ReplicatedMapEntry) {
0686: ReplicatedMapEntry re = (ReplicatedMapEntry) mapmsg
0687: .getValue();
0688: re.setOwner(getMapOwner());
0689: entry.setValue(re);
0690: } else {
0691: if (mapmsg.getValue() != null)
0692: entry.setValue(mapmsg.getValue());
0693: } //end if
0694: } //end if
0695: super .put(entry.getKey(), entry);
0696: } //end if
0697: }
0698:
0699: public boolean accept(Serializable msg, Member sender) {
0700: boolean result = false;
0701: if (msg instanceof MapMessage) {
0702: if (log.isTraceEnabled())
0703: log.trace("Map[" + mapname + "] accepting...." + msg);
0704: result = Arrays.equals(mapContextName, ((MapMessage) msg)
0705: .getMapId());
0706: if (log.isTraceEnabled())
0707: log.trace("Msg[" + mapname + "] accepted[" + result
0708: + "]...." + msg);
0709: }
0710: return result;
0711: }
0712:
0713: public void mapMemberAdded(Member member) {
0714: if (member.equals(getChannel().getLocalMember(false)))
0715: return;
0716: boolean memberAdded = false;
0717: //select a backup node if we don't have one
0718: synchronized (mapMembers) {
0719: if (!mapMembers.containsKey(member)) {
0720: mapMembers.put(member, new Long(System
0721: .currentTimeMillis()));
0722: memberAdded = true;
0723: }
0724: }
0725: if (memberAdded) {
0726: synchronized (stateMutex) {
0727: Iterator i = super .entrySet().iterator();
0728: while (i.hasNext()) {
0729: Map.Entry e = (Map.Entry) i.next();
0730: MapEntry entry = (MapEntry) super .get(e.getKey());
0731: if (entry == null)
0732: continue;
0733: if (entry.isPrimary()
0734: && (entry.getBackupNodes() == null || entry
0735: .getBackupNodes().length == 0)) {
0736: try {
0737: Member[] backup = publishEntryInfo(entry
0738: .getKey(), entry.getValue());
0739: entry.setBackupNodes(backup);
0740: } catch (ChannelException x) {
0741: log.error("Unable to select backup node.",
0742: x);
0743: } //catch
0744: } //end if
0745: } //while
0746: } //synchronized
0747: }//end if
0748: }
0749:
0750: public boolean inSet(Member m, Member[] set) {
0751: if (set == null)
0752: return false;
0753: boolean result = false;
0754: for (int i = 0; i < set.length && (!result); i++)
0755: if (m.equals(set[i]))
0756: result = true;
0757: return result;
0758: }
0759:
0760: public Member[] excludeFromSet(Member[] mbrs, Member[] set) {
0761: ArrayList result = new ArrayList();
0762: for (int i = 0; i < set.length; i++) {
0763: boolean include = true;
0764: for (int j = 0; j < mbrs.length; j++)
0765: if (mbrs[j].equals(set[i]))
0766: include = false;
0767: if (include)
0768: result.add(set[i]);
0769: }
0770: return (Member[]) result.toArray(new Member[result.size()]);
0771: }
0772:
0773: public void memberAdded(Member member) {
0774: //do nothing
0775: }
0776:
0777: public void memberDisappeared(Member member) {
0778: boolean removed = false;
0779: synchronized (mapMembers) {
0780: removed = (mapMembers.remove(member) != null);
0781: }
0782: Iterator i = super .entrySet().iterator();
0783: while (i.hasNext()) {
0784: Map.Entry e = (Map.Entry) i.next();
0785: MapEntry entry = (MapEntry) super .get(e.getKey());
0786: if (entry.isPrimary()
0787: && inSet(member, entry.getBackupNodes())) {
0788: try {
0789: Member[] backup = publishEntryInfo(entry.getKey(),
0790: entry.getValue());
0791: entry.setBackupNodes(backup);
0792: } catch (ChannelException x) {
0793: log.error("Unable to relocate[" + entry.getKey()
0794: + "] to a new backup node", x);
0795: }
0796: } //end if
0797: } //while
0798: }
0799:
0800: public int getNextBackupIndex() {
0801: int size = mapMembers.size();
0802: if (mapMembers.size() == 0)
0803: return -1;
0804: int node = currentNode++;
0805: if (node >= size) {
0806: node = 0;
0807: currentNode = 0;
0808: }
0809: return node;
0810: }
0811:
0812: public Member getNextBackupNode() {
0813: Member[] members = getMapMembers();
0814: int node = getNextBackupIndex();
0815: if (members.length == 0 || node == -1)
0816: return null;
0817: if (node >= members.length)
0818: node = 0;
0819: return members[node];
0820: }
0821:
0822: protected abstract Member[] publishEntryInfo(Object key,
0823: Object value) throws ChannelException;
0824:
0825: public void heartbeat() {
0826: try {
0827: ping(accessTimeout);
0828: } catch (Exception x) {
0829: log
0830: .error(
0831: "Unable to send AbstractReplicatedMap.ping message",
0832: x);
0833: }
0834: }
0835:
0836: //------------------------------------------------------------------------------
0837: // METHODS TO OVERRIDE
0838: //------------------------------------------------------------------------------
0839:
0840: /**
0841: * Removes an object from this map, it will also remove it from
0842: *
0843: * @param key Object
0844: * @return Object
0845: */
0846: public Object remove(Object key) {
0847: return remove(key, true);
0848: }
0849:
0850: public Object remove(Object key, boolean notify) {
0851: MapEntry entry = (MapEntry) super .remove(key);
0852:
0853: try {
0854: if (getMapMembers().length > 0 && notify) {
0855: MapMessage msg = new MapMessage(getMapContextName(),
0856: MapMessage.MSG_REMOVE, false,
0857: (Serializable) key, null, null, null);
0858: getChannel().send(getMapMembers(), msg,
0859: getChannelSendOptions());
0860: }
0861: } catch (ChannelException x) {
0862: log
0863: .error(
0864: "Unable to replicate out data for a LazyReplicatedMap.remove operation",
0865: x);
0866: }
0867: return entry != null ? entry.getValue() : null;
0868: }
0869:
0870: public MapEntry getInternal(Object key) {
0871: return (MapEntry) super .get(key);
0872: }
0873:
0874: public Object get(Object key) {
0875: MapEntry entry = (MapEntry) super .get(key);
0876: if (log.isTraceEnabled())
0877: log.trace("Requesting id:" + key + " entry:" + entry);
0878: if (entry == null)
0879: return null;
0880: if (!entry.isPrimary()) {
0881: //if the message is not primary, we need to retrieve the latest value
0882: try {
0883: Member[] backup = null;
0884: MapMessage msg = null;
0885: if (!entry.isBackup()) {
0886: //make sure we don't retrieve from ourselves
0887: msg = new MapMessage(getMapContextName(),
0888: MapMessage.MSG_RETRIEVE_BACKUP, false,
0889: (Serializable) key, null, null, null);
0890: Response[] resp = getRpcChannel().send(
0891: entry.getBackupNodes(), msg,
0892: this .getRpcChannel().FIRST_REPLY,
0893: Channel.SEND_OPTIONS_DEFAULT,
0894: getRpcTimeout());
0895: if (resp == null || resp.length == 0) {
0896: //no responses
0897: log
0898: .warn("Unable to retrieve remote object for key:"
0899: + key);
0900: return null;
0901: }
0902: msg = (MapMessage) resp[0].getMessage();
0903: msg.deserialize(getExternalLoaders());
0904: backup = entry.getBackupNodes();
0905: if (entry.getValue() instanceof ReplicatedMapEntry) {
0906: ReplicatedMapEntry val = (ReplicatedMapEntry) entry
0907: .getValue();
0908: val.setOwner(getMapOwner());
0909: }
0910: if (msg.getValue() != null)
0911: entry.setValue(msg.getValue());
0912: }
0913: if (entry.isBackup()) {
0914: //select a new backup node
0915: backup = publishEntryInfo(key, entry.getValue());
0916: } else if (entry.isProxy()) {
0917: //invalidate the previous primary
0918: msg = new MapMessage(getMapContextName(),
0919: MapMessage.MSG_PROXY, false,
0920: (Serializable) key, null, null, backup);
0921: Member[] dest = getMapMembersExcl(backup);
0922: if (dest != null && dest.length > 0) {
0923: getChannel().send(dest, msg,
0924: getChannelSendOptions());
0925: }
0926: }
0927:
0928: entry.setBackupNodes(backup);
0929: entry.setBackup(false);
0930: entry.setProxy(false);
0931:
0932: } catch (Exception x) {
0933: log
0934: .error(
0935: "Unable to replicate out data for a LazyReplicatedMap.get operation",
0936: x);
0937: return null;
0938: }
0939: }
0940: if (log.isTraceEnabled())
0941: log.trace("Requesting id:" + key + " result:"
0942: + entry.getValue());
0943: if (entry.getValue() != null
0944: && entry.getValue() instanceof ReplicatedMapEntry) {
0945: ReplicatedMapEntry val = (ReplicatedMapEntry) entry
0946: .getValue();
0947: //hack, somehow this is not being set above
0948: val.setOwner(getMapOwner());
0949:
0950: }
0951: return entry.getValue();
0952: }
0953:
0954: protected void printMap(String header) {
0955: try {
0956: System.out.println("\nDEBUG MAP:" + header);
0957: System.out.println("Map["
0958: + new String(mapContextName, chset) + ", Map Size:"
0959: + super .size());
0960: Member[] mbrs = getMapMembers();
0961: for (int i = 0; i < mbrs.length; i++) {
0962: System.out.println("Mbr[" + (i + 1) + "="
0963: + mbrs[i].getName());
0964: }
0965: Iterator i = super .entrySet().iterator();
0966: int cnt = 0;
0967:
0968: while (i.hasNext()) {
0969: Map.Entry e = (Map.Entry) i.next();
0970: System.out.println((++cnt) + ". "
0971: + super .get(e.getKey()));
0972: }
0973: System.out.println("EndMap]\n\n");
0974: } catch (Exception ignore) {
0975: ignore.printStackTrace();
0976: }
0977: }
0978:
0979: /**
0980: * Returns true if the key has an entry in the map.
0981: * The entry can be a proxy or a backup entry, invoking <code>get(key)</code>
0982: * will make this entry primary for the group
0983: * @param key Object
0984: * @return boolean
0985: */
0986: public boolean containsKey(Object key) {
0987: return super .containsKey(key);
0988: }
0989:
0990: public Object put(Object key, Object value) {
0991: return put(key, value, true);
0992: }
0993:
0994: public Object put(Object key, Object value, boolean notify) {
0995: MapEntry entry = new MapEntry(key, value);
0996: entry.setBackup(false);
0997: entry.setProxy(false);
0998:
0999: Object old = null;
1000:
1001: //make sure that any old values get removed
1002: if (containsKey(key))
1003: old = remove(key);
1004: try {
1005: if (notify) {
1006: Member[] backup = publishEntryInfo(key, value);
1007: entry.setBackupNodes(backup);
1008: }
1009: } catch (ChannelException x) {
1010: log
1011: .error(
1012: "Unable to replicate out data for a LazyReplicatedMap.put operation",
1013: x);
1014: }
1015: super .put(key, entry);
1016: return old;
1017: }
1018:
1019: /**
1020: * Copies all values from one map to this instance
1021: * @param m Map
1022: */
1023: public void putAll(Map m) {
1024: Iterator i = m.entrySet().iterator();
1025: while (i.hasNext()) {
1026: Map.Entry entry = (Map.Entry) i.next();
1027: put(entry.getKey(), entry.getValue());
1028: }
1029: }
1030:
1031: public void clear() {
1032: clear(true);
1033: }
1034:
1035: public void clear(boolean notify) {
1036: if (notify) {
1037: //only delete active keys
1038: Iterator keys = keySet().iterator();
1039: while (keys.hasNext())
1040: remove(keys.next());
1041: } else {
1042: super .clear();
1043: }
1044: }
1045:
1046: public boolean containsValue(Object value) {
1047: if (value == null) {
1048: return super .containsValue(value);
1049: } else {
1050: Iterator i = super .entrySet().iterator();
1051: while (i.hasNext()) {
1052: Map.Entry e = (Map.Entry) i.next();
1053: MapEntry entry = (MapEntry) super .get(e.getKey());
1054: if (entry.isPrimary() && value.equals(entry.getValue()))
1055: return true;
1056: }//while
1057: return false;
1058: }//end if
1059: }
1060:
1061: public Object clone() {
1062: throw new UnsupportedOperationException(
1063: "This operation is not valid on a replicated map");
1064: }
1065:
1066: /**
1067: * Returns the entire contents of the map
1068: * Map.Entry.getValue() will return a LazyReplicatedMap.MapEntry object containing all the information
1069: * about the object.
1070: * @return Set
1071: */
1072: public Set entrySetFull() {
1073: return super .entrySet();
1074: }
1075:
1076: public Set keySetFull() {
1077: return super .keySet();
1078: }
1079:
1080: public int sizeFull() {
1081: return super .size();
1082: }
1083:
1084: public Set entrySet() {
1085: LinkedHashSet set = new LinkedHashSet(super .size());
1086: Iterator i = super .entrySet().iterator();
1087: while (i.hasNext()) {
1088: Map.Entry e = (Map.Entry) i.next();
1089: Object key = e.getKey();
1090: MapEntry entry = (MapEntry) super .get(key);
1091: if (entry.isPrimary())
1092: set.add(entry.getValue());
1093: }
1094: return Collections.unmodifiableSet(set);
1095: }
1096:
1097: public Set keySet() {
1098: //todo implement
1099: //should only return keys where this is active.
1100: LinkedHashSet set = new LinkedHashSet(super .size());
1101: Iterator i = super .entrySet().iterator();
1102: while (i.hasNext()) {
1103: Map.Entry e = (Map.Entry) i.next();
1104: Object key = e.getKey();
1105: MapEntry entry = (MapEntry) super .get(key);
1106: if (entry.isPrimary())
1107: set.add(key);
1108: }
1109: return Collections.unmodifiableSet(set);
1110:
1111: }
1112:
1113: public int size() {
1114: //todo, implement a counter variable instead
1115: //only count active members in this node
1116: int counter = 0;
1117: Iterator it = super .entrySet().iterator();
1118: while (it != null && it.hasNext()) {
1119: Map.Entry e = (Map.Entry) it.next();
1120: if (e != null) {
1121: MapEntry entry = (MapEntry) super .get(e.getKey());
1122: if (entry != null && entry.isPrimary()
1123: && entry.getValue() != null)
1124: counter++;
1125: }
1126: }
1127: return counter;
1128: }
1129:
1130: protected boolean removeEldestEntry(Map.Entry eldest) {
1131: return false;
1132: }
1133:
1134: public boolean isEmpty() {
1135: return size() == 0;
1136: }
1137:
1138: public Collection values() {
1139: ArrayList values = new ArrayList();
1140: Iterator i = super .entrySet().iterator();
1141: while (i.hasNext()) {
1142: Map.Entry e = (Map.Entry) i.next();
1143: MapEntry entry = (MapEntry) super .get(e.getKey());
1144: if (entry.isPrimary() && entry.getValue() != null)
1145: values.add(entry.getValue());
1146: }
1147: return Collections.unmodifiableCollection(values);
1148: }
1149:
1150: //------------------------------------------------------------------------------
1151: // Map Entry class
1152: //------------------------------------------------------------------------------
1153: public static class MapEntry implements Map.Entry {
1154: private boolean backup;
1155: private boolean proxy;
1156: private Member[] backupNodes;
1157:
1158: private Object key;
1159: private Object value;
1160:
1161: public MapEntry(Object key, Object value) {
1162: setKey(key);
1163: setValue(value);
1164:
1165: }
1166:
1167: public boolean isKeySerializable() {
1168: return (key == null) || (key instanceof Serializable);
1169: }
1170:
1171: public boolean isValueSerializable() {
1172: return (value == null) || (value instanceof Serializable);
1173: }
1174:
1175: public boolean isSerializable() {
1176: return isKeySerializable() && isValueSerializable();
1177: }
1178:
1179: public boolean isBackup() {
1180: return backup;
1181: }
1182:
1183: public void setBackup(boolean backup) {
1184: this .backup = backup;
1185: }
1186:
1187: public boolean isProxy() {
1188: return proxy;
1189: }
1190:
1191: public boolean isPrimary() {
1192: return ((!proxy) && (!backup));
1193: }
1194:
1195: public void setProxy(boolean proxy) {
1196: this .proxy = proxy;
1197: }
1198:
1199: public boolean isDiffable() {
1200: return (value instanceof ReplicatedMapEntry)
1201: && ((ReplicatedMapEntry) value).isDiffable();
1202: }
1203:
1204: public void setBackupNodes(Member[] nodes) {
1205: this .backupNodes = nodes;
1206: }
1207:
1208: public Member[] getBackupNodes() {
1209: return backupNodes;
1210: }
1211:
1212: public Object getValue() {
1213: return value;
1214: }
1215:
1216: public Object setValue(Object value) {
1217: Object old = this .value;
1218: this .value = value;
1219: return old;
1220: }
1221:
1222: public Object getKey() {
1223: return key;
1224: }
1225:
1226: public Object setKey(Object key) {
1227: Object old = this .key;
1228: this .key = key;
1229: return old;
1230: }
1231:
1232: public int hashCode() {
1233: return key.hashCode();
1234: }
1235:
1236: public boolean equals(Object o) {
1237: return key.equals(o);
1238: }
1239:
1240: /**
1241: * apply a diff, or an entire object
1242: * @param data byte[]
1243: * @param offset int
1244: * @param length int
1245: * @param diff boolean
1246: * @throws IOException
1247: * @throws ClassNotFoundException
1248: */
1249: public void apply(byte[] data, int offset, int length,
1250: boolean diff) throws IOException,
1251: ClassNotFoundException {
1252: if (isDiffable() && diff) {
1253: ReplicatedMapEntry rentry = (ReplicatedMapEntry) value;
1254: try {
1255: rentry.lock();
1256: rentry.applyDiff(data, offset, length);
1257: } finally {
1258: rentry.unlock();
1259: }
1260: } else if (length == 0) {
1261: value = null;
1262: proxy = true;
1263: } else {
1264: value = XByteBuffer.deserialize(data, offset, length);
1265: }
1266: }
1267:
1268: public String toString() {
1269: StringBuffer buf = new StringBuffer("MapEntry[key:");
1270: buf.append(getKey()).append("; ");
1271: buf.append("value:").append(getValue()).append("; ");
1272: buf.append("primary:").append(isPrimary()).append("; ");
1273: buf.append("backup:").append(isBackup()).append("; ");
1274: buf.append("proxy:").append(isProxy()).append(";]");
1275: return buf.toString();
1276: }
1277:
1278: }
1279:
1280: //------------------------------------------------------------------------------
1281: // map message to send to and from other maps
1282: //------------------------------------------------------------------------------
1283:
1284: public static class MapMessage implements Serializable {
1285: public static final int MSG_BACKUP = 1;
1286: public static final int MSG_RETRIEVE_BACKUP = 2;
1287: public static final int MSG_PROXY = 3;
1288: public static final int MSG_REMOVE = 4;
1289: public static final int MSG_STATE = 5;
1290: public static final int MSG_START = 6;
1291: public static final int MSG_STOP = 7;
1292: public static final int MSG_INIT = 8;
1293: public static final int MSG_COPY = 9;
1294: public static final int MSG_STATE_COPY = 10;
1295:
1296: private byte[] mapId;
1297: private int msgtype;
1298: private boolean diff;
1299: private transient Serializable key;
1300: private transient Serializable value;
1301: private byte[] valuedata;
1302: private byte[] keydata;
1303: private byte[] diffvalue;
1304: private Member[] nodes;
1305:
1306: public String toString() {
1307: StringBuffer buf = new StringBuffer("MapMessage[context=");
1308: buf.append(new String(mapId));
1309: buf.append("; type=");
1310: buf.append(getTypeDesc());
1311: buf.append("; key=");
1312: buf.append(key);
1313: buf.append("; value=");
1314: buf.append(value);
1315: return buf.toString();
1316: }
1317:
1318: public String getTypeDesc() {
1319: switch (msgtype) {
1320: case MSG_BACKUP:
1321: return "MSG_BACKUP";
1322: case MSG_RETRIEVE_BACKUP:
1323: return "MSG_RETRIEVE_BACKUP";
1324: case MSG_PROXY:
1325: return "MSG_PROXY";
1326: case MSG_REMOVE:
1327: return "MSG_REMOVE";
1328: case MSG_STATE:
1329: return "MSG_STATE";
1330: case MSG_START:
1331: return "MSG_START";
1332: case MSG_STOP:
1333: return "MSG_STOP";
1334: case MSG_INIT:
1335: return "MSG_INIT";
1336: case MSG_STATE_COPY:
1337: return "MSG_STATE_COPY";
1338: case MSG_COPY:
1339: return "MSG_COPY";
1340: default:
1341: return "UNKNOWN";
1342: }
1343: }
1344:
1345: public MapMessage() {
1346: }
1347:
1348: public MapMessage(byte[] mapId, int msgtype, boolean diff,
1349: Serializable key, Serializable value, byte[] diffvalue,
1350: Member[] nodes) {
1351: this .mapId = mapId;
1352: this .msgtype = msgtype;
1353: this .diff = diff;
1354: this .key = key;
1355: this .value = value;
1356: this .diffvalue = diffvalue;
1357: this .nodes = nodes;
1358: setValue(value);
1359: setKey(key);
1360: }
1361:
1362: public void deserialize(ClassLoader[] cls) throws IOException,
1363: ClassNotFoundException {
1364: key(cls);
1365: value(cls);
1366: }
1367:
1368: public int getMsgType() {
1369: return msgtype;
1370: }
1371:
1372: public boolean isDiff() {
1373: return diff;
1374: }
1375:
1376: public Serializable getKey() {
1377: try {
1378: return key(null);
1379: } catch (Exception x) {
1380: log.error(
1381: "Deserialization error of the MapMessage.key",
1382: x);
1383: return null;
1384: }
1385: }
1386:
1387: public Serializable key(ClassLoader[] cls) throws IOException,
1388: ClassNotFoundException {
1389: if (key != null)
1390: return key;
1391: if (keydata == null || keydata.length == 0)
1392: return null;
1393: key = XByteBuffer.deserialize(keydata, 0, keydata.length,
1394: cls);
1395: keydata = null;
1396: return key;
1397: }
1398:
1399: public byte[] getKeyData() {
1400: return keydata;
1401: }
1402:
1403: public Serializable getValue() {
1404: try {
1405: return value(null);
1406: } catch (Exception x) {
1407: log
1408: .error(
1409: "Deserialization error of the MapMessage.value",
1410: x);
1411: return null;
1412: }
1413: }
1414:
1415: public Serializable value(ClassLoader[] cls)
1416: throws IOException, ClassNotFoundException {
1417: if (value != null)
1418: return value;
1419: if (valuedata == null || valuedata.length == 0)
1420: return null;
1421: value = XByteBuffer.deserialize(valuedata, 0,
1422: valuedata.length, cls);
1423: valuedata = null;
1424: ;
1425: return value;
1426: }
1427:
1428: public byte[] getValueData() {
1429: return valuedata;
1430: }
1431:
1432: public byte[] getDiffValue() {
1433: return diffvalue;
1434: }
1435:
1436: public Member[] getBackupNodes() {
1437: return nodes;
1438: }
1439:
1440: private void setBackUpNodes(Member[] nodes) {
1441: this .nodes = nodes;
1442: }
1443:
1444: public byte[] getMapId() {
1445: return mapId;
1446: }
1447:
1448: public void setValue(Serializable value) {
1449: try {
1450: if (value != null)
1451: valuedata = XByteBuffer.serialize(value);
1452: this .value = value;
1453: } catch (IOException x) {
1454: throw new RuntimeException(x);
1455: }
1456: }
1457:
1458: public void setKey(Serializable key) {
1459: try {
1460: if (key != null)
1461: keydata = XByteBuffer.serialize(key);
1462: this .key = key;
1463: } catch (IOException x) {
1464: throw new RuntimeException(x);
1465: }
1466: }
1467:
1468: protected Member[] readMembers(ObjectInput in)
1469: throws IOException, ClassNotFoundException {
1470: int nodecount = in.readInt();
1471: Member[] members = new Member[nodecount];
1472: for (int i = 0; i < members.length; i++) {
1473: byte[] d = new byte[in.readInt()];
1474: in.read(d);
1475: if (d.length > 0)
1476: members[i] = MemberImpl.getMember(d);
1477: }
1478: return members;
1479: }
1480:
1481: protected void writeMembers(ObjectOutput out, Member[] members)
1482: throws IOException {
1483: if (members == null)
1484: members = new Member[0];
1485: out.writeInt(members.length);
1486: for (int i = 0; i < members.length; i++) {
1487: if (members[i] != null) {
1488: byte[] d = members[i] != null ? ((MemberImpl) members[i])
1489: .getData(false)
1490: : new byte[0];
1491: out.writeInt(d.length);
1492: out.write(d);
1493: }
1494: }
1495: }
1496:
1497: /**
1498: * shallow clone
1499: * @return Object
1500: */
1501: public Object clone() {
1502: MapMessage msg = new MapMessage(this .mapId, this .msgtype,
1503: this .diff, this .key, this .value, this .diffvalue,
1504: this .nodes);
1505: msg.keydata = this .keydata;
1506: msg.valuedata = this .valuedata;
1507: return msg;
1508: }
1509: } //MapMessage
1510:
1511: public Channel getChannel() {
1512: return channel;
1513: }
1514:
1515: public byte[] getMapContextName() {
1516: return mapContextName;
1517: }
1518:
1519: public RpcChannel getRpcChannel() {
1520: return rpcChannel;
1521: }
1522:
1523: public long getRpcTimeout() {
1524: return rpcTimeout;
1525: }
1526:
1527: public Object getStateMutex() {
1528: return stateMutex;
1529: }
1530:
1531: public boolean isStateTransferred() {
1532: return stateTransferred;
1533: }
1534:
1535: public Object getMapOwner() {
1536: return mapOwner;
1537: }
1538:
1539: public ClassLoader[] getExternalLoaders() {
1540: return externalLoaders;
1541: }
1542:
1543: public int getChannelSendOptions() {
1544: return channelSendOptions;
1545: }
1546:
1547: public long getAccessTimeout() {
1548: return accessTimeout;
1549: }
1550:
1551: public void setMapOwner(Object mapOwner) {
1552: this .mapOwner = mapOwner;
1553: }
1554:
1555: public void setExternalLoaders(ClassLoader[] externalLoaders) {
1556: this .externalLoaders = externalLoaders;
1557: }
1558:
1559: public void setChannelSendOptions(int channelSendOptions) {
1560: this .channelSendOptions = channelSendOptions;
1561: }
1562:
1563: public void setAccessTimeout(long accessTimeout) {
1564: this.accessTimeout = accessTimeout;
1565: }
1566:
1567: }
|