Source Code Cross Referenced for Multiplexer.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » mux » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.mux 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        package org.jgroups.mux;
0002:
0003:        import org.apache.commons.logging.Log;
0004:        import org.apache.commons.logging.LogFactory;
0005:        import org.jgroups.*;
0006:        import org.jgroups.protocols.pbcast.FLUSH;
0007:        import org.jgroups.stack.StateTransferInfo;
0008:        import org.jgroups.util.Promise;
0009:        import org.jgroups.util.Util;
0010:
0011:        import java.util.*;
0012:
0013:        /**
0014:         * Used for dispatching incoming messages. The Multiplexer implements UpHandler and registers with the associated
0015:         * JChannel (there can only be 1 Multiplexer per JChannel). When up() is called with a message, the header of the
0016:         * message is removed and the MuxChannel corresponding to the header's service ID is retrieved from the map,
0017:         * and MuxChannel.up() is called with the message.
0018:         * @author Bela Ban
0019:         * @version $Id: Multiplexer.java,v 1.35.2.4 2007/02/12 18:42:34 vlada Exp $
0020:         */
0021:        public class Multiplexer implements  UpHandler {
0022:            /** Map<String,MuxChannel>. Maintains the mapping between service IDs and their associated MuxChannels */
0023:            private final Map services = new HashMap();
0024:            private final JChannel channel;
0025:            static final Log log = LogFactory.getLog(Multiplexer.class);
0026:            static final String SEPARATOR = "::";
0027:            static final short SEPARATOR_LEN = (short) SEPARATOR.length();
0028:            static final String NAME = "MUX";
0029:            private final BlockOkCollector block_ok_collector = new BlockOkCollector();
0030:
0031:            private MergeView temp_merge_view = null;
0032:
0033:            private boolean flush_present = true;
0034:            private boolean blocked = false;
0035:
0036:            /** Cluster view */
0037:            View view = null;
0038:
0039:            Address local_addr = null;
0040:
0041:            /** Map<String,Boolean>. Map of service IDs and booleans that determine whether getState() has already been called */
0042:            private final Map state_transfer_listeners = new HashMap();
0043:
0044:            /** Map<String,List<Address>>. A map of services as keys and lists of hosts as values */
0045:            private final Map service_state = new HashMap();
0046:
0047:            /** Used to wait on service state information */
0048:            private final Promise service_state_promise = new Promise();
0049:
0050:            /** Map<Address, Set<String>>. Keys are senders, values are a set of services hosted by that sender.
0051:             * Used to collect responses to LIST_SERVICES_REQ */
0052:            private final Map service_responses = new HashMap();
0053:
0054:            private long SERVICES_RSP_TIMEOUT = 10000;
0055:
0056:            public Multiplexer() {
0057:                this .channel = null;
0058:                flush_present = isFlushPresent();
0059:            }
0060:
0061:            public Multiplexer(JChannel channel) {
0062:                this .channel = channel;
0063:                this .channel.setUpHandler(this );
0064:                this .channel.setOpt(Channel.BLOCK, Boolean.TRUE); // we want to handle BLOCK events ourselves
0065:                flush_present = isFlushPresent();
0066:            }
0067:
0068:            /**
0069:             * @deprecated Use ${link #getServiceIds()} instead
0070:             * @return The set of service IDs
0071:             */
0072:            public Set getApplicationIds() {
0073:                return services != null ? services.keySet() : null;
0074:            }
0075:
0076:            public Set getServiceIds() {
0077:                return services != null ? services.keySet() : null;
0078:            }
0079:
0080:            public long getServicesResponseTimeout() {
0081:                return SERVICES_RSP_TIMEOUT;
0082:            }
0083:
0084:            public void setServicesResponseTimeout(long services_rsp_timeout) {
0085:                this .SERVICES_RSP_TIMEOUT = services_rsp_timeout;
0086:            }
0087:
0088:            /** Returns a copy of the current view <em>minus</em> the nodes on which service service_id is <em>not</em> running
0089:             *
0090:             * @param service_id
0091:             * @return The service view
0092:             */
0093:            public View getServiceView(String service_id) {
0094:                List hosts = (List) service_state.get(service_id);
0095:                if (hosts == null)
0096:                    return null;
0097:                return generateServiceView(hosts);
0098:            }
0099:
0100:            public boolean stateTransferListenersPresent() {
0101:                return state_transfer_listeners != null
0102:                        && state_transfer_listeners.size() > 0;
0103:            }
0104:
0105:            /**
0106:             * Called by a MuxChannel when BLOCK_OK is sent down
0107:             */
0108:            public void blockOk() {
0109:                block_ok_collector.increment();
0110:            }
0111:
0112:            public synchronized void registerForStateTransfer(String appl_id,
0113:                    String substate_id) {
0114:                String key = appl_id;
0115:                if (substate_id != null && substate_id.length() > 0)
0116:                    key += SEPARATOR + substate_id;
0117:                state_transfer_listeners.put(key, Boolean.FALSE);
0118:            }
0119:
0120:            public synchronized boolean getState(Address target, String id,
0121:                    long timeout) throws ChannelNotConnectedException,
0122:                    ChannelClosedException {
0123:                if (state_transfer_listeners == null)
0124:                    return false;
0125:                Map.Entry entry;
0126:                String key;
0127:                for (Iterator it = state_transfer_listeners.entrySet()
0128:                        .iterator(); it.hasNext();) {
0129:                    entry = (Map.Entry) it.next();
0130:                    key = (String) entry.getKey();
0131:                    int index = key.indexOf(SEPARATOR);
0132:                    boolean match;
0133:                    if (index > -1) {
0134:                        String tmp = key.substring(0, index);
0135:                        match = id.equals(tmp);
0136:                    } else {
0137:                        match = id.equals(key);
0138:                    }
0139:                    if (match) {
0140:                        entry.setValue(Boolean.TRUE);
0141:                        break;
0142:                    }
0143:                }
0144:
0145:                Collection values = state_transfer_listeners.values();
0146:                boolean all_true = Util.all(values, Boolean.TRUE);
0147:                if (!all_true)
0148:                    return true; // pseudo
0149:
0150:                boolean rc = false;
0151:                Set keys = new HashSet(state_transfer_listeners.keySet());
0152:                rc = fetchServiceStates(target, keys, timeout);
0153:                state_transfer_listeners.clear();
0154:                return rc;
0155:            }
0156:
0157:            /** Fetches the app states for all service IDs in keys.
0158:             * The keys are a duplicate list, so it cannot be modified by the caller of this method
0159:             * @param keys
0160:             */
0161:            private boolean fetchServiceStates(Address target, Set keys,
0162:                    long timeout) throws ChannelClosedException,
0163:                    ChannelNotConnectedException {
0164:                boolean rc, all_rcs = true;
0165:                String appl_id;
0166:                for (Iterator it = keys.iterator(); it.hasNext();) {
0167:                    appl_id = (String) it.next();
0168:                    rc = channel.getState(target, appl_id, timeout);
0169:                    if (rc == false)
0170:                        all_rcs = false;
0171:                }
0172:                return all_rcs;
0173:            }
0174:
0175:            /**
0176:             * Fetches the map of services and hosts from the coordinator (Multiplexer). No-op if we are the coordinator
0177:             */
0178:            public void fetchServiceInformation() throws Exception {
0179:                while (true) {
0180:                    Address coord = getCoordinator(), local_address = channel != null ? channel
0181:                            .getLocalAddress()
0182:                            : null;
0183:                    boolean is_coord = coord != null && local_address != null
0184:                            && local_address.equals(coord);
0185:                    if (is_coord) {
0186:                        if (log.isTraceEnabled())
0187:                            log
0188:                                    .trace("I'm coordinator, will not fetch service state information");
0189:                        break;
0190:                    }
0191:
0192:                    ServiceInfo si = new ServiceInfo(ServiceInfo.STATE_REQ,
0193:                            null, null, null);
0194:                    MuxHeader hdr = new MuxHeader(si);
0195:                    Message state_req = new Message(coord, null, null);
0196:                    state_req.putHeader(NAME, hdr);
0197:                    service_state_promise.reset();
0198:                    channel.send(state_req);
0199:
0200:                    try {
0201:                        byte[] state = (byte[]) service_state_promise
0202:                                .getResultWithTimeout(2000);
0203:                        if (state != null) {
0204:                            Map new_state = (Map) Util
0205:                                    .objectFromByteBuffer(state);
0206:                            synchronized (service_state) {
0207:                                service_state.clear();
0208:                                service_state.putAll(new_state);
0209:                            }
0210:                            if (log.isTraceEnabled())
0211:                                log
0212:                                        .trace("service state was set successfully ("
0213:                                                + service_state.size()
0214:                                                + " entries)");
0215:                        } else {
0216:                            if (log.isWarnEnabled())
0217:                                log.warn("received service state was null");
0218:                        }
0219:                        break;
0220:                    } catch (TimeoutException e) {
0221:                        if (log.isTraceEnabled())
0222:                            log
0223:                                    .trace("timed out waiting for service state from "
0224:                                            + coord + ", retrying");
0225:                    }
0226:                }
0227:            }
0228:
0229:            public void sendServiceUpMessage(String service, Address host,
0230:                    boolean bypassFlush) throws Exception {
0231:                sendServiceMessage(ServiceInfo.SERVICE_UP, service, host,
0232:                        bypassFlush);
0233:                if (local_addr != null && host != null
0234:                        && local_addr.equals(host))
0235:                    handleServiceUp(service, host, false);
0236:            }
0237:
0238:            public void sendServiceDownMessage(String service, Address host,
0239:                    boolean bypassFlush) throws Exception {
0240:                sendServiceMessage(ServiceInfo.SERVICE_DOWN, service, host,
0241:                        bypassFlush);
0242:                if (local_addr != null && host != null
0243:                        && local_addr.equals(host))
0244:                    handleServiceDown(service, host, false);
0245:            }
0246:
0247:            public void up(Event evt) {
0248:                // remove header and dispatch to correct MuxChannel
0249:                MuxHeader hdr;
0250:
0251:                switch (evt.getType()) {
0252:                case Event.MSG:
0253:                    Message msg = (Message) evt.getArg();
0254:                    hdr = (MuxHeader) msg.getHeader(NAME);
0255:                    if (hdr == null) {
0256:                        log.error("MuxHeader not present - discarding message "
0257:                                + msg);
0258:                        return;
0259:                    }
0260:
0261:                    if (hdr.info != null) { // it is a service state request - not a default multiplex request
0262:                        try {
0263:                            handleServiceStateRequest(hdr.info, msg.getSrc());
0264:                        } catch (Exception e) {
0265:                            if (log.isErrorEnabled())
0266:                                log
0267:                                        .error(
0268:                                                "failure in handling service state request",
0269:                                                e);
0270:                        }
0271:                        break;
0272:                    }
0273:
0274:                    MuxChannel mux_ch = (MuxChannel) services.get(hdr.id);
0275:                    if (mux_ch == null) {
0276:                        log
0277:                                .warn("service "
0278:                                        + hdr.id
0279:                                        + " not currently running, discarding messgage "
0280:                                        + msg);
0281:                        return;
0282:                    }
0283:                    mux_ch.up(evt);
0284:                    break;
0285:
0286:                case Event.VIEW_CHANGE:
0287:                    Vector old_members = view != null ? view.getMembers()
0288:                            : null;
0289:                    view = (View) evt.getArg();
0290:                    Vector new_members = view != null ? view.getMembers()
0291:                            : null;
0292:                    Vector left_members = Util.determineLeftMembers(
0293:                            old_members, new_members);
0294:
0295:                    if (view instanceof  MergeView) {
0296:                        temp_merge_view = (MergeView) view.clone();
0297:                        if (log.isTraceEnabled())
0298:                            log.trace("received a MergeView: "
0299:                                    + temp_merge_view
0300:                                    + ", adjusting the service view");
0301:                        if (!flush_present && temp_merge_view != null) {
0302:                            try {
0303:                                if (log.isTraceEnabled())
0304:                                    log
0305:                                            .trace("calling handleMergeView() from VIEW_CHANGE (flush_present="
0306:                                                    + flush_present + ")");
0307:                                Thread merge_handler = new Thread() {
0308:                                    public void run() {
0309:                                        try {
0310:                                            handleMergeView(temp_merge_view);
0311:                                        } catch (Exception e) {
0312:                                            if (log.isErrorEnabled())
0313:                                                log
0314:                                                        .error(
0315:                                                                "problems handling merge view",
0316:                                                                e);
0317:                                        }
0318:                                    }
0319:                                };
0320:                                merge_handler
0321:                                        .setName("merge handler view_change");
0322:                                merge_handler.setDaemon(false);
0323:                                merge_handler.start();
0324:                            } catch (Exception e) {
0325:                                if (log.isErrorEnabled())
0326:                                    log.error("failed handling merge view", e);
0327:                            }
0328:                        } else {
0329:                            ; // don't do anything because we are blocked sending messages anyway
0330:                        }
0331:                    } else { // regular view
0332:                        synchronized (service_responses) {
0333:                            service_responses.clear();
0334:                        }
0335:                    }
0336:                    if (left_members.size() > 0)
0337:                        adjustServiceViews(left_members);
0338:                    break;
0339:
0340:                case Event.SUSPECT:
0341:                    Address suspected_mbr = (Address) evt.getArg();
0342:
0343:                    synchronized (service_responses) {
0344:                        service_responses.put(suspected_mbr, null);
0345:                        service_responses.notifyAll();
0346:                    }
0347:                    passToAllMuxChannels(evt);
0348:                    break;
0349:
0350:                case Event.GET_APPLSTATE:
0351:                case Event.STATE_TRANSFER_OUTPUTSTREAM:
0352:                    handleStateRequest(evt);
0353:                    break;
0354:
0355:                case Event.GET_STATE_OK:
0356:                case Event.STATE_TRANSFER_INPUTSTREAM:
0357:                    handleStateResponse(evt);
0358:                    break;
0359:
0360:                case Event.SET_LOCAL_ADDRESS:
0361:                    local_addr = (Address) evt.getArg();
0362:                    passToAllMuxChannels(evt);
0363:                    break;
0364:
0365:                case Event.BLOCK:
0366:                    temp_merge_view = null;
0367:                    blocked = true;
0368:                    int num_services = services.size();
0369:                    if (num_services == 0) {
0370:                        channel.blockOk();
0371:                        return;
0372:                    }
0373:                    block_ok_collector.reset();
0374:                    passToAllMuxChannels(evt);
0375:                    block_ok_collector.waitUntil(num_services);
0376:                    channel.blockOk();
0377:                    return;
0378:
0379:                case Event.UNBLOCK: // process queued-up MergeViews
0380:                    if (!blocked) {
0381:                        passToAllMuxChannels(evt);
0382:                        return;
0383:                    } else
0384:                        blocked = false;
0385:                    if (temp_merge_view != null) {
0386:                        if (log.isTraceEnabled())
0387:                            log
0388:                                    .trace("calling handleMergeView() from UNBLOCK (flush_present="
0389:                                            + flush_present + ")");
0390:                        try {
0391:                            Thread merge_handler = new Thread() {
0392:                                public void run() {
0393:                                    try {
0394:                                        handleMergeView(temp_merge_view);
0395:                                    } catch (Exception e) {
0396:                                        if (log.isErrorEnabled())
0397:                                            log
0398:                                                    .error(
0399:                                                            "problems handling merge view",
0400:                                                            e);
0401:                                    }
0402:                                }
0403:                            };
0404:                            merge_handler.setName("merge handler (unblock)");
0405:                            merge_handler.setDaemon(false);
0406:                            merge_handler.start();
0407:                        } catch (Exception e) {
0408:                            if (log.isErrorEnabled())
0409:                                log.error("failed handling merge view", e);
0410:                        }
0411:                    }
0412:                    passToAllMuxChannels(evt);
0413:                    break;
0414:
0415:                default:
0416:                    passToAllMuxChannels(evt);
0417:                    break;
0418:                }
0419:            }
0420:
0421:            public Channel createMuxChannel(JChannelFactory f, String id,
0422:                    String stack_name) throws Exception {
0423:                MuxChannel ch;
0424:                synchronized (services) {
0425:                    if (services.containsKey(id))
0426:                        throw new Exception(
0427:                                "service ID \""
0428:                                        + id
0429:                                        + "\" is already registered, cannot register duplicate ID");
0430:                    ch = new MuxChannel(f, channel, id, stack_name, this );
0431:                    services.put(id, ch);
0432:                }
0433:                return ch;
0434:            }
0435:
0436:            private void passToAllMuxChannels(Event evt) {
0437:                for (Iterator it = services.values().iterator(); it.hasNext();) {
0438:                    MuxChannel ch = (MuxChannel) it.next();
0439:                    ch.up(evt);
0440:                }
0441:            }
0442:
0443:            public MuxChannel remove(String id) {
0444:                synchronized (services) {
0445:                    return (MuxChannel) services.remove(id);
0446:                }
0447:            }
0448:
0449:            /** Closes the underlying JChannel if all MuxChannels have been disconnected */
0450:            public void disconnect() {
0451:                MuxChannel mux_ch;
0452:                boolean all_disconnected = true;
0453:                synchronized (services) {
0454:                    for (Iterator it = services.values().iterator(); it
0455:                            .hasNext();) {
0456:                        mux_ch = (MuxChannel) it.next();
0457:                        if (mux_ch.isConnected()) {
0458:                            all_disconnected = false;
0459:                            break;
0460:                        }
0461:                    }
0462:                    if (all_disconnected) {
0463:                        if (log.isTraceEnabled()) {
0464:                            log
0465:                                    .trace("disconnecting underlying JChannel as all MuxChannels are disconnected");
0466:                        }
0467:                        channel.disconnect();
0468:                    }
0469:                }
0470:            }
0471:
0472:            public void unregister(String appl_id) {
0473:                synchronized (services) {
0474:                    services.remove(appl_id);
0475:                }
0476:            }
0477:
0478:            public boolean close() {
0479:                MuxChannel mux_ch;
0480:                boolean all_closed = true;
0481:                synchronized (services) {
0482:                    for (Iterator it = services.values().iterator(); it
0483:                            .hasNext();) {
0484:                        mux_ch = (MuxChannel) it.next();
0485:                        if (mux_ch.isOpen()) {
0486:                            all_closed = false;
0487:                            break;
0488:                        }
0489:                    }
0490:                    if (all_closed) {
0491:                        if (log.isTraceEnabled()) {
0492:                            log
0493:                                    .trace("closing underlying JChannel as all MuxChannels are closed");
0494:                        }
0495:                        channel.close();
0496:                        services.clear();
0497:                    }
0498:                    return all_closed;
0499:                }
0500:            }
0501:
0502:            public void closeAll() {
0503:                synchronized (services) {
0504:                    MuxChannel mux_ch;
0505:                    for (Iterator it = services.values().iterator(); it
0506:                            .hasNext();) {
0507:                        mux_ch = (MuxChannel) it.next();
0508:                        mux_ch.setConnected(false);
0509:                        mux_ch.setClosed(true);
0510:                        mux_ch.closeMessageQueue(true);
0511:                    }
0512:                }
0513:            }
0514:
0515:            public boolean shutdown() {
0516:                MuxChannel mux_ch;
0517:                boolean all_closed = true;
0518:                synchronized (services) {
0519:                    for (Iterator it = services.values().iterator(); it
0520:                            .hasNext();) {
0521:                        mux_ch = (MuxChannel) it.next();
0522:                        if (mux_ch.isOpen()) {
0523:                            all_closed = false;
0524:                            break;
0525:                        }
0526:                    }
0527:                    if (all_closed) {
0528:                        if (log.isTraceEnabled()) {
0529:                            log
0530:                                    .trace("shutting down underlying JChannel as all MuxChannels are closed");
0531:                        }
0532:                        channel.shutdown();
0533:                        services.clear();
0534:                    }
0535:                    return all_closed;
0536:                }
0537:            }
0538:
0539:            private boolean isFlushPresent() {
0540:                return channel.getProtocolStack().findProtocol("FLUSH") != null;
0541:            }
0542:
0543:            private void sendServiceState() throws Exception {
0544:                Object[] my_services = services.keySet().toArray();
0545:                byte[] data = Util.objectToByteBuffer(my_services);
0546:                ServiceInfo sinfo = new ServiceInfo(
0547:                        ServiceInfo.LIST_SERVICES_RSP, null, channel
0548:                                .getLocalAddress(), data);
0549:                Message rsp = new Message(); // send to everyone
0550:                MuxHeader hdr = new MuxHeader(sinfo);
0551:                rsp.putHeader(NAME, hdr);
0552:                channel.send(rsp);
0553:            }
0554:
0555:            private Address getLocalAddress() {
0556:                if (local_addr != null)
0557:                    return local_addr;
0558:                if (channel != null)
0559:                    local_addr = channel.getLocalAddress();
0560:                return local_addr;
0561:            }
0562:
0563:            private Address getCoordinator() {
0564:                if (channel != null) {
0565:                    View v = channel.getView();
0566:                    if (v != null) {
0567:                        Vector members = v.getMembers();
0568:                        if (members != null && members.size() > 0) {
0569:                            return (Address) members.firstElement();
0570:                        }
0571:                    }
0572:                }
0573:                return null;
0574:            }
0575:
0576:            /**
0577:             *
0578:             * Returns an Address of a state provider for a given service_id.
0579:             *  
0580:             * If preferredTarget is a member of a service view for a given service_id 
0581:             * then preferredTarget is returned. Otherwise, service view coordinator is 
0582:             * returned if such node exists. If service view is empty for a given service_id 
0583:             * null is returned.  
0584:             *
0585:             * @param preferredTarget
0586:             * @param service_id
0587:             * @return
0588:             */
0589:            public Address getStateProvider(Address preferredTarget,
0590:                    String service_id) {
0591:                Address result = null;
0592:                List hosts = (List) service_state.get(service_id);
0593:                if (hosts != null && !hosts.isEmpty()) {
0594:                    if (hosts.contains(preferredTarget)) {
0595:                        result = preferredTarget;
0596:                    } else {
0597:                        result = (Address) hosts.get(0);
0598:                    }
0599:                }
0600:                return result;
0601:            }
0602:
0603:            private void sendServiceMessage(byte type, String service,
0604:                    Address host, boolean bypassFlush) throws Exception {
0605:                if (host == null)
0606:                    host = getLocalAddress();
0607:                if (host == null) {
0608:                    if (log.isWarnEnabled()) {
0609:                        log.warn("local_addr is null, cannot send ServiceInfo."
0610:                                + ServiceInfo.typeToString(type) + " message");
0611:                    }
0612:                    return;
0613:                }
0614:
0615:                ServiceInfo si = new ServiceInfo(type, service, host, null);
0616:                MuxHeader hdr = new MuxHeader(si);
0617:                Message service_msg = new Message();
0618:                service_msg.putHeader(NAME, hdr);
0619:                if (bypassFlush)
0620:                    service_msg.putHeader(FLUSH.NAME, new FLUSH.FlushHeader(
0621:                            FLUSH.FlushHeader.FLUSH_BYPASS));
0622:
0623:                channel.send(service_msg);
0624:            }
0625:
0626:            private void handleStateRequest(Event evt) {
0627:                StateTransferInfo info = (StateTransferInfo) evt.getArg();
0628:                String id = info.state_id;
0629:                String original_id = id;
0630:                MuxChannel mux_ch = null;
0631:
0632:                try {
0633:                    int index = id.indexOf(SEPARATOR);
0634:                    if (index > -1) {
0635:                        info.state_id = id.substring(index + SEPARATOR_LEN);
0636:                        id = id.substring(0, index); // similar reuse as above...
0637:                    } else {
0638:                        info.state_id = null;
0639:                    }
0640:
0641:                    mux_ch = (MuxChannel) services.get(id);
0642:                    if (mux_ch == null)
0643:                        throw new IllegalArgumentException(
0644:                                "didn't find service with ID=" + id
0645:                                        + " to fetch state from");
0646:
0647:                    // evt.setArg(info);
0648:                    mux_ch.up(evt); // state_id will be null, get regular state from the service named state_id
0649:                } catch (Throwable ex) {
0650:                    if (log.isErrorEnabled())
0651:                        log
0652:                                .error(
0653:                                        "failed returning the application state, will return null",
0654:                                        ex);
0655:                    channel.returnState(null, original_id); // we cannot use mux_ch because it might be null due to the lookup above
0656:                }
0657:            }
0658:
0659:            private void handleStateResponse(Event evt) {
0660:                StateTransferInfo info = (StateTransferInfo) evt.getArg();
0661:                MuxChannel mux_ch;
0662:
0663:                String appl_id, substate_id, tmp;
0664:                tmp = info.state_id;
0665:
0666:                if (tmp == null) {
0667:                    if (log.isTraceEnabled())
0668:                        log.trace("state is null, not passing up: " + info);
0669:                    return;
0670:                }
0671:
0672:                int index = tmp.indexOf(SEPARATOR);
0673:                if (index > -1) {
0674:                    appl_id = tmp.substring(0, index);
0675:                    substate_id = tmp.substring(index + SEPARATOR_LEN);
0676:                } else {
0677:                    appl_id = tmp;
0678:                    substate_id = null;
0679:                }
0680:
0681:                mux_ch = (MuxChannel) services.get(appl_id);
0682:                if (mux_ch == null) {
0683:                    log.error("didn't find service with ID=" + appl_id
0684:                            + " to fetch state from");
0685:                } else {
0686:                    StateTransferInfo tmp_info = info.copy();
0687:                    tmp_info.state_id = substate_id;
0688:                    evt.setArg(tmp_info);
0689:                    mux_ch.up(evt); // state_id will be null, get regular state from the service named state_id
0690:                }
0691:            }
0692:
0693:            private void handleServiceStateRequest(ServiceInfo info,
0694:                    Address sender) throws Exception {
0695:                switch (info.type) {
0696:                case ServiceInfo.STATE_REQ:
0697:                    byte[] state;
0698:                    synchronized (service_state) {
0699:                        state = Util.objectToByteBuffer(service_state);
0700:                    }
0701:                    ServiceInfo si = new ServiceInfo(ServiceInfo.STATE_RSP,
0702:                            null, null, state);
0703:                    MuxHeader hdr = new MuxHeader(si);
0704:                    Message state_rsp = new Message(sender);
0705:                    state_rsp.putHeader(NAME, hdr);
0706:                    channel.send(state_rsp);
0707:                    break;
0708:                case ServiceInfo.STATE_RSP:
0709:                    service_state_promise.setResult(info.state);
0710:                    break;
0711:                case ServiceInfo.SERVICE_UP:
0712:                    handleServiceUp(info.service, info.host, true);
0713:                    break;
0714:                case ServiceInfo.SERVICE_DOWN:
0715:                    handleServiceDown(info.service, info.host, true);
0716:                    break;
0717:                case ServiceInfo.LIST_SERVICES_RSP:
0718:                    handleServicesRsp(sender, info.state);
0719:                    break;
0720:                default:
0721:                    if (log.isErrorEnabled())
0722:                        log.error("service request type " + info.type
0723:                                + " not known");
0724:                    break;
0725:                }
0726:            }
0727:
0728:            private void handleServicesRsp(Address sender, byte[] state)
0729:                    throws Exception {
0730:                Object[] keys = (Object[]) Util.objectFromByteBuffer(state);
0731:                Set s = new HashSet();
0732:                for (int i = 0; i < keys.length; i++)
0733:                    s.add(keys[i]);
0734:
0735:                synchronized (service_responses) {
0736:                    Set tmp = (Set) service_responses.get(sender);
0737:                    if (tmp == null)
0738:                        tmp = new HashSet();
0739:                    tmp.addAll(s);
0740:
0741:                    service_responses.put(sender, tmp);
0742:                    if (log.isTraceEnabled())
0743:                        log.trace("received service response: " + sender + "("
0744:                                + s.toString() + ")");
0745:                    service_responses.notifyAll();
0746:                }
0747:            }
0748:
0749:            private void handleServiceDown(String service, Address host,
0750:                    boolean received) {
0751:                List hosts, hosts_copy;
0752:                boolean removed = false;
0753:
0754:                // discard if we sent this message
0755:                if (received && host != null && local_addr != null
0756:                        && local_addr.equals(host)) {
0757:                    return;
0758:                }
0759:
0760:                synchronized (service_state) {
0761:                    hosts = (List) service_state.get(service);
0762:                    if (hosts == null)
0763:                        return;
0764:                    removed = hosts.remove(host);
0765:                    hosts_copy = new ArrayList(hosts); // make a copy so we don't modify hosts in generateServiceView()
0766:                }
0767:
0768:                if (removed) {
0769:                    View service_view = generateServiceView(hosts_copy);
0770:                    if (service_view != null) {
0771:                        MuxChannel ch = (MuxChannel) services.get(service);
0772:                        if (ch != null) {
0773:                            Event view_evt = new Event(Event.VIEW_CHANGE,
0774:                                    service_view);
0775:                            ch.up(view_evt);
0776:                        } else {
0777:                            if (log.isTraceEnabled())
0778:                                log
0779:                                        .trace("service "
0780:                                                + service
0781:                                                + " not found, cannot dispatch service view "
0782:                                                + service_view);
0783:                        }
0784:                    }
0785:                }
0786:
0787:                Address local_address = getLocalAddress();
0788:                if (local_address != null && host != null
0789:                        && host.equals(local_address))
0790:                    unregister(service);
0791:            }
0792:
0793:            private void handleServiceUp(String service, Address host,
0794:                    boolean received) {
0795:                List hosts, hosts_copy;
0796:                boolean added = false;
0797:
0798:                // discard if we sent this message
0799:                if (received && host != null && local_addr != null
0800:                        && local_addr.equals(host)) {
0801:                    return;
0802:                }
0803:
0804:                synchronized (service_state) {
0805:                    hosts = (List) service_state.get(service);
0806:                    if (hosts == null) {
0807:                        hosts = new ArrayList();
0808:                        service_state.put(service, hosts);
0809:                    }
0810:                    if (!hosts.contains(host)) {
0811:                        hosts.add(host);
0812:                        added = true;
0813:                    }
0814:                    hosts_copy = new ArrayList(hosts); // make a copy so we don't modify hosts in generateServiceView()
0815:                }
0816:
0817:                if (added) {
0818:                    View service_view = generateServiceView(hosts_copy);
0819:                    if (service_view != null) {
0820:                        MuxChannel ch = (MuxChannel) services.get(service);
0821:                        if (ch != null) {
0822:                            Event view_evt = new Event(Event.VIEW_CHANGE,
0823:                                    service_view);
0824:                            ch.up(view_evt);
0825:                        } else {
0826:                            if (log.isTraceEnabled())
0827:                                log
0828:                                        .trace("service "
0829:                                                + service
0830:                                                + " not found, cannot dispatch service view "
0831:                                                + service_view);
0832:                        }
0833:                    }
0834:                }
0835:            }
0836:
0837:            /**
0838:             * Fetches the service states from everyone else in the cluster. Once all states have been received and inserted into
0839:             * service_state, compute a service view (a copy of MergeView) for each service and pass it up
0840:             * @param view
0841:             */
0842:            private void handleMergeView(MergeView view) throws Exception {
0843:                long time_to_wait = SERVICES_RSP_TIMEOUT, start;
0844:                int num_members = view.size(); // include myself
0845:                Map copy = null;
0846:
0847:                sendServiceState();
0848:
0849:                synchronized (service_responses) {
0850:                    start = System.currentTimeMillis();
0851:                    try {
0852:                        while (time_to_wait > 0
0853:                                && numResponses(service_responses) < num_members) {
0854:                            // System.out.println("time_to_wait=" + time_to_wait + ", numResponses(service_responses)=" + numResponses(service_responses) +
0855:                            //     ", num_members=" + num_members + ", service_state=" + service_state);
0856:                            service_responses.wait(time_to_wait);
0857:                            time_to_wait -= System.currentTimeMillis() - start;
0858:                        }
0859:
0860:                        // System.out.println("wait terminated: time_to_wait=" + time_to_wait + ", numResponses(service_responses)=" + numResponses(service_responses) +
0861:                        //     ", num_members=" + num_members + ", service_state=" + service_state);
0862:                        copy = new HashMap(service_responses);
0863:                    } catch (Exception ex) {
0864:                        if (log.isErrorEnabled())
0865:                            log
0866:                                    .error(
0867:                                            "failed fetching a list of services from other members in the cluster, cannot handle merge view "
0868:                                                    + view, ex);
0869:                    }
0870:                }
0871:
0872:                if (log.isTraceEnabled())
0873:                    log.trace("merging service state, my service_state: "
0874:                            + service_state + ", received responses: " + copy);
0875:
0876:                // merges service_responses with service_state and emits MergeViews for the services affected (MuxChannel)
0877:                mergeServiceState(view, copy);
0878:                service_responses.clear();
0879:            }
0880:
0881:            private int numResponses(Map m) {
0882:                int num = 0;
0883:                Collection values = m.values();
0884:                for (Iterator it = values.iterator(); it.hasNext();) {
0885:                    if (it.next() != null)
0886:                        num++;
0887:                }
0888:
0889:                return num;
0890:            }
0891:
0892:            private void mergeServiceState(MergeView view, Map copy) {
0893:                Set modified_services = new HashSet();
0894:                Map.Entry entry;
0895:                Address host; // address of the sender
0896:                Set service_list; // Set<String> of services
0897:                List my_services;
0898:                String service;
0899:
0900:                synchronized (service_state) {
0901:                    for (Iterator it = copy.entrySet().iterator(); it.hasNext();) {
0902:                        entry = (Map.Entry) it.next();
0903:                        host = (Address) entry.getKey();
0904:                        service_list = (Set) entry.getValue();
0905:                        if (service_list == null)
0906:                            continue;
0907:
0908:                        for (Iterator it2 = service_list.iterator(); it2
0909:                                .hasNext();) {
0910:                            service = (String) it2.next();
0911:                            my_services = (List) service_state.get(service);
0912:                            if (my_services == null) {
0913:                                my_services = new ArrayList();
0914:                                service_state.put(service, my_services);
0915:                            }
0916:
0917:                            boolean was_modified = my_services.add(host);
0918:                            if (was_modified) {
0919:                                modified_services.add(service);
0920:                            }
0921:                        }
0922:                    }
0923:                }
0924:
0925:                // now emit MergeViews for all services which were modified
0926:                for (Iterator it = modified_services.iterator(); it.hasNext();) {
0927:                    service = (String) it.next();
0928:                    MuxChannel ch = (MuxChannel) services.get(service);
0929:                    my_services = (List) service_state.get(service);
0930:                    Vector membersCopy = new Vector(view.getMembers());
0931:                    membersCopy.retainAll(my_services);
0932:                    MergeView v = new MergeView(view.getVid(), membersCopy,
0933:                            view.getSubgroups());
0934:                    Event evt = new Event(Event.VIEW_CHANGE, v);
0935:                    ch.up(evt);
0936:                }
0937:            }
0938:
0939:            private void adjustServiceViews(Vector left_members) {
0940:                if (left_members != null)
0941:                    for (int i = 0; i < left_members.size(); i++) {
0942:                        try {
0943:                            adjustServiceView((Address) left_members
0944:                                    .elementAt(i));
0945:                        } catch (Throwable t) {
0946:                            if (log.isErrorEnabled())
0947:                                log.error("failed adjusting service views", t);
0948:                        }
0949:                    }
0950:            }
0951:
0952:            private void adjustServiceView(Address host) {
0953:                Map.Entry entry;
0954:                List hosts, hosts_copy;
0955:                String service;
0956:                boolean removed = false;
0957:
0958:                synchronized (service_state) {
0959:                    for (Iterator it = service_state.entrySet().iterator(); it
0960:                            .hasNext();) {
0961:                        entry = (Map.Entry) it.next();
0962:                        service = (String) entry.getKey();
0963:                        hosts = (List) entry.getValue();
0964:                        if (hosts == null)
0965:                            continue;
0966:
0967:                        removed = hosts.remove(host);
0968:                        hosts_copy = new ArrayList(hosts); // make a copy so we don't modify hosts in generateServiceView()
0969:
0970:                        if (removed) {
0971:                            View service_view = generateServiceView(hosts_copy);
0972:                            if (service_view != null) {
0973:                                MuxChannel ch = (MuxChannel) services
0974:                                        .get(service);
0975:                                if (ch != null) {
0976:                                    Event view_evt = new Event(
0977:                                            Event.VIEW_CHANGE, service_view);
0978:                                    ch.up(view_evt);
0979:                                } else {
0980:                                    if (log.isTraceEnabled())
0981:                                        log
0982:                                                .trace("service "
0983:                                                        + service
0984:                                                        + " not found, cannot dispatch service view "
0985:                                                        + service_view);
0986:                                }
0987:                            }
0988:                        }
0989:                        Address local_address = getLocalAddress();
0990:                        if (local_address != null && host != null
0991:                                && host.equals(local_address))
0992:                            unregister(service);
0993:                    }
0994:                }
0995:            }
0996:
0997:            /**
0998:             * Create a copy of view which contains only members which are present in hosts. Call viewAccepted() on the MuxChannel
0999:             * which corresponds with service. If no members are removed or added from/to view, this is a no-op.
1000:             * @param hosts List<Address>
1001:             * @return the servicd view (a modified copy of the real view), or null if the view was not modified
1002:             */
1003:            private View generateServiceView(List hosts) {
1004:                Vector members = new Vector(view.getMembers());
1005:                members.retainAll(hosts);
1006:                return new View(view.getVid(), members);
1007:            }
1008:
1009:            /** Tell the underlying channel to start the flush protocol, this will be handled by FLUSH */
1010:            private void startFlush() {
1011:                channel.down(new Event(Event.SUSPEND));
1012:            }
1013:
1014:            /** Tell the underlying channel to stop the flush, and resume message sending. This will be handled by FLUSH */
1015:            private void stopFlush() {
1016:                channel.down(new Event(Event.RESUME));
1017:            }
1018:
1019:            public void addServiceIfNotPresent(String id, MuxChannel ch) {
1020:                MuxChannel tmp;
1021:                synchronized (services) {
1022:                    tmp = (MuxChannel) services.get(id);
1023:                    if (tmp == null) {
1024:                        services.put(id, ch);
1025:                    }
1026:                }
1027:            }
1028:
1029:            private static class BlockOkCollector {
1030:                int num_block_oks = 0;
1031:
1032:                synchronized void reset() {
1033:                    num_block_oks = 0;
1034:                }
1035:
1036:                synchronized void increment() {
1037:                    num_block_oks++;
1038:                }
1039:
1040:                synchronized void waitUntil(int num) {
1041:                    while (num_block_oks < num) {
1042:                        try {
1043:                            this .wait();
1044:                        } catch (InterruptedException e) {
1045:                        }
1046:                    }
1047:                }
1048:
1049:                public String toString() {
1050:                    return String.valueOf(num_block_oks);
1051:                }
1052:            }
1053:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.