Source Code Cross Referenced for ReplyReceiver.java in  » Collaboration » JacORB » org » jacorb » orb » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Collaboration » JacORB » org.jacorb.orb 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        package org.jacorb.orb;
002:
003:        /*
004:         *        JacORB - a free Java ORB
005:         *
006:         *   Copyright (C) 1997-2004 Gerald Brose.
007:         *
008:         *   This library is free software; you can redistribute it and/or
009:         *   modify it under the terms of the GNU Library General Public
010:         *   License as published by the Free Software Foundation; either
011:         *   version 2 of the License, or (at your option) any later version.
012:         *
013:         *   This library is distributed in the hope that it will be useful,
014:         *   but WITHOUT ANY WARRANTY; without even the implied warranty of
015:         *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
016:         *   Library General Public License for more details.
017:         *
018:         *   You should have received a copy of the GNU Library General Public
019:         *   License along with this library; if not, write to the Free
020:         *   Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
021:         */
022:
023:        import java.util.*;
024:
025:        import org.apache.avalon.framework.logger.Logger;
026:        import org.apache.avalon.framework.configuration.Configurable;
027:
028:        import org.jacorb.orb.giop.MessageInputStream;
029:        import org.jacorb.orb.giop.ReplyInputStream;
030:        import org.jacorb.orb.giop.ReplyPlaceholder;
031:        import org.jacorb.util.Time;
032:
033:        import org.omg.CORBA.MARSHAL;
034:        import org.omg.CORBA.SystemException;
035:        import org.omg.CORBA.portable.ApplicationException;
036:        import org.omg.CORBA.portable.InvokeHandler;
037:        import org.omg.CORBA.portable.RemarshalException;
038:        import org.omg.CORBA.portable.ServantObject;
039:        import org.omg.GIOP.ReplyStatusType_1_2;
040:        import org.omg.Messaging.ExceptionHolder;
041:        import org.omg.TimeBase.UtcT;
042:
043:        /**
044:         * A special ReplyPlaceholder that receives replies to normal requests,
045:         * either synchronously or asynchronously.  A ReplyReceiver
046:         * handles all ORB-internal work that needs to be done for the reply,
047:         * such as checking for exceptions and invoking the interceptors.
048:         * The client stub can either do a blocking wait on the ReplyReceiver
049:         * (via getReply()), or a ReplyHandler can be supplied when the
050:         * ReplyReceiver is created; then the reply is delivered to that
051:         * ReplyHandler.
052:         *
053:         * @author Andre Spiegel <spiegel@gnu.org>
054:         * @version $Id: ReplyReceiver.java,v 1.33 2006/08/29 15:02:17 alphonse.bendt Exp $
055:         */
056:
057:        public class ReplyReceiver extends ReplyPlaceholder implements 
058:                Configurable {
059:            private final org.jacorb.orb.Delegate delegate;
060:            private final ClientInterceptorHandler interceptors;
061:
062:            private final org.omg.Messaging.ReplyHandler replyHandler;
063:
064:            private final String operation;
065:            private final Timer timer;
066:
067:            private Logger logger;
068:
069:            /** configuration properties */
070:            private boolean retry_on_failure = false;
071:
072:            public ReplyReceiver(org.jacorb.orb.Delegate delegate,
073:                    String operation, org.omg.TimeBase.UtcT replyEndTime,
074:                    ClientInterceptorHandler interceptors,
075:                    org.omg.Messaging.ReplyHandler replyHandler) {
076:                super ((org.jacorb.orb.ORB) delegate.orb(null));
077:
078:                this .delegate = delegate;
079:                this .operation = operation;
080:                this .interceptors = interceptors;
081:                this .replyHandler = replyHandler;
082:
083:                if (replyEndTime != null) {
084:                    timer = new Timer(replyEndTime);
085:                    timer.setName("ReplyReceiver Timer");
086:                    timer.start();
087:                } else {
088:                    timer = null;
089:                }
090:
091:            }
092:
093:            public void configure(
094:                    org.apache.avalon.framework.configuration.Configuration configuration) {
095:                logger = ((org.jacorb.config.Configuration) configuration)
096:                        .getNamedLogger("jacorb.orb.rep_recv");
097:                retry_on_failure = configuration.getAttributeAsBoolean(
098:                        "jacorb.connection.client.retry_on_failure", false);
099:            }
100:
101:            public void replyReceived(MessageInputStream in) {
102:                if (timeoutException) {
103:                    return; // discard reply
104:                }
105:
106:                if (timer != null) {
107:                    timer.wakeup();
108:                }
109:
110:                Set pending_replies = delegate.get_pending_replies();
111:                // grab pending_replies lock BEFORE my own,
112:                // then I will already have it in the replyDone call below.
113:                synchronized (pending_replies) {
114:                    // This internal synchronization prevents a deadlock
115:                    // when a timeout and a reply coincide, suggested
116:                    // by Jimmy Wilson, 2005-01.  It is only a temporary
117:                    // work-around though, until I can simplify this entire
118:                    // logic much more thoroughly, AS.
119:                    synchronized (lock) {
120:                        if (timeoutException) {
121:                            return; // discard reply
122:                        }
123:
124:                        this .in = in;
125:                        delegate.replyDone(this );
126:
127:                        if (replyHandler != null) {
128:                            // asynchronous delivery
129:                            performCallback((ReplyInputStream) in);
130:                        } else {
131:                            // synchronous delivery
132:                            ready = true;
133:                            lock.notifyAll();
134:                        }
135:                    }
136:                }
137:            }
138:
139:            private void performCallback(ReplyInputStream reply) {
140:                // TODO: Call interceptors.
141:
142:                org.omg.CORBA.portable.Delegate replyHandlerDelegate = ((org.omg.CORBA.portable.ObjectImpl) replyHandler)
143:                        ._get_delegate();
144:
145:                ServantObject so = replyHandlerDelegate.servant_preinvoke(
146:                        replyHandler, operation, InvokeHandler.class);
147:                try {
148:                    switch (reply.getStatus().value()) {
149:                    case ReplyStatusType_1_2._NO_EXCEPTION: {
150:                        ((InvokeHandler) so.servant)._invoke(operation, reply,
151:                                new DummyResponseHandler());
152:                        break;
153:                    }
154:                    case ReplyStatusType_1_2._USER_EXCEPTION:
155:                    case ReplyStatusType_1_2._SYSTEM_EXCEPTION: {
156:                        ExceptionHolderImpl holder = new ExceptionHolderImpl(
157:                                reply);
158:
159:                        org.omg.CORBA_2_3.ORB orb = (org.omg.CORBA_2_3.ORB) replyHandlerDelegate
160:                                .orb(null);
161:                        orb.register_value_factory(
162:                                "IDL:omg.org/Messaging/ExceptionHolder:1.0",
163:                                new ExceptionHolderFactory());
164:
165:                        CDRInputStream input = new CDRInputStream(orb, holder
166:                                .marshal());
167:
168:                        ((InvokeHandler) so.servant)._invoke(operation
169:                                + "_excep", input, new DummyResponseHandler());
170:                        break;
171:                    }
172:                    }
173:                } catch (Exception e) {
174:                    logger.warn("Exception during callback", e);
175:                } finally {
176:                    replyHandlerDelegate.servant_postinvoke(replyHandler, so);
177:                }
178:            }
179:
180:            /**
181:             * There's a lot of code duplication in this method right now.
182:             * This should be merged with performCallback() above.
183:             */
184:            private void performExceptionCallback(ExceptionHolderImpl holder) {
185:                // TODO: Call interceptors.
186:
187:                org.omg.CORBA.portable.Delegate replyHandlerDelegate = ((org.omg.CORBA.portable.ObjectImpl) replyHandler)
188:                        ._get_delegate();
189:
190:                ServantObject so = replyHandlerDelegate.servant_preinvoke(
191:                        replyHandler, operation, InvokeHandler.class);
192:                try {
193:                    org.omg.CORBA_2_3.ORB orb = (org.omg.CORBA_2_3.ORB) replyHandlerDelegate
194:                            .orb(null);
195:                    orb.register_value_factory(
196:                            "IDL:omg.org/Messaging/ExceptionHolder:1.0",
197:                            new ExceptionHolderFactory());
198:
199:                    CDRInputStream input = new CDRInputStream(orb, holder
200:                            .marshal());
201:
202:                    ((InvokeHandler) so.servant)._invoke(operation + "_excep",
203:                            input, new DummyResponseHandler());
204:                } catch (Exception e) {
205:                    if (logger.isWarnEnabled()) {
206:                        logger.warn("Exception during callback: "
207:                                + e.toString());
208:                    }
209:                } finally {
210:                    replyHandlerDelegate.servant_postinvoke(replyHandler, so);
211:                }
212:            }
213:
214:            /**
215:             * This method blocks until a reply becomes available.
216:             * If the reply contains any exceptions, they are rethrown.
217:             */
218:            public synchronized ReplyInputStream getReply()
219:                    throws RemarshalException, ApplicationException {
220:                try {
221:                    // On NT connection closure due to service shutdown is not
222:                    // detected until this point, resulting in a COMM_FAILURE.
223:                    // Map to RemarshalException to force rebind attempt.
224:                    try {
225:                        getInputStream(timer != null); // block until reply is available
226:                    } catch (org.omg.CORBA.COMM_FAILURE ex) {
227:                        if (retry_on_failure) {
228:                            throw new RemarshalException();
229:                        }
230:                        //rethrow
231:                        throw ex;
232:                    }
233:                } catch (SystemException se) {
234:                    interceptors.handle_receive_exception(se);
235:                    throw se;
236:                } catch (RemarshalException re) {
237:                    // Wait until the thread that received the actual
238:                    // forward request rebound the Delegate
239:                    delegate.waitOnBarrier();
240:                    throw new RemarshalException();
241:                }
242:
243:                ReplyInputStream reply = (ReplyInputStream) in;
244:
245:                ReplyStatusType_1_2 status = delegate.doNotCheckExceptions() ? ReplyStatusType_1_2.NO_EXCEPTION
246:                        : reply.getStatus();
247:
248:                switch (status.value()) {
249:                case ReplyStatusType_1_2._NO_EXCEPTION: {
250:                    interceptors.handle_receive_reply(reply);
251:                    return reply;
252:                }
253:                case ReplyStatusType_1_2._USER_EXCEPTION: {
254:                    ApplicationException ae = getApplicationException(reply);
255:                    interceptors.handle_receive_exception(ae, reply);
256:                    throw ae;
257:                }
258:                case ReplyStatusType_1_2._SYSTEM_EXCEPTION: {
259:                    SystemException se = SystemExceptionHelper.read(reply);
260:                    interceptors.handle_receive_exception(se, reply);
261:                    throw se;
262:                }
263:                case ReplyStatusType_1_2._LOCATION_FORWARD:
264:                case ReplyStatusType_1_2._LOCATION_FORWARD_PERM: {
265:                    org.omg.CORBA.Object forward_reference = reply
266:                            .read_Object();
267:                    interceptors.handle_location_forward(reply,
268:                            forward_reference);
269:                    doRebind(forward_reference);
270:                    throw new RemarshalException();
271:                }
272:                case ReplyStatusType_1_2._NEEDS_ADDRESSING_MODE: {
273:                    throw new org.omg.CORBA.NO_IMPLEMENT(
274:                            "WARNING: Got reply status NEEDS_ADDRESSING_MODE "
275:                                    + "(not implemented).");
276:                }
277:                default: {
278:                    throw new MARSHAL("Received unexpected reply status: "
279:                            + status.value());
280:                }
281:                }
282:            }
283:
284:            private void doRebind(org.omg.CORBA.Object forward_reference) {
285:                // make other threads that have unreturned replies wait
286:                delegate.lockBarrier();
287:
288:                try {
289:                    // tell every pending request to remarshal
290:                    // they will be blocked on the barrier
291:                    Set pending_replies = delegate.get_pending_replies();
292:                    synchronized (pending_replies) {
293:                        for (Iterator i = pending_replies.iterator(); i
294:                                .hasNext();) {
295:                            ReplyPlaceholder p = (ReplyPlaceholder) i.next();
296:                            p.retry();
297:                        }
298:                    }
299:
300:                    // do the actual rebind
301:                    delegate.rebind(forward_reference);
302:                } finally {
303:                    // now other threads can safely remarshal
304:                    delegate.openBarrier();
305:                }
306:            }
307:
308:            private ApplicationException getApplicationException(
309:                    ReplyInputStream reply) {
310:                reply.mark(0);
311:                String id = reply.read_string();
312:
313:                try {
314:                    reply.reset();
315:                } catch (java.io.IOException ioe) {
316:                    logger.error("unexpected Exception in reset()", ioe);
317:                }
318:
319:                return new ApplicationException(id, reply);
320:            }
321:
322:            /**
323:             * A ResponseHandler that is passed to the ReplyHandler's POA
324:             * when we invoke it.  Since ReplyHandler operations never generate
325:             * replies, this ResponseHandler does nothing to this effect.
326:             * The createReply() method, however, is the last method that
327:             * is called before control goes to the ReplyHandler servant,
328:             * so we use it to check for timing constraints.
329:             */
330:            private class DummyResponseHandler implements 
331:                    org.omg.CORBA.portable.ResponseHandler {
332:                public org.omg.CORBA.portable.OutputStream createReply() {
333:                    // the latest possible time at which we can do this
334:                    Time.waitFor(delegate.getReplyStartTime());
335:                    return null;
336:                }
337:
338:                public org.omg.CORBA.portable.OutputStream createExceptionReply() {
339:                    return null;
340:                }
341:            }
342:
343:            private static class ExceptionHolderFactory implements 
344:                    org.omg.CORBA.portable.ValueFactory {
345:                public java.io.Serializable read_value(
346:                        org.omg.CORBA_2_3.portable.InputStream is) {
347:                    ExceptionHolder result = new ExceptionHolderImpl();
348:                    result._read(is);
349:                    return result;
350:                }
351:            }
352:
353:            /**
354:             * This class implements timeouts while we are waiting for
355:             * replies.  When it is instantiated, it takes a CORBA UtcT
356:             * constructor parameter that specifies the timeout expiration
357:             * time.  The timer starts running as soon as the Thread is
358:             * started.  When the timeout goes off, this Timer makes sure
359:             * that the enclosing ReplyReceiver is deactivated, and that
360:             * everybody associated with it is notified appropriately.
361:             * The timeout can be cancelled by calling wakeup() on a Timer.
362:             */
363:            private class Timer extends Thread {
364:                private final UtcT endTime;
365:                private boolean awakened = false;
366:
367:                public Timer(UtcT endTime) {
368:                    super ("ReplyReceiverTimer");
369:                    this .endTime = endTime;
370:                }
371:
372:                public void run() {
373:                    synchronized (lock) {
374:                        timeoutException = false;
375:                        if (!awakened) {
376:                            long time = org.jacorb.util.Time.millisTo(endTime);
377:                            if (time > 0) {
378:                                try {
379:                                    lock.wait(time);
380:                                } catch (InterruptedException ex) {
381:                                    logger
382:                                            .info("Interrupted while waiting for timeout");
383:                                }
384:                            }
385:                            if (!awakened) {
386:                                timeoutException = true;
387:
388:                                if (replyHandler != null) {
389:                                    ExceptionHolderImpl exHolder = new ExceptionHolderImpl(
390:                                            new org.omg.CORBA.TIMEOUT());
391:                                    performExceptionCallback(exHolder);
392:                                }
393:                                ready = true;
394:                                lock.notifyAll();
395:                            }
396:                        }
397:                    }
398:                }
399:
400:                public void wakeup() {
401:                    synchronized (lock) {
402:                        awakened = true;
403:                        timeoutException = false;
404:                        lock.notifyAll();
405:                    }
406:                }
407:            }
408:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.