Source Code Cross Referenced for Fiber.java in  » 6.0-JDK-Modules » jax-ws-runtime » com » sun » xml » ws » api » pipe » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » 6.0 JDK Modules » jax ws runtime » com.sun.xml.ws.api.pipe 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
003:         * 
004:         * Copyright 1997-2007 Sun Microsystems, Inc. All rights reserved.
005:         * 
006:         * The contents of this file are subject to the terms of either the GNU
007:         * General Public License Version 2 only ("GPL") or the Common Development
008:         * and Distribution License("CDDL") (collectively, the "License").  You
009:         * may not use this file except in compliance with the License. You can obtain
010:         * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
011:         * or glassfish/bootstrap/legal/LICENSE.txt.  See the License for the specific
012:         * language governing permissions and limitations under the License.
013:         * 
014:         * When distributing the software, include this License Header Notice in each
015:         * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
016:         * Sun designates this particular file as subject to the "Classpath" exception
017:         * as provided by Sun in the GPL Version 2 section of the License file that
018:         * accompanied this code.  If applicable, add the following below the License
019:         * Header, with the fields enclosed by brackets [] replaced by your own
020:         * identifying information: "Portions Copyrighted [year]
021:         * [name of copyright owner]"
022:         * 
023:         * Contributor(s):
024:         * 
025:         * If you wish your version of this file to be governed by only the CDDL or
026:         * only the GPL Version 2, indicate your decision by adding "[Contributor]
027:         * elects to include this software in this distribution under the [CDDL or GPL
028:         * Version 2] license."  If you don't indicate a single choice of license, a
029:         * recipient has the option to distribute your version of this file under
030:         * either the CDDL, the GPL Version 2 or to extend the choice of license to
031:         * its licensees as provided above.  However, if you add GPL Version 2 code
032:         * and therefore, elected the GPL Version 2 license, then the option applies
033:         * only if the new code is made subject to such option by the copyright
034:         * holder.
035:         */
036:
037:        package com.sun.xml.ws.api.pipe;
038:
039:        import com.sun.istack.NotNull;
040:        import com.sun.istack.Nullable;
041:        import com.sun.xml.ws.api.message.Packet;
042:        import com.sun.xml.ws.api.pipe.helper.AbstractFilterTubeImpl;
043:        import com.sun.xml.ws.api.server.Adapter;
044:
045:        import java.util.ArrayList;
046:        import java.util.List;
047:        import java.util.concurrent.atomic.AtomicInteger;
048:        import java.util.concurrent.locks.ReentrantLock;
049:        import java.util.logging.Level;
050:        import java.util.logging.Logger;
051:
052:        /**
053:         * User-level thread. Represents the execution of one request/response processing.
054:         *
055:         * <p>
056:         * JAX-WS RI is capable of running a large number of request/response concurrently by
057:         * using a relatively small number of threads. This is made possible by utilizing
058:         * a {@link Fiber} &mdash; a user-level thread that gets created for each request/response
059:         * processing.
060:         *
061:         * <p>
062:         * A fiber remembers where in the pipeline the processing is at, what needs to be
063:         * executed on the way out (when processing response), and other additional information
064:         * specific to the execution of a particular request/response.
065:         *
066:         * <h2>Suspend/Resume</h2>
067:         * <p>
068:         * Fiber can be {@link NextAction#suspend() suspended} by a {@link Tube}.
069:         * When a fiber is suspended, it will be kept on the side until it is
070:         * {@link #resume(Packet) resumed}. This allows threads to go execute
071:         * other runnable fibers, allowing efficient utilization of smaller number of
072:         * threads.
073:         *
074:         * <h2>Context-switch Interception</h2>
075:         * <p>
076:         * {@link FiberContextSwitchInterceptor} allows {@link Tube}s and {@link Adapter}s
077:         * to perform additional processing every time a thread starts running a fiber
078:         * and stops running it.
079:         *
080:         * <h2>Context ClassLoader</h2>
081:         * <p>
082:         * Just like thread, a fiber has a context class loader (CCL.) A fiber's CCL
083:         * becomes the thread's CCL when it's executing the fiber. The original CCL
084:         * of the thread will be restored when the thread leaves the fiber execution.
085:         *
086:         *
087:         * <h2>Debugging Aid</h2>
088:         * <p>
089:         * Because {@link Fiber} doesn't keep much in the call stack, and instead use
090:         * {@link #conts} to store the continuation, debugging fiber related activities
091:         * could be harder.
092:         *
093:         * <p>
094:         * Setting the {@link #LOGGER} for FINE would give you basic start/stop/resume/suspend
095:         * level logging. Using FINER would cause more detailed logging, which includes
096:         * what tubes are executed in what order and how they behaved.
097:         *
098:         * <p>
099:         * When you debug the server side, consider setting {@link Fiber#serializeExecution}
100:         * to true, so that execution of fibers are serialized. Debugging a server
101:         * with more than one running threads is very tricky, and this switch will
102:         * prevent that. This can be also enabled by setting the system property on.
103:         * See the source code.
104:         *
105:         * @author Kohsuke Kawaguchi
106:         * @author Jitendra Kotamraju
107:         */
108:        public final class Fiber implements  Runnable {
109:            /**
110:             * {@link Tube}s whose {@link Tube#processResponse(Packet)} method needs
111:             * to be invoked on the way back.
112:             */
113:            private Tube[] conts = new Tube[16];
114:            private int contsSize;
115:
116:            /**
117:             * If this field is non-null, the next instruction to execute is
118:             * to call its {@link Tube#processRequest(Packet)}. Otherwise
119:             * the instruction is to call {@link #conts}.
120:             */
121:            private Tube next;
122:
123:            private Packet packet;
124:
125:            private Throwable/*but really it's either RuntimeException or Error*/throwable;
126:
127:            public final Engine owner;
128:
129:            /**
130:             * Is this thread suspended? 0=not suspended, 1=suspended.
131:             *
132:             * <p>
133:             * Logically this is just a boolean, but we need to prepare for the case
134:             * where the thread is {@link #resume(Packet) resumed} before we get to the {@link #suspend()}.
135:             * This happens when things happen in the following order:
136:             *
137:             * <ol>
138:             *  <li>Tube decides that the fiber needs to be suspended to wait for the external event.
139:             *  <li>Tube hooks up fiber with some external mechanism (like NIO channel selector)
140:             *  <li>Tube returns with {@link NextAction#suspend()}.
141:             *  <li>"External mechanism" becomes signal state and invokes {@link Fiber#resume(Packet)}
142:             *      to wake up fiber
143:             *  <li>{@link Fiber#doRun} invokes {@link Fiber#suspend()}.
144:             * </ol>
145:             *
146:             * <p>
147:             * Using int, this will work OK because {@link #suspendedCount} becomes -1 when
148:             * {@link #resume(Packet)} occurs before {@link #suspend()}.
149:             *
150:             * <p>
151:             * Increment and decrement is guarded by 'this' object.
152:             */
153:            private volatile int suspendedCount = 0;
154:
155:            /**
156:             * Is this fiber completed?
157:             */
158:            private volatile boolean completed;
159:
160:            /**
161:             * Is this {@link Fiber} currently running in the synchronous mode?
162:             */
163:            private boolean synchronous;
164:
165:            private boolean interrupted;
166:
167:            private final int id;
168:
169:            /**
170:             * Active {@link FiberContextSwitchInterceptor}s for this fiber.
171:             */
172:            private List<FiberContextSwitchInterceptor> interceptors;
173:
174:            /**
175:             * Not null when {@link #interceptors} is not null.
176:             */
177:            private InterceptorHandler interceptorHandler;
178:
179:            /**
180:             * This flag is set to true when a new interceptor is added.
181:             *
182:             * When that happens, we need to first exit the current interceptors
183:             * and then reenter them, so that the newly added interceptors start
184:             * taking effect. This flag is used to control that flow.
185:             */
186:            private boolean needsToReenter;
187:
188:            /**
189:             * Fiber's context {@link ClassLoader}.
190:             */
191:            private @Nullable
192:            ClassLoader contextClassLoader;
193:
194:            private @Nullable
195:            CompletionCallback completionCallback;
196:
197:            /**
198:             * Set to true if this fiber is started asynchronously, to avoid
199:             * doubly-invoking completion code.
200:             */
201:            private boolean started;
202:
203:            /**
204:             * Callback to be invoked when a {@link Fiber} finishs execution.
205:             */
206:            public interface CompletionCallback {
207:                /**
208:                 * Indicates that the fiber has finished its execution.
209:                 *
210:                 * <p>
211:                 * Since the JAX-WS RI runs asynchronously,
212:                 * this method maybe invoked by a different thread
213:                 * than any of the threads that started it or run a part of tubeline.
214:                 */
215:                void onCompletion(@NotNull
216:                Packet response);
217:
218:                /**
219:                 * Indicates that the fiber has finished abnormally, by throwing a given {@link Throwable}.
220:                 */
221:                void onCompletion(@NotNull
222:                Throwable error);
223:            }
224:
225:            Fiber(Engine engine) {
226:                this .owner = engine;
227:                if (isTraceEnabled()) {
228:                    id = iotaGen.incrementAndGet();
229:                    LOGGER.fine(getName() + " created");
230:                } else {
231:                    id = -1;
232:                }
233:
234:                // if this is run from another fiber, then we naturally inherit its context classloader,
235:                // so this code works for fiber->fiber inheritance just fine.
236:                contextClassLoader = Thread.currentThread()
237:                        .getContextClassLoader();
238:            }
239:
240:            /**
241:             * Starts the execution of this fiber asynchronously.
242:             *
243:             * <p>
244:             * This method works like {@link Thread#start()}.
245:             *
246:             * @param tubeline
247:             *      The first tube of the tubeline that will act on the packet.
248:             * @param request
249:             *      The request packet to be passed to <tt>startPoint.processRequest()</tt>.
250:             * @param completionCallback
251:             *      The callback to be invoked when the processing is finished and the
252:             *      final response packet is available.
253:             *
254:             * @see #runSync(Tube,Packet)
255:             */
256:            public void start(@NotNull
257:            Tube tubeline, @NotNull
258:            Packet request, @Nullable
259:            CompletionCallback completionCallback) {
260:                next = tubeline;
261:                this .packet = request;
262:                this .completionCallback = completionCallback;
263:                this .started = true;
264:                owner.addRunnable(this );
265:            }
266:
267:            /**
268:             * Wakes up a suspended fiber.
269:             *
270:             * <p>
271:             * If a fiber was suspended from the {@link Tube#processRequest(Packet)} method,
272:             * then the execution will be resumed from the corresponding
273:             * {@link Tube#processResponse(Packet)} method with the specified response packet
274:             * as the parameter.
275:             *
276:             * <p>
277:             * If a fiber was suspended from the {@link Tube#processResponse(Packet)} method,
278:             * then the execution will be resumed from the next tube's
279:             * {@link Tube#processResponse(Packet)} method with the specified response packet
280:             * as the parameter.
281:             *
282:             * <p>
283:             * This method is implemented in a race-free way. Another thread can invoke
284:             * this method even before this fiber goes into the suspension mode. So the caller
285:             * need not worry about synchronizing {@link NextAction#suspend()} and this method. 
286:             */
287:            public synchronized void resume(@NotNull
288:            Packet response) {
289:                if (isTraceEnabled())
290:                    LOGGER.fine(getName() + " resumed");
291:                packet = response;
292:                if (--suspendedCount == 0) {
293:                    if (synchronous) {
294:                        notifyAll();
295:                    } else {
296:                        owner.addRunnable(this );
297:                    }
298:                }
299:            }
300:
301:            /**
302:             * Suspends this fiber's execution until the resume method is invoked.
303:             *
304:             * The call returns immediately, and when the fiber is resumed
305:             * the execution picks up from the last scheduled continuation.
306:             */
307:            private synchronized void suspend() {
308:                if (isTraceEnabled())
309:                    LOGGER.fine(getName() + " suspended");
310:                suspendedCount++;
311:            }
312:
313:            /**
314:             * Adds a new {@link FiberContextSwitchInterceptor} to this fiber.
315:             *
316:             * <p>
317:             * The newly installed fiber will take effect immediately after the current
318:             * tube returns from its {@link Tube#processRequest(Packet)} or
319:             * {@link Tube#processResponse(Packet)}, before the next tube begins processing.
320:             *
321:             * <p>
322:             * So when the tubeline consists of X and Y, and when X installs an interceptor,
323:             * the order of execution will be as follows:
324:             *
325:             * <ol>
326:             *  <li>X.processRequest()
327:             *  <li>interceptor gets installed
328:             *  <li>interceptor.execute() is invoked
329:             *  <li>Y.processRequest()
330:             * </ol>
331:             */
332:            public void addInterceptor(@NotNull
333:            FiberContextSwitchInterceptor interceptor) {
334:                if (interceptors == null) {
335:                    interceptors = new ArrayList<FiberContextSwitchInterceptor>();
336:                    interceptorHandler = new InterceptorHandler();
337:                }
338:                interceptors.add(interceptor);
339:                needsToReenter = true;
340:            }
341:
342:            /**
343:             * Removes a {@link FiberContextSwitchInterceptor} from this fiber.
344:             *
345:             * <p>
346:             * The removal of the interceptor takes effect immediately after the current
347:             * tube returns from its {@link Tube#processRequest(Packet)} or
348:             * {@link Tube#processResponse(Packet)}, before the next tube begins processing.
349:             *
350:             *
351:             * <p>
352:             * So when the tubeline consists of X and Y, and when Y uninstalls an interceptor
353:             * on the way out, then the order of execution will be as follows:
354:             *
355:             * <ol>
356:             *  <li>Y.processResponse() (notice that this happens with interceptor.execute() in the callstack)
357:             *  <li>interceptor gets uninstalled
358:             *  <li>interceptor.execute() returns
359:             *  <li>X.processResponse()
360:             * </ol>
361:             *
362:             * @return
363:             *      true if the specified interceptor was removed. False if
364:             *      the specified interceptor was not registered with this fiber to begin with.
365:             */
366:            public boolean removeInterceptor(@NotNull
367:            FiberContextSwitchInterceptor interceptor) {
368:                if (interceptors != null && interceptors.remove(interceptor)) {
369:                    needsToReenter = true;
370:                    return true;
371:                }
372:                return false;
373:            }
374:
375:            /**
376:             * Gets the context {@link ClassLoader} of this fiber.
377:             */
378:            public @Nullable
379:            ClassLoader getContextClassLoader() {
380:                return contextClassLoader;
381:            }
382:
383:            /**
384:             * Sets the context {@link ClassLoader} of this fiber.
385:             */
386:            public ClassLoader setContextClassLoader(@Nullable
387:            ClassLoader contextClassLoader) {
388:                ClassLoader r = this .contextClassLoader;
389:                this .contextClassLoader = contextClassLoader;
390:                return r;
391:            }
392:
393:            /**
394:             * DO NOT CALL THIS METHOD. This is an implementation detail
395:             * of {@link Fiber}.
396:             */
397:            @Deprecated
398:            public void run() {
399:                assert !synchronous;
400:                next = doRun(next);
401:                completionCheck();
402:            }
403:
404:            /**
405:             * Runs a given {@link Tube} (and everything thereafter) synchronously.
406:             *
407:             * <p>
408:             * This method blocks and returns only when all the successive {@link Tube}s
409:             * complete their request/response processing. This method can be used
410:             * if a {@link Tube} needs to fallback to synchronous processing.
411:             *
412:             * <h3>Example:</h3>
413:             * <pre>
414:             * class FooTube extends {@link AbstractFilterTubeImpl} {
415:             *   NextAction processRequest(Packet request) {
416:             *     // run everything synchronously and return with the response packet
417:             *     return doReturnWith(Fiber.current().runSync(next,request));
418:             *   }
419:             *   NextAction processResponse(Packet response) {
420:             *     // never be invoked
421:             *   }
422:             * }
423:             * </pre>
424:             *
425:             * @param tubeline
426:             *      The first tube of the tubeline that will act on the packet.
427:             * @param request
428:             *      The request packet to be passed to <tt>startPoint.processRequest()</tt>.
429:             * @return
430:             *      The response packet to the <tt>request</tt>.
431:             *
432:             * @see #start(Tube, Packet, CompletionCallback)
433:             */
434:            public synchronized @NotNull
435:            Packet runSync(@NotNull
436:            Tube tubeline, @NotNull
437:            Packet request) {
438:                // save the current continuation, so that we return runSync() without executing them.
439:                final Tube[] oldCont = conts;
440:                final int oldContSize = contsSize;
441:                final boolean oldSynchronous = synchronous;
442:
443:                if (oldContSize > 0) {
444:                    conts = new Tube[16];
445:                    contsSize = 0;
446:                }
447:
448:                try {
449:                    synchronous = true;
450:                    this .packet = request;
451:                    doRun(tubeline);
452:                    if (throwable != null) {
453:                        if (throwable instanceof  RuntimeException) {
454:                            throw (RuntimeException) throwable;
455:                        }
456:                        if (throwable instanceof  Error) {
457:                            throw (Error) throwable;
458:                        }
459:                        // our system is supposed to only accept Error or RuntimeException
460:                        throw new AssertionError(throwable);
461:                    }
462:                    return this .packet;
463:                } finally {
464:                    conts = oldCont;
465:                    contsSize = oldContSize;
466:                    synchronous = oldSynchronous;
467:                    if (interrupted) {
468:                        Thread.currentThread().interrupt();
469:                        interrupted = false;
470:                    }
471:                    if (!started)
472:                        completionCheck();
473:                }
474:            }
475:
476:            private synchronized void completionCheck() {
477:                if (contsSize == 0) {
478:                    if (isTraceEnabled())
479:                        LOGGER.fine(getName() + " completed");
480:                    completed = true;
481:                    notifyAll();
482:                    if (completionCallback != null) {
483:                        if (throwable != null)
484:                            completionCallback.onCompletion(throwable);
485:                        else
486:                            completionCallback.onCompletion(packet);
487:                    }
488:                }
489:            }
490:
491:            ///**
492:            // * Blocks until the fiber completes.
493:            // */
494:            //public synchronized void join() throws InterruptedException {
495:            //    while(!completed)
496:            //        wait();
497:            //}
498:
499:            /**
500:             * Invokes all registered {@link InterceptorHandler}s and then call into
501:             * {@link Fiber#__doRun(Tube)}.
502:             */
503:            private class InterceptorHandler implements 
504:                    FiberContextSwitchInterceptor.Work<Tube, Tube> {
505:                /**
506:                 * Index in {@link Fiber#interceptors} to invoke next.
507:                 */
508:                private int idx;
509:
510:                /**
511:                 * Initiate the interception, and eventually invokes {@link Fiber#__doRun(Tube)}.
512:                 */
513:                Tube invoke(Tube next) {
514:                    idx = 0;
515:                    return execute(next);
516:                }
517:
518:                public Tube execute(Tube next) {
519:                    if (idx == interceptors.size()) {
520:                        return __doRun(next);
521:                    } else {
522:                        FiberContextSwitchInterceptor interceptor = interceptors
523:                                .get(idx++);
524:                        return interceptor.execute(Fiber.this , next, this );
525:                    }
526:                }
527:            }
528:
529:            /**
530:             * Executes the fiber as much as possible.
531:             *
532:             * @param next
533:             *      The next tube whose {@link Tube#processRequest(Packet)} is to be invoked. If null,
534:             *      that means we'll just call {@link Tube#processResponse(Packet)} on the continuation.
535:             *
536:             * @return
537:             *      If non-null, the next time execution resumes, it should resume from calling
538:             *      the {@link Tube#processRequest(Packet)}. Otherwise it means just finishing up
539:             *      the continuation.
540:             */
541:            @SuppressWarnings({"LoopStatementThatDoesntLoop"})
542:            // IntelliJ reports this bogus error
543:            private Tube doRun(Tube next) {
544:                Thread currentThread = Thread.currentThread();
545:
546:                if (isTraceEnabled())
547:                    LOGGER.fine(getName() + " running by "
548:                            + currentThread.getName());
549:
550:                if (serializeExecution) {
551:                    serializedExecutionLock.lock();
552:                    try {
553:                        return _doRun(next);
554:                    } finally {
555:                        serializedExecutionLock.unlock();
556:                    }
557:                } else {
558:                    return _doRun(next);
559:                }
560:            }
561:
562:            private Tube _doRun(Tube next) {
563:                Thread currentThread = Thread.currentThread();
564:
565:                ClassLoader old = currentThread.getContextClassLoader();
566:                currentThread.setContextClassLoader(contextClassLoader);
567:                try {
568:                    do {
569:                        needsToReenter = false;
570:
571:                        // if interceptors are set, go through the interceptors.
572:                        if (interceptorHandler == null)
573:                            next = __doRun(next);
574:                        else
575:                            next = interceptorHandler.invoke(next);
576:                    } while (needsToReenter);
577:
578:                    return next;
579:                } finally {
580:                    currentThread.setContextClassLoader(old);
581:                }
582:            }
583:
584:            /**
585:             * To be invoked from {@link #doRun(Tube)}.
586:             *
587:             * @see #doRun(Tube)
588:             */
589:            private Tube __doRun(Tube next) {
590:                final Fiber old = CURRENT_FIBER.get();
591:                CURRENT_FIBER.set(this );
592:
593:                // if true, lots of debug messages to show what's being executed
594:                final boolean traceEnabled = LOGGER.isLoggable(Level.FINER);
595:
596:                try {
597:                    while (!isBlocking() && !needsToReenter) {
598:                        try {
599:                            NextAction na;
600:                            Tube last;
601:                            if (throwable != null) {
602:                                if (contsSize == 0) {
603:                                    // nothing else to execute. we are done.
604:                                    return null;
605:                                }
606:                                last = popCont();
607:                                if (traceEnabled)
608:                                    LOGGER.finer(getName() + ' ' + last
609:                                            + ".processException(" + throwable
610:                                            + ')');
611:                                na = last.processException(throwable);
612:                            } else {
613:                                if (next != null) {
614:                                    if (traceEnabled)
615:                                        LOGGER.finer(getName() + ' ' + next
616:                                                + ".processRequest(" + packet
617:                                                + ')');
618:                                    na = next.processRequest(packet);
619:                                    last = next;
620:                                } else {
621:                                    if (contsSize == 0) {
622:                                        // nothing else to execute. we are done.
623:                                        return null;
624:                                    }
625:                                    last = popCont();
626:                                    if (traceEnabled)
627:                                        LOGGER.finer(getName() + ' ' + last
628:                                                + ".processResponse(" + packet
629:                                                + ')');
630:                                    na = last.processResponse(packet);
631:                                }
632:                            }
633:
634:                            if (traceEnabled)
635:                                LOGGER.finer(getName() + ' ' + last
636:                                        + " returned with " + na);
637:
638:                            // If resume is called before suspend, then make sure
639:                            // resume(Packet) is not lost
640:                            if (na.kind != NextAction.SUSPEND) {
641:                                packet = na.packet;
642:                                throwable = na.throwable;
643:                            }
644:
645:                            switch (na.kind) {
646:                            case NextAction.INVOKE:
647:                                pushCont(last);
648:                                // fall through next
649:                            case NextAction.INVOKE_AND_FORGET:
650:                                next = na.next;
651:                                break;
652:                            case NextAction.RETURN:
653:                            case NextAction.THROW:
654:                                next = null;
655:                                break;
656:                            case NextAction.SUSPEND:
657:                                pushCont(last);
658:                                next = null;
659:                                suspend();
660:                                break;
661:                            default:
662:                                throw new AssertionError();
663:                            }
664:                        } catch (RuntimeException t) {
665:                            if (traceEnabled)
666:                                LOGGER.log(Level.FINER, getName() + " Caught "
667:                                        + t + ". Start stack unwinding", t);
668:                            throwable = t;
669:                        } catch (Error t) {
670:                            if (traceEnabled)
671:                                LOGGER.log(Level.FINER, getName() + " Caught "
672:                                        + t + ". Start stack unwinding", t);
673:                            throwable = t;
674:                        }
675:                    }
676:                    // there's nothing we can execute right away.
677:                    // we'll be back when this fiber is resumed.
678:                    return next;
679:                } finally {
680:                    CURRENT_FIBER.set(old);
681:                }
682:            }
683:
684:            private void pushCont(Tube tube) {
685:                conts[contsSize++] = tube;
686:
687:                // expand if needed
688:                int len = conts.length;
689:                if (contsSize == len) {
690:                    Tube[] newBuf = new Tube[len * 2];
691:                    System.arraycopy(conts, 0, newBuf, 0, len);
692:                    conts = newBuf;
693:                }
694:            }
695:
696:            private Tube popCont() {
697:                return conts[--contsSize];
698:            }
699:
700:            /**
701:             * Returns true if the fiber needs to block its execution.
702:             */
703:            // TODO: synchronization on synchronous case is wrong.
704:            private boolean isBlocking() {
705:                if (synchronous) {
706:                    while (suspendedCount == 1)
707:                        try {
708:                            if (isTraceEnabled()) {
709:                                LOGGER.fine(getName() + " is blocking thread "
710:                                        + Thread.currentThread().getName());
711:                            }
712:                            wait(); // the synchronized block is the whole runSync method.
713:                        } catch (InterruptedException e) {
714:                            // remember that we are interrupted, but don't respond to it
715:                            // right away. This behavior is in line with what happens
716:                            // when you are actually running the whole thing synchronously.
717:                            interrupted = true;
718:                        }
719:                    return false;
720:                } else
721:                    return suspendedCount == 1;
722:            }
723:
724:            private String getName() {
725:                return "engine-" + owner.id + "fiber-" + id;
726:            }
727:
728:            public String toString() {
729:                return getName();
730:            }
731:
732:            /**
733:             * Gets the current {@link Packet} associated with this fiber.
734:             *
735:             * <p>
736:             * This method returns null if no packet has been associated with the fiber yet.
737:             */
738:            public @Nullable
739:            Packet getPacket() {
740:                return packet;
741:            }
742:
743:            /**
744:             * Returns true if this fiber is still running or suspended.
745:             */
746:            public boolean isAlive() {
747:                return !completed;
748:            }
749:
750:            /**
751:             * (ADVANCED) Returns true if the current fiber is being executed synchronously.
752:             *
753:             * <p>
754:             * Fiber may run synchronously for various reasons. Perhaps this is
755:             * on client side and application has invoked a synchronous method call.
756:             * Perhaps this is on server side and we have deployed on a synchronous
757:             * transport (like servlet.)
758:             *
759:             * <p>
760:             * When a fiber is run synchronously (IOW by {@link #runSync(Tube, Packet)}),
761:             * further invocations to {@link #runSync(Tube, Packet)} can be done
762:             * without degrading the performance.
763:             *
764:             * <p>
765:             * So this value can be used as a further optimization hint for
766:             * advanced {@link Tube}s to choose the best strategy to invoke
767:             * the next {@link Tube}. For example, a tube may want to install
768:             * a {@link FiberContextSwitchInterceptor} if running async, yet
769:             * it might find it faster to do {@link #runSync(Tube, Packet)}
770:             * if it's already running synchronously.
771:             */
772:            public static boolean isSynchronous() {
773:                return current().synchronous;
774:            }
775:
776:            /**
777:             * Gets the current fiber that's running.
778:             *
779:             * <p>
780:             * This works like {@link Thread#currentThread()}.
781:             * This method only works when invoked from {@link Tube}.
782:             */
783:            public static @NotNull
784:            Fiber current() {
785:                Fiber fiber = CURRENT_FIBER.get();
786:                if (fiber == null)
787:                    throw new IllegalStateException(
788:                            "Can be only used from fibers");
789:                return fiber;
790:            }
791:
792:            private static final ThreadLocal<Fiber> CURRENT_FIBER = new ThreadLocal<Fiber>();
793:
794:            /**
795:             * Used to allocate unique number for each fiber.
796:             */
797:            private static final AtomicInteger iotaGen = new AtomicInteger();
798:
799:            private static boolean isTraceEnabled() {
800:                return LOGGER.isLoggable(Level.FINE);
801:            }
802:
803:            private static final Logger LOGGER = Logger.getLogger(Fiber.class
804:                    .getName());
805:
806:            private static final ReentrantLock serializedExecutionLock = new ReentrantLock();
807:
808:            /**
809:             * Set this boolean to true to execute fibers sequentially one by one.
810:             * See class javadoc.
811:             */
812:            public static volatile boolean serializeExecution = Boolean
813:                    .getBoolean(Fiber.class.getName() + ".serialize");
814:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.