0001: package org.jgroups.protocols.pbcast;
0002:
0003: import java.io.BufferedInputStream;
0004: import java.io.BufferedOutputStream;
0005: import java.io.DataInputStream;
0006: import java.io.DataOutputStream;
0007: import java.io.IOException;
0008: import java.io.InputStream;
0009: import java.io.ObjectInput;
0010: import java.io.ObjectInputStream;
0011: import java.io.ObjectOutput;
0012: import java.io.ObjectOutputStream;
0013: import java.io.OutputStream;
0014: import java.net.InetAddress;
0015: import java.net.InetSocketAddress;
0016: import java.net.ServerSocket;
0017: import java.net.Socket;
0018: import java.net.UnknownHostException;
0019: import java.util.HashMap;
0020: import java.util.HashSet;
0021: import java.util.Iterator;
0022: import java.util.Map;
0023: import java.util.Properties;
0024: import java.util.Set;
0025: import java.util.Vector;
0026:
0027: import org.jgroups.Address;
0028: import org.jgroups.Channel;
0029: import org.jgroups.Event;
0030: import org.jgroups.Global;
0031: import org.jgroups.Header;
0032: import org.jgroups.Message;
0033: import org.jgroups.TimeoutException;
0034: import org.jgroups.View;
0035: import org.jgroups.stack.IpAddress;
0036: import org.jgroups.stack.Protocol;
0037: import org.jgroups.stack.StateTransferInfo;
0038: import org.jgroups.util.Promise;
0039: import org.jgroups.util.Streamable;
0040: import org.jgroups.util.Util;
0041:
0042: import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
0043: import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
0044:
0045: /**
0046: * <code>STREAMING_STATE_TRANSFER</code>, as its name implies, allows a streaming
0047: * state transfer between two channel instances.
0048: *
0049: * <p>
0050: *
0051: * Major advantage of this approach is that transfering application state to a
0052: * joining member of a group does not entail loading of the complete application
0053: * state into memory. Application state, for example, might be located entirely
0054: * on some form of disk based storage. The default <code>STATE_TRANSFER</code>
0055: * requires this state to be loaded entirely into memory before being transferred
0056: * to a group member while <code>STREAMING_STATE_TRANSFER</code> does not.
0057: * Thus <code>STREAMING_STATE_TRANSFER</code> protocol is able to transfer
0058: * application state that is very large (>1Gb) without a likelihood of such transfer
0059: * resulting in OutOfMemoryException.
0060: *
0061: * <p>
0062: *
0063: * Channel instance can be configured with either <code>STREAMING_STATE_TRANSFER</code>
0064: * or <code>STATE_TRANSFER</code> but not both protocols at the same time.
0065: *
0066: * <p>
0067: *
0068: * In order to process streaming state transfer an application has to implement
0069: * <code>ExtendedMessageListener</code> if it is using channel in a push style
0070: * mode or it has to process <code>StreamingSetStateEvent</code> and
0071: * <code>StreamingGetStateEvent</code> if it is using channel in a pull style mode.
0072: *
0073: *
0074: * @author Vladimir Blagojevic
0075: * @see org.jgroups.ExtendedMessageListener
0076: * @see org.jgroups.StreamingGetStateEvent
0077: * @see org.jgroups.StreamingSetStateEvent
0078: * @see org.jgroups.protocols.pbcast.STATE_TRANSFER
0079: * @since 2.4
0080: *
0081: * @version $Id$
0082: *
0083: */
0084: public class STREAMING_STATE_TRANSFER extends Protocol {
0085: private Address local_addr = null;
0086:
0087: private final Vector members = new Vector();
0088:
0089: private final Map state_requesters = new HashMap();
0090:
0091: private Digest digest = null;
0092:
0093: private final HashMap map = new HashMap(); // to store configuration information
0094:
0095: private int num_state_reqs = 0;
0096:
0097: private long num_bytes_sent = 0;
0098:
0099: private double avg_state_size = 0;
0100:
0101: private final static String NAME = "STREAMING_STATE_TRANSFER";
0102:
0103: private InetAddress bind_addr;
0104:
0105: private int bind_port = 0;
0106:
0107: private StateProviderThreadSpawner spawner;
0108:
0109: private int max_pool = 5;
0110:
0111: private long pool_thread_keep_alive;
0112:
0113: private int socket_buffer_size = 8 * 1024;
0114:
0115: private boolean use_reading_thread;
0116:
0117: private final Promise flush_promise = new Promise();;
0118:
0119: private volatile boolean use_flush;
0120:
0121: private long flush_timeout = 4000;
0122:
0123: private final Object poolLock = new Object();
0124:
0125: private int threadCounter;
0126:
0127: private volatile boolean flushProtocolInStack = false;
0128:
0129: public final String getName() {
0130: return NAME;
0131: }
0132:
0133: public int getNumberOfStateRequests() {
0134: return num_state_reqs;
0135: }
0136:
0137: public long getNumberOfStateBytesSent() {
0138: return num_bytes_sent;
0139: }
0140:
0141: public double getAverageStateSize() {
0142: return avg_state_size;
0143: }
0144:
0145: public Vector requiredDownServices() {
0146: Vector retval = new Vector();
0147: retval.addElement(new Integer(Event.GET_DIGEST_STATE));
0148: retval.addElement(new Integer(Event.SET_DIGEST));
0149: return retval;
0150: }
0151:
0152: public void resetStats() {
0153: super .resetStats();
0154: num_state_reqs = 0;
0155: num_bytes_sent = 0;
0156: avg_state_size = 0;
0157: }
0158:
0159: public boolean setProperties(Properties props) {
0160: super .setProperties(props);
0161: use_flush = Util.parseBoolean(props, "use_flush", false);
0162: flush_timeout = Util.parseLong(props, "flush_timeout",
0163: flush_timeout);
0164:
0165: try {
0166: bind_addr = Util.parseBindAddress(props, "bind_addr");
0167: } catch (UnknownHostException e) {
0168: log.error("(bind_addr): host " + e.getLocalizedMessage()
0169: + " not known");
0170: return false;
0171: }
0172: bind_port = Util.parseInt(props, "start_port", 0);
0173: socket_buffer_size = Util.parseInt(props, "socket_buffer_size",
0174: 8 * 1024); //8K
0175: max_pool = Util.parseInt(props, "max_pool", 5);
0176: pool_thread_keep_alive = Util.parseLong(props,
0177: "pool_thread_keep_alive", 1000 * 30); //30 sec
0178: use_reading_thread = Util.parseBoolean(props,
0179: "use_reading_thread", false);
0180: if (props.size() > 0) {
0181: log.error("the following properties are not recognized: "
0182: + props);
0183:
0184: return false;
0185: }
0186: return true;
0187: }
0188:
0189: public void init() throws Exception {
0190: map.put("state_transfer", Boolean.TRUE);
0191: map.put("protocol_class", getClass().getName());
0192: }
0193:
0194: public void start() throws Exception {
0195: passUp(new Event(Event.CONFIG, map));
0196: if (!flushProtocolInStack && use_flush) {
0197: log
0198: .warn("use_flush is true, however, FLUSH protocol not found in stack.");
0199: use_flush = false;
0200: }
0201: }
0202:
0203: public void stop() {
0204: super .stop();
0205: if (spawner != null) {
0206: spawner.stop();
0207: }
0208: }
0209:
0210: public void up(Event evt) {
0211: switch (evt.getType()) {
0212: case Event.BECOME_SERVER:
0213: break;
0214:
0215: case Event.SET_LOCAL_ADDRESS:
0216: local_addr = (Address) evt.getArg();
0217: break;
0218:
0219: case Event.TMP_VIEW:
0220: case Event.VIEW_CHANGE:
0221: handleViewChange((View) evt.getArg());
0222: break;
0223:
0224: case Event.GET_DIGEST_STATE_OK:
0225: synchronized (state_requesters) {
0226: digest = (Digest) evt.getArg();
0227: if (log.isDebugEnabled())
0228: log.debug("GET_DIGEST_STATE_OK: digest is "
0229: + digest);
0230: }
0231: respondToStateRequester();
0232: return;
0233:
0234: case Event.MSG:
0235: Message msg = (Message) evt.getArg();
0236: StateHeader hdr = (StateHeader) msg.removeHeader(getName());
0237: if (hdr != null) {
0238: switch (hdr.type) {
0239: case StateHeader.STATE_REQ:
0240: handleStateReq(hdr);
0241: break;
0242: case StateHeader.STATE_RSP:
0243: handleStateRsp(hdr);
0244: break;
0245: case StateHeader.STATE_REMOVE_REQUESTER:
0246: removeFromStateRequesters(hdr.sender, hdr.state_id);
0247: break;
0248: default:
0249: if (log.isErrorEnabled())
0250: log.error("type " + hdr.type
0251: + " not known in StateHeader");
0252: break;
0253: }
0254: return;
0255: }
0256: break;
0257: case Event.CONFIG:
0258: Map config = (Map) evt.getArg();
0259: if (bind_addr == null
0260: && (config != null && config
0261: .containsKey("bind_addr"))) {
0262: bind_addr = (InetAddress) config.get("bind_addr");
0263: if (log.isDebugEnabled())
0264: log.debug("using bind_addr from CONFIG event "
0265: + bind_addr);
0266: }
0267: break;
0268: }
0269: passUp(evt);
0270: }
0271:
0272: public void down(Event evt) {
0273: Address target;
0274: StateTransferInfo info;
0275:
0276: switch (evt.getType()) {
0277:
0278: case Event.TMP_VIEW:
0279: case Event.VIEW_CHANGE:
0280: handleViewChange((View) evt.getArg());
0281: break;
0282:
0283: case Event.GET_STATE:
0284: info = (StateTransferInfo) evt.getArg();
0285: if (info.target == null) {
0286: target = determineCoordinator();
0287: } else {
0288: target = info.target;
0289: if (target.equals(local_addr)) {
0290: if (log.isErrorEnabled())
0291: log
0292: .error("GET_STATE: cannot fetch state from myself !");
0293: target = null;
0294: }
0295: }
0296: if (target == null) {
0297: if (log.isDebugEnabled())
0298: log.debug("GET_STATE: first member (no state)");
0299: passUp(new Event(Event.GET_STATE_OK,
0300: new StateTransferInfo()));
0301: } else {
0302: boolean successfulFlush = false;
0303: if (use_flush) {
0304: successfulFlush = startFlush(flush_timeout, 5);
0305: }
0306: if (successfulFlush) {
0307: log.debug("Successful flush at " + local_addr);
0308: }
0309: Message state_req = new Message(target, null, null);
0310: state_req.putHeader(NAME, new StateHeader(
0311: StateHeader.STATE_REQ, local_addr,
0312: info.state_id));
0313: String stateRequested = info.state_id == null ? "full"
0314: : info.state_id;
0315: if (log.isDebugEnabled())
0316: log.debug("Member " + local_addr + " asking "
0317: + target + " for " + stateRequested
0318: + " state");
0319:
0320: // suspend sending and handling of mesage garbage collection gossip messages,
0321: // fixes bugs #943480 and #938584). Wake up when state has been received
0322: if (log.isTraceEnabled())
0323: log.trace("passing down a SUSPEND_STABLE event");
0324: passDown(new Event(Event.SUSPEND_STABLE, new Long(
0325: info.timeout)));
0326: passDown(new Event(Event.MSG, state_req));
0327: }
0328: return; // don't pass down any further !
0329:
0330: case Event.STATE_TRANSFER_INPUTSTREAM_CLOSED:
0331: if (use_flush) {
0332: stopFlush();
0333: }
0334:
0335: if (log.isTraceEnabled())
0336: log.trace("STATE_TRANSFER_INPUTSTREAM_CLOSED received");
0337: //resume sending and handling of message garbage collection gossip messages,
0338: // fixes bugs #943480 and #938584). Wakes up a previously suspended message garbage
0339: // collection protocol (e.g. STABLE)
0340: if (log.isTraceEnabled())
0341: log.trace("passing down a RESUME_STABLE event");
0342: passDown(new Event(Event.RESUME_STABLE));
0343: return;
0344: case Event.SUSPEND_OK:
0345: if (use_flush) {
0346: flush_promise.setResult(Boolean.TRUE);
0347: }
0348: break;
0349: case Event.SUSPEND_FAILED:
0350: if (use_flush) {
0351: flush_promise.setResult(Boolean.FALSE);
0352: }
0353: break;
0354: case Event.CONFIG:
0355: Map config = (Map) evt.getArg();
0356: if (config != null && config.containsKey("flush_timeout")) {
0357: Long ftimeout = (Long) config.get("flush_timeout");
0358: use_flush = true;
0359: flush_timeout = ftimeout.longValue();
0360: }
0361: if ((config != null && !config
0362: .containsKey("flush_suported"))) {
0363: flushProtocolInStack = true;
0364: }
0365: break;
0366:
0367: }
0368:
0369: passDown(evt); // pass on to the layer below us
0370: }
0371:
0372: /* --------------------------- Private Methods -------------------------------- */
0373:
0374: /**
0375: * When FLUSH is used we do not need to pass digests between members
0376: *
0377: * see JGroups/doc/design/PArtialStateTransfer.txt
0378: * see JGroups/doc/design/FLUSH.txt
0379: *
0380: * @return true if use of digests is required, false otherwise
0381: */
0382: private boolean isDigestNeeded() {
0383: return !use_flush;
0384: }
0385:
0386: private void respondToStateRequester() {
0387:
0388: // setup the plumbing if needed
0389: if (spawner == null) {
0390: ServerSocket serverSocket = Util.createServerSocket(
0391: bind_addr, bind_port);
0392: spawner = new StateProviderThreadSpawner(setupThreadPool(),
0393: serverSocket);
0394: new Thread(Util.getGlobalThreadGroup(), spawner,
0395: "StateProviderThreadSpawner").start();
0396: }
0397:
0398: synchronized (state_requesters) {
0399: if (state_requesters.isEmpty()) {
0400: if (log.isWarnEnabled())
0401: log
0402: .warn("Should be responding to state requester, but there are no requesters !");
0403: return;
0404: }
0405:
0406: if (digest == null && isDigestNeeded()) {
0407: if (log.isWarnEnabled()) {
0408: log
0409: .warn("Should be responding to state requester, but there is no digest !");
0410: }
0411: } else if (digest != null && isDigestNeeded()) {
0412: digest = digest.copy();
0413: }
0414:
0415: if (log.isTraceEnabled())
0416: log.trace("Iterating state requesters "
0417: + state_requesters);
0418:
0419: for (Iterator it = state_requesters.keySet().iterator(); it
0420: .hasNext();) {
0421: String tmp_state_id = (String) it.next();
0422: Set requesters = (Set) state_requesters
0423: .get(tmp_state_id);
0424: for (Iterator iter = requesters.iterator(); iter
0425: .hasNext();) {
0426: Address requester = (Address) iter.next();
0427: Message state_rsp = new Message(requester);
0428: StateHeader hdr = new StateHeader(
0429: StateHeader.STATE_RSP, local_addr, spawner
0430: .getServerSocketAddress(),
0431: isDigestNeeded() ? digest : null,
0432: tmp_state_id);
0433: state_rsp.putHeader(NAME, hdr);
0434:
0435: if (log.isTraceEnabled())
0436: log.trace("Responding to state requester "
0437: + requester + " with address "
0438: + spawner.getServerSocketAddress()
0439: + " and digest " + digest);
0440: passDown(new Event(Event.MSG, state_rsp));
0441: if (stats) {
0442: num_state_reqs++;
0443: }
0444: }
0445: }
0446: }
0447: }
0448:
0449: private boolean startFlush(long timeout, int numberOfAttempts) {
0450: boolean successfulFlush = false;
0451: flush_promise.reset();
0452: passUp(new Event(Event.SUSPEND));
0453: try {
0454: Boolean r = (Boolean) flush_promise
0455: .getResultWithTimeout(timeout);
0456: successfulFlush = r.booleanValue();
0457: } catch (TimeoutException e) {
0458: log.warn("Initiator of flush and state requesting member "
0459: + local_addr
0460: + " timed out waiting for flush responses after "
0461: + flush_timeout + " msec");
0462: }
0463:
0464: if (!successfulFlush && numberOfAttempts > 0) {
0465: long backOffSleepTime = Util.random(5000);
0466: if (log.isInfoEnabled())
0467: log.info("Flush in progress detected at " + local_addr
0468: + ". Backing off for " + backOffSleepTime
0469: + " ms. Attempts left " + numberOfAttempts);
0470:
0471: Util.sleepRandom(backOffSleepTime);
0472: successfulFlush = startFlush(flush_timeout,
0473: --numberOfAttempts);
0474: }
0475: return successfulFlush;
0476: }
0477:
0478: private void stopFlush() {
0479: passUp(new Event(Event.RESUME));
0480: }
0481:
0482: private PooledExecutor setupThreadPool() {
0483: PooledExecutor threadPool = new PooledExecutor(max_pool);
0484: threadPool.waitWhenBlocked();
0485: threadPool.setMinimumPoolSize(1);
0486: threadPool.setKeepAliveTime(pool_thread_keep_alive);
0487: threadPool.setThreadFactory(new ThreadFactory() {
0488:
0489: public Thread newThread(final Runnable command) {
0490: synchronized (poolLock) {
0491: threadCounter++;
0492: }
0493: return new Thread(Util.getGlobalThreadGroup(),
0494: "STREAMING_STATE_TRANSFER.poolid="
0495: + threadCounter) {
0496: public void run() {
0497: if (log.isTraceEnabled()) {
0498: log.trace(Thread.currentThread()
0499: + " started.");
0500: }
0501: command.run();
0502: if (log.isTraceEnabled()) {
0503: log.trace(Thread.currentThread()
0504: + " stopped.");
0505: }
0506: }
0507: };
0508: }
0509:
0510: });
0511: return threadPool;
0512: }
0513:
0514: private Address determineCoordinator() {
0515: Address ret = null;
0516: synchronized (members) {
0517: if (!members.isEmpty()) {
0518: for (int i = 0; i < members.size(); i++)
0519: if (!local_addr.equals(members.elementAt(i)))
0520: return (Address) members.elementAt(i);
0521: }
0522: }
0523: return ret;
0524: }
0525:
0526: private void handleViewChange(View v) {
0527: synchronized (members) {
0528: members.clear();
0529: members.addAll(v.getMembers());
0530: }
0531: }
0532:
0533: private void handleStateReq(StateHeader hdr) {
0534: Object sender = hdr.sender;
0535: if (sender == null) {
0536: if (log.isErrorEnabled())
0537: log.error("sender is null !");
0538: return;
0539: }
0540: String id = hdr.state_id;
0541: synchronized (state_requesters) {
0542: boolean empty = state_requesters.isEmpty();
0543: Set requesters = (Set) state_requesters.get(id);
0544: if (requesters == null) {
0545: requesters = new HashSet();
0546: }
0547: requesters.add(sender);
0548: state_requesters.put(id, requesters);
0549:
0550: if (!isDigestNeeded()) {
0551: respondToStateRequester();
0552: } else if (empty) {
0553: digest = null;
0554: if (log.isTraceEnabled())
0555: log.trace("passing down GET_DIGEST_STATE");
0556: passDown(new Event(Event.GET_DIGEST_STATE));
0557: }
0558: }
0559: }
0560:
0561: void handleStateRsp(StateHeader hdr) {
0562: Digest tmp_digest = hdr.my_digest;
0563: if (isDigestNeeded()) {
0564: if (tmp_digest == null) {
0565: if (log.isWarnEnabled())
0566: log.warn("digest received from " + hdr.sender
0567: + " is null, skipping setting digest !");
0568: } else {
0569: // set the digest (e.g.in NAKACK)
0570: passDown(new Event(Event.SET_DIGEST, tmp_digest));
0571: }
0572: }
0573: connectToStateProvider(hdr);
0574: }
0575:
0576: void removeFromStateRequesters(Address address, String state_id) {
0577: synchronized (state_requesters) {
0578: Set requesters = (Set) state_requesters.get(state_id);
0579: if (requesters != null && !requesters.isEmpty()) {
0580: boolean removed = requesters.remove(address);
0581: if (log.isTraceEnabled()) {
0582: log
0583: .trace("Attempted to clear " + address
0584: + " from requesters, successful="
0585: + removed);
0586: }
0587: if (requesters.isEmpty()) {
0588: state_requesters.remove(state_id);
0589: if (log.isTraceEnabled()) {
0590: log.trace("Cleared all requesters for state "
0591: + state_id + ",state_requesters="
0592: + state_requesters);
0593: }
0594: }
0595: }
0596: }
0597: }
0598:
0599: private void connectToStateProvider(StateHeader hdr) {
0600: IpAddress address = hdr.bind_addr;
0601: String tmp_state_id = hdr.getStateId();
0602: StreamingInputStreamWrapper wrapper = null;
0603: StateTransferInfo sti = null;
0604: final Socket socket = new Socket();
0605: try {
0606: socket.bind(new InetSocketAddress(bind_addr, 0));
0607: int bufferSize = socket.getReceiveBufferSize();
0608: socket.setReceiveBufferSize(socket_buffer_size);
0609: if (log.isDebugEnabled())
0610: log.debug("Connecting to state provider "
0611: + address.getIpAddress() + ":"
0612: + address.getPort()
0613: + ", original buffer size was " + bufferSize
0614: + " and was reset to "
0615: + socket.getReceiveBufferSize());
0616: socket.connect(new InetSocketAddress(
0617: address.getIpAddress(), address.getPort()));
0618: if (log.isDebugEnabled())
0619: log
0620: .debug("Connected to state provider, my end of the socket is "
0621: + socket.getLocalAddress()
0622: + ":"
0623: + socket.getLocalPort()
0624: + " passing inputstream up...");
0625:
0626: //write out our state_id and address so state provider can clear this request
0627: ObjectOutputStream out = new ObjectOutputStream(socket
0628: .getOutputStream());
0629: out.writeObject(tmp_state_id);
0630: out.writeObject(local_addr);
0631:
0632: wrapper = new StreamingInputStreamWrapper(socket);
0633: sti = new StateTransferInfo(hdr.sender, wrapper,
0634: tmp_state_id);
0635: } catch (IOException e) {
0636: if (log.isWarnEnabled()) {
0637: log.warn(
0638: "State reader socket thread spawned abnormaly",
0639: e);
0640: }
0641:
0642: //pass null stream up so that JChannel.getState() returns false
0643: InputStream is = null;
0644: sti = new StateTransferInfo(hdr.sender, is, tmp_state_id);
0645: } finally {
0646: if (!socket.isConnected()) {
0647: if (log.isWarnEnabled())
0648: log
0649: .warn("Could not connect to state provider. Closing socket...");
0650: try {
0651: if (wrapper != null) {
0652: wrapper.close();
0653: } else {
0654: socket.close();
0655: }
0656:
0657: } catch (IOException e) {
0658: }
0659: //since socket did not connect properly we have to
0660: //clear our entry in state providers hashmap "manually"
0661: Message m = new Message(hdr.sender);
0662: StateHeader mhdr = new StateHeader(
0663: StateHeader.STATE_REMOVE_REQUESTER, local_addr,
0664: tmp_state_id);
0665: m.putHeader(NAME, mhdr);
0666: passDown(new Event(Event.MSG, m));
0667: }
0668: passStreamUp(sti);
0669: }
0670: }
0671:
0672: private void passStreamUp(final StateTransferInfo sti) {
0673: Runnable readingThread = new Runnable() {
0674: public void run() {
0675: passUp(new Event(Event.STATE_TRANSFER_INPUTSTREAM, sti));
0676: }
0677: };
0678: if (use_reading_thread) {
0679: new Thread(Util.getGlobalThreadGroup(), readingThread,
0680: "STREAMING_STATE_TRANSFER.reader").start();
0681:
0682: } else {
0683: readingThread.run();
0684: }
0685: }
0686:
0687: /*
0688: * ------------------------ End of Private Methods
0689: * ------------------------------
0690: */
0691:
0692: private class StateProviderThreadSpawner implements Runnable {
0693: PooledExecutor pool;
0694:
0695: ServerSocket serverSocket;
0696:
0697: IpAddress address;
0698:
0699: Thread runner;
0700:
0701: volatile boolean running = true;
0702:
0703: public StateProviderThreadSpawner(PooledExecutor pool,
0704: ServerSocket stateServingSocket) {
0705: if (pool == null || stateServingSocket == null) {
0706: throw new IllegalArgumentException(
0707: "Cannot create thread pool");
0708: }
0709: this .pool = pool;
0710: this .serverSocket = stateServingSocket;
0711: this .address = new IpAddress(
0712: STREAMING_STATE_TRANSFER.this .bind_addr,
0713: serverSocket.getLocalPort());
0714: }
0715:
0716: public void run() {
0717: runner = Thread.currentThread();
0718: for (; running;) {
0719: try {
0720: if (log.isDebugEnabled())
0721: log
0722: .debug("StateProviderThreadSpawner listening at "
0723: + getServerSocketAddress()
0724: + "...");
0725: if (log.isTraceEnabled())
0726: log.trace("Pool has " + pool.getPoolSize()
0727: + " active threads");
0728: final Socket socket = serverSocket.accept();
0729: pool.execute(new Runnable() {
0730: public void run() {
0731: if (log.isDebugEnabled())
0732: log
0733: .debug("Accepted request for state transfer from "
0734: + socket
0735: .getInetAddress()
0736: + ":"
0737: + socket.getPort()
0738: + " handing of to PooledExecutor thread");
0739: new StateProviderHandler().process(socket);
0740: }
0741: });
0742:
0743: } catch (IOException e) {
0744: if (log.isWarnEnabled()) {
0745: //we get this exception when we close server socket
0746: //exclude that case
0747: if (!serverSocket.isClosed()) {
0748: log
0749: .warn(
0750: "Spawning socket from server socket finished abnormaly",
0751: e);
0752: }
0753: }
0754: } catch (InterruptedException e) {
0755: // should not happen
0756: }
0757: }
0758: }
0759:
0760: public IpAddress getServerSocketAddress() {
0761: return address;
0762: }
0763:
0764: public void stop() {
0765: running = false;
0766: try {
0767: if (!serverSocket.isClosed()) {
0768: serverSocket.close();
0769: }
0770: } catch (IOException e) {
0771: } finally {
0772: if (log.isDebugEnabled())
0773: log
0774: .debug("Waiting for StateProviderThreadSpawner to die ... ");
0775:
0776: try {
0777: runner.join(3000);
0778: } catch (InterruptedException ignored) {
0779: }
0780:
0781: if (log.isDebugEnabled())
0782: log.debug("Shutting the thread pool down... ");
0783:
0784: pool.shutdownNow();
0785: try {
0786: //TODO use some system wide timeout eventually
0787: pool.awaitTerminationAfterShutdown(5000);
0788: } catch (InterruptedException ignored) {
0789: }
0790: }
0791: if (log.isDebugEnabled())
0792: log
0793: .debug("Thread pool is shutdown. All pool threads are cleaned up.");
0794: }
0795: }
0796:
0797: private class StateProviderHandler {
0798: public void process(Socket socket) {
0799: StreamingOutputStreamWrapper wrapper = null;
0800: ObjectInputStream ois = null;
0801: try {
0802: int bufferSize = socket.getSendBufferSize();
0803: socket.setSendBufferSize(socket_buffer_size);
0804: if (log.isDebugEnabled())
0805: log
0806: .debug("Running on "
0807: + Thread.currentThread()
0808: + ". Accepted request for state transfer from "
0809: + socket.getInetAddress() + ":"
0810: + socket.getPort()
0811: + ", original buffer size was "
0812: + bufferSize + " and was reset to "
0813: + socket.getSendBufferSize()
0814: + ", passing outputstream up... ");
0815:
0816: //read out state requesters state_id and address and clear this request
0817: ois = new ObjectInputStream(socket.getInputStream());
0818: String state_id = (String) ois.readObject();
0819: Address stateRequester = (Address) ois.readObject();
0820: removeFromStateRequesters(stateRequester, state_id);
0821:
0822: wrapper = new StreamingOutputStreamWrapper(socket);
0823: StateTransferInfo sti = new StateTransferInfo(
0824: stateRequester, wrapper, state_id);
0825: passUp(new Event(Event.STATE_TRANSFER_OUTPUTSTREAM, sti));
0826: } catch (IOException e) {
0827: if (log.isWarnEnabled()) {
0828: log
0829: .warn(
0830: "State writer socket thread spawned abnormaly",
0831: e);
0832: }
0833: } catch (ClassNotFoundException e) {
0834: //thrown by ois.readObject()
0835: //should never happen since String/Address are core classes
0836: } finally {
0837: if (socket != null && !socket.isConnected()) {
0838: if (log.isWarnEnabled())
0839: log
0840: .warn("Accepted request for state transfer but socket "
0841: + socket
0842: + " not connected properly. Closing it...");
0843: try {
0844: if (wrapper != null) {
0845: wrapper.close();
0846: } else {
0847: socket.close();
0848: }
0849: } catch (IOException e) {
0850: }
0851: }
0852: }
0853: }
0854: }
0855:
0856: private class StreamingInputStreamWrapper extends InputStream {
0857:
0858: private Socket inputStreamOwner;
0859:
0860: private InputStream delegate;
0861:
0862: private Channel channelOwner;
0863:
0864: public StreamingInputStreamWrapper(Socket inputStreamOwner)
0865: throws IOException {
0866: super ();
0867: this .inputStreamOwner = inputStreamOwner;
0868: this .delegate = new BufferedInputStream(inputStreamOwner
0869: .getInputStream());
0870: this .channelOwner = stack.getChannel();
0871: }
0872:
0873: public int available() throws IOException {
0874: return delegate.available();
0875: }
0876:
0877: public void close() throws IOException {
0878: if (log.isDebugEnabled()) {
0879: log.debug("State reader " + inputStreamOwner
0880: + " is closing the socket ");
0881: }
0882: if (channelOwner != null && channelOwner.isConnected()) {
0883: channelOwner.down(new Event(
0884: Event.STATE_TRANSFER_INPUTSTREAM_CLOSED));
0885: }
0886: inputStreamOwner.close();
0887: }
0888:
0889: public synchronized void mark(int readlimit) {
0890: delegate.mark(readlimit);
0891: }
0892:
0893: public boolean markSupported() {
0894: return delegate.markSupported();
0895: }
0896:
0897: public int read() throws IOException {
0898: return delegate.read();
0899: }
0900:
0901: public int read(byte[] b, int off, int len) throws IOException {
0902: return delegate.read(b, off, len);
0903: }
0904:
0905: public int read(byte[] b) throws IOException {
0906: return delegate.read(b);
0907: }
0908:
0909: public synchronized void reset() throws IOException {
0910: delegate.reset();
0911: }
0912:
0913: public long skip(long n) throws IOException {
0914: return delegate.skip(n);
0915: }
0916: }
0917:
0918: private class StreamingOutputStreamWrapper extends OutputStream {
0919: private Socket outputStreamOwner;
0920:
0921: private OutputStream delegate;
0922:
0923: private long bytesWrittenCounter = 0;
0924:
0925: private Channel channelOwner;
0926:
0927: public StreamingOutputStreamWrapper(Socket outputStreamOwner)
0928: throws IOException {
0929: super ();
0930: this .outputStreamOwner = outputStreamOwner;
0931: this .delegate = new BufferedOutputStream(outputStreamOwner
0932: .getOutputStream());
0933: this .channelOwner = stack.getChannel();
0934: }
0935:
0936: public void close() throws IOException {
0937: if (log.isDebugEnabled()) {
0938: log.debug("State writer " + outputStreamOwner
0939: + " is closing the socket ");
0940: }
0941: try {
0942: if (channelOwner != null && channelOwner.isConnected()) {
0943: channelOwner.down(new Event(
0944: Event.STATE_TRANSFER_OUTPUTSTREAM_CLOSED));
0945: }
0946: outputStreamOwner.close();
0947: } catch (IOException e) {
0948: throw e;
0949: } finally {
0950: if (stats) {
0951: synchronized (state_requesters) {
0952: num_bytes_sent += bytesWrittenCounter;
0953: avg_state_size = num_bytes_sent
0954: / (double) num_state_reqs;
0955: }
0956: }
0957: }
0958: }
0959:
0960: public void flush() throws IOException {
0961: delegate.flush();
0962: }
0963:
0964: public void write(byte[] b, int off, int len)
0965: throws IOException {
0966: delegate.write(b, off, len);
0967: bytesWrittenCounter += len;
0968: }
0969:
0970: public void write(byte[] b) throws IOException {
0971: delegate.write(b);
0972: if (b != null) {
0973: bytesWrittenCounter += b.length;
0974: }
0975: }
0976:
0977: public void write(int b) throws IOException {
0978: delegate.write(b);
0979: bytesWrittenCounter += 1;
0980: }
0981: }
0982:
0983: public static class StateHeader extends Header implements
0984: Streamable {
0985: public static final byte STATE_REQ = 1;
0986:
0987: public static final byte STATE_RSP = 2;
0988:
0989: public static final byte STATE_REMOVE_REQUESTER = 3;
0990:
0991: long id = 0; // state transfer ID (to separate multiple state transfers at the same time)
0992:
0993: byte type = 0;
0994:
0995: Address sender; // sender of state STATE_REQ or STATE_RSP
0996:
0997: Digest my_digest = null; // digest of sender (if type is STATE_RSP)
0998:
0999: IpAddress bind_addr = null;
1000:
1001: String state_id = null; // for partial state transfer
1002:
1003: public StateHeader() { // for externalization
1004: }
1005:
1006: public StateHeader(byte type, Address sender, String state_id) {
1007: this .type = type;
1008: this .sender = sender;
1009: this .state_id = state_id;
1010: }
1011:
1012: public StateHeader(byte type, Address sender, long id,
1013: Digest digest) {
1014: this .type = type;
1015: this .sender = sender;
1016: this .id = id;
1017: this .my_digest = digest;
1018: }
1019:
1020: public StateHeader(byte type, Address sender,
1021: IpAddress bind_addr, Digest digest, String state_id) {
1022: this .type = type;
1023: this .sender = sender;
1024: this .my_digest = digest;
1025: this .bind_addr = bind_addr;
1026: this .state_id = state_id;
1027: }
1028:
1029: public int getType() {
1030: return type;
1031: }
1032:
1033: public Digest getDigest() {
1034: return my_digest;
1035: }
1036:
1037: public String getStateId() {
1038: return state_id;
1039: }
1040:
1041: public boolean equals(Object o) {
1042: StateHeader other;
1043:
1044: if (sender != null && o != null) {
1045: if (!(o instanceof StateHeader))
1046: return false;
1047: other = (StateHeader) o;
1048: return sender.equals(other.sender) && id == other.id;
1049: }
1050: return false;
1051: }
1052:
1053: public int hashCode() {
1054: if (sender != null)
1055: return sender.hashCode() + (int) id;
1056: else
1057: return (int) id;
1058: }
1059:
1060: public String toString() {
1061: StringBuffer sb = new StringBuffer();
1062: sb.append("type=").append(type2Str(type));
1063: if (sender != null)
1064: sb.append(", sender=").append(sender).append(" id=")
1065: .append(id);
1066: if (my_digest != null)
1067: sb.append(", digest=").append(my_digest);
1068: return sb.toString();
1069: }
1070:
1071: static String type2Str(int t) {
1072: switch (t) {
1073: case STATE_REQ:
1074: return "STATE_REQ";
1075: case STATE_RSP:
1076: return "STATE_RSP";
1077: case STATE_REMOVE_REQUESTER:
1078: return "STATE_REMOVE_REQUESTER";
1079: default:
1080: return "<unknown>";
1081: }
1082: }
1083:
1084: public void writeExternal(ObjectOutput out) throws IOException {
1085: out.writeObject(sender);
1086: out.writeLong(id);
1087: out.writeByte(type);
1088: out.writeObject(my_digest);
1089: out.writeObject(bind_addr);
1090: out.writeUTF(state_id);
1091: }
1092:
1093: public void readExternal(ObjectInput in) throws IOException,
1094: ClassNotFoundException {
1095: sender = (Address) in.readObject();
1096: id = in.readLong();
1097: type = in.readByte();
1098: my_digest = (Digest) in.readObject();
1099: bind_addr = (IpAddress) in.readObject();
1100: state_id = in.readUTF();
1101: }
1102:
1103: public void writeTo(DataOutputStream out) throws IOException {
1104: out.writeByte(type);
1105: out.writeLong(id);
1106: Util.writeAddress(sender, out);
1107: Util.writeStreamable(my_digest, out);
1108: Util.writeStreamable(bind_addr, out);
1109: Util.writeString(state_id, out);
1110: }
1111:
1112: public void readFrom(DataInputStream in) throws IOException,
1113: IllegalAccessException, InstantiationException {
1114: type = in.readByte();
1115: id = in.readLong();
1116: sender = Util.readAddress(in);
1117: my_digest = (Digest) Util.readStreamable(Digest.class, in);
1118: bind_addr = (IpAddress) Util.readStreamable(
1119: IpAddress.class, in);
1120: state_id = Util.readString(in);
1121: }
1122:
1123: public long size() {
1124: long retval = Global.LONG_SIZE + Global.BYTE_SIZE; // id and type
1125:
1126: retval += Util.size(sender);
1127:
1128: retval += Global.BYTE_SIZE; // presence byte for my_digest
1129: if (my_digest != null)
1130: retval += my_digest.serializedSize();
1131:
1132: retval += Global.BYTE_SIZE; // presence byte for state_id
1133: if (state_id != null)
1134: retval += state_id.length() + 2;
1135: return retval;
1136: }
1137:
1138: }
1139:
1140: }
|