Source Code Cross Referenced for EventProcessor.java in  » ESB » open-esb » com » sun » jbi » binding » proxy » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » ESB » open esb » com.sun.jbi.binding.proxy 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * BEGIN_HEADER - DO NOT EDIT
003:         *
004:         * The contents of this file are subject to the terms
005:         * of the Common Development and Distribution License
006:         * (the "License").  You may not use this file except
007:         * in compliance with the License.
008:         *
009:         * You can obtain a copy of the license at
010:         * https://open-esb.dev.java.net/public/CDDLv1.0.html.
011:         * See the License for the specific language governing
012:         * permissions and limitations under the License.
013:         *
014:         * When distributing Covered Code, include this CDDL
015:         * HEADER in each file and include the License file at
016:         * https://open-esb.dev.java.net/public/CDDLv1.0.html.
017:         * If applicable add the following below this CDDL HEADER,
018:         * with the fields enclosed by brackets "[]" replaced with
019:         * your own identifying information: Portions Copyright
020:         * [year] [name of copyright owner]
021:         */
022:
023:        /*
024:         * @(#)EventProcessor.java
025:         * Copyright 2004-2007 Sun Microsystems, Inc. All Rights Reserved.
026:         *
027:         * END_HEADER - DO NOT EDIT
028:         */
029:        package com.sun.jbi.binding.proxy;
030:
031:        import com.sun.jbi.binding.proxy.connection.ConnectionManager;
032:        import com.sun.jbi.binding.proxy.connection.ConnectionManagerFactory;
033:        import com.sun.jbi.binding.proxy.connection.ClientConnection;
034:        import com.sun.jbi.binding.proxy.connection.EventConnection;
035:        import com.sun.jbi.binding.proxy.connection.ServerConnection;
036:        import com.sun.jbi.binding.proxy.connection.EventInfo;
037:        import com.sun.jbi.binding.proxy.connection.EventInfoFactory;
038:
039:        import com.sun.jbi.binding.proxy.util.MEPInputStream;
040:        import com.sun.jbi.binding.proxy.util.MEPOutputStream;
041:        import com.sun.jbi.binding.proxy.util.Translator;
042:
043:        import com.sun.jbi.messaging.DeliveryChannel;
044:        import com.sun.jbi.messaging.EndpointListener;
045:        import com.sun.jbi.messaging.MessageExchange;
046:
047:        import java.io.ByteArrayInputStream;
048:        import java.io.ByteArrayOutputStream;
049:
050:        import java.util.Date;
051:        import java.util.HashMap;
052:        import java.util.Iterator;
053:        import java.util.LinkedList;
054:        import java.util.Map;
055:        import java.util.logging.Logger;
056:
057:        import java.text.SimpleDateFormat;
058:
059:        import javax.jbi.messaging.ExchangeStatus;
060:        import javax.jbi.messaging.InOnly;
061:        import javax.jbi.messaging.NormalizedMessage;
062:
063:        import javax.jbi.servicedesc.ServiceEndpoint;
064:
065:        import javax.xml.namespace.QName;
066:
067:        /**
068:         * Performs BC-related work for ProxyBinding.
069:         * @author Sun Microsystems, Inc
070:         */
071:        class EventProcessor implements  java.lang.Runnable {
072:            private int mState;
073:            private static int STATE_HELLO = 1;
074:            private static int STATE_RUNNING = 2;
075:            private static int STATE_RUNNING_MASTER = 3;
076:            private static int STATE_LEAVE = 4;
077:
078:            private HashMap mInstances;
079:            private HashMap mWaitInstances;
080:            private String mId;
081:            private String mMasterId;
082:            private long mBirthTime;
083:            private long mEventTime;
084:            private long mNextTime;
085:            private long mNextHeartBeatTime;
086:            private long mLastTransitionTime;
087:            private boolean mLargerHelloIdSeen;
088:            private boolean mOtherTrafficSeen;
089:            private boolean mSendHeartBeats;
090:            private long mDefaultHeartBeatTime;
091:            private long mDefaultHelloTime;
092:            private int mHelloCount;
093:
094:            /**
095:             * Our logger.
096:             */
097:            private Logger mLog;
098:
099:            private ProxyBinding mPB;
100:            private ConnectionManager mCM;
101:            private ServiceEndpoint mService;
102:            private EventConnection mEC;
103:            private boolean mRunning = true;
104:            int mEventsReceived;
105:            int mEventsSent;
106:
107:            /**
108:             * Name of our ACTIVATE endpoint operation.
109:             */
110:            static final String ACTIVATE = "Activate";
111:
112:            /**
113:             * Name of our DEACTIVATE endpoint operation.
114:             */
115:            static final String DEACTIVATE = "Deactivate";
116:
117:            private ProxyBindingStatistics mProxyBindingStatistics;
118:
119:            /**
120:             * Constructor for the EventProcessor.
121:             * @param proxyBinding that we are running under
122:             */
123:            EventProcessor(ProxyBinding proxyBinding)
124:                    throws com.sun.jbi.binding.proxy.connection.ConnectionException {
125:                EventInfoFactory efi;
126:
127:                mLargerHelloIdSeen = false;
128:                mSendHeartBeats = false;
129:                mOtherTrafficSeen = false;
130:
131:                mPB = proxyBinding;
132:                mLog = mPB.getLogger("event");
133:                mService = proxyBinding.getService();
134:                mCM = proxyBinding.getConnectionManager();
135:                mEC = mCM.getEventConnection();
136:                mInstances = new HashMap();
137:                mWaitInstances = new HashMap();
138:
139:                //
140:                //  Register the events that we can accept.
141:                //
142:                efi = EventInfoFactory.getInstance();
143:                efi.registerEventInfo(RegistrationInfo.class,
144:                        RegistrationInfo.EVENTNAME);
145:                efi.registerEventInfo(HelloInfo.class, HelloInfo.EVENTNAME);
146:                efi.registerEventInfo(HelloAckInfo.class,
147:                        HelloAckInfo.EVENTNAME);
148:                efi.registerEventInfo(LeaveInfo.class, LeaveInfo.EVENTNAME);
149:                efi.registerEventInfo(LeaveAckInfo.class,
150:                        LeaveAckInfo.EVENTNAME);
151:                efi.registerEventInfo(HeartBeatInfo.class,
152:                        HeartBeatInfo.EVENTNAME);
153:                //
154:                //  Get some configuration values.
155:                //
156:                mDefaultHeartBeatTime = getProperty(
157:                        "com.sun.jbi.binding.proxy.heartbeattime", "120000");
158:                mDefaultHelloTime = getProperty(
159:                        "com.sun.jbi.binding.proxy.hellotime", "5000");
160:                // get handle to ProxyBinding stats
161:                mProxyBindingStatistics = proxyBinding.getPBStatistics();
162:            }
163:
164:            private long getProperty(String prop, String defaultValue) {
165:                long value;
166:
167:                try {
168:                    value = new Integer(System.getProperty(prop, defaultValue))
169:                            .longValue();
170:                } catch (java.lang.NumberFormatException nfEx) {
171:                    value = new Integer(defaultValue).longValue();
172:                }
173:                return (value);
174:            }
175:
176:            /**
177:             * Stop the event processor on the next event or timeout.
178:             */
179:            void stop() {
180:                if (mState == STATE_RUNNING || mState == STATE_RUNNING_MASTER) {
181:                    sendLeaveEvent();
182:                }
183:                mRunning = false;
184:            }
185:
186:            HashMap getInstances() {
187:                return ((HashMap) mInstances.clone());
188:            }
189:
190:            /**
191:             * Main processing loop
192:             * Basic flow:
193:             *      Read next write event or timeout.
194:             *      Process event (if available)
195:             *      Handle timeout (if happened)
196:             */
197:            public void run() {
198:                mLog.info("PB:EventProcessor starting.");
199:
200:                //
201:                // Basic processing loop.
202:                // Prime the machinery with a Hello event.
203:                //
204:                for (sendHelloEvent(); mRunning;) {
205:                    try {
206:                        EventInfo ei = null;
207:                        long timeout;
208:                        long startTime;
209:
210:                        //
211:                        // Setup timeout based on expected time of next timed event.
212:                        // Make sure that we don't go negative or set timeout to zero (which would wait
213:                        // for ever.)
214:                        //
215:                        startTime = System.currentTimeMillis();
216:                        if ((timeout = mNextTime - startTime) <= 0) {
217:                            timeout = 1;
218:                        }
219:
220:                        //
221:                        //  Wait for next event or timeout.
222:                        //
223:                        ei = mEC.receiveEvent(timeout);
224:                        mEventTime = System.currentTimeMillis();
225:
226:                        //
227:                        //  Process event if we got one.
228:                        //
229:                        if (ei != null) {
230:                            mEventsReceived++;
231:                            if ((mProxyBindingStatistics != null)
232:                                    && (mProxyBindingStatistics.isEnabled())) {
233:                                mProxyBindingStatistics
234:                                        .incrementReceivedEvents();
235:                            }
236:                            processEvent(ei);
237:                        }
238:
239:                        //
240:                        //  Handle timeout processing if it has expired.
241:                        //
242:                        if (mEventTime >= mNextTime) {
243:                            mNextTime = handleTimeout();
244:                        }
245:                    } catch (com.sun.jbi.binding.proxy.connection.EventException eEx) {
246:                        mLog.info("PB:EventReceiver EventException: " + eEx);
247:                        mPB.stop();
248:                    } catch (Exception ex) {
249:                        mLog.info("PB:EventProcessor Exception: " + ex);
250:                        ex.printStackTrace();
251:                    }
252:                }
253:            }
254:
255:            //
256:            // ------------------- Methods handling event generation and processing  ------------------
257:            //    
258:
259:            void processEvent(EventInfo ei) {
260:                String sender = ei.getSender();
261:
262:                //
263:                //  Skip any events that we sent.
264:                //
265:                if (sender == null || !sender.equals(mId)) {
266:                    if (ei instanceof  RegistrationInfo) {
267:                        handleRegistrationEvent((RegistrationInfo) ei);
268:                    } else if (ei instanceof  HeartBeatInfo) {
269:                        handleHeartBeatEvent((HeartBeatInfo) ei);
270:                    } else if (ei instanceof  HelloInfo) {
271:                        handleHelloEvent((HelloInfo) ei);
272:                    } else if (ei instanceof  HelloAckInfo) {
273:                        handleHelloAckEvent((HelloAckInfo) ei);
274:                    } else if (ei instanceof  LeaveInfo) {
275:                        handleLeaveEvent((LeaveInfo) ei);
276:                    } else if (ei instanceof  LeaveAckInfo) {
277:                        handleLeaveAckEvent((LeaveAckInfo) ei);
278:                    }
279:                }
280:            }
281:
282:            long handleTimeout() {
283:                long nextEventTime;
284:                String nextMaster;
285:                boolean changeMaster;
286:
287:                //
288:                //  The HELLO protocol requires 3 HELLO's to be sent before any MASTER takeover can happen.
289:                //  If a MASTER returns a HELLO_ACK the HELLO protocol terminates early. In the case where
290:                //  no MASTER exists yet, the HELLO broadcaster that possess the highest Id sent in a HELLO
291:                //  message is allowed to be the new MASTER. Any other traffic detected by a HELLO broadcaster
292:                //  will cause the HELLO broadcaster to assume a MASTER will be selected from existing members.
293:                //
294:                if (mState == STATE_HELLO) {
295:                    if (mEventTime > mNextHeartBeatTime) {
296:                        if (mHelloCount < 6) {
297:                            if (mHelloCount >= 3
298:                                    && (!mOtherTrafficSeen || !mLargerHelloIdSeen)) {
299:                                mLog.info("PB:became master by default (" + mId
300:                                        + ")");
301:                                sendHelloAckEvent(mId, mBirthTime);
302:                                mInstances.put(mId, new InstanceEntry(mId,
303:                                        mBirthTime, 0));
304:                                mState = STATE_RUNNING_MASTER;
305:                                mPB.startComplete();
306:                            } else {
307:                                resendHelloEvent();
308:                            }
309:                        } else {
310:                            sendHelloEvent();
311:                        }
312:                    }
313:                } else if (mState == STATE_RUNNING
314:                        || mState == STATE_RUNNING_MASTER) {
315:                    if (mEventTime > mNextHeartBeatTime) {
316:                        sendHeartBeatEvent(false);
317:                    }
318:                } else if (mState == STATE_LEAVE) {
319:                }
320:
321:                //
322:                //  Compute when the next heart beat event needs to be sent or received.
323:                //  When running as the master it will sent LeaveAck events for any expired heartbeats.
324:                //  If non-masters think that the master has expired they elect a new master 
325:                //  (highest instanceId name is the winner.)
326:                //
327:                nextEventTime = mNextHeartBeatTime;
328:                nextMaster = mId;
329:                changeMaster = false;
330:                for (Iterator i = mInstances.values().iterator(); i.hasNext();) {
331:                    InstanceEntry ie = (InstanceEntry) i.next();
332:                    long hbt = ie.getHeartbeatTime();
333:
334:                    if (!ie.getInstanceId().equals(mId)) {
335:                        if (mEventTime > hbt) {
336:                            if (mState == STATE_RUNNING_MASTER) {
337:                                mLog.info("PB:no heartbeat ("
338:                                        + ie.getInstanceId() + "," + hbt + ")");
339:                                i.remove();
340:                                if (mInstances.size() == 1) {
341:                                    mSendHeartBeats = false;
342:                                }
343:                                sendLeaveAckEvent(ie);
344:                            } else if (ie.getInstanceId().equals(mMasterId)) {
345:                                changeMaster = true;
346:                            }
347:                        } else {
348:                            if (hbt < nextEventTime) {
349:                                nextEventTime = ie.getHeartbeatTime();
350:                            }
351:                            if (ie.getInstanceId().compareTo(nextMaster) > 0) {
352:                                nextMaster = ie.getInstanceId();
353:                            }
354:                        }
355:                    }
356:                }
357:
358:                //
359:                //  If the master needs to change and we are the master than take over.
360:                //
361:                if (changeMaster && nextMaster.equals(mId)) {
362:                    mLog.info("PB:became master (" + mId + ")");
363:                    mState = STATE_RUNNING_MASTER;
364:                    mInstances.remove(mMasterId);
365:                    mMasterId = mId;
366:                    if (mInstances.size() == 1) {
367:                        mSendHeartBeats = false;
368:                    }
369:                }
370:
371:                //
372:                //  Change timeout to 1 minute when no other members exist.
373:                //
374:                if (mState == STATE_RUNNING_MASTER && !mSendHeartBeats) {
375:                    nextEventTime = mEventTime + 60000;
376:                }
377:                //mLog.info("Next timeout (" + nextEventTime + ")");
378:
379:                return (nextEventTime);
380:            }
381:
382:            void sendHelloEvent() {
383:                HelloInfo hi;
384:
385:                mId = mPB.getConnectionManager().getInstanceId();
386:                mBirthTime = System.currentTimeMillis();
387:                mNextTime = mBirthTime + mDefaultHelloTime;
388:                mNextHeartBeatTime = mBirthTime + mDefaultHelloTime;
389:                mState = STATE_HELLO;
390:                mLargerHelloIdSeen = false;
391:                mOtherTrafficSeen = false;
392:                mHelloCount = 0;
393:                mLog.info("PB:sending HelloEvent (" + mId + "," + mBirthTime
394:                        + ")");
395:                hi = new HelloInfo(mId, mBirthTime);
396:                sendEvent(hi);
397:            }
398:
399:            void resendHelloEvent() {
400:                HelloInfo hi;
401:
402:                mHelloCount++;
403:                mNextHeartBeatTime = mEventTime + mDefaultHelloTime;
404:                hi = new HelloInfo(mId, mBirthTime);
405:                mLog.info("PB:resending HelloEvent (" + mId + "," + mBirthTime
406:                        + ")");
407:                sendEvent(hi);
408:            }
409:
410:            void handleHelloEvent(HelloInfo hi) {
411:                if (mState == STATE_RUNNING_MASTER) {
412:                    mLog.info("PB:received HelloEvent (" + hi.getInstanceId()
413:                            + "," + hi.getBirthtime() + ")");
414:                    mSendHeartBeats = true;
415:                    mInstances.put(hi.getInstanceId(), new InstanceEntry(hi
416:                            .getInstanceId(), hi.getBirthtime(), mEventTime
417:                            + mDefaultHeartBeatTime * 2));
418:                    sendHelloAckEvent(hi.getInstanceId(), hi.getBirthtime());
419:                    sendHeartBeatEvent(true);
420:                } else if (mState == STATE_HELLO) {
421:                    mLog.info("PB:received other HelloEvent ("
422:                            + hi.getInstanceId() + "," + hi.getBirthtime()
423:                            + ")");
424:                    if (hi.getInstanceId().compareTo(mId) > 0) {
425:                        mLargerHelloIdSeen = true;
426:                    }
427:                }
428:            }
429:
430:            void sendHelloAckEvent(String id, long birthtime) {
431:                HelloAckInfo hai;
432:                LinkedList ll = new LinkedList();
433:
434:                ll.addAll(mInstances.values());
435:                hai = new HelloAckInfo(id, birthtime, setLastTransitionTime(),
436:                        mMasterId, mBirthTime, ll);
437:                mNextHeartBeatTime = mEventTime + mDefaultHeartBeatTime;
438:                mLog.info("PB:sending HelloAckEvent (" + id + "," + birthtime
439:                        + "," + mLastTransitionTime + ")");
440:                sendEvent(hai);
441:                mPB.postNotification(ProxyBindingLifeCycle.ESB_MEMBER_JOIN, id);
442:            }
443:
444:            void handleHelloAckEvent(HelloAckInfo hai) {
445:                if (mState == STATE_HELLO) {
446:                    if (hai.getInstanceId().equals(mId)) {
447:                        LinkedList ll;
448:                        InstanceEntry ie;
449:
450:                        mLog.info("PB:received matching HelloAckEvent (" + mId
451:                                + "," + hai.getBirthtime() + ","
452:                                + hai.getLastTransitionTime() + ","
453:                                + hai.getMasterId() + ","
454:                                + hai.getMasterBirthTime() + ")");
455:
456:                        //
457:                        //  Populate our expected view of the world.
458:                        //
459:                        ll = hai.getMembers();
460:                        for (; !ll.isEmpty();) {
461:                            ie = (InstanceEntry) ll.remove(0);
462:                            mLog.info("PB: add member (" + ie.getInstanceId()
463:                                    + "," + ie.getBirthTime() + ")");
464:                            ie.setHeartbeatTime(mEventTime + mDefaultHelloTime);
465:                            mInstances.put(ie.getInstanceId(), ie);
466:                            if (!ie.getInstanceId().equals(mId)) {
467:                                mWaitInstances.put(ie.getInstanceId(), ie);
468:                            }
469:                        }
470:                        mState = STATE_RUNNING;
471:                        mNextHeartBeatTime = mEventTime + mDefaultHeartBeatTime;
472:                        mMasterId = hai.getMasterId();
473:                        mLastTransitionTime = hai.getLastTransitionTime();
474:                        mSendHeartBeats = true;
475:                    } else {
476:                        mLog.info("PB:received non-matching HelloAckEvent ("
477:                                + hai.getInstanceId() + ","
478:                                + hai.getBirthtime() + ","
479:                                + hai.getLastTransitionTime() + ")");
480:                        mOtherTrafficSeen = true;
481:                    }
482:                } else if (mState == STATE_RUNNING) {
483:                    InstanceEntry ie = (InstanceEntry) mInstances.get(hai
484:                            .getInstanceId());
485:
486:                    mLog.info("PB:Received HelloAckEvent ("
487:                            + hai.getInstanceId() + "," + hai.getBirthtime()
488:                            + "," + hai.getLastTransitionTime() + ")");
489:                    mLastTransitionTime = hai.getLastTransitionTime();
490:                    if (ie != null) {
491:                        mLog
492:                                .info("PB:received HelloAckEvent for different birthTime("
493:                                        + ie.getBirthTime() + ")");
494:                        mInstances.remove(hai.getInstanceId());
495:                    }
496:                    mInstances.put(hai.getInstanceId(), new InstanceEntry(hai
497:                            .getInstanceId(), hai.getBirthtime(), mEventTime
498:                            + mDefaultHeartBeatTime * 2 + 1000));
499:                    sendHeartBeatEvent(true);
500:                } else if (mState == STATE_RUNNING_MASTER) {
501:                    mLog.info("PB: master received HelloAckEvent ("
502:                            + hai.getInstanceId() + "," + hai.getBirthtime()
503:                            + "," + hai.getLastTransitionTime() + ")");
504:                }
505:            }
506:
507:            void sendHeartBeatEvent(boolean includeEndpoints) {
508:                HeartBeatInfo hbi;
509:
510:                if (mSendHeartBeats) {
511:                    hbi = new HeartBeatInfo(mId, mBirthTime,
512:                            mLastTransitionTime);
513:                    mNextHeartBeatTime = mEventTime + mDefaultHeartBeatTime;
514:                    if (includeEndpoints) {
515:                        hbi.setEndpoints(mPB.getLocalEndpoints());
516:                    }
517:                    mLog.info("PB:sending HeartBeatEvent (" + mId + ","
518:                            + mBirthTime + "," + mLastTransitionTime + ","
519:                            + includeEndpoints + ")");
520:                    sendEvent(hbi);
521:                }
522:            }
523:
524:            void handleHeartBeatEvent(HeartBeatInfo hbi) {
525:                if (mState == STATE_RUNNING || mState == STATE_RUNNING_MASTER) {
526:                    InstanceEntry ie;
527:
528:                    ie = (InstanceEntry) mInstances.get(hbi.getInstanceId());
529:                    if (ie != null && ie.getBirthTime() == hbi.getBirthtime()) {
530:                        mLog.info("PB:received HeartBeatEvent ("
531:                                + hbi.getInstanceId() + ","
532:                                + hbi.getBirthtime() + ","
533:                                + hbi.getLastTransitionTime() + ")");
534:                        ie.setHeartbeatTime(mEventTime + mDefaultHeartBeatTime
535:                                * 2 + 1000);
536:
537:                        //
538:                        //  Handle endpoint information iff:
539:                        //      We are waiting for endpoint info.
540:                        //      Endpoint information is encoded in the message
541:                        //      The message is related to us joining (not a previous join.)
542:                        //
543:                        if (!mWaitInstances.isEmpty()
544:                                && hbi.getEndpoints() != null
545:                                && hbi.getLastTransitionTime() == mLastTransitionTime) {
546:                            for (Iterator i = hbi.getEndpoints().iterator(); i
547:                                    .hasNext();) {
548:                                RegistrationInfo ri = (RegistrationInfo) i
549:                                        .next();
550:
551:                                try {
552:                                    mLog
553:                                            .info("PB:HeartBeatEvent registration ("
554:                                                    + ri.getServiceName()
555:                                                    + ","
556:                                                    + ri.getEndpointName()
557:                                                    + ")");
558:                                    mPB.addRemoteEndpoint(ri);
559:                                } catch (javax.jbi.JBIException mEx) {
560:                                }
561:                            }
562:
563:                            //
564:                            //  See if we have seen all members yet.
565:                            //
566:                            mWaitInstances.remove(hbi.getInstanceId());
567:                            if (mWaitInstances.isEmpty()) {
568:                                mPB.startComplete();
569:                            }
570:                        }
571:                    } else {
572:                        mLog.info("PB:received unknown HeartBeatEvent ("
573:                                + hbi.getInstanceId() + ","
574:                                + hbi.getBirthtime() + ")");
575:                    }
576:                } else if (mState == STATE_HELLO) {
577:                    mOtherTrafficSeen = true;
578:                }
579:            }
580:
581:            void sendLeaveEvent() {
582:                LeaveInfo li;
583:
584:                li = new LeaveInfo(mId, mBirthTime);
585:                mLog.info("PB:sending LeaveEvent (" + mId + "," + mBirthTime
586:                        + ")");
587:                sendEvent(li);
588:                mState = STATE_LEAVE;
589:                if (mInstances.size() == 1) {
590:                    mRunning = false;
591:                }
592:            }
593:
594:            void handleLeaveEvent(LeaveInfo li) {
595:                if (mState == STATE_RUNNING_MASTER) {
596:                    InstanceEntry ie;
597:
598:                    ie = (InstanceEntry) mInstances.get(li.getInstanceId());
599:                    if (ie != null && ie.getBirthTime() == li.getBirthtime()) {
600:                        mLog.info("PB:received LeaveEvent ("
601:                                + li.getInstanceId() + "," + li.getBirthtime()
602:                                + ")");
603:                        mInstances.remove(ie.getInstanceId());
604:                        mPB.purgeRemoteEndpointsForInstance(ie.getInstanceId());
605:                        sendLeaveAckEvent(ie);
606:                        if (mInstances.size() == 1) {
607:                            mSendHeartBeats = false;
608:                        }
609:                    } else {
610:                        mLog.info("PB:received unknown LeaveEvent ("
611:                                + li.getInstanceId() + "," + li.getBirthtime()
612:                                + ")");
613:                    }
614:                } else if (mState == STATE_HELLO) {
615:                    mOtherTrafficSeen = true;
616:                }
617:            }
618:
619:            void sendLeaveAckEvent(InstanceEntry ie) {
620:                LeaveAckInfo lai;
621:
622:                lai = new LeaveAckInfo(ie.getInstanceId(),
623:                        setLastTransitionTime());
624:                mLog.info("PB:sending LeaveAckEvent (" + ie.getInstanceId()
625:                        + "," + lai.getLastTransitionTime() + ")");
626:                sendEvent(lai);
627:                mPB.postNotification(ProxyBindingLifeCycle.ESB_MEMBER_LEAVE, ie
628:                        .getInstanceId());
629:            }
630:
631:            void handleLeaveAckEvent(LeaveAckInfo lai) {
632:                if (mState == STATE_LEAVE) {
633:                    mRunning = false;
634:                } else if (mState == STATE_HELLO) {
635:                    mOtherTrafficSeen = true;
636:                } else {
637:                    mLog.info("PB:received LeaveAckEvent ("
638:                            + lai.getInstanceId() + ","
639:                            + lai.getLastTransitionTime() + ")");
640:                    mLastTransitionTime = lai.getLastTransitionTime();
641:                    mInstances.remove(lai.getInstanceId());
642:                    mPB.purgeRemoteEndpointsForInstance(lai.getInstanceId());
643:
644:                    //
645:                    //  See if we are waiting for instance which died.
646:                    //
647:                    mWaitInstances.remove(lai.getInstanceId());
648:                    if (mWaitInstances.isEmpty()) {
649:                        mPB.startComplete();
650:                    }
651:                }
652:            }
653:
654:            void handleRegistrationEvent(RegistrationInfo ri) {
655:                if (mState == STATE_RUNNING || mState == STATE_RUNNING_MASTER) {
656:                    try {
657:                        if (ri.getAction().equals(RegistrationInfo.ACTION_ADD)) {
658:                            mLog.info("PB:add registration ("
659:                                    + ri.getServiceName() + ","
660:                                    + ri.getEndpointName() + ","
661:                                    + ri.getInstanceId() + ")");
662:                            mPB.addRemoteEndpoint(ri);
663:                        } else if (ri.getAction().equals(
664:                                RegistrationInfo.ACTION_REMOVE)) {
665:                            mLog.info("PB:remove registration ("
666:                                    + ri.getServiceName() + ","
667:                                    + ri.getEndpointName() + ","
668:                                    + ri.getInstanceId() + ")");
669:                            mPB.removeRemoteEndpoint(ri);
670:                        } else {
671:                            mLog.info("PB:unknown registration event action ("
672:                                    + ri.getAction() + ")");
673:                        }
674:                    } catch (javax.jbi.JBIException mEx) {
675:                        mLog.info("PB:error handling registration event: "
676:                                + mEx);
677:                    }
678:                } else {
679:                    mOtherTrafficSeen = true;
680:                }
681:            }
682:
683:            //
684:            //  Because multiple systems can run with skewed clocks and in different time zones we just
685:            //  want to make sure that time is monotonically increasing.
686:            //
687:            long setLastTransitionTime() {
688:                long time = System.currentTimeMillis();
689:
690:                if (time < mLastTransitionTime) {
691:                    mLastTransitionTime += 13;
692:                } else {
693:                    mLastTransitionTime = time;
694:                }
695:                return (mLastTransitionTime);
696:            }
697:
698:            void sendEvent(EventInfo ei) {
699:                try {
700:                    mEventsSent++;
701:                    mEC.sendEvent(ei);
702:                    if ((mProxyBindingStatistics != null)
703:                            && (mProxyBindingStatistics.isEnabled())) {
704:                        mProxyBindingStatistics.incrementSentEvents();
705:                    }
706:                } catch (com.sun.jbi.binding.proxy.connection.EventException eEx) {
707:                    mLog.info("PB:sendEvent failed: " + eEx);
708:                }
709:            }
710:
711:            public String toString() {
712:                StringBuffer sb = new StringBuffer();
713:                SimpleDateFormat sdf = new SimpleDateFormat(
714:                        "yyyy-MM-dd'T'HH:mm:ss.SSSZ");
715:
716:                sb.append("  Event Processor  Id(");
717:                sb.append(mId);
718:                sb.append(")\n    State          (");
719:                sb
720:                        .append(mState == STATE_HELLO ? "HELLO"
721:                                : (mState == STATE_RUNNING ? "RUNNING"
722:                                        : (mState == STATE_LEAVE ? "LEAVE"
723:                                                : "MASTER")));
724:                sb.append(")\n    BirthTime      (");
725:                sb.append(sdf.format(new Date(mBirthTime)));
726:                sb.append(")\n    NextTime       (");
727:                sb.append(sdf.format(new Date(mNextTime)));
728:                sb.append(")\n    LastTransition (");
729:                sb.append(sdf.format(new Date(mLastTransitionTime)));
730:                sb.append(")\n    Master         (");
731:                sb.append(mMasterId);
732:                sb.append(")\n    Events         Sent(");
733:                sb.append(mEventsSent);
734:                sb.append(")  Received(");
735:                sb.append(mEventsReceived);
736:                sb.append(")\n    Instances Count(");
737:                sb.append(mInstances.size());
738:                sb.append(")\n");
739:                for (Iterator i = mInstances.values().iterator(); i.hasNext();) {
740:                    sb.append(i.next().toString());
741:                }
742:                sb.append("    Wait Instances Count(");
743:                sb.append(mWaitInstances.size());
744:                sb.append(")\n");
745:                if (mWaitInstances.size() > 0) {
746:                    for (Iterator i = mWaitInstances.values().iterator(); i
747:                            .hasNext();) {
748:                        sb.append(i.next().toString());
749:                    }
750:                }
751:                return (sb.toString());
752:            }
753:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.