001: /*
002: * BEGIN_HEADER - DO NOT EDIT
003: *
004: * The contents of this file are subject to the terms
005: * of the Common Development and Distribution License
006: * (the "License"). You may not use this file except
007: * in compliance with the License.
008: *
009: * You can obtain a copy of the license at
010: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
011: * See the License for the specific language governing
012: * permissions and limitations under the License.
013: *
014: * When distributing Covered Code, include this CDDL
015: * HEADER in each file and include the License file at
016: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
017: * If applicable add the following below this CDDL HEADER,
018: * with the fields enclosed by brackets "[]" replaced with
019: * your own identifying information: Portions Copyright
020: * [year] [name of copyright owner]
021: */
022:
023: /*
024: * @(#)EventProcessor.java
025: * Copyright 2004-2007 Sun Microsystems, Inc. All Rights Reserved.
026: *
027: * END_HEADER - DO NOT EDIT
028: */
029: package com.sun.jbi.binding.proxy;
030:
031: import com.sun.jbi.binding.proxy.connection.ConnectionManager;
032: import com.sun.jbi.binding.proxy.connection.ConnectionManagerFactory;
033: import com.sun.jbi.binding.proxy.connection.ClientConnection;
034: import com.sun.jbi.binding.proxy.connection.EventConnection;
035: import com.sun.jbi.binding.proxy.connection.ServerConnection;
036: import com.sun.jbi.binding.proxy.connection.EventInfo;
037: import com.sun.jbi.binding.proxy.connection.EventInfoFactory;
038:
039: import com.sun.jbi.binding.proxy.util.MEPInputStream;
040: import com.sun.jbi.binding.proxy.util.MEPOutputStream;
041: import com.sun.jbi.binding.proxy.util.Translator;
042:
043: import com.sun.jbi.messaging.DeliveryChannel;
044: import com.sun.jbi.messaging.EndpointListener;
045: import com.sun.jbi.messaging.MessageExchange;
046:
047: import java.io.ByteArrayInputStream;
048: import java.io.ByteArrayOutputStream;
049:
050: import java.util.Date;
051: import java.util.HashMap;
052: import java.util.Iterator;
053: import java.util.LinkedList;
054: import java.util.Map;
055: import java.util.logging.Logger;
056:
057: import java.text.SimpleDateFormat;
058:
059: import javax.jbi.messaging.ExchangeStatus;
060: import javax.jbi.messaging.InOnly;
061: import javax.jbi.messaging.NormalizedMessage;
062:
063: import javax.jbi.servicedesc.ServiceEndpoint;
064:
065: import javax.xml.namespace.QName;
066:
067: /**
068: * Performs BC-related work for ProxyBinding.
069: * @author Sun Microsystems, Inc
070: */
071: class EventProcessor implements java.lang.Runnable {
072: private int mState;
073: private static int STATE_HELLO = 1;
074: private static int STATE_RUNNING = 2;
075: private static int STATE_RUNNING_MASTER = 3;
076: private static int STATE_LEAVE = 4;
077:
078: private HashMap mInstances;
079: private HashMap mWaitInstances;
080: private String mId;
081: private String mMasterId;
082: private long mBirthTime;
083: private long mEventTime;
084: private long mNextTime;
085: private long mNextHeartBeatTime;
086: private long mLastTransitionTime;
087: private boolean mLargerHelloIdSeen;
088: private boolean mOtherTrafficSeen;
089: private boolean mSendHeartBeats;
090: private long mDefaultHeartBeatTime;
091: private long mDefaultHelloTime;
092: private int mHelloCount;
093:
094: /**
095: * Our logger.
096: */
097: private Logger mLog;
098:
099: private ProxyBinding mPB;
100: private ConnectionManager mCM;
101: private ServiceEndpoint mService;
102: private EventConnection mEC;
103: private boolean mRunning = true;
104: int mEventsReceived;
105: int mEventsSent;
106:
107: /**
108: * Name of our ACTIVATE endpoint operation.
109: */
110: static final String ACTIVATE = "Activate";
111:
112: /**
113: * Name of our DEACTIVATE endpoint operation.
114: */
115: static final String DEACTIVATE = "Deactivate";
116:
117: private ProxyBindingStatistics mProxyBindingStatistics;
118:
119: /**
120: * Constructor for the EventProcessor.
121: * @param proxyBinding that we are running under
122: */
123: EventProcessor(ProxyBinding proxyBinding)
124: throws com.sun.jbi.binding.proxy.connection.ConnectionException {
125: EventInfoFactory efi;
126:
127: mLargerHelloIdSeen = false;
128: mSendHeartBeats = false;
129: mOtherTrafficSeen = false;
130:
131: mPB = proxyBinding;
132: mLog = mPB.getLogger("event");
133: mService = proxyBinding.getService();
134: mCM = proxyBinding.getConnectionManager();
135: mEC = mCM.getEventConnection();
136: mInstances = new HashMap();
137: mWaitInstances = new HashMap();
138:
139: //
140: // Register the events that we can accept.
141: //
142: efi = EventInfoFactory.getInstance();
143: efi.registerEventInfo(RegistrationInfo.class,
144: RegistrationInfo.EVENTNAME);
145: efi.registerEventInfo(HelloInfo.class, HelloInfo.EVENTNAME);
146: efi.registerEventInfo(HelloAckInfo.class,
147: HelloAckInfo.EVENTNAME);
148: efi.registerEventInfo(LeaveInfo.class, LeaveInfo.EVENTNAME);
149: efi.registerEventInfo(LeaveAckInfo.class,
150: LeaveAckInfo.EVENTNAME);
151: efi.registerEventInfo(HeartBeatInfo.class,
152: HeartBeatInfo.EVENTNAME);
153: //
154: // Get some configuration values.
155: //
156: mDefaultHeartBeatTime = getProperty(
157: "com.sun.jbi.binding.proxy.heartbeattime", "120000");
158: mDefaultHelloTime = getProperty(
159: "com.sun.jbi.binding.proxy.hellotime", "5000");
160: // get handle to ProxyBinding stats
161: mProxyBindingStatistics = proxyBinding.getPBStatistics();
162: }
163:
164: private long getProperty(String prop, String defaultValue) {
165: long value;
166:
167: try {
168: value = new Integer(System.getProperty(prop, defaultValue))
169: .longValue();
170: } catch (java.lang.NumberFormatException nfEx) {
171: value = new Integer(defaultValue).longValue();
172: }
173: return (value);
174: }
175:
176: /**
177: * Stop the event processor on the next event or timeout.
178: */
179: void stop() {
180: if (mState == STATE_RUNNING || mState == STATE_RUNNING_MASTER) {
181: sendLeaveEvent();
182: }
183: mRunning = false;
184: }
185:
186: HashMap getInstances() {
187: return ((HashMap) mInstances.clone());
188: }
189:
190: /**
191: * Main processing loop
192: * Basic flow:
193: * Read next write event or timeout.
194: * Process event (if available)
195: * Handle timeout (if happened)
196: */
197: public void run() {
198: mLog.info("PB:EventProcessor starting.");
199:
200: //
201: // Basic processing loop.
202: // Prime the machinery with a Hello event.
203: //
204: for (sendHelloEvent(); mRunning;) {
205: try {
206: EventInfo ei = null;
207: long timeout;
208: long startTime;
209:
210: //
211: // Setup timeout based on expected time of next timed event.
212: // Make sure that we don't go negative or set timeout to zero (which would wait
213: // for ever.)
214: //
215: startTime = System.currentTimeMillis();
216: if ((timeout = mNextTime - startTime) <= 0) {
217: timeout = 1;
218: }
219:
220: //
221: // Wait for next event or timeout.
222: //
223: ei = mEC.receiveEvent(timeout);
224: mEventTime = System.currentTimeMillis();
225:
226: //
227: // Process event if we got one.
228: //
229: if (ei != null) {
230: mEventsReceived++;
231: if ((mProxyBindingStatistics != null)
232: && (mProxyBindingStatistics.isEnabled())) {
233: mProxyBindingStatistics
234: .incrementReceivedEvents();
235: }
236: processEvent(ei);
237: }
238:
239: //
240: // Handle timeout processing if it has expired.
241: //
242: if (mEventTime >= mNextTime) {
243: mNextTime = handleTimeout();
244: }
245: } catch (com.sun.jbi.binding.proxy.connection.EventException eEx) {
246: mLog.info("PB:EventReceiver EventException: " + eEx);
247: mPB.stop();
248: } catch (Exception ex) {
249: mLog.info("PB:EventProcessor Exception: " + ex);
250: ex.printStackTrace();
251: }
252: }
253: }
254:
255: //
256: // ------------------- Methods handling event generation and processing ------------------
257: //
258:
259: void processEvent(EventInfo ei) {
260: String sender = ei.getSender();
261:
262: //
263: // Skip any events that we sent.
264: //
265: if (sender == null || !sender.equals(mId)) {
266: if (ei instanceof RegistrationInfo) {
267: handleRegistrationEvent((RegistrationInfo) ei);
268: } else if (ei instanceof HeartBeatInfo) {
269: handleHeartBeatEvent((HeartBeatInfo) ei);
270: } else if (ei instanceof HelloInfo) {
271: handleHelloEvent((HelloInfo) ei);
272: } else if (ei instanceof HelloAckInfo) {
273: handleHelloAckEvent((HelloAckInfo) ei);
274: } else if (ei instanceof LeaveInfo) {
275: handleLeaveEvent((LeaveInfo) ei);
276: } else if (ei instanceof LeaveAckInfo) {
277: handleLeaveAckEvent((LeaveAckInfo) ei);
278: }
279: }
280: }
281:
282: long handleTimeout() {
283: long nextEventTime;
284: String nextMaster;
285: boolean changeMaster;
286:
287: //
288: // The HELLO protocol requires 3 HELLO's to be sent before any MASTER takeover can happen.
289: // If a MASTER returns a HELLO_ACK the HELLO protocol terminates early. In the case where
290: // no MASTER exists yet, the HELLO broadcaster that possess the highest Id sent in a HELLO
291: // message is allowed to be the new MASTER. Any other traffic detected by a HELLO broadcaster
292: // will cause the HELLO broadcaster to assume a MASTER will be selected from existing members.
293: //
294: if (mState == STATE_HELLO) {
295: if (mEventTime > mNextHeartBeatTime) {
296: if (mHelloCount < 6) {
297: if (mHelloCount >= 3
298: && (!mOtherTrafficSeen || !mLargerHelloIdSeen)) {
299: mLog.info("PB:became master by default (" + mId
300: + ")");
301: sendHelloAckEvent(mId, mBirthTime);
302: mInstances.put(mId, new InstanceEntry(mId,
303: mBirthTime, 0));
304: mState = STATE_RUNNING_MASTER;
305: mPB.startComplete();
306: } else {
307: resendHelloEvent();
308: }
309: } else {
310: sendHelloEvent();
311: }
312: }
313: } else if (mState == STATE_RUNNING
314: || mState == STATE_RUNNING_MASTER) {
315: if (mEventTime > mNextHeartBeatTime) {
316: sendHeartBeatEvent(false);
317: }
318: } else if (mState == STATE_LEAVE) {
319: }
320:
321: //
322: // Compute when the next heart beat event needs to be sent or received.
323: // When running as the master it will sent LeaveAck events for any expired heartbeats.
324: // If non-masters think that the master has expired they elect a new master
325: // (highest instanceId name is the winner.)
326: //
327: nextEventTime = mNextHeartBeatTime;
328: nextMaster = mId;
329: changeMaster = false;
330: for (Iterator i = mInstances.values().iterator(); i.hasNext();) {
331: InstanceEntry ie = (InstanceEntry) i.next();
332: long hbt = ie.getHeartbeatTime();
333:
334: if (!ie.getInstanceId().equals(mId)) {
335: if (mEventTime > hbt) {
336: if (mState == STATE_RUNNING_MASTER) {
337: mLog.info("PB:no heartbeat ("
338: + ie.getInstanceId() + "," + hbt + ")");
339: i.remove();
340: if (mInstances.size() == 1) {
341: mSendHeartBeats = false;
342: }
343: sendLeaveAckEvent(ie);
344: } else if (ie.getInstanceId().equals(mMasterId)) {
345: changeMaster = true;
346: }
347: } else {
348: if (hbt < nextEventTime) {
349: nextEventTime = ie.getHeartbeatTime();
350: }
351: if (ie.getInstanceId().compareTo(nextMaster) > 0) {
352: nextMaster = ie.getInstanceId();
353: }
354: }
355: }
356: }
357:
358: //
359: // If the master needs to change and we are the master than take over.
360: //
361: if (changeMaster && nextMaster.equals(mId)) {
362: mLog.info("PB:became master (" + mId + ")");
363: mState = STATE_RUNNING_MASTER;
364: mInstances.remove(mMasterId);
365: mMasterId = mId;
366: if (mInstances.size() == 1) {
367: mSendHeartBeats = false;
368: }
369: }
370:
371: //
372: // Change timeout to 1 minute when no other members exist.
373: //
374: if (mState == STATE_RUNNING_MASTER && !mSendHeartBeats) {
375: nextEventTime = mEventTime + 60000;
376: }
377: //mLog.info("Next timeout (" + nextEventTime + ")");
378:
379: return (nextEventTime);
380: }
381:
382: void sendHelloEvent() {
383: HelloInfo hi;
384:
385: mId = mPB.getConnectionManager().getInstanceId();
386: mBirthTime = System.currentTimeMillis();
387: mNextTime = mBirthTime + mDefaultHelloTime;
388: mNextHeartBeatTime = mBirthTime + mDefaultHelloTime;
389: mState = STATE_HELLO;
390: mLargerHelloIdSeen = false;
391: mOtherTrafficSeen = false;
392: mHelloCount = 0;
393: mLog.info("PB:sending HelloEvent (" + mId + "," + mBirthTime
394: + ")");
395: hi = new HelloInfo(mId, mBirthTime);
396: sendEvent(hi);
397: }
398:
399: void resendHelloEvent() {
400: HelloInfo hi;
401:
402: mHelloCount++;
403: mNextHeartBeatTime = mEventTime + mDefaultHelloTime;
404: hi = new HelloInfo(mId, mBirthTime);
405: mLog.info("PB:resending HelloEvent (" + mId + "," + mBirthTime
406: + ")");
407: sendEvent(hi);
408: }
409:
410: void handleHelloEvent(HelloInfo hi) {
411: if (mState == STATE_RUNNING_MASTER) {
412: mLog.info("PB:received HelloEvent (" + hi.getInstanceId()
413: + "," + hi.getBirthtime() + ")");
414: mSendHeartBeats = true;
415: mInstances.put(hi.getInstanceId(), new InstanceEntry(hi
416: .getInstanceId(), hi.getBirthtime(), mEventTime
417: + mDefaultHeartBeatTime * 2));
418: sendHelloAckEvent(hi.getInstanceId(), hi.getBirthtime());
419: sendHeartBeatEvent(true);
420: } else if (mState == STATE_HELLO) {
421: mLog.info("PB:received other HelloEvent ("
422: + hi.getInstanceId() + "," + hi.getBirthtime()
423: + ")");
424: if (hi.getInstanceId().compareTo(mId) > 0) {
425: mLargerHelloIdSeen = true;
426: }
427: }
428: }
429:
430: void sendHelloAckEvent(String id, long birthtime) {
431: HelloAckInfo hai;
432: LinkedList ll = new LinkedList();
433:
434: ll.addAll(mInstances.values());
435: hai = new HelloAckInfo(id, birthtime, setLastTransitionTime(),
436: mMasterId, mBirthTime, ll);
437: mNextHeartBeatTime = mEventTime + mDefaultHeartBeatTime;
438: mLog.info("PB:sending HelloAckEvent (" + id + "," + birthtime
439: + "," + mLastTransitionTime + ")");
440: sendEvent(hai);
441: mPB.postNotification(ProxyBindingLifeCycle.ESB_MEMBER_JOIN, id);
442: }
443:
444: void handleHelloAckEvent(HelloAckInfo hai) {
445: if (mState == STATE_HELLO) {
446: if (hai.getInstanceId().equals(mId)) {
447: LinkedList ll;
448: InstanceEntry ie;
449:
450: mLog.info("PB:received matching HelloAckEvent (" + mId
451: + "," + hai.getBirthtime() + ","
452: + hai.getLastTransitionTime() + ","
453: + hai.getMasterId() + ","
454: + hai.getMasterBirthTime() + ")");
455:
456: //
457: // Populate our expected view of the world.
458: //
459: ll = hai.getMembers();
460: for (; !ll.isEmpty();) {
461: ie = (InstanceEntry) ll.remove(0);
462: mLog.info("PB: add member (" + ie.getInstanceId()
463: + "," + ie.getBirthTime() + ")");
464: ie.setHeartbeatTime(mEventTime + mDefaultHelloTime);
465: mInstances.put(ie.getInstanceId(), ie);
466: if (!ie.getInstanceId().equals(mId)) {
467: mWaitInstances.put(ie.getInstanceId(), ie);
468: }
469: }
470: mState = STATE_RUNNING;
471: mNextHeartBeatTime = mEventTime + mDefaultHeartBeatTime;
472: mMasterId = hai.getMasterId();
473: mLastTransitionTime = hai.getLastTransitionTime();
474: mSendHeartBeats = true;
475: } else {
476: mLog.info("PB:received non-matching HelloAckEvent ("
477: + hai.getInstanceId() + ","
478: + hai.getBirthtime() + ","
479: + hai.getLastTransitionTime() + ")");
480: mOtherTrafficSeen = true;
481: }
482: } else if (mState == STATE_RUNNING) {
483: InstanceEntry ie = (InstanceEntry) mInstances.get(hai
484: .getInstanceId());
485:
486: mLog.info("PB:Received HelloAckEvent ("
487: + hai.getInstanceId() + "," + hai.getBirthtime()
488: + "," + hai.getLastTransitionTime() + ")");
489: mLastTransitionTime = hai.getLastTransitionTime();
490: if (ie != null) {
491: mLog
492: .info("PB:received HelloAckEvent for different birthTime("
493: + ie.getBirthTime() + ")");
494: mInstances.remove(hai.getInstanceId());
495: }
496: mInstances.put(hai.getInstanceId(), new InstanceEntry(hai
497: .getInstanceId(), hai.getBirthtime(), mEventTime
498: + mDefaultHeartBeatTime * 2 + 1000));
499: sendHeartBeatEvent(true);
500: } else if (mState == STATE_RUNNING_MASTER) {
501: mLog.info("PB: master received HelloAckEvent ("
502: + hai.getInstanceId() + "," + hai.getBirthtime()
503: + "," + hai.getLastTransitionTime() + ")");
504: }
505: }
506:
507: void sendHeartBeatEvent(boolean includeEndpoints) {
508: HeartBeatInfo hbi;
509:
510: if (mSendHeartBeats) {
511: hbi = new HeartBeatInfo(mId, mBirthTime,
512: mLastTransitionTime);
513: mNextHeartBeatTime = mEventTime + mDefaultHeartBeatTime;
514: if (includeEndpoints) {
515: hbi.setEndpoints(mPB.getLocalEndpoints());
516: }
517: mLog.info("PB:sending HeartBeatEvent (" + mId + ","
518: + mBirthTime + "," + mLastTransitionTime + ","
519: + includeEndpoints + ")");
520: sendEvent(hbi);
521: }
522: }
523:
524: void handleHeartBeatEvent(HeartBeatInfo hbi) {
525: if (mState == STATE_RUNNING || mState == STATE_RUNNING_MASTER) {
526: InstanceEntry ie;
527:
528: ie = (InstanceEntry) mInstances.get(hbi.getInstanceId());
529: if (ie != null && ie.getBirthTime() == hbi.getBirthtime()) {
530: mLog.info("PB:received HeartBeatEvent ("
531: + hbi.getInstanceId() + ","
532: + hbi.getBirthtime() + ","
533: + hbi.getLastTransitionTime() + ")");
534: ie.setHeartbeatTime(mEventTime + mDefaultHeartBeatTime
535: * 2 + 1000);
536:
537: //
538: // Handle endpoint information iff:
539: // We are waiting for endpoint info.
540: // Endpoint information is encoded in the message
541: // The message is related to us joining (not a previous join.)
542: //
543: if (!mWaitInstances.isEmpty()
544: && hbi.getEndpoints() != null
545: && hbi.getLastTransitionTime() == mLastTransitionTime) {
546: for (Iterator i = hbi.getEndpoints().iterator(); i
547: .hasNext();) {
548: RegistrationInfo ri = (RegistrationInfo) i
549: .next();
550:
551: try {
552: mLog
553: .info("PB:HeartBeatEvent registration ("
554: + ri.getServiceName()
555: + ","
556: + ri.getEndpointName()
557: + ")");
558: mPB.addRemoteEndpoint(ri);
559: } catch (javax.jbi.JBIException mEx) {
560: }
561: }
562:
563: //
564: // See if we have seen all members yet.
565: //
566: mWaitInstances.remove(hbi.getInstanceId());
567: if (mWaitInstances.isEmpty()) {
568: mPB.startComplete();
569: }
570: }
571: } else {
572: mLog.info("PB:received unknown HeartBeatEvent ("
573: + hbi.getInstanceId() + ","
574: + hbi.getBirthtime() + ")");
575: }
576: } else if (mState == STATE_HELLO) {
577: mOtherTrafficSeen = true;
578: }
579: }
580:
581: void sendLeaveEvent() {
582: LeaveInfo li;
583:
584: li = new LeaveInfo(mId, mBirthTime);
585: mLog.info("PB:sending LeaveEvent (" + mId + "," + mBirthTime
586: + ")");
587: sendEvent(li);
588: mState = STATE_LEAVE;
589: if (mInstances.size() == 1) {
590: mRunning = false;
591: }
592: }
593:
594: void handleLeaveEvent(LeaveInfo li) {
595: if (mState == STATE_RUNNING_MASTER) {
596: InstanceEntry ie;
597:
598: ie = (InstanceEntry) mInstances.get(li.getInstanceId());
599: if (ie != null && ie.getBirthTime() == li.getBirthtime()) {
600: mLog.info("PB:received LeaveEvent ("
601: + li.getInstanceId() + "," + li.getBirthtime()
602: + ")");
603: mInstances.remove(ie.getInstanceId());
604: mPB.purgeRemoteEndpointsForInstance(ie.getInstanceId());
605: sendLeaveAckEvent(ie);
606: if (mInstances.size() == 1) {
607: mSendHeartBeats = false;
608: }
609: } else {
610: mLog.info("PB:received unknown LeaveEvent ("
611: + li.getInstanceId() + "," + li.getBirthtime()
612: + ")");
613: }
614: } else if (mState == STATE_HELLO) {
615: mOtherTrafficSeen = true;
616: }
617: }
618:
619: void sendLeaveAckEvent(InstanceEntry ie) {
620: LeaveAckInfo lai;
621:
622: lai = new LeaveAckInfo(ie.getInstanceId(),
623: setLastTransitionTime());
624: mLog.info("PB:sending LeaveAckEvent (" + ie.getInstanceId()
625: + "," + lai.getLastTransitionTime() + ")");
626: sendEvent(lai);
627: mPB.postNotification(ProxyBindingLifeCycle.ESB_MEMBER_LEAVE, ie
628: .getInstanceId());
629: }
630:
631: void handleLeaveAckEvent(LeaveAckInfo lai) {
632: if (mState == STATE_LEAVE) {
633: mRunning = false;
634: } else if (mState == STATE_HELLO) {
635: mOtherTrafficSeen = true;
636: } else {
637: mLog.info("PB:received LeaveAckEvent ("
638: + lai.getInstanceId() + ","
639: + lai.getLastTransitionTime() + ")");
640: mLastTransitionTime = lai.getLastTransitionTime();
641: mInstances.remove(lai.getInstanceId());
642: mPB.purgeRemoteEndpointsForInstance(lai.getInstanceId());
643:
644: //
645: // See if we are waiting for instance which died.
646: //
647: mWaitInstances.remove(lai.getInstanceId());
648: if (mWaitInstances.isEmpty()) {
649: mPB.startComplete();
650: }
651: }
652: }
653:
654: void handleRegistrationEvent(RegistrationInfo ri) {
655: if (mState == STATE_RUNNING || mState == STATE_RUNNING_MASTER) {
656: try {
657: if (ri.getAction().equals(RegistrationInfo.ACTION_ADD)) {
658: mLog.info("PB:add registration ("
659: + ri.getServiceName() + ","
660: + ri.getEndpointName() + ","
661: + ri.getInstanceId() + ")");
662: mPB.addRemoteEndpoint(ri);
663: } else if (ri.getAction().equals(
664: RegistrationInfo.ACTION_REMOVE)) {
665: mLog.info("PB:remove registration ("
666: + ri.getServiceName() + ","
667: + ri.getEndpointName() + ","
668: + ri.getInstanceId() + ")");
669: mPB.removeRemoteEndpoint(ri);
670: } else {
671: mLog.info("PB:unknown registration event action ("
672: + ri.getAction() + ")");
673: }
674: } catch (javax.jbi.JBIException mEx) {
675: mLog.info("PB:error handling registration event: "
676: + mEx);
677: }
678: } else {
679: mOtherTrafficSeen = true;
680: }
681: }
682:
683: //
684: // Because multiple systems can run with skewed clocks and in different time zones we just
685: // want to make sure that time is monotonically increasing.
686: //
687: long setLastTransitionTime() {
688: long time = System.currentTimeMillis();
689:
690: if (time < mLastTransitionTime) {
691: mLastTransitionTime += 13;
692: } else {
693: mLastTransitionTime = time;
694: }
695: return (mLastTransitionTime);
696: }
697:
698: void sendEvent(EventInfo ei) {
699: try {
700: mEventsSent++;
701: mEC.sendEvent(ei);
702: if ((mProxyBindingStatistics != null)
703: && (mProxyBindingStatistics.isEnabled())) {
704: mProxyBindingStatistics.incrementSentEvents();
705: }
706: } catch (com.sun.jbi.binding.proxy.connection.EventException eEx) {
707: mLog.info("PB:sendEvent failed: " + eEx);
708: }
709: }
710:
711: public String toString() {
712: StringBuffer sb = new StringBuffer();
713: SimpleDateFormat sdf = new SimpleDateFormat(
714: "yyyy-MM-dd'T'HH:mm:ss.SSSZ");
715:
716: sb.append(" Event Processor Id(");
717: sb.append(mId);
718: sb.append(")\n State (");
719: sb
720: .append(mState == STATE_HELLO ? "HELLO"
721: : (mState == STATE_RUNNING ? "RUNNING"
722: : (mState == STATE_LEAVE ? "LEAVE"
723: : "MASTER")));
724: sb.append(")\n BirthTime (");
725: sb.append(sdf.format(new Date(mBirthTime)));
726: sb.append(")\n NextTime (");
727: sb.append(sdf.format(new Date(mNextTime)));
728: sb.append(")\n LastTransition (");
729: sb.append(sdf.format(new Date(mLastTransitionTime)));
730: sb.append(")\n Master (");
731: sb.append(mMasterId);
732: sb.append(")\n Events Sent(");
733: sb.append(mEventsSent);
734: sb.append(") Received(");
735: sb.append(mEventsReceived);
736: sb.append(")\n Instances Count(");
737: sb.append(mInstances.size());
738: sb.append(")\n");
739: for (Iterator i = mInstances.values().iterator(); i.hasNext();) {
740: sb.append(i.next().toString());
741: }
742: sb.append(" Wait Instances Count(");
743: sb.append(mWaitInstances.size());
744: sb.append(")\n");
745: if (mWaitInstances.size() > 0) {
746: for (Iterator i = mWaitInstances.values().iterator(); i
747: .hasNext();) {
748: sb.append(i.next().toString());
749: }
750: }
751: return (sb.toString());
752: }
753: }
|