Source Code Cross Referenced for AbstractReplicatedMap.java in  » Sevlet-Container » apache-tomcat-6.0.14 » org » apache » catalina » tribes » tipis » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Sevlet Container » apache tomcat 6.0.14 » org.apache.catalina.tribes.tipis 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /*
0002:         * Licensed to the Apache Software Foundation (ASF) under one or more
0003:         * contributor license agreements.  See the NOTICE file distributed with
0004:         * this work for additional information regarding copyright ownership.
0005:         * The ASF licenses this file to You under the Apache License, Version 2.0
0006:         * (the "License"); you may not use this file except in compliance with
0007:         * the License.  You may obtain a copy of the License at
0008:         *
0009:         *      http://www.apache.org/licenses/LICENSE-2.0
0010:         *
0011:         * Unless required by applicable law or agreed to in writing, software
0012:         * distributed under the License is distributed on an "AS IS" BASIS,
0013:         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014:         * See the License for the specific language governing permissions and
0015:         * limitations under the License.
0016:         */
0017:
0018:        package org.apache.catalina.tribes.tipis;
0019:
0020:        import java.io.IOException;
0021:        import java.io.ObjectInput;
0022:        import java.io.ObjectOutput;
0023:        import java.io.Serializable;
0024:        import java.io.UnsupportedEncodingException;
0025:        import java.util.ArrayList;
0026:        import java.util.Collection;
0027:        import java.util.Collections;
0028:        import java.util.HashMap;
0029:        import java.util.Iterator;
0030:        import java.util.LinkedHashSet;
0031:        import java.util.Map;
0032:        import java.util.Set;
0033:
0034:        import org.apache.catalina.tribes.Channel;
0035:        import org.apache.catalina.tribes.ChannelException;
0036:        import org.apache.catalina.tribes.ChannelListener;
0037:        import org.apache.catalina.tribes.Heartbeat;
0038:        import org.apache.catalina.tribes.Member;
0039:        import org.apache.catalina.tribes.MembershipListener;
0040:        import org.apache.catalina.tribes.group.Response;
0041:        import org.apache.catalina.tribes.group.RpcCallback;
0042:        import org.apache.catalina.tribes.group.RpcChannel;
0043:        import org.apache.catalina.tribes.io.XByteBuffer;
0044:        import org.apache.catalina.tribes.membership.MemberImpl;
0045:        import org.apache.catalina.tribes.util.Arrays;
0046:        import org.apache.juli.logging.Log;
0047:        import org.apache.juli.logging.LogFactory;
0048:        import java.util.concurrent.ConcurrentHashMap;
0049:
0050:        /**
0051:         *
0052:         * @author Filip Hanik
0053:         * @version 1.0
0054:         */
0055:        public abstract class AbstractReplicatedMap extends ConcurrentHashMap
0056:                implements  RpcCallback, ChannelListener, MembershipListener,
0057:                Heartbeat {
0058:            protected static Log log = LogFactory
0059:                    .getLog(AbstractReplicatedMap.class);
0060:
0061:            /**
0062:             * The default initial capacity - MUST be a power of two.
0063:             */
0064:            public static final int DEFAULT_INITIAL_CAPACITY = 16;
0065:
0066:            /**
0067:             * The load factor used when none specified in constructor.
0068:             **/
0069:            public static final float DEFAULT_LOAD_FACTOR = 0.75f;
0070:
0071:            /**
0072:             * Used to identify the map
0073:             */
0074:            final String chset = "ISO-8859-1";
0075:
0076:            //------------------------------------------------------------------------------
0077:            //              INSTANCE VARIABLES
0078:            //------------------------------------------------------------------------------
0079:            protected abstract int getStateMessageType();
0080:
0081:            /**
0082:             * Timeout for RPC messages, how long we will wait for a reply
0083:             */
0084:            protected transient long rpcTimeout = 5000;
0085:            /**
0086:             * Reference to the channel for sending messages
0087:             */
0088:            protected transient Channel channel;
0089:            /**
0090:             * The RpcChannel to send RPC messages through
0091:             */
0092:            protected transient RpcChannel rpcChannel;
0093:            /**
0094:             * The Map context name makes this map unique, this
0095:             * allows us to have more than one map shared
0096:             * through one channel
0097:             */
0098:            protected transient byte[] mapContextName;
0099:            /**
0100:             * Has the state been transferred
0101:             */
0102:            protected transient boolean stateTransferred = false;
0103:            /**
0104:             * Simple lock object for transfers
0105:             */
0106:            protected transient Object stateMutex = new Object();
0107:            /**
0108:             * A list of members in our map
0109:             */
0110:            protected transient HashMap mapMembers = new HashMap();
0111:            /**
0112:             * Our default send options
0113:             */
0114:            protected transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT;
0115:            /**
0116:             * The owner of this map, ala a SessionManager for example
0117:             */
0118:            protected transient Object mapOwner;
0119:            /**
0120:             * External class loaders if serialization and deserialization is to be performed successfully.
0121:             */
0122:            protected transient ClassLoader[] externalLoaders;
0123:
0124:            /**
0125:             * The node we are currently backing up data to, this index will rotate
0126:             * on a round robin basis
0127:             */
0128:            protected transient int currentNode = 0;
0129:
0130:            /**
0131:             * Since the map keeps internal membership
0132:             * this is the timeout for a ping message to be responded to
0133:             * If a remote map doesn't respond within this timeframe, 
0134:             * its considered dead.
0135:             */
0136:            protected transient long accessTimeout = 5000;
0137:
0138:            /**
0139:             * Readable string of the mapContextName value
0140:             */
0141:            protected transient String mapname = "";
0142:
0143:            //------------------------------------------------------------------------------
0144:            //              CONSTRUCTORS
0145:            //------------------------------------------------------------------------------
0146:
0147:            /**
0148:             * Creates a new map
0149:             * @param channel The channel to use for communication
0150:             * @param timeout long - timeout for RPC messags
0151:             * @param mapContextName String - unique name for this map, to allow multiple maps per channel
0152:             * @param initialCapacity int - the size of this map, see HashMap
0153:             * @param loadFactor float - load factor, see HashMap
0154:             * @param cls - a list of classloaders to be used for deserialization of objects.
0155:             */
0156:            public AbstractReplicatedMap(Object owner, Channel channel,
0157:                    long timeout, String mapContextName, int initialCapacity,
0158:                    float loadFactor, int channelSendOptions, ClassLoader[] cls) {
0159:                super (initialCapacity, loadFactor, 15);
0160:                init(owner, channel, mapContextName, timeout,
0161:                        channelSendOptions, cls);
0162:
0163:            }
0164:
0165:            /**
0166:             * Helper methods, wraps a single member in an array
0167:             * @param m Member
0168:             * @return Member[]
0169:             */
0170:            protected Member[] wrap(Member m) {
0171:                if (m == null)
0172:                    return new Member[0];
0173:                else
0174:                    return new Member[] { m };
0175:            }
0176:
0177:            /**
0178:             * Initializes the map by creating the RPC channel, registering itself as a channel listener
0179:             * This method is also responsible for initiating the state transfer
0180:             * @param owner Object
0181:             * @param channel Channel
0182:             * @param mapContextName String
0183:             * @param timeout long
0184:             * @param channelSendOptions int
0185:             * @param cls ClassLoader[]
0186:             */
0187:            protected void init(Object owner, Channel channel,
0188:                    String mapContextName, long timeout,
0189:                    int channelSendOptions, ClassLoader[] cls) {
0190:                log
0191:                        .info("Initializing AbstractReplicatedMap with context name:"
0192:                                + mapContextName);
0193:                this .mapOwner = owner;
0194:                this .externalLoaders = cls;
0195:                this .channelSendOptions = channelSendOptions;
0196:                this .channel = channel;
0197:                this .rpcTimeout = timeout;
0198:
0199:                try {
0200:                    this .mapname = mapContextName;
0201:                    //unique context is more efficient if it is stored as bytes
0202:                    this .mapContextName = mapContextName.getBytes(chset);
0203:                } catch (UnsupportedEncodingException x) {
0204:                    log.warn("Unable to encode mapContextName["
0205:                            + mapContextName + "] using getBytes(" + chset
0206:                            + ") using default getBytes()", x);
0207:                    this .mapContextName = mapContextName.getBytes();
0208:                }
0209:                if (log.isTraceEnabled())
0210:                    log
0211:                            .trace("Created Lazy Map with name:"
0212:                                    + mapContextName + ", bytes:"
0213:                                    + Arrays.toString(this .mapContextName));
0214:
0215:                //create an rpc channel and add the map as a listener
0216:                this .rpcChannel = new RpcChannel(this .mapContextName, channel,
0217:                        this );
0218:                //add this map as a message listener
0219:                this .channel.addChannelListener(this );
0220:                //listen for membership notifications
0221:                this .channel.addMembershipListener(this );
0222:
0223:                try {
0224:                    //broadcast our map, this just notifies other members of our existence
0225:                    broadcast(MapMessage.MSG_INIT, true);
0226:                    //transfer state from another map
0227:                    transferState();
0228:                    //state is transferred, we are ready for messaging
0229:                    broadcast(MapMessage.MSG_START, true);
0230:                } catch (ChannelException x) {
0231:                    log.warn("Unable to send map start message.");
0232:                    throw new RuntimeException(
0233:                            "Unable to start replicated map.", x);
0234:                }
0235:            }
0236:
0237:            /**
0238:             * Sends a ping out to all the members in the cluster, not just map members
0239:             * that this map is alive.
0240:             * @param timeout long
0241:             * @throws ChannelException
0242:             */
0243:            protected void ping(long timeout) throws ChannelException {
0244:                //send out a map membership message, only wait for the first reply
0245:                MapMessage msg = new MapMessage(this .mapContextName,
0246:                        MapMessage.MSG_INIT, false, null, null, null,
0247:                        wrap(channel.getLocalMember(false)));
0248:                if (channel.getMembers().length > 0) {
0249:                    //send a ping, wait for all nodes to reply
0250:                    Response[] resp = rpcChannel.send(channel.getMembers(),
0251:                            msg, rpcChannel.ALL_REPLY, (channelSendOptions),
0252:                            (int) accessTimeout);
0253:                    for (int i = 0; i < resp.length; i++) {
0254:                        memberAlive(resp[i].getSource());
0255:                    } //for
0256:                }
0257:                //update our map of members, expire some if we didn't receive a ping back
0258:                synchronized (mapMembers) {
0259:                    Iterator it = mapMembers.entrySet().iterator();
0260:                    long now = System.currentTimeMillis();
0261:                    while (it.hasNext()) {
0262:                        Map.Entry entry = (Map.Entry) it.next();
0263:                        long access = ((Long) entry.getValue()).longValue();
0264:                        if ((now - access) > timeout) {
0265:                            it.remove();
0266:                            memberDisappeared((Member) entry.getKey());
0267:                        }
0268:                    }
0269:                }//synch
0270:            }
0271:
0272:            /**
0273:             * We have received a member alive notification
0274:             * @param member Member
0275:             */
0276:            protected void memberAlive(Member member) {
0277:                synchronized (mapMembers) {
0278:                    if (!mapMembers.containsKey(member)) {
0279:                        mapMemberAdded(member);
0280:                    } //end if
0281:                    mapMembers
0282:                            .put(member, new Long(System.currentTimeMillis()));
0283:                }
0284:            }
0285:
0286:            /**
0287:             * Helper method to broadcast a message to all members in a channel
0288:             * @param msgtype int
0289:             * @param rpc boolean
0290:             * @throws ChannelException
0291:             */
0292:            protected void broadcast(int msgtype, boolean rpc)
0293:                    throws ChannelException {
0294:                //send out a map membership message, only wait for the first reply
0295:                MapMessage msg = new MapMessage(this .mapContextName, msgtype,
0296:                        false, null, null, null, wrap(channel
0297:                                .getLocalMember(false)));
0298:                if (rpc) {
0299:                    Response[] resp = rpcChannel.send(channel.getMembers(),
0300:                            msg, rpcChannel.FIRST_REPLY, (channelSendOptions),
0301:                            rpcTimeout);
0302:                    for (int i = 0; i < resp.length; i++) {
0303:                        mapMemberAdded(resp[i].getSource());
0304:                        messageReceived(resp[i].getMessage(), resp[i]
0305:                                .getSource());
0306:                    }
0307:                } else {
0308:                    channel.send(channel.getMembers(), msg, channelSendOptions);
0309:                }
0310:            }
0311:
0312:            public void breakdown() {
0313:                finalize();
0314:            }
0315:
0316:            public void finalize() {
0317:                try {
0318:                    broadcast(MapMessage.MSG_STOP, false);
0319:                } catch (Exception ignore) {
0320:                }
0321:                //cleanup
0322:                if (this .rpcChannel != null) {
0323:                    this .rpcChannel.breakdown();
0324:                }
0325:                if (this .channel != null) {
0326:                    this .channel.removeChannelListener(this );
0327:                    this .channel.removeMembershipListener(this );
0328:                }
0329:                this .rpcChannel = null;
0330:                this .channel = null;
0331:                this .mapMembers.clear();
0332:                super .clear();
0333:                this .stateTransferred = false;
0334:                this .externalLoaders = null;
0335:            }
0336:
0337:            public int hashCode() {
0338:                return Arrays.hashCode(this .mapContextName);
0339:            }
0340:
0341:            public boolean equals(Object o) {
0342:                if (o == null)
0343:                    return false;
0344:                if (!(o instanceof  AbstractReplicatedMap))
0345:                    return false;
0346:                if (!(o.getClass().equals(this .getClass())))
0347:                    return false;
0348:                AbstractReplicatedMap other = (AbstractReplicatedMap) o;
0349:                return Arrays.equals(mapContextName, other.mapContextName);
0350:            }
0351:
0352:            //------------------------------------------------------------------------------
0353:            //              GROUP COM INTERFACES
0354:            //------------------------------------------------------------------------------
0355:            public Member[] getMapMembers(HashMap members) {
0356:                synchronized (members) {
0357:                    Member[] result = new Member[members.size()];
0358:                    members.keySet().toArray(result);
0359:                    return result;
0360:                }
0361:            }
0362:
0363:            public Member[] getMapMembers() {
0364:                return getMapMembers(this .mapMembers);
0365:            }
0366:
0367:            public Member[] getMapMembersExcl(Member[] exclude) {
0368:                synchronized (mapMembers) {
0369:                    HashMap list = (HashMap) mapMembers.clone();
0370:                    for (int i = 0; i < exclude.length; i++)
0371:                        list.remove(exclude[i]);
0372:                    return getMapMembers(list);
0373:                }
0374:            }
0375:
0376:            /**
0377:             * Replicates any changes to the object since the last time
0378:             * The object has to be primary, ie, if the object is a proxy or a backup, it will not be replicated<br>
0379:             * @param complete - if set to true, the object is replicated to its backup
0380:             * if set to false, only objects that implement ReplicatedMapEntry and the isDirty() returns true will
0381:             * be replicated
0382:             */
0383:            public void replicate(Object key, boolean complete) {
0384:                if (log.isTraceEnabled())
0385:                    log.trace("Replicate invoked on key:" + key);
0386:                MapEntry entry = (MapEntry) super .get(key);
0387:                if (entry == null)
0388:                    return;
0389:                if (!entry.isSerializable())
0390:                    return;
0391:                if (entry != null && entry.isPrimary()
0392:                        && entry.getBackupNodes() != null
0393:                        && entry.getBackupNodes().length > 0) {
0394:                    Object value = entry.getValue();
0395:                    //check to see if we need to replicate this object isDirty()||complete
0396:                    boolean repl = complete
0397:                            || ((value instanceof  ReplicatedMapEntry) && ((ReplicatedMapEntry) value)
0398:                                    .isDirty());
0399:
0400:                    if (!repl) {
0401:                        if (log.isTraceEnabled())
0402:                            log.trace("Not replicating:" + key
0403:                                    + ", no change made");
0404:
0405:                        return;
0406:                    }
0407:                    //check to see if the message is diffable
0408:                    boolean diff = ((value instanceof  ReplicatedMapEntry) && ((ReplicatedMapEntry) value)
0409:                            .isDiffable());
0410:                    MapMessage msg = null;
0411:                    if (diff) {
0412:                        ReplicatedMapEntry rentry = (ReplicatedMapEntry) entry
0413:                                .getValue();
0414:                        try {
0415:                            rentry.lock();
0416:                            //construct a diff message
0417:                            msg = new MapMessage(mapContextName,
0418:                                    MapMessage.MSG_BACKUP, true,
0419:                                    (Serializable) entry.getKey(), null, rentry
0420:                                            .getDiff(), entry.getBackupNodes());
0421:                        } catch (IOException x) {
0422:                            log
0423:                                    .error(
0424:                                            "Unable to diff object. Will replicate the entire object instead.",
0425:                                            x);
0426:                        } finally {
0427:                            rentry.unlock();
0428:                        }
0429:
0430:                    }
0431:                    if (msg == null) {
0432:                        //construct a complete
0433:                        msg = new MapMessage(mapContextName,
0434:                                MapMessage.MSG_BACKUP, false,
0435:                                (Serializable) entry.getKey(),
0436:                                (Serializable) entry.getValue(), null, entry
0437:                                        .getBackupNodes());
0438:
0439:                    }
0440:                    try {
0441:                        if (channel != null && entry.getBackupNodes() != null
0442:                                && entry.getBackupNodes().length > 0) {
0443:                            channel.send(entry.getBackupNodes(), msg,
0444:                                    channelSendOptions);
0445:                        }
0446:                    } catch (ChannelException x) {
0447:                        log.error("Unable to replicate data.", x);
0448:                    }
0449:                } //end if
0450:
0451:            }
0452:
0453:            /**
0454:             * This can be invoked by a periodic thread to replicate out any changes.
0455:             * For maps that don't store objects that implement ReplicatedMapEntry, this
0456:             * method should be used infrequently to avoid large amounts of data transfer
0457:             * @param complete boolean
0458:             */
0459:            public void replicate(boolean complete) {
0460:                Iterator i = super .entrySet().iterator();
0461:                while (i.hasNext()) {
0462:                    Map.Entry e = (Map.Entry) i.next();
0463:                    replicate(e.getKey(), complete);
0464:                } //while
0465:
0466:            }
0467:
0468:            public void transferState() {
0469:                try {
0470:                    Member[] members = getMapMembers();
0471:                    Member backup = members.length > 0 ? (Member) members[0]
0472:                            : null;
0473:                    if (backup != null) {
0474:                        MapMessage msg = new MapMessage(mapContextName,
0475:                                getStateMessageType(), false, null, null, null,
0476:                                null);
0477:                        Response[] resp = rpcChannel.send(
0478:                                new Member[] { backup }, msg,
0479:                                rpcChannel.FIRST_REPLY, channelSendOptions,
0480:                                rpcTimeout);
0481:                        if (resp.length > 0) {
0482:                            synchronized (stateMutex) {
0483:                                msg = (MapMessage) resp[0].getMessage();
0484:                                msg.deserialize(getExternalLoaders());
0485:                                ArrayList list = (ArrayList) msg.getValue();
0486:                                for (int i = 0; i < list.size(); i++) {
0487:                                    messageReceived((Serializable) list.get(i),
0488:                                            resp[0].getSource());
0489:                                } //for
0490:                            }
0491:                        } else {
0492:                            log
0493:                                    .warn("Transfer state, 0 replies, probably a timeout.");
0494:                        }
0495:                    }
0496:                } catch (ChannelException x) {
0497:                    log.error("Unable to transfer LazyReplicatedMap state.", x);
0498:                } catch (IOException x) {
0499:                    log.error("Unable to transfer LazyReplicatedMap state.", x);
0500:                } catch (ClassNotFoundException x) {
0501:                    log.error("Unable to transfer LazyReplicatedMap state.", x);
0502:                }
0503:                stateTransferred = true;
0504:            }
0505:
0506:            /**
0507:             * @todo implement state transfer
0508:             * @param msg Serializable
0509:             * @return Serializable - null if no reply should be sent
0510:             */
0511:            public Serializable replyRequest(Serializable msg,
0512:                    final Member sender) {
0513:                if (!(msg instanceof  MapMessage))
0514:                    return null;
0515:                MapMessage mapmsg = (MapMessage) msg;
0516:
0517:                //map init request
0518:                if (mapmsg.getMsgType() == mapmsg.MSG_INIT) {
0519:                    mapmsg.setBackUpNodes(wrap(channel.getLocalMember(false)));
0520:                    return mapmsg;
0521:                }
0522:
0523:                //map start request
0524:                if (mapmsg.getMsgType() == mapmsg.MSG_START) {
0525:                    mapmsg.setBackUpNodes(wrap(channel.getLocalMember(false)));
0526:                    mapMemberAdded(sender);
0527:                    return mapmsg;
0528:                }
0529:
0530:                //backup request
0531:                if (mapmsg.getMsgType() == mapmsg.MSG_RETRIEVE_BACKUP) {
0532:                    MapEntry entry = (MapEntry) super .get(mapmsg.getKey());
0533:                    if (entry == null || (!entry.isSerializable()))
0534:                        return null;
0535:                    mapmsg.setValue((Serializable) entry.getValue());
0536:                    return mapmsg;
0537:                }
0538:
0539:                //state transfer request
0540:                if (mapmsg.getMsgType() == mapmsg.MSG_STATE
0541:                        || mapmsg.getMsgType() == mapmsg.MSG_STATE_COPY) {
0542:                    synchronized (stateMutex) { //make sure we dont do two things at the same time
0543:                        ArrayList list = new ArrayList();
0544:                        Iterator i = super .entrySet().iterator();
0545:                        while (i.hasNext()) {
0546:                            Map.Entry e = (Map.Entry) i.next();
0547:                            MapEntry entry = (MapEntry) super .get(e.getKey());
0548:                            if (entry.isSerializable()) {
0549:                                boolean copy = (mapmsg.getMsgType() == mapmsg.MSG_STATE_COPY);
0550:                                MapMessage me = new MapMessage(mapContextName,
0551:                                        copy ? MapMessage.MSG_COPY
0552:                                                : MapMessage.MSG_PROXY, false,
0553:                                        (Serializable) entry.getKey(),
0554:                                        copy ? (Serializable) entry.getValue()
0555:                                                : null, null, entry
0556:                                                .getBackupNodes());
0557:                                list.add(me);
0558:                            }
0559:                        }
0560:                        mapmsg.setValue(list);
0561:                        return mapmsg;
0562:
0563:                    } //synchronized
0564:                }
0565:
0566:                return null;
0567:
0568:            }
0569:
0570:            /**
0571:             * If the reply has already been sent to the requesting thread,
0572:             * the rpc callback can handle any data that comes in after the fact.
0573:             * @param msg Serializable
0574:             * @param sender Member
0575:             */
0576:            public void leftOver(Serializable msg, Member sender) {
0577:                //left over membership messages
0578:                if (!(msg instanceof  MapMessage))
0579:                    return;
0580:
0581:                MapMessage mapmsg = (MapMessage) msg;
0582:                try {
0583:                    mapmsg.deserialize(getExternalLoaders());
0584:                    if (mapmsg.getMsgType() == MapMessage.MSG_START) {
0585:                        mapMemberAdded(mapmsg.getBackupNodes()[0]);
0586:                    } else if (mapmsg.getMsgType() == MapMessage.MSG_INIT) {
0587:                        memberAlive(mapmsg.getBackupNodes()[0]);
0588:                    }
0589:                } catch (IOException x) {
0590:                    log.error("Unable to deserialize MapMessage.", x);
0591:                } catch (ClassNotFoundException x) {
0592:                    log.error("Unable to deserialize MapMessage.", x);
0593:                }
0594:            }
0595:
0596:            public void messageReceived(Serializable msg, Member sender) {
0597:                if (!(msg instanceof  MapMessage))
0598:                    return;
0599:
0600:                MapMessage mapmsg = (MapMessage) msg;
0601:                if (log.isTraceEnabled()) {
0602:                    log
0603:                            .trace("Map[" + mapname + "] received message:"
0604:                                    + mapmsg);
0605:                }
0606:
0607:                try {
0608:                    mapmsg.deserialize(getExternalLoaders());
0609:                } catch (IOException x) {
0610:                    log.error("Unable to deserialize MapMessage.", x);
0611:                    return;
0612:                } catch (ClassNotFoundException x) {
0613:                    log.error("Unable to deserialize MapMessage.", x);
0614:                    return;
0615:                }
0616:                if (log.isTraceEnabled())
0617:                    log.trace("Map message received from:" + sender.getName()
0618:                            + " msg:" + mapmsg);
0619:                if (mapmsg.getMsgType() == MapMessage.MSG_START) {
0620:                    mapMemberAdded(mapmsg.getBackupNodes()[0]);
0621:                }
0622:
0623:                if (mapmsg.getMsgType() == MapMessage.MSG_STOP) {
0624:                    memberDisappeared(mapmsg.getBackupNodes()[0]);
0625:                }
0626:
0627:                if (mapmsg.getMsgType() == MapMessage.MSG_PROXY) {
0628:                    MapEntry entry = (MapEntry) super .get(mapmsg.getKey());
0629:                    if (entry == null) {
0630:                        entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue());
0631:                        entry.setBackup(false);
0632:                        entry.setProxy(true);
0633:                        entry.setBackupNodes(mapmsg.getBackupNodes());
0634:                        super .put(entry.getKey(), entry);
0635:                    } else {
0636:                        entry.setProxy(true);
0637:                        entry.setBackup(false);
0638:                        entry.setBackupNodes(mapmsg.getBackupNodes());
0639:                    }
0640:                }
0641:
0642:                if (mapmsg.getMsgType() == MapMessage.MSG_REMOVE) {
0643:                    super .remove(mapmsg.getKey());
0644:                }
0645:
0646:                if (mapmsg.getMsgType() == MapMessage.MSG_BACKUP
0647:                        || mapmsg.getMsgType() == MapMessage.MSG_COPY) {
0648:                    MapEntry entry = (MapEntry) super .get(mapmsg.getKey());
0649:                    if (entry == null) {
0650:                        entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue());
0651:                        entry
0652:                                .setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP);
0653:                        entry.setProxy(false);
0654:                        entry.setBackupNodes(mapmsg.getBackupNodes());
0655:                        if (mapmsg.getValue() != null
0656:                                && mapmsg.getValue() instanceof  ReplicatedMapEntry) {
0657:                            ((ReplicatedMapEntry) mapmsg.getValue())
0658:                                    .setOwner(getMapOwner());
0659:                        }
0660:                    } else {
0661:                        entry
0662:                                .setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP);
0663:                        entry.setProxy(false);
0664:                        entry.setBackupNodes(mapmsg.getBackupNodes());
0665:                        if (entry.getValue() instanceof  ReplicatedMapEntry) {
0666:                            ReplicatedMapEntry diff = (ReplicatedMapEntry) entry
0667:                                    .getValue();
0668:                            if (mapmsg.isDiff()) {
0669:                                try {
0670:                                    diff.lock();
0671:                                    diff.applyDiff(mapmsg.getDiffValue(), 0,
0672:                                            mapmsg.getDiffValue().length);
0673:                                } catch (Exception x) {
0674:                                    log.error("Unable to apply diff to key:"
0675:                                            + entry.getKey(), x);
0676:                                } finally {
0677:                                    diff.unlock();
0678:                                }
0679:                            } else {
0680:                                if (mapmsg.getValue() != null)
0681:                                    entry.setValue(mapmsg.getValue());
0682:                                ((ReplicatedMapEntry) entry.getValue())
0683:                                        .setOwner(getMapOwner());
0684:                            } //end if
0685:                        } else if (mapmsg.getValue() instanceof  ReplicatedMapEntry) {
0686:                            ReplicatedMapEntry re = (ReplicatedMapEntry) mapmsg
0687:                                    .getValue();
0688:                            re.setOwner(getMapOwner());
0689:                            entry.setValue(re);
0690:                        } else {
0691:                            if (mapmsg.getValue() != null)
0692:                                entry.setValue(mapmsg.getValue());
0693:                        } //end if
0694:                    } //end if
0695:                    super .put(entry.getKey(), entry);
0696:                } //end if
0697:            }
0698:
0699:            public boolean accept(Serializable msg, Member sender) {
0700:                boolean result = false;
0701:                if (msg instanceof  MapMessage) {
0702:                    if (log.isTraceEnabled())
0703:                        log.trace("Map[" + mapname + "] accepting...." + msg);
0704:                    result = Arrays.equals(mapContextName, ((MapMessage) msg)
0705:                            .getMapId());
0706:                    if (log.isTraceEnabled())
0707:                        log.trace("Msg[" + mapname + "] accepted[" + result
0708:                                + "]...." + msg);
0709:                }
0710:                return result;
0711:            }
0712:
0713:            public void mapMemberAdded(Member member) {
0714:                if (member.equals(getChannel().getLocalMember(false)))
0715:                    return;
0716:                boolean memberAdded = false;
0717:                //select a backup node if we don't have one
0718:                synchronized (mapMembers) {
0719:                    if (!mapMembers.containsKey(member)) {
0720:                        mapMembers.put(member, new Long(System
0721:                                .currentTimeMillis()));
0722:                        memberAdded = true;
0723:                    }
0724:                }
0725:                if (memberAdded) {
0726:                    synchronized (stateMutex) {
0727:                        Iterator i = super .entrySet().iterator();
0728:                        while (i.hasNext()) {
0729:                            Map.Entry e = (Map.Entry) i.next();
0730:                            MapEntry entry = (MapEntry) super .get(e.getKey());
0731:                            if (entry == null)
0732:                                continue;
0733:                            if (entry.isPrimary()
0734:                                    && (entry.getBackupNodes() == null || entry
0735:                                            .getBackupNodes().length == 0)) {
0736:                                try {
0737:                                    Member[] backup = publishEntryInfo(entry
0738:                                            .getKey(), entry.getValue());
0739:                                    entry.setBackupNodes(backup);
0740:                                } catch (ChannelException x) {
0741:                                    log.error("Unable to select backup node.",
0742:                                            x);
0743:                                } //catch
0744:                            } //end if
0745:                        } //while
0746:                    } //synchronized
0747:                }//end if
0748:            }
0749:
0750:            public boolean inSet(Member m, Member[] set) {
0751:                if (set == null)
0752:                    return false;
0753:                boolean result = false;
0754:                for (int i = 0; i < set.length && (!result); i++)
0755:                    if (m.equals(set[i]))
0756:                        result = true;
0757:                return result;
0758:            }
0759:
0760:            public Member[] excludeFromSet(Member[] mbrs, Member[] set) {
0761:                ArrayList result = new ArrayList();
0762:                for (int i = 0; i < set.length; i++) {
0763:                    boolean include = true;
0764:                    for (int j = 0; j < mbrs.length; j++)
0765:                        if (mbrs[j].equals(set[i]))
0766:                            include = false;
0767:                    if (include)
0768:                        result.add(set[i]);
0769:                }
0770:                return (Member[]) result.toArray(new Member[result.size()]);
0771:            }
0772:
0773:            public void memberAdded(Member member) {
0774:                //do nothing
0775:            }
0776:
0777:            public void memberDisappeared(Member member) {
0778:                boolean removed = false;
0779:                synchronized (mapMembers) {
0780:                    removed = (mapMembers.remove(member) != null);
0781:                }
0782:                Iterator i = super .entrySet().iterator();
0783:                while (i.hasNext()) {
0784:                    Map.Entry e = (Map.Entry) i.next();
0785:                    MapEntry entry = (MapEntry) super .get(e.getKey());
0786:                    if (entry.isPrimary()
0787:                            && inSet(member, entry.getBackupNodes())) {
0788:                        try {
0789:                            Member[] backup = publishEntryInfo(entry.getKey(),
0790:                                    entry.getValue());
0791:                            entry.setBackupNodes(backup);
0792:                        } catch (ChannelException x) {
0793:                            log.error("Unable to relocate[" + entry.getKey()
0794:                                    + "] to a new backup node", x);
0795:                        }
0796:                    } //end if
0797:                } //while
0798:            }
0799:
0800:            public int getNextBackupIndex() {
0801:                int size = mapMembers.size();
0802:                if (mapMembers.size() == 0)
0803:                    return -1;
0804:                int node = currentNode++;
0805:                if (node >= size) {
0806:                    node = 0;
0807:                    currentNode = 0;
0808:                }
0809:                return node;
0810:            }
0811:
0812:            public Member getNextBackupNode() {
0813:                Member[] members = getMapMembers();
0814:                int node = getNextBackupIndex();
0815:                if (members.length == 0 || node == -1)
0816:                    return null;
0817:                if (node >= members.length)
0818:                    node = 0;
0819:                return members[node];
0820:            }
0821:
0822:            protected abstract Member[] publishEntryInfo(Object key,
0823:                    Object value) throws ChannelException;
0824:
0825:            public void heartbeat() {
0826:                try {
0827:                    ping(accessTimeout);
0828:                } catch (Exception x) {
0829:                    log
0830:                            .error(
0831:                                    "Unable to send AbstractReplicatedMap.ping message",
0832:                                    x);
0833:                }
0834:            }
0835:
0836:            //------------------------------------------------------------------------------    
0837:            //              METHODS TO OVERRIDE    
0838:            //------------------------------------------------------------------------------
0839:
0840:            /**
0841:             * Removes an object from this map, it will also remove it from 
0842:             * 
0843:             * @param key Object
0844:             * @return Object
0845:             */
0846:            public Object remove(Object key) {
0847:                return remove(key, true);
0848:            }
0849:
0850:            public Object remove(Object key, boolean notify) {
0851:                MapEntry entry = (MapEntry) super .remove(key);
0852:
0853:                try {
0854:                    if (getMapMembers().length > 0 && notify) {
0855:                        MapMessage msg = new MapMessage(getMapContextName(),
0856:                                MapMessage.MSG_REMOVE, false,
0857:                                (Serializable) key, null, null, null);
0858:                        getChannel().send(getMapMembers(), msg,
0859:                                getChannelSendOptions());
0860:                    }
0861:                } catch (ChannelException x) {
0862:                    log
0863:                            .error(
0864:                                    "Unable to replicate out data for a LazyReplicatedMap.remove operation",
0865:                                    x);
0866:                }
0867:                return entry != null ? entry.getValue() : null;
0868:            }
0869:
0870:            public MapEntry getInternal(Object key) {
0871:                return (MapEntry) super .get(key);
0872:            }
0873:
0874:            public Object get(Object key) {
0875:                MapEntry entry = (MapEntry) super .get(key);
0876:                if (log.isTraceEnabled())
0877:                    log.trace("Requesting id:" + key + " entry:" + entry);
0878:                if (entry == null)
0879:                    return null;
0880:                if (!entry.isPrimary()) {
0881:                    //if the message is not primary, we need to retrieve the latest value
0882:                    try {
0883:                        Member[] backup = null;
0884:                        MapMessage msg = null;
0885:                        if (!entry.isBackup()) {
0886:                            //make sure we don't retrieve from ourselves
0887:                            msg = new MapMessage(getMapContextName(),
0888:                                    MapMessage.MSG_RETRIEVE_BACKUP, false,
0889:                                    (Serializable) key, null, null, null);
0890:                            Response[] resp = getRpcChannel().send(
0891:                                    entry.getBackupNodes(), msg,
0892:                                    this .getRpcChannel().FIRST_REPLY,
0893:                                    Channel.SEND_OPTIONS_DEFAULT,
0894:                                    getRpcTimeout());
0895:                            if (resp == null || resp.length == 0) {
0896:                                //no responses
0897:                                log
0898:                                        .warn("Unable to retrieve remote object for key:"
0899:                                                + key);
0900:                                return null;
0901:                            }
0902:                            msg = (MapMessage) resp[0].getMessage();
0903:                            msg.deserialize(getExternalLoaders());
0904:                            backup = entry.getBackupNodes();
0905:                            if (entry.getValue() instanceof  ReplicatedMapEntry) {
0906:                                ReplicatedMapEntry val = (ReplicatedMapEntry) entry
0907:                                        .getValue();
0908:                                val.setOwner(getMapOwner());
0909:                            }
0910:                            if (msg.getValue() != null)
0911:                                entry.setValue(msg.getValue());
0912:                        }
0913:                        if (entry.isBackup()) {
0914:                            //select a new backup node
0915:                            backup = publishEntryInfo(key, entry.getValue());
0916:                        } else if (entry.isProxy()) {
0917:                            //invalidate the previous primary
0918:                            msg = new MapMessage(getMapContextName(),
0919:                                    MapMessage.MSG_PROXY, false,
0920:                                    (Serializable) key, null, null, backup);
0921:                            Member[] dest = getMapMembersExcl(backup);
0922:                            if (dest != null && dest.length > 0) {
0923:                                getChannel().send(dest, msg,
0924:                                        getChannelSendOptions());
0925:                            }
0926:                        }
0927:
0928:                        entry.setBackupNodes(backup);
0929:                        entry.setBackup(false);
0930:                        entry.setProxy(false);
0931:
0932:                    } catch (Exception x) {
0933:                        log
0934:                                .error(
0935:                                        "Unable to replicate out data for a LazyReplicatedMap.get operation",
0936:                                        x);
0937:                        return null;
0938:                    }
0939:                }
0940:                if (log.isTraceEnabled())
0941:                    log.trace("Requesting id:" + key + " result:"
0942:                            + entry.getValue());
0943:                if (entry.getValue() != null
0944:                        && entry.getValue() instanceof  ReplicatedMapEntry) {
0945:                    ReplicatedMapEntry val = (ReplicatedMapEntry) entry
0946:                            .getValue();
0947:                    //hack, somehow this is not being set above
0948:                    val.setOwner(getMapOwner());
0949:
0950:                }
0951:                return entry.getValue();
0952:            }
0953:
0954:            protected void printMap(String header) {
0955:                try {
0956:                    System.out.println("\nDEBUG MAP:" + header);
0957:                    System.out.println("Map["
0958:                            + new String(mapContextName, chset) + ", Map Size:"
0959:                            + super .size());
0960:                    Member[] mbrs = getMapMembers();
0961:                    for (int i = 0; i < mbrs.length; i++) {
0962:                        System.out.println("Mbr[" + (i + 1) + "="
0963:                                + mbrs[i].getName());
0964:                    }
0965:                    Iterator i = super .entrySet().iterator();
0966:                    int cnt = 0;
0967:
0968:                    while (i.hasNext()) {
0969:                        Map.Entry e = (Map.Entry) i.next();
0970:                        System.out.println((++cnt) + ". "
0971:                                + super .get(e.getKey()));
0972:                    }
0973:                    System.out.println("EndMap]\n\n");
0974:                } catch (Exception ignore) {
0975:                    ignore.printStackTrace();
0976:                }
0977:            }
0978:
0979:            /**
0980:             * Returns true if the key has an entry in the map.
0981:             * The entry can be a proxy or a backup entry, invoking <code>get(key)</code>
0982:             * will make this entry primary for the group
0983:             * @param key Object
0984:             * @return boolean
0985:             */
0986:            public boolean containsKey(Object key) {
0987:                return super .containsKey(key);
0988:            }
0989:
0990:            public Object put(Object key, Object value) {
0991:                return put(key, value, true);
0992:            }
0993:
0994:            public Object put(Object key, Object value, boolean notify) {
0995:                MapEntry entry = new MapEntry(key, value);
0996:                entry.setBackup(false);
0997:                entry.setProxy(false);
0998:
0999:                Object old = null;
1000:
1001:                //make sure that any old values get removed
1002:                if (containsKey(key))
1003:                    old = remove(key);
1004:                try {
1005:                    if (notify) {
1006:                        Member[] backup = publishEntryInfo(key, value);
1007:                        entry.setBackupNodes(backup);
1008:                    }
1009:                } catch (ChannelException x) {
1010:                    log
1011:                            .error(
1012:                                    "Unable to replicate out data for a LazyReplicatedMap.put operation",
1013:                                    x);
1014:                }
1015:                super .put(key, entry);
1016:                return old;
1017:            }
1018:
1019:            /**
1020:             * Copies all values from one map to this instance
1021:             * @param m Map
1022:             */
1023:            public void putAll(Map m) {
1024:                Iterator i = m.entrySet().iterator();
1025:                while (i.hasNext()) {
1026:                    Map.Entry entry = (Map.Entry) i.next();
1027:                    put(entry.getKey(), entry.getValue());
1028:                }
1029:            }
1030:
1031:            public void clear() {
1032:                clear(true);
1033:            }
1034:
1035:            public void clear(boolean notify) {
1036:                if (notify) {
1037:                    //only delete active keys
1038:                    Iterator keys = keySet().iterator();
1039:                    while (keys.hasNext())
1040:                        remove(keys.next());
1041:                } else {
1042:                    super .clear();
1043:                }
1044:            }
1045:
1046:            public boolean containsValue(Object value) {
1047:                if (value == null) {
1048:                    return super .containsValue(value);
1049:                } else {
1050:                    Iterator i = super .entrySet().iterator();
1051:                    while (i.hasNext()) {
1052:                        Map.Entry e = (Map.Entry) i.next();
1053:                        MapEntry entry = (MapEntry) super .get(e.getKey());
1054:                        if (entry.isPrimary() && value.equals(entry.getValue()))
1055:                            return true;
1056:                    }//while
1057:                    return false;
1058:                }//end if
1059:            }
1060:
1061:            public Object clone() {
1062:                throw new UnsupportedOperationException(
1063:                        "This operation is not valid on a replicated map");
1064:            }
1065:
1066:            /**
1067:             * Returns the entire contents of the map
1068:             * Map.Entry.getValue() will return a LazyReplicatedMap.MapEntry object containing all the information 
1069:             * about the object.
1070:             * @return Set
1071:             */
1072:            public Set entrySetFull() {
1073:                return super .entrySet();
1074:            }
1075:
1076:            public Set keySetFull() {
1077:                return super .keySet();
1078:            }
1079:
1080:            public int sizeFull() {
1081:                return super .size();
1082:            }
1083:
1084:            public Set entrySet() {
1085:                LinkedHashSet set = new LinkedHashSet(super .size());
1086:                Iterator i = super .entrySet().iterator();
1087:                while (i.hasNext()) {
1088:                    Map.Entry e = (Map.Entry) i.next();
1089:                    Object key = e.getKey();
1090:                    MapEntry entry = (MapEntry) super .get(key);
1091:                    if (entry.isPrimary())
1092:                        set.add(entry.getValue());
1093:                }
1094:                return Collections.unmodifiableSet(set);
1095:            }
1096:
1097:            public Set keySet() {
1098:                //todo implement
1099:                //should only return keys where this is active.
1100:                LinkedHashSet set = new LinkedHashSet(super .size());
1101:                Iterator i = super .entrySet().iterator();
1102:                while (i.hasNext()) {
1103:                    Map.Entry e = (Map.Entry) i.next();
1104:                    Object key = e.getKey();
1105:                    MapEntry entry = (MapEntry) super .get(key);
1106:                    if (entry.isPrimary())
1107:                        set.add(key);
1108:                }
1109:                return Collections.unmodifiableSet(set);
1110:
1111:            }
1112:
1113:            public int size() {
1114:                //todo, implement a counter variable instead
1115:                //only count active members in this node
1116:                int counter = 0;
1117:                Iterator it = super .entrySet().iterator();
1118:                while (it != null && it.hasNext()) {
1119:                    Map.Entry e = (Map.Entry) it.next();
1120:                    if (e != null) {
1121:                        MapEntry entry = (MapEntry) super .get(e.getKey());
1122:                        if (entry != null && entry.isPrimary()
1123:                                && entry.getValue() != null)
1124:                            counter++;
1125:                    }
1126:                }
1127:                return counter;
1128:            }
1129:
1130:            protected boolean removeEldestEntry(Map.Entry eldest) {
1131:                return false;
1132:            }
1133:
1134:            public boolean isEmpty() {
1135:                return size() == 0;
1136:            }
1137:
1138:            public Collection values() {
1139:                ArrayList values = new ArrayList();
1140:                Iterator i = super .entrySet().iterator();
1141:                while (i.hasNext()) {
1142:                    Map.Entry e = (Map.Entry) i.next();
1143:                    MapEntry entry = (MapEntry) super .get(e.getKey());
1144:                    if (entry.isPrimary() && entry.getValue() != null)
1145:                        values.add(entry.getValue());
1146:                }
1147:                return Collections.unmodifiableCollection(values);
1148:            }
1149:
1150:            //------------------------------------------------------------------------------
1151:            //                Map Entry class
1152:            //------------------------------------------------------------------------------
1153:            public static class MapEntry implements  Map.Entry {
1154:                private boolean backup;
1155:                private boolean proxy;
1156:                private Member[] backupNodes;
1157:
1158:                private Object key;
1159:                private Object value;
1160:
1161:                public MapEntry(Object key, Object value) {
1162:                    setKey(key);
1163:                    setValue(value);
1164:
1165:                }
1166:
1167:                public boolean isKeySerializable() {
1168:                    return (key == null) || (key instanceof  Serializable);
1169:                }
1170:
1171:                public boolean isValueSerializable() {
1172:                    return (value == null) || (value instanceof  Serializable);
1173:                }
1174:
1175:                public boolean isSerializable() {
1176:                    return isKeySerializable() && isValueSerializable();
1177:                }
1178:
1179:                public boolean isBackup() {
1180:                    return backup;
1181:                }
1182:
1183:                public void setBackup(boolean backup) {
1184:                    this .backup = backup;
1185:                }
1186:
1187:                public boolean isProxy() {
1188:                    return proxy;
1189:                }
1190:
1191:                public boolean isPrimary() {
1192:                    return ((!proxy) && (!backup));
1193:                }
1194:
1195:                public void setProxy(boolean proxy) {
1196:                    this .proxy = proxy;
1197:                }
1198:
1199:                public boolean isDiffable() {
1200:                    return (value instanceof  ReplicatedMapEntry)
1201:                            && ((ReplicatedMapEntry) value).isDiffable();
1202:                }
1203:
1204:                public void setBackupNodes(Member[] nodes) {
1205:                    this .backupNodes = nodes;
1206:                }
1207:
1208:                public Member[] getBackupNodes() {
1209:                    return backupNodes;
1210:                }
1211:
1212:                public Object getValue() {
1213:                    return value;
1214:                }
1215:
1216:                public Object setValue(Object value) {
1217:                    Object old = this .value;
1218:                    this .value = value;
1219:                    return old;
1220:                }
1221:
1222:                public Object getKey() {
1223:                    return key;
1224:                }
1225:
1226:                public Object setKey(Object key) {
1227:                    Object old = this .key;
1228:                    this .key = key;
1229:                    return old;
1230:                }
1231:
1232:                public int hashCode() {
1233:                    return key.hashCode();
1234:                }
1235:
1236:                public boolean equals(Object o) {
1237:                    return key.equals(o);
1238:                }
1239:
1240:                /**
1241:                 * apply a diff, or an entire object
1242:                 * @param data byte[]
1243:                 * @param offset int
1244:                 * @param length int
1245:                 * @param diff boolean
1246:                 * @throws IOException
1247:                 * @throws ClassNotFoundException
1248:                 */
1249:                public void apply(byte[] data, int offset, int length,
1250:                        boolean diff) throws IOException,
1251:                        ClassNotFoundException {
1252:                    if (isDiffable() && diff) {
1253:                        ReplicatedMapEntry rentry = (ReplicatedMapEntry) value;
1254:                        try {
1255:                            rentry.lock();
1256:                            rentry.applyDiff(data, offset, length);
1257:                        } finally {
1258:                            rentry.unlock();
1259:                        }
1260:                    } else if (length == 0) {
1261:                        value = null;
1262:                        proxy = true;
1263:                    } else {
1264:                        value = XByteBuffer.deserialize(data, offset, length);
1265:                    }
1266:                }
1267:
1268:                public String toString() {
1269:                    StringBuffer buf = new StringBuffer("MapEntry[key:");
1270:                    buf.append(getKey()).append("; ");
1271:                    buf.append("value:").append(getValue()).append("; ");
1272:                    buf.append("primary:").append(isPrimary()).append("; ");
1273:                    buf.append("backup:").append(isBackup()).append("; ");
1274:                    buf.append("proxy:").append(isProxy()).append(";]");
1275:                    return buf.toString();
1276:                }
1277:
1278:            }
1279:
1280:            //------------------------------------------------------------------------------
1281:            //                map message to send to and from other maps
1282:            //------------------------------------------------------------------------------
1283:
1284:            public static class MapMessage implements  Serializable {
1285:                public static final int MSG_BACKUP = 1;
1286:                public static final int MSG_RETRIEVE_BACKUP = 2;
1287:                public static final int MSG_PROXY = 3;
1288:                public static final int MSG_REMOVE = 4;
1289:                public static final int MSG_STATE = 5;
1290:                public static final int MSG_START = 6;
1291:                public static final int MSG_STOP = 7;
1292:                public static final int MSG_INIT = 8;
1293:                public static final int MSG_COPY = 9;
1294:                public static final int MSG_STATE_COPY = 10;
1295:
1296:                private byte[] mapId;
1297:                private int msgtype;
1298:                private boolean diff;
1299:                private transient Serializable key;
1300:                private transient Serializable value;
1301:                private byte[] valuedata;
1302:                private byte[] keydata;
1303:                private byte[] diffvalue;
1304:                private Member[] nodes;
1305:
1306:                public String toString() {
1307:                    StringBuffer buf = new StringBuffer("MapMessage[context=");
1308:                    buf.append(new String(mapId));
1309:                    buf.append("; type=");
1310:                    buf.append(getTypeDesc());
1311:                    buf.append("; key=");
1312:                    buf.append(key);
1313:                    buf.append("; value=");
1314:                    buf.append(value);
1315:                    return buf.toString();
1316:                }
1317:
1318:                public String getTypeDesc() {
1319:                    switch (msgtype) {
1320:                    case MSG_BACKUP:
1321:                        return "MSG_BACKUP";
1322:                    case MSG_RETRIEVE_BACKUP:
1323:                        return "MSG_RETRIEVE_BACKUP";
1324:                    case MSG_PROXY:
1325:                        return "MSG_PROXY";
1326:                    case MSG_REMOVE:
1327:                        return "MSG_REMOVE";
1328:                    case MSG_STATE:
1329:                        return "MSG_STATE";
1330:                    case MSG_START:
1331:                        return "MSG_START";
1332:                    case MSG_STOP:
1333:                        return "MSG_STOP";
1334:                    case MSG_INIT:
1335:                        return "MSG_INIT";
1336:                    case MSG_STATE_COPY:
1337:                        return "MSG_STATE_COPY";
1338:                    case MSG_COPY:
1339:                        return "MSG_COPY";
1340:                    default:
1341:                        return "UNKNOWN";
1342:                    }
1343:                }
1344:
1345:                public MapMessage() {
1346:                }
1347:
1348:                public MapMessage(byte[] mapId, int msgtype, boolean diff,
1349:                        Serializable key, Serializable value, byte[] diffvalue,
1350:                        Member[] nodes) {
1351:                    this .mapId = mapId;
1352:                    this .msgtype = msgtype;
1353:                    this .diff = diff;
1354:                    this .key = key;
1355:                    this .value = value;
1356:                    this .diffvalue = diffvalue;
1357:                    this .nodes = nodes;
1358:                    setValue(value);
1359:                    setKey(key);
1360:                }
1361:
1362:                public void deserialize(ClassLoader[] cls) throws IOException,
1363:                        ClassNotFoundException {
1364:                    key(cls);
1365:                    value(cls);
1366:                }
1367:
1368:                public int getMsgType() {
1369:                    return msgtype;
1370:                }
1371:
1372:                public boolean isDiff() {
1373:                    return diff;
1374:                }
1375:
1376:                public Serializable getKey() {
1377:                    try {
1378:                        return key(null);
1379:                    } catch (Exception x) {
1380:                        log.error(
1381:                                "Deserialization error of the MapMessage.key",
1382:                                x);
1383:                        return null;
1384:                    }
1385:                }
1386:
1387:                public Serializable key(ClassLoader[] cls) throws IOException,
1388:                        ClassNotFoundException {
1389:                    if (key != null)
1390:                        return key;
1391:                    if (keydata == null || keydata.length == 0)
1392:                        return null;
1393:                    key = XByteBuffer.deserialize(keydata, 0, keydata.length,
1394:                            cls);
1395:                    keydata = null;
1396:                    return key;
1397:                }
1398:
1399:                public byte[] getKeyData() {
1400:                    return keydata;
1401:                }
1402:
1403:                public Serializable getValue() {
1404:                    try {
1405:                        return value(null);
1406:                    } catch (Exception x) {
1407:                        log
1408:                                .error(
1409:                                        "Deserialization error of the MapMessage.value",
1410:                                        x);
1411:                        return null;
1412:                    }
1413:                }
1414:
1415:                public Serializable value(ClassLoader[] cls)
1416:                        throws IOException, ClassNotFoundException {
1417:                    if (value != null)
1418:                        return value;
1419:                    if (valuedata == null || valuedata.length == 0)
1420:                        return null;
1421:                    value = XByteBuffer.deserialize(valuedata, 0,
1422:                            valuedata.length, cls);
1423:                    valuedata = null;
1424:                    ;
1425:                    return value;
1426:                }
1427:
1428:                public byte[] getValueData() {
1429:                    return valuedata;
1430:                }
1431:
1432:                public byte[] getDiffValue() {
1433:                    return diffvalue;
1434:                }
1435:
1436:                public Member[] getBackupNodes() {
1437:                    return nodes;
1438:                }
1439:
1440:                private void setBackUpNodes(Member[] nodes) {
1441:                    this .nodes = nodes;
1442:                }
1443:
1444:                public byte[] getMapId() {
1445:                    return mapId;
1446:                }
1447:
1448:                public void setValue(Serializable value) {
1449:                    try {
1450:                        if (value != null)
1451:                            valuedata = XByteBuffer.serialize(value);
1452:                        this .value = value;
1453:                    } catch (IOException x) {
1454:                        throw new RuntimeException(x);
1455:                    }
1456:                }
1457:
1458:                public void setKey(Serializable key) {
1459:                    try {
1460:                        if (key != null)
1461:                            keydata = XByteBuffer.serialize(key);
1462:                        this .key = key;
1463:                    } catch (IOException x) {
1464:                        throw new RuntimeException(x);
1465:                    }
1466:                }
1467:
1468:                protected Member[] readMembers(ObjectInput in)
1469:                        throws IOException, ClassNotFoundException {
1470:                    int nodecount = in.readInt();
1471:                    Member[] members = new Member[nodecount];
1472:                    for (int i = 0; i < members.length; i++) {
1473:                        byte[] d = new byte[in.readInt()];
1474:                        in.read(d);
1475:                        if (d.length > 0)
1476:                            members[i] = MemberImpl.getMember(d);
1477:                    }
1478:                    return members;
1479:                }
1480:
1481:                protected void writeMembers(ObjectOutput out, Member[] members)
1482:                        throws IOException {
1483:                    if (members == null)
1484:                        members = new Member[0];
1485:                    out.writeInt(members.length);
1486:                    for (int i = 0; i < members.length; i++) {
1487:                        if (members[i] != null) {
1488:                            byte[] d = members[i] != null ? ((MemberImpl) members[i])
1489:                                    .getData(false)
1490:                                    : new byte[0];
1491:                            out.writeInt(d.length);
1492:                            out.write(d);
1493:                        }
1494:                    }
1495:                }
1496:
1497:                /**
1498:                 * shallow clone
1499:                 * @return Object
1500:                 */
1501:                public Object clone() {
1502:                    MapMessage msg = new MapMessage(this .mapId, this .msgtype,
1503:                            this .diff, this .key, this .value, this .diffvalue,
1504:                            this .nodes);
1505:                    msg.keydata = this .keydata;
1506:                    msg.valuedata = this .valuedata;
1507:                    return msg;
1508:                }
1509:            } //MapMessage
1510:
1511:            public Channel getChannel() {
1512:                return channel;
1513:            }
1514:
1515:            public byte[] getMapContextName() {
1516:                return mapContextName;
1517:            }
1518:
1519:            public RpcChannel getRpcChannel() {
1520:                return rpcChannel;
1521:            }
1522:
1523:            public long getRpcTimeout() {
1524:                return rpcTimeout;
1525:            }
1526:
1527:            public Object getStateMutex() {
1528:                return stateMutex;
1529:            }
1530:
1531:            public boolean isStateTransferred() {
1532:                return stateTransferred;
1533:            }
1534:
1535:            public Object getMapOwner() {
1536:                return mapOwner;
1537:            }
1538:
1539:            public ClassLoader[] getExternalLoaders() {
1540:                return externalLoaders;
1541:            }
1542:
1543:            public int getChannelSendOptions() {
1544:                return channelSendOptions;
1545:            }
1546:
1547:            public long getAccessTimeout() {
1548:                return accessTimeout;
1549:            }
1550:
1551:            public void setMapOwner(Object mapOwner) {
1552:                this .mapOwner = mapOwner;
1553:            }
1554:
1555:            public void setExternalLoaders(ClassLoader[] externalLoaders) {
1556:                this .externalLoaders = externalLoaders;
1557:            }
1558:
1559:            public void setChannelSendOptions(int channelSendOptions) {
1560:                this .channelSendOptions = channelSendOptions;
1561:            }
1562:
1563:            public void setAccessTimeout(long accessTimeout) {
1564:                this.accessTimeout = accessTimeout;
1565:            }
1566:
1567:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.