0001: package org.jgroups.mux;
0002:
0003: import org.apache.commons.logging.Log;
0004: import org.apache.commons.logging.LogFactory;
0005: import org.jgroups.*;
0006: import org.jgroups.protocols.pbcast.FLUSH;
0007: import org.jgroups.stack.StateTransferInfo;
0008: import org.jgroups.util.Promise;
0009: import org.jgroups.util.Util;
0010:
0011: import java.util.*;
0012:
0013: /**
0014: * Used for dispatching incoming messages. The Multiplexer implements UpHandler and registers with the associated
0015: * JChannel (there can only be 1 Multiplexer per JChannel). When up() is called with a message, the header of the
0016: * message is removed and the MuxChannel corresponding to the header's service ID is retrieved from the map,
0017: * and MuxChannel.up() is called with the message.
0018: * @author Bela Ban
0019: * @version $Id: Multiplexer.java,v 1.35.2.4 2007/02/12 18:42:34 vlada Exp $
0020: */
0021: public class Multiplexer implements UpHandler {
0022: /** Map<String,MuxChannel>. Maintains the mapping between service IDs and their associated MuxChannels */
0023: private final Map services = new HashMap();
0024: private final JChannel channel;
0025: static final Log log = LogFactory.getLog(Multiplexer.class);
0026: static final String SEPARATOR = "::";
0027: static final short SEPARATOR_LEN = (short) SEPARATOR.length();
0028: static final String NAME = "MUX";
0029: private final BlockOkCollector block_ok_collector = new BlockOkCollector();
0030:
0031: private MergeView temp_merge_view = null;
0032:
0033: private boolean flush_present = true;
0034: private boolean blocked = false;
0035:
0036: /** Cluster view */
0037: View view = null;
0038:
0039: Address local_addr = null;
0040:
0041: /** Map<String,Boolean>. Map of service IDs and booleans that determine whether getState() has already been called */
0042: private final Map state_transfer_listeners = new HashMap();
0043:
0044: /** Map<String,List<Address>>. A map of services as keys and lists of hosts as values */
0045: private final Map service_state = new HashMap();
0046:
0047: /** Used to wait on service state information */
0048: private final Promise service_state_promise = new Promise();
0049:
0050: /** Map<Address, Set<String>>. Keys are senders, values are a set of services hosted by that sender.
0051: * Used to collect responses to LIST_SERVICES_REQ */
0052: private final Map service_responses = new HashMap();
0053:
0054: private long SERVICES_RSP_TIMEOUT = 10000;
0055:
0056: public Multiplexer() {
0057: this .channel = null;
0058: flush_present = isFlushPresent();
0059: }
0060:
0061: public Multiplexer(JChannel channel) {
0062: this .channel = channel;
0063: this .channel.setUpHandler(this );
0064: this .channel.setOpt(Channel.BLOCK, Boolean.TRUE); // we want to handle BLOCK events ourselves
0065: flush_present = isFlushPresent();
0066: }
0067:
0068: /**
0069: * @deprecated Use ${link #getServiceIds()} instead
0070: * @return The set of service IDs
0071: */
0072: public Set getApplicationIds() {
0073: return services != null ? services.keySet() : null;
0074: }
0075:
0076: public Set getServiceIds() {
0077: return services != null ? services.keySet() : null;
0078: }
0079:
0080: public long getServicesResponseTimeout() {
0081: return SERVICES_RSP_TIMEOUT;
0082: }
0083:
0084: public void setServicesResponseTimeout(long services_rsp_timeout) {
0085: this .SERVICES_RSP_TIMEOUT = services_rsp_timeout;
0086: }
0087:
0088: /** Returns a copy of the current view <em>minus</em> the nodes on which service service_id is <em>not</em> running
0089: *
0090: * @param service_id
0091: * @return The service view
0092: */
0093: public View getServiceView(String service_id) {
0094: List hosts = (List) service_state.get(service_id);
0095: if (hosts == null)
0096: return null;
0097: return generateServiceView(hosts);
0098: }
0099:
0100: public boolean stateTransferListenersPresent() {
0101: return state_transfer_listeners != null
0102: && state_transfer_listeners.size() > 0;
0103: }
0104:
0105: /**
0106: * Called by a MuxChannel when BLOCK_OK is sent down
0107: */
0108: public void blockOk() {
0109: block_ok_collector.increment();
0110: }
0111:
0112: public synchronized void registerForStateTransfer(String appl_id,
0113: String substate_id) {
0114: String key = appl_id;
0115: if (substate_id != null && substate_id.length() > 0)
0116: key += SEPARATOR + substate_id;
0117: state_transfer_listeners.put(key, Boolean.FALSE);
0118: }
0119:
0120: public synchronized boolean getState(Address target, String id,
0121: long timeout) throws ChannelNotConnectedException,
0122: ChannelClosedException {
0123: if (state_transfer_listeners == null)
0124: return false;
0125: Map.Entry entry;
0126: String key;
0127: for (Iterator it = state_transfer_listeners.entrySet()
0128: .iterator(); it.hasNext();) {
0129: entry = (Map.Entry) it.next();
0130: key = (String) entry.getKey();
0131: int index = key.indexOf(SEPARATOR);
0132: boolean match;
0133: if (index > -1) {
0134: String tmp = key.substring(0, index);
0135: match = id.equals(tmp);
0136: } else {
0137: match = id.equals(key);
0138: }
0139: if (match) {
0140: entry.setValue(Boolean.TRUE);
0141: break;
0142: }
0143: }
0144:
0145: Collection values = state_transfer_listeners.values();
0146: boolean all_true = Util.all(values, Boolean.TRUE);
0147: if (!all_true)
0148: return true; // pseudo
0149:
0150: boolean rc = false;
0151: Set keys = new HashSet(state_transfer_listeners.keySet());
0152: rc = fetchServiceStates(target, keys, timeout);
0153: state_transfer_listeners.clear();
0154: return rc;
0155: }
0156:
0157: /** Fetches the app states for all service IDs in keys.
0158: * The keys are a duplicate list, so it cannot be modified by the caller of this method
0159: * @param keys
0160: */
0161: private boolean fetchServiceStates(Address target, Set keys,
0162: long timeout) throws ChannelClosedException,
0163: ChannelNotConnectedException {
0164: boolean rc, all_rcs = true;
0165: String appl_id;
0166: for (Iterator it = keys.iterator(); it.hasNext();) {
0167: appl_id = (String) it.next();
0168: rc = channel.getState(target, appl_id, timeout);
0169: if (rc == false)
0170: all_rcs = false;
0171: }
0172: return all_rcs;
0173: }
0174:
0175: /**
0176: * Fetches the map of services and hosts from the coordinator (Multiplexer). No-op if we are the coordinator
0177: */
0178: public void fetchServiceInformation() throws Exception {
0179: while (true) {
0180: Address coord = getCoordinator(), local_address = channel != null ? channel
0181: .getLocalAddress()
0182: : null;
0183: boolean is_coord = coord != null && local_address != null
0184: && local_address.equals(coord);
0185: if (is_coord) {
0186: if (log.isTraceEnabled())
0187: log
0188: .trace("I'm coordinator, will not fetch service state information");
0189: break;
0190: }
0191:
0192: ServiceInfo si = new ServiceInfo(ServiceInfo.STATE_REQ,
0193: null, null, null);
0194: MuxHeader hdr = new MuxHeader(si);
0195: Message state_req = new Message(coord, null, null);
0196: state_req.putHeader(NAME, hdr);
0197: service_state_promise.reset();
0198: channel.send(state_req);
0199:
0200: try {
0201: byte[] state = (byte[]) service_state_promise
0202: .getResultWithTimeout(2000);
0203: if (state != null) {
0204: Map new_state = (Map) Util
0205: .objectFromByteBuffer(state);
0206: synchronized (service_state) {
0207: service_state.clear();
0208: service_state.putAll(new_state);
0209: }
0210: if (log.isTraceEnabled())
0211: log
0212: .trace("service state was set successfully ("
0213: + service_state.size()
0214: + " entries)");
0215: } else {
0216: if (log.isWarnEnabled())
0217: log.warn("received service state was null");
0218: }
0219: break;
0220: } catch (TimeoutException e) {
0221: if (log.isTraceEnabled())
0222: log
0223: .trace("timed out waiting for service state from "
0224: + coord + ", retrying");
0225: }
0226: }
0227: }
0228:
0229: public void sendServiceUpMessage(String service, Address host,
0230: boolean bypassFlush) throws Exception {
0231: sendServiceMessage(ServiceInfo.SERVICE_UP, service, host,
0232: bypassFlush);
0233: if (local_addr != null && host != null
0234: && local_addr.equals(host))
0235: handleServiceUp(service, host, false);
0236: }
0237:
0238: public void sendServiceDownMessage(String service, Address host,
0239: boolean bypassFlush) throws Exception {
0240: sendServiceMessage(ServiceInfo.SERVICE_DOWN, service, host,
0241: bypassFlush);
0242: if (local_addr != null && host != null
0243: && local_addr.equals(host))
0244: handleServiceDown(service, host, false);
0245: }
0246:
0247: public void up(Event evt) {
0248: // remove header and dispatch to correct MuxChannel
0249: MuxHeader hdr;
0250:
0251: switch (evt.getType()) {
0252: case Event.MSG:
0253: Message msg = (Message) evt.getArg();
0254: hdr = (MuxHeader) msg.getHeader(NAME);
0255: if (hdr == null) {
0256: log.error("MuxHeader not present - discarding message "
0257: + msg);
0258: return;
0259: }
0260:
0261: if (hdr.info != null) { // it is a service state request - not a default multiplex request
0262: try {
0263: handleServiceStateRequest(hdr.info, msg.getSrc());
0264: } catch (Exception e) {
0265: if (log.isErrorEnabled())
0266: log
0267: .error(
0268: "failure in handling service state request",
0269: e);
0270: }
0271: break;
0272: }
0273:
0274: MuxChannel mux_ch = (MuxChannel) services.get(hdr.id);
0275: if (mux_ch == null) {
0276: log
0277: .warn("service "
0278: + hdr.id
0279: + " not currently running, discarding messgage "
0280: + msg);
0281: return;
0282: }
0283: mux_ch.up(evt);
0284: break;
0285:
0286: case Event.VIEW_CHANGE:
0287: Vector old_members = view != null ? view.getMembers()
0288: : null;
0289: view = (View) evt.getArg();
0290: Vector new_members = view != null ? view.getMembers()
0291: : null;
0292: Vector left_members = Util.determineLeftMembers(
0293: old_members, new_members);
0294:
0295: if (view instanceof MergeView) {
0296: temp_merge_view = (MergeView) view.clone();
0297: if (log.isTraceEnabled())
0298: log.trace("received a MergeView: "
0299: + temp_merge_view
0300: + ", adjusting the service view");
0301: if (!flush_present && temp_merge_view != null) {
0302: try {
0303: if (log.isTraceEnabled())
0304: log
0305: .trace("calling handleMergeView() from VIEW_CHANGE (flush_present="
0306: + flush_present + ")");
0307: Thread merge_handler = new Thread() {
0308: public void run() {
0309: try {
0310: handleMergeView(temp_merge_view);
0311: } catch (Exception e) {
0312: if (log.isErrorEnabled())
0313: log
0314: .error(
0315: "problems handling merge view",
0316: e);
0317: }
0318: }
0319: };
0320: merge_handler
0321: .setName("merge handler view_change");
0322: merge_handler.setDaemon(false);
0323: merge_handler.start();
0324: } catch (Exception e) {
0325: if (log.isErrorEnabled())
0326: log.error("failed handling merge view", e);
0327: }
0328: } else {
0329: ; // don't do anything because we are blocked sending messages anyway
0330: }
0331: } else { // regular view
0332: synchronized (service_responses) {
0333: service_responses.clear();
0334: }
0335: }
0336: if (left_members.size() > 0)
0337: adjustServiceViews(left_members);
0338: break;
0339:
0340: case Event.SUSPECT:
0341: Address suspected_mbr = (Address) evt.getArg();
0342:
0343: synchronized (service_responses) {
0344: service_responses.put(suspected_mbr, null);
0345: service_responses.notifyAll();
0346: }
0347: passToAllMuxChannels(evt);
0348: break;
0349:
0350: case Event.GET_APPLSTATE:
0351: case Event.STATE_TRANSFER_OUTPUTSTREAM:
0352: handleStateRequest(evt);
0353: break;
0354:
0355: case Event.GET_STATE_OK:
0356: case Event.STATE_TRANSFER_INPUTSTREAM:
0357: handleStateResponse(evt);
0358: break;
0359:
0360: case Event.SET_LOCAL_ADDRESS:
0361: local_addr = (Address) evt.getArg();
0362: passToAllMuxChannels(evt);
0363: break;
0364:
0365: case Event.BLOCK:
0366: temp_merge_view = null;
0367: blocked = true;
0368: int num_services = services.size();
0369: if (num_services == 0) {
0370: channel.blockOk();
0371: return;
0372: }
0373: block_ok_collector.reset();
0374: passToAllMuxChannels(evt);
0375: block_ok_collector.waitUntil(num_services);
0376: channel.blockOk();
0377: return;
0378:
0379: case Event.UNBLOCK: // process queued-up MergeViews
0380: if (!blocked) {
0381: passToAllMuxChannels(evt);
0382: return;
0383: } else
0384: blocked = false;
0385: if (temp_merge_view != null) {
0386: if (log.isTraceEnabled())
0387: log
0388: .trace("calling handleMergeView() from UNBLOCK (flush_present="
0389: + flush_present + ")");
0390: try {
0391: Thread merge_handler = new Thread() {
0392: public void run() {
0393: try {
0394: handleMergeView(temp_merge_view);
0395: } catch (Exception e) {
0396: if (log.isErrorEnabled())
0397: log
0398: .error(
0399: "problems handling merge view",
0400: e);
0401: }
0402: }
0403: };
0404: merge_handler.setName("merge handler (unblock)");
0405: merge_handler.setDaemon(false);
0406: merge_handler.start();
0407: } catch (Exception e) {
0408: if (log.isErrorEnabled())
0409: log.error("failed handling merge view", e);
0410: }
0411: }
0412: passToAllMuxChannels(evt);
0413: break;
0414:
0415: default:
0416: passToAllMuxChannels(evt);
0417: break;
0418: }
0419: }
0420:
0421: public Channel createMuxChannel(JChannelFactory f, String id,
0422: String stack_name) throws Exception {
0423: MuxChannel ch;
0424: synchronized (services) {
0425: if (services.containsKey(id))
0426: throw new Exception(
0427: "service ID \""
0428: + id
0429: + "\" is already registered, cannot register duplicate ID");
0430: ch = new MuxChannel(f, channel, id, stack_name, this );
0431: services.put(id, ch);
0432: }
0433: return ch;
0434: }
0435:
0436: private void passToAllMuxChannels(Event evt) {
0437: for (Iterator it = services.values().iterator(); it.hasNext();) {
0438: MuxChannel ch = (MuxChannel) it.next();
0439: ch.up(evt);
0440: }
0441: }
0442:
0443: public MuxChannel remove(String id) {
0444: synchronized (services) {
0445: return (MuxChannel) services.remove(id);
0446: }
0447: }
0448:
0449: /** Closes the underlying JChannel if all MuxChannels have been disconnected */
0450: public void disconnect() {
0451: MuxChannel mux_ch;
0452: boolean all_disconnected = true;
0453: synchronized (services) {
0454: for (Iterator it = services.values().iterator(); it
0455: .hasNext();) {
0456: mux_ch = (MuxChannel) it.next();
0457: if (mux_ch.isConnected()) {
0458: all_disconnected = false;
0459: break;
0460: }
0461: }
0462: if (all_disconnected) {
0463: if (log.isTraceEnabled()) {
0464: log
0465: .trace("disconnecting underlying JChannel as all MuxChannels are disconnected");
0466: }
0467: channel.disconnect();
0468: }
0469: }
0470: }
0471:
0472: public void unregister(String appl_id) {
0473: synchronized (services) {
0474: services.remove(appl_id);
0475: }
0476: }
0477:
0478: public boolean close() {
0479: MuxChannel mux_ch;
0480: boolean all_closed = true;
0481: synchronized (services) {
0482: for (Iterator it = services.values().iterator(); it
0483: .hasNext();) {
0484: mux_ch = (MuxChannel) it.next();
0485: if (mux_ch.isOpen()) {
0486: all_closed = false;
0487: break;
0488: }
0489: }
0490: if (all_closed) {
0491: if (log.isTraceEnabled()) {
0492: log
0493: .trace("closing underlying JChannel as all MuxChannels are closed");
0494: }
0495: channel.close();
0496: services.clear();
0497: }
0498: return all_closed;
0499: }
0500: }
0501:
0502: public void closeAll() {
0503: synchronized (services) {
0504: MuxChannel mux_ch;
0505: for (Iterator it = services.values().iterator(); it
0506: .hasNext();) {
0507: mux_ch = (MuxChannel) it.next();
0508: mux_ch.setConnected(false);
0509: mux_ch.setClosed(true);
0510: mux_ch.closeMessageQueue(true);
0511: }
0512: }
0513: }
0514:
0515: public boolean shutdown() {
0516: MuxChannel mux_ch;
0517: boolean all_closed = true;
0518: synchronized (services) {
0519: for (Iterator it = services.values().iterator(); it
0520: .hasNext();) {
0521: mux_ch = (MuxChannel) it.next();
0522: if (mux_ch.isOpen()) {
0523: all_closed = false;
0524: break;
0525: }
0526: }
0527: if (all_closed) {
0528: if (log.isTraceEnabled()) {
0529: log
0530: .trace("shutting down underlying JChannel as all MuxChannels are closed");
0531: }
0532: channel.shutdown();
0533: services.clear();
0534: }
0535: return all_closed;
0536: }
0537: }
0538:
0539: private boolean isFlushPresent() {
0540: return channel.getProtocolStack().findProtocol("FLUSH") != null;
0541: }
0542:
0543: private void sendServiceState() throws Exception {
0544: Object[] my_services = services.keySet().toArray();
0545: byte[] data = Util.objectToByteBuffer(my_services);
0546: ServiceInfo sinfo = new ServiceInfo(
0547: ServiceInfo.LIST_SERVICES_RSP, null, channel
0548: .getLocalAddress(), data);
0549: Message rsp = new Message(); // send to everyone
0550: MuxHeader hdr = new MuxHeader(sinfo);
0551: rsp.putHeader(NAME, hdr);
0552: channel.send(rsp);
0553: }
0554:
0555: private Address getLocalAddress() {
0556: if (local_addr != null)
0557: return local_addr;
0558: if (channel != null)
0559: local_addr = channel.getLocalAddress();
0560: return local_addr;
0561: }
0562:
0563: private Address getCoordinator() {
0564: if (channel != null) {
0565: View v = channel.getView();
0566: if (v != null) {
0567: Vector members = v.getMembers();
0568: if (members != null && members.size() > 0) {
0569: return (Address) members.firstElement();
0570: }
0571: }
0572: }
0573: return null;
0574: }
0575:
0576: /**
0577: *
0578: * Returns an Address of a state provider for a given service_id.
0579: *
0580: * If preferredTarget is a member of a service view for a given service_id
0581: * then preferredTarget is returned. Otherwise, service view coordinator is
0582: * returned if such node exists. If service view is empty for a given service_id
0583: * null is returned.
0584: *
0585: * @param preferredTarget
0586: * @param service_id
0587: * @return
0588: */
0589: public Address getStateProvider(Address preferredTarget,
0590: String service_id) {
0591: Address result = null;
0592: List hosts = (List) service_state.get(service_id);
0593: if (hosts != null && !hosts.isEmpty()) {
0594: if (hosts.contains(preferredTarget)) {
0595: result = preferredTarget;
0596: } else {
0597: result = (Address) hosts.get(0);
0598: }
0599: }
0600: return result;
0601: }
0602:
0603: private void sendServiceMessage(byte type, String service,
0604: Address host, boolean bypassFlush) throws Exception {
0605: if (host == null)
0606: host = getLocalAddress();
0607: if (host == null) {
0608: if (log.isWarnEnabled()) {
0609: log.warn("local_addr is null, cannot send ServiceInfo."
0610: + ServiceInfo.typeToString(type) + " message");
0611: }
0612: return;
0613: }
0614:
0615: ServiceInfo si = new ServiceInfo(type, service, host, null);
0616: MuxHeader hdr = new MuxHeader(si);
0617: Message service_msg = new Message();
0618: service_msg.putHeader(NAME, hdr);
0619: if (bypassFlush)
0620: service_msg.putHeader(FLUSH.NAME, new FLUSH.FlushHeader(
0621: FLUSH.FlushHeader.FLUSH_BYPASS));
0622:
0623: channel.send(service_msg);
0624: }
0625:
0626: private void handleStateRequest(Event evt) {
0627: StateTransferInfo info = (StateTransferInfo) evt.getArg();
0628: String id = info.state_id;
0629: String original_id = id;
0630: MuxChannel mux_ch = null;
0631:
0632: try {
0633: int index = id.indexOf(SEPARATOR);
0634: if (index > -1) {
0635: info.state_id = id.substring(index + SEPARATOR_LEN);
0636: id = id.substring(0, index); // similar reuse as above...
0637: } else {
0638: info.state_id = null;
0639: }
0640:
0641: mux_ch = (MuxChannel) services.get(id);
0642: if (mux_ch == null)
0643: throw new IllegalArgumentException(
0644: "didn't find service with ID=" + id
0645: + " to fetch state from");
0646:
0647: // evt.setArg(info);
0648: mux_ch.up(evt); // state_id will be null, get regular state from the service named state_id
0649: } catch (Throwable ex) {
0650: if (log.isErrorEnabled())
0651: log
0652: .error(
0653: "failed returning the application state, will return null",
0654: ex);
0655: channel.returnState(null, original_id); // we cannot use mux_ch because it might be null due to the lookup above
0656: }
0657: }
0658:
0659: private void handleStateResponse(Event evt) {
0660: StateTransferInfo info = (StateTransferInfo) evt.getArg();
0661: MuxChannel mux_ch;
0662:
0663: String appl_id, substate_id, tmp;
0664: tmp = info.state_id;
0665:
0666: if (tmp == null) {
0667: if (log.isTraceEnabled())
0668: log.trace("state is null, not passing up: " + info);
0669: return;
0670: }
0671:
0672: int index = tmp.indexOf(SEPARATOR);
0673: if (index > -1) {
0674: appl_id = tmp.substring(0, index);
0675: substate_id = tmp.substring(index + SEPARATOR_LEN);
0676: } else {
0677: appl_id = tmp;
0678: substate_id = null;
0679: }
0680:
0681: mux_ch = (MuxChannel) services.get(appl_id);
0682: if (mux_ch == null) {
0683: log.error("didn't find service with ID=" + appl_id
0684: + " to fetch state from");
0685: } else {
0686: StateTransferInfo tmp_info = info.copy();
0687: tmp_info.state_id = substate_id;
0688: evt.setArg(tmp_info);
0689: mux_ch.up(evt); // state_id will be null, get regular state from the service named state_id
0690: }
0691: }
0692:
0693: private void handleServiceStateRequest(ServiceInfo info,
0694: Address sender) throws Exception {
0695: switch (info.type) {
0696: case ServiceInfo.STATE_REQ:
0697: byte[] state;
0698: synchronized (service_state) {
0699: state = Util.objectToByteBuffer(service_state);
0700: }
0701: ServiceInfo si = new ServiceInfo(ServiceInfo.STATE_RSP,
0702: null, null, state);
0703: MuxHeader hdr = new MuxHeader(si);
0704: Message state_rsp = new Message(sender);
0705: state_rsp.putHeader(NAME, hdr);
0706: channel.send(state_rsp);
0707: break;
0708: case ServiceInfo.STATE_RSP:
0709: service_state_promise.setResult(info.state);
0710: break;
0711: case ServiceInfo.SERVICE_UP:
0712: handleServiceUp(info.service, info.host, true);
0713: break;
0714: case ServiceInfo.SERVICE_DOWN:
0715: handleServiceDown(info.service, info.host, true);
0716: break;
0717: case ServiceInfo.LIST_SERVICES_RSP:
0718: handleServicesRsp(sender, info.state);
0719: break;
0720: default:
0721: if (log.isErrorEnabled())
0722: log.error("service request type " + info.type
0723: + " not known");
0724: break;
0725: }
0726: }
0727:
0728: private void handleServicesRsp(Address sender, byte[] state)
0729: throws Exception {
0730: Object[] keys = (Object[]) Util.objectFromByteBuffer(state);
0731: Set s = new HashSet();
0732: for (int i = 0; i < keys.length; i++)
0733: s.add(keys[i]);
0734:
0735: synchronized (service_responses) {
0736: Set tmp = (Set) service_responses.get(sender);
0737: if (tmp == null)
0738: tmp = new HashSet();
0739: tmp.addAll(s);
0740:
0741: service_responses.put(sender, tmp);
0742: if (log.isTraceEnabled())
0743: log.trace("received service response: " + sender + "("
0744: + s.toString() + ")");
0745: service_responses.notifyAll();
0746: }
0747: }
0748:
0749: private void handleServiceDown(String service, Address host,
0750: boolean received) {
0751: List hosts, hosts_copy;
0752: boolean removed = false;
0753:
0754: // discard if we sent this message
0755: if (received && host != null && local_addr != null
0756: && local_addr.equals(host)) {
0757: return;
0758: }
0759:
0760: synchronized (service_state) {
0761: hosts = (List) service_state.get(service);
0762: if (hosts == null)
0763: return;
0764: removed = hosts.remove(host);
0765: hosts_copy = new ArrayList(hosts); // make a copy so we don't modify hosts in generateServiceView()
0766: }
0767:
0768: if (removed) {
0769: View service_view = generateServiceView(hosts_copy);
0770: if (service_view != null) {
0771: MuxChannel ch = (MuxChannel) services.get(service);
0772: if (ch != null) {
0773: Event view_evt = new Event(Event.VIEW_CHANGE,
0774: service_view);
0775: ch.up(view_evt);
0776: } else {
0777: if (log.isTraceEnabled())
0778: log
0779: .trace("service "
0780: + service
0781: + " not found, cannot dispatch service view "
0782: + service_view);
0783: }
0784: }
0785: }
0786:
0787: Address local_address = getLocalAddress();
0788: if (local_address != null && host != null
0789: && host.equals(local_address))
0790: unregister(service);
0791: }
0792:
0793: private void handleServiceUp(String service, Address host,
0794: boolean received) {
0795: List hosts, hosts_copy;
0796: boolean added = false;
0797:
0798: // discard if we sent this message
0799: if (received && host != null && local_addr != null
0800: && local_addr.equals(host)) {
0801: return;
0802: }
0803:
0804: synchronized (service_state) {
0805: hosts = (List) service_state.get(service);
0806: if (hosts == null) {
0807: hosts = new ArrayList();
0808: service_state.put(service, hosts);
0809: }
0810: if (!hosts.contains(host)) {
0811: hosts.add(host);
0812: added = true;
0813: }
0814: hosts_copy = new ArrayList(hosts); // make a copy so we don't modify hosts in generateServiceView()
0815: }
0816:
0817: if (added) {
0818: View service_view = generateServiceView(hosts_copy);
0819: if (service_view != null) {
0820: MuxChannel ch = (MuxChannel) services.get(service);
0821: if (ch != null) {
0822: Event view_evt = new Event(Event.VIEW_CHANGE,
0823: service_view);
0824: ch.up(view_evt);
0825: } else {
0826: if (log.isTraceEnabled())
0827: log
0828: .trace("service "
0829: + service
0830: + " not found, cannot dispatch service view "
0831: + service_view);
0832: }
0833: }
0834: }
0835: }
0836:
0837: /**
0838: * Fetches the service states from everyone else in the cluster. Once all states have been received and inserted into
0839: * service_state, compute a service view (a copy of MergeView) for each service and pass it up
0840: * @param view
0841: */
0842: private void handleMergeView(MergeView view) throws Exception {
0843: long time_to_wait = SERVICES_RSP_TIMEOUT, start;
0844: int num_members = view.size(); // include myself
0845: Map copy = null;
0846:
0847: sendServiceState();
0848:
0849: synchronized (service_responses) {
0850: start = System.currentTimeMillis();
0851: try {
0852: while (time_to_wait > 0
0853: && numResponses(service_responses) < num_members) {
0854: // System.out.println("time_to_wait=" + time_to_wait + ", numResponses(service_responses)=" + numResponses(service_responses) +
0855: // ", num_members=" + num_members + ", service_state=" + service_state);
0856: service_responses.wait(time_to_wait);
0857: time_to_wait -= System.currentTimeMillis() - start;
0858: }
0859:
0860: // System.out.println("wait terminated: time_to_wait=" + time_to_wait + ", numResponses(service_responses)=" + numResponses(service_responses) +
0861: // ", num_members=" + num_members + ", service_state=" + service_state);
0862: copy = new HashMap(service_responses);
0863: } catch (Exception ex) {
0864: if (log.isErrorEnabled())
0865: log
0866: .error(
0867: "failed fetching a list of services from other members in the cluster, cannot handle merge view "
0868: + view, ex);
0869: }
0870: }
0871:
0872: if (log.isTraceEnabled())
0873: log.trace("merging service state, my service_state: "
0874: + service_state + ", received responses: " + copy);
0875:
0876: // merges service_responses with service_state and emits MergeViews for the services affected (MuxChannel)
0877: mergeServiceState(view, copy);
0878: service_responses.clear();
0879: }
0880:
0881: private int numResponses(Map m) {
0882: int num = 0;
0883: Collection values = m.values();
0884: for (Iterator it = values.iterator(); it.hasNext();) {
0885: if (it.next() != null)
0886: num++;
0887: }
0888:
0889: return num;
0890: }
0891:
0892: private void mergeServiceState(MergeView view, Map copy) {
0893: Set modified_services = new HashSet();
0894: Map.Entry entry;
0895: Address host; // address of the sender
0896: Set service_list; // Set<String> of services
0897: List my_services;
0898: String service;
0899:
0900: synchronized (service_state) {
0901: for (Iterator it = copy.entrySet().iterator(); it.hasNext();) {
0902: entry = (Map.Entry) it.next();
0903: host = (Address) entry.getKey();
0904: service_list = (Set) entry.getValue();
0905: if (service_list == null)
0906: continue;
0907:
0908: for (Iterator it2 = service_list.iterator(); it2
0909: .hasNext();) {
0910: service = (String) it2.next();
0911: my_services = (List) service_state.get(service);
0912: if (my_services == null) {
0913: my_services = new ArrayList();
0914: service_state.put(service, my_services);
0915: }
0916:
0917: boolean was_modified = my_services.add(host);
0918: if (was_modified) {
0919: modified_services.add(service);
0920: }
0921: }
0922: }
0923: }
0924:
0925: // now emit MergeViews for all services which were modified
0926: for (Iterator it = modified_services.iterator(); it.hasNext();) {
0927: service = (String) it.next();
0928: MuxChannel ch = (MuxChannel) services.get(service);
0929: my_services = (List) service_state.get(service);
0930: Vector membersCopy = new Vector(view.getMembers());
0931: membersCopy.retainAll(my_services);
0932: MergeView v = new MergeView(view.getVid(), membersCopy,
0933: view.getSubgroups());
0934: Event evt = new Event(Event.VIEW_CHANGE, v);
0935: ch.up(evt);
0936: }
0937: }
0938:
0939: private void adjustServiceViews(Vector left_members) {
0940: if (left_members != null)
0941: for (int i = 0; i < left_members.size(); i++) {
0942: try {
0943: adjustServiceView((Address) left_members
0944: .elementAt(i));
0945: } catch (Throwable t) {
0946: if (log.isErrorEnabled())
0947: log.error("failed adjusting service views", t);
0948: }
0949: }
0950: }
0951:
0952: private void adjustServiceView(Address host) {
0953: Map.Entry entry;
0954: List hosts, hosts_copy;
0955: String service;
0956: boolean removed = false;
0957:
0958: synchronized (service_state) {
0959: for (Iterator it = service_state.entrySet().iterator(); it
0960: .hasNext();) {
0961: entry = (Map.Entry) it.next();
0962: service = (String) entry.getKey();
0963: hosts = (List) entry.getValue();
0964: if (hosts == null)
0965: continue;
0966:
0967: removed = hosts.remove(host);
0968: hosts_copy = new ArrayList(hosts); // make a copy so we don't modify hosts in generateServiceView()
0969:
0970: if (removed) {
0971: View service_view = generateServiceView(hosts_copy);
0972: if (service_view != null) {
0973: MuxChannel ch = (MuxChannel) services
0974: .get(service);
0975: if (ch != null) {
0976: Event view_evt = new Event(
0977: Event.VIEW_CHANGE, service_view);
0978: ch.up(view_evt);
0979: } else {
0980: if (log.isTraceEnabled())
0981: log
0982: .trace("service "
0983: + service
0984: + " not found, cannot dispatch service view "
0985: + service_view);
0986: }
0987: }
0988: }
0989: Address local_address = getLocalAddress();
0990: if (local_address != null && host != null
0991: && host.equals(local_address))
0992: unregister(service);
0993: }
0994: }
0995: }
0996:
0997: /**
0998: * Create a copy of view which contains only members which are present in hosts. Call viewAccepted() on the MuxChannel
0999: * which corresponds with service. If no members are removed or added from/to view, this is a no-op.
1000: * @param hosts List<Address>
1001: * @return the servicd view (a modified copy of the real view), or null if the view was not modified
1002: */
1003: private View generateServiceView(List hosts) {
1004: Vector members = new Vector(view.getMembers());
1005: members.retainAll(hosts);
1006: return new View(view.getVid(), members);
1007: }
1008:
1009: /** Tell the underlying channel to start the flush protocol, this will be handled by FLUSH */
1010: private void startFlush() {
1011: channel.down(new Event(Event.SUSPEND));
1012: }
1013:
1014: /** Tell the underlying channel to stop the flush, and resume message sending. This will be handled by FLUSH */
1015: private void stopFlush() {
1016: channel.down(new Event(Event.RESUME));
1017: }
1018:
1019: public void addServiceIfNotPresent(String id, MuxChannel ch) {
1020: MuxChannel tmp;
1021: synchronized (services) {
1022: tmp = (MuxChannel) services.get(id);
1023: if (tmp == null) {
1024: services.put(id, ch);
1025: }
1026: }
1027: }
1028:
1029: private static class BlockOkCollector {
1030: int num_block_oks = 0;
1031:
1032: synchronized void reset() {
1033: num_block_oks = 0;
1034: }
1035:
1036: synchronized void increment() {
1037: num_block_oks++;
1038: }
1039:
1040: synchronized void waitUntil(int num) {
1041: while (num_block_oks < num) {
1042: try {
1043: this .wait();
1044: } catch (InterruptedException e) {
1045: }
1046: }
1047: }
1048:
1049: public String toString() {
1050: return String.valueOf(num_block_oks);
1051: }
1052: }
1053: }
|