001: /*
002: * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
003: *
004: * Copyright 1997-2007 Sun Microsystems, Inc. All rights reserved.
005: *
006: * The contents of this file are subject to the terms of either the GNU
007: * General Public License Version 2 only ("GPL") or the Common Development
008: * and Distribution License("CDDL") (collectively, the "License"). You
009: * may not use this file except in compliance with the License. You can obtain
010: * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
011: * or glassfish/bootstrap/legal/LICENSE.txt. See the License for the specific
012: * language governing permissions and limitations under the License.
013: *
014: * When distributing the software, include this License Header Notice in each
015: * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
016: * Sun designates this particular file as subject to the "Classpath" exception
017: * as provided by Sun in the GPL Version 2 section of the License file that
018: * accompanied this code. If applicable, add the following below the License
019: * Header, with the fields enclosed by brackets [] replaced by your own
020: * identifying information: "Portions Copyrighted [year]
021: * [name of copyright owner]"
022: *
023: * Contributor(s):
024: *
025: * If you wish your version of this file to be governed by only the CDDL or
026: * only the GPL Version 2, indicate your decision by adding "[Contributor]
027: * elects to include this software in this distribution under the [CDDL or GPL
028: * Version 2] license." If you don't indicate a single choice of license, a
029: * recipient has the option to distribute your version of this file under
030: * either the CDDL, the GPL Version 2 or to extend the choice of license to
031: * its licensees as provided above. However, if you add GPL Version 2 code
032: * and therefore, elected the GPL Version 2 license, then the option applies
033: * only if the new code is made subject to such option by the copyright
034: * holder.
035: */
036:
037: package com.sun.xml.ws.api.pipe;
038:
039: import com.sun.istack.NotNull;
040: import com.sun.istack.Nullable;
041: import com.sun.xml.ws.api.message.Packet;
042: import com.sun.xml.ws.api.pipe.helper.AbstractFilterTubeImpl;
043: import com.sun.xml.ws.api.server.Adapter;
044:
045: import java.util.ArrayList;
046: import java.util.List;
047: import java.util.concurrent.atomic.AtomicInteger;
048: import java.util.concurrent.locks.ReentrantLock;
049: import java.util.logging.Level;
050: import java.util.logging.Logger;
051:
052: /**
053: * User-level thread. Represents the execution of one request/response processing.
054: *
055: * <p>
056: * JAX-WS RI is capable of running a large number of request/response concurrently by
057: * using a relatively small number of threads. This is made possible by utilizing
058: * a {@link Fiber} — a user-level thread that gets created for each request/response
059: * processing.
060: *
061: * <p>
062: * A fiber remembers where in the pipeline the processing is at, what needs to be
063: * executed on the way out (when processing response), and other additional information
064: * specific to the execution of a particular request/response.
065: *
066: * <h2>Suspend/Resume</h2>
067: * <p>
068: * Fiber can be {@link NextAction#suspend() suspended} by a {@link Tube}.
069: * When a fiber is suspended, it will be kept on the side until it is
070: * {@link #resume(Packet) resumed}. This allows threads to go execute
071: * other runnable fibers, allowing efficient utilization of smaller number of
072: * threads.
073: *
074: * <h2>Context-switch Interception</h2>
075: * <p>
076: * {@link FiberContextSwitchInterceptor} allows {@link Tube}s and {@link Adapter}s
077: * to perform additional processing every time a thread starts running a fiber
078: * and stops running it.
079: *
080: * <h2>Context ClassLoader</h2>
081: * <p>
082: * Just like thread, a fiber has a context class loader (CCL.) A fiber's CCL
083: * becomes the thread's CCL when it's executing the fiber. The original CCL
084: * of the thread will be restored when the thread leaves the fiber execution.
085: *
086: *
087: * <h2>Debugging Aid</h2>
088: * <p>
089: * Because {@link Fiber} doesn't keep much in the call stack, and instead use
090: * {@link #conts} to store the continuation, debugging fiber related activities
091: * could be harder.
092: *
093: * <p>
094: * Setting the {@link #LOGGER} for FINE would give you basic start/stop/resume/suspend
095: * level logging. Using FINER would cause more detailed logging, which includes
096: * what tubes are executed in what order and how they behaved.
097: *
098: * <p>
099: * When you debug the server side, consider setting {@link Fiber#serializeExecution}
100: * to true, so that execution of fibers are serialized. Debugging a server
101: * with more than one running threads is very tricky, and this switch will
102: * prevent that. This can be also enabled by setting the system property on.
103: * See the source code.
104: *
105: * @author Kohsuke Kawaguchi
106: * @author Jitendra Kotamraju
107: */
108: public final class Fiber implements Runnable {
109: /**
110: * {@link Tube}s whose {@link Tube#processResponse(Packet)} method needs
111: * to be invoked on the way back.
112: */
113: private Tube[] conts = new Tube[16];
114: private int contsSize;
115:
116: /**
117: * If this field is non-null, the next instruction to execute is
118: * to call its {@link Tube#processRequest(Packet)}. Otherwise
119: * the instruction is to call {@link #conts}.
120: */
121: private Tube next;
122:
123: private Packet packet;
124:
125: private Throwable/*but really it's either RuntimeException or Error*/throwable;
126:
127: public final Engine owner;
128:
129: /**
130: * Is this thread suspended? 0=not suspended, 1=suspended.
131: *
132: * <p>
133: * Logically this is just a boolean, but we need to prepare for the case
134: * where the thread is {@link #resume(Packet) resumed} before we get to the {@link #suspend()}.
135: * This happens when things happen in the following order:
136: *
137: * <ol>
138: * <li>Tube decides that the fiber needs to be suspended to wait for the external event.
139: * <li>Tube hooks up fiber with some external mechanism (like NIO channel selector)
140: * <li>Tube returns with {@link NextAction#suspend()}.
141: * <li>"External mechanism" becomes signal state and invokes {@link Fiber#resume(Packet)}
142: * to wake up fiber
143: * <li>{@link Fiber#doRun} invokes {@link Fiber#suspend()}.
144: * </ol>
145: *
146: * <p>
147: * Using int, this will work OK because {@link #suspendedCount} becomes -1 when
148: * {@link #resume(Packet)} occurs before {@link #suspend()}.
149: *
150: * <p>
151: * Increment and decrement is guarded by 'this' object.
152: */
153: private volatile int suspendedCount = 0;
154:
155: /**
156: * Is this fiber completed?
157: */
158: private volatile boolean completed;
159:
160: /**
161: * Is this {@link Fiber} currently running in the synchronous mode?
162: */
163: private boolean synchronous;
164:
165: private boolean interrupted;
166:
167: private final int id;
168:
169: /**
170: * Active {@link FiberContextSwitchInterceptor}s for this fiber.
171: */
172: private List<FiberContextSwitchInterceptor> interceptors;
173:
174: /**
175: * Not null when {@link #interceptors} is not null.
176: */
177: private InterceptorHandler interceptorHandler;
178:
179: /**
180: * This flag is set to true when a new interceptor is added.
181: *
182: * When that happens, we need to first exit the current interceptors
183: * and then reenter them, so that the newly added interceptors start
184: * taking effect. This flag is used to control that flow.
185: */
186: private boolean needsToReenter;
187:
188: /**
189: * Fiber's context {@link ClassLoader}.
190: */
191: private @Nullable
192: ClassLoader contextClassLoader;
193:
194: private @Nullable
195: CompletionCallback completionCallback;
196:
197: /**
198: * Set to true if this fiber is started asynchronously, to avoid
199: * doubly-invoking completion code.
200: */
201: private boolean started;
202:
203: /**
204: * Callback to be invoked when a {@link Fiber} finishs execution.
205: */
206: public interface CompletionCallback {
207: /**
208: * Indicates that the fiber has finished its execution.
209: *
210: * <p>
211: * Since the JAX-WS RI runs asynchronously,
212: * this method maybe invoked by a different thread
213: * than any of the threads that started it or run a part of tubeline.
214: */
215: void onCompletion(@NotNull
216: Packet response);
217:
218: /**
219: * Indicates that the fiber has finished abnormally, by throwing a given {@link Throwable}.
220: */
221: void onCompletion(@NotNull
222: Throwable error);
223: }
224:
225: Fiber(Engine engine) {
226: this .owner = engine;
227: if (isTraceEnabled()) {
228: id = iotaGen.incrementAndGet();
229: LOGGER.fine(getName() + " created");
230: } else {
231: id = -1;
232: }
233:
234: // if this is run from another fiber, then we naturally inherit its context classloader,
235: // so this code works for fiber->fiber inheritance just fine.
236: contextClassLoader = Thread.currentThread()
237: .getContextClassLoader();
238: }
239:
240: /**
241: * Starts the execution of this fiber asynchronously.
242: *
243: * <p>
244: * This method works like {@link Thread#start()}.
245: *
246: * @param tubeline
247: * The first tube of the tubeline that will act on the packet.
248: * @param request
249: * The request packet to be passed to <tt>startPoint.processRequest()</tt>.
250: * @param completionCallback
251: * The callback to be invoked when the processing is finished and the
252: * final response packet is available.
253: *
254: * @see #runSync(Tube,Packet)
255: */
256: public void start(@NotNull
257: Tube tubeline, @NotNull
258: Packet request, @Nullable
259: CompletionCallback completionCallback) {
260: next = tubeline;
261: this .packet = request;
262: this .completionCallback = completionCallback;
263: this .started = true;
264: owner.addRunnable(this );
265: }
266:
267: /**
268: * Wakes up a suspended fiber.
269: *
270: * <p>
271: * If a fiber was suspended from the {@link Tube#processRequest(Packet)} method,
272: * then the execution will be resumed from the corresponding
273: * {@link Tube#processResponse(Packet)} method with the specified response packet
274: * as the parameter.
275: *
276: * <p>
277: * If a fiber was suspended from the {@link Tube#processResponse(Packet)} method,
278: * then the execution will be resumed from the next tube's
279: * {@link Tube#processResponse(Packet)} method with the specified response packet
280: * as the parameter.
281: *
282: * <p>
283: * This method is implemented in a race-free way. Another thread can invoke
284: * this method even before this fiber goes into the suspension mode. So the caller
285: * need not worry about synchronizing {@link NextAction#suspend()} and this method.
286: */
287: public synchronized void resume(@NotNull
288: Packet response) {
289: if (isTraceEnabled())
290: LOGGER.fine(getName() + " resumed");
291: packet = response;
292: if (--suspendedCount == 0) {
293: if (synchronous) {
294: notifyAll();
295: } else {
296: owner.addRunnable(this );
297: }
298: }
299: }
300:
301: /**
302: * Suspends this fiber's execution until the resume method is invoked.
303: *
304: * The call returns immediately, and when the fiber is resumed
305: * the execution picks up from the last scheduled continuation.
306: */
307: private synchronized void suspend() {
308: if (isTraceEnabled())
309: LOGGER.fine(getName() + " suspended");
310: suspendedCount++;
311: }
312:
313: /**
314: * Adds a new {@link FiberContextSwitchInterceptor} to this fiber.
315: *
316: * <p>
317: * The newly installed fiber will take effect immediately after the current
318: * tube returns from its {@link Tube#processRequest(Packet)} or
319: * {@link Tube#processResponse(Packet)}, before the next tube begins processing.
320: *
321: * <p>
322: * So when the tubeline consists of X and Y, and when X installs an interceptor,
323: * the order of execution will be as follows:
324: *
325: * <ol>
326: * <li>X.processRequest()
327: * <li>interceptor gets installed
328: * <li>interceptor.execute() is invoked
329: * <li>Y.processRequest()
330: * </ol>
331: */
332: public void addInterceptor(@NotNull
333: FiberContextSwitchInterceptor interceptor) {
334: if (interceptors == null) {
335: interceptors = new ArrayList<FiberContextSwitchInterceptor>();
336: interceptorHandler = new InterceptorHandler();
337: }
338: interceptors.add(interceptor);
339: needsToReenter = true;
340: }
341:
342: /**
343: * Removes a {@link FiberContextSwitchInterceptor} from this fiber.
344: *
345: * <p>
346: * The removal of the interceptor takes effect immediately after the current
347: * tube returns from its {@link Tube#processRequest(Packet)} or
348: * {@link Tube#processResponse(Packet)}, before the next tube begins processing.
349: *
350: *
351: * <p>
352: * So when the tubeline consists of X and Y, and when Y uninstalls an interceptor
353: * on the way out, then the order of execution will be as follows:
354: *
355: * <ol>
356: * <li>Y.processResponse() (notice that this happens with interceptor.execute() in the callstack)
357: * <li>interceptor gets uninstalled
358: * <li>interceptor.execute() returns
359: * <li>X.processResponse()
360: * </ol>
361: *
362: * @return
363: * true if the specified interceptor was removed. False if
364: * the specified interceptor was not registered with this fiber to begin with.
365: */
366: public boolean removeInterceptor(@NotNull
367: FiberContextSwitchInterceptor interceptor) {
368: if (interceptors != null && interceptors.remove(interceptor)) {
369: needsToReenter = true;
370: return true;
371: }
372: return false;
373: }
374:
375: /**
376: * Gets the context {@link ClassLoader} of this fiber.
377: */
378: public @Nullable
379: ClassLoader getContextClassLoader() {
380: return contextClassLoader;
381: }
382:
383: /**
384: * Sets the context {@link ClassLoader} of this fiber.
385: */
386: public ClassLoader setContextClassLoader(@Nullable
387: ClassLoader contextClassLoader) {
388: ClassLoader r = this .contextClassLoader;
389: this .contextClassLoader = contextClassLoader;
390: return r;
391: }
392:
393: /**
394: * DO NOT CALL THIS METHOD. This is an implementation detail
395: * of {@link Fiber}.
396: */
397: @Deprecated
398: public void run() {
399: assert !synchronous;
400: next = doRun(next);
401: completionCheck();
402: }
403:
404: /**
405: * Runs a given {@link Tube} (and everything thereafter) synchronously.
406: *
407: * <p>
408: * This method blocks and returns only when all the successive {@link Tube}s
409: * complete their request/response processing. This method can be used
410: * if a {@link Tube} needs to fallback to synchronous processing.
411: *
412: * <h3>Example:</h3>
413: * <pre>
414: * class FooTube extends {@link AbstractFilterTubeImpl} {
415: * NextAction processRequest(Packet request) {
416: * // run everything synchronously and return with the response packet
417: * return doReturnWith(Fiber.current().runSync(next,request));
418: * }
419: * NextAction processResponse(Packet response) {
420: * // never be invoked
421: * }
422: * }
423: * </pre>
424: *
425: * @param tubeline
426: * The first tube of the tubeline that will act on the packet.
427: * @param request
428: * The request packet to be passed to <tt>startPoint.processRequest()</tt>.
429: * @return
430: * The response packet to the <tt>request</tt>.
431: *
432: * @see #start(Tube, Packet, CompletionCallback)
433: */
434: public synchronized @NotNull
435: Packet runSync(@NotNull
436: Tube tubeline, @NotNull
437: Packet request) {
438: // save the current continuation, so that we return runSync() without executing them.
439: final Tube[] oldCont = conts;
440: final int oldContSize = contsSize;
441: final boolean oldSynchronous = synchronous;
442:
443: if (oldContSize > 0) {
444: conts = new Tube[16];
445: contsSize = 0;
446: }
447:
448: try {
449: synchronous = true;
450: this .packet = request;
451: doRun(tubeline);
452: if (throwable != null) {
453: if (throwable instanceof RuntimeException) {
454: throw (RuntimeException) throwable;
455: }
456: if (throwable instanceof Error) {
457: throw (Error) throwable;
458: }
459: // our system is supposed to only accept Error or RuntimeException
460: throw new AssertionError(throwable);
461: }
462: return this .packet;
463: } finally {
464: conts = oldCont;
465: contsSize = oldContSize;
466: synchronous = oldSynchronous;
467: if (interrupted) {
468: Thread.currentThread().interrupt();
469: interrupted = false;
470: }
471: if (!started)
472: completionCheck();
473: }
474: }
475:
476: private synchronized void completionCheck() {
477: if (contsSize == 0) {
478: if (isTraceEnabled())
479: LOGGER.fine(getName() + " completed");
480: completed = true;
481: notifyAll();
482: if (completionCallback != null) {
483: if (throwable != null)
484: completionCallback.onCompletion(throwable);
485: else
486: completionCallback.onCompletion(packet);
487: }
488: }
489: }
490:
491: ///**
492: // * Blocks until the fiber completes.
493: // */
494: //public synchronized void join() throws InterruptedException {
495: // while(!completed)
496: // wait();
497: //}
498:
499: /**
500: * Invokes all registered {@link InterceptorHandler}s and then call into
501: * {@link Fiber#__doRun(Tube)}.
502: */
503: private class InterceptorHandler implements
504: FiberContextSwitchInterceptor.Work<Tube, Tube> {
505: /**
506: * Index in {@link Fiber#interceptors} to invoke next.
507: */
508: private int idx;
509:
510: /**
511: * Initiate the interception, and eventually invokes {@link Fiber#__doRun(Tube)}.
512: */
513: Tube invoke(Tube next) {
514: idx = 0;
515: return execute(next);
516: }
517:
518: public Tube execute(Tube next) {
519: if (idx == interceptors.size()) {
520: return __doRun(next);
521: } else {
522: FiberContextSwitchInterceptor interceptor = interceptors
523: .get(idx++);
524: return interceptor.execute(Fiber.this , next, this );
525: }
526: }
527: }
528:
529: /**
530: * Executes the fiber as much as possible.
531: *
532: * @param next
533: * The next tube whose {@link Tube#processRequest(Packet)} is to be invoked. If null,
534: * that means we'll just call {@link Tube#processResponse(Packet)} on the continuation.
535: *
536: * @return
537: * If non-null, the next time execution resumes, it should resume from calling
538: * the {@link Tube#processRequest(Packet)}. Otherwise it means just finishing up
539: * the continuation.
540: */
541: @SuppressWarnings({"LoopStatementThatDoesntLoop"})
542: // IntelliJ reports this bogus error
543: private Tube doRun(Tube next) {
544: Thread currentThread = Thread.currentThread();
545:
546: if (isTraceEnabled())
547: LOGGER.fine(getName() + " running by "
548: + currentThread.getName());
549:
550: if (serializeExecution) {
551: serializedExecutionLock.lock();
552: try {
553: return _doRun(next);
554: } finally {
555: serializedExecutionLock.unlock();
556: }
557: } else {
558: return _doRun(next);
559: }
560: }
561:
562: private Tube _doRun(Tube next) {
563: Thread currentThread = Thread.currentThread();
564:
565: ClassLoader old = currentThread.getContextClassLoader();
566: currentThread.setContextClassLoader(contextClassLoader);
567: try {
568: do {
569: needsToReenter = false;
570:
571: // if interceptors are set, go through the interceptors.
572: if (interceptorHandler == null)
573: next = __doRun(next);
574: else
575: next = interceptorHandler.invoke(next);
576: } while (needsToReenter);
577:
578: return next;
579: } finally {
580: currentThread.setContextClassLoader(old);
581: }
582: }
583:
584: /**
585: * To be invoked from {@link #doRun(Tube)}.
586: *
587: * @see #doRun(Tube)
588: */
589: private Tube __doRun(Tube next) {
590: final Fiber old = CURRENT_FIBER.get();
591: CURRENT_FIBER.set(this );
592:
593: // if true, lots of debug messages to show what's being executed
594: final boolean traceEnabled = LOGGER.isLoggable(Level.FINER);
595:
596: try {
597: while (!isBlocking() && !needsToReenter) {
598: try {
599: NextAction na;
600: Tube last;
601: if (throwable != null) {
602: if (contsSize == 0) {
603: // nothing else to execute. we are done.
604: return null;
605: }
606: last = popCont();
607: if (traceEnabled)
608: LOGGER.finer(getName() + ' ' + last
609: + ".processException(" + throwable
610: + ')');
611: na = last.processException(throwable);
612: } else {
613: if (next != null) {
614: if (traceEnabled)
615: LOGGER.finer(getName() + ' ' + next
616: + ".processRequest(" + packet
617: + ')');
618: na = next.processRequest(packet);
619: last = next;
620: } else {
621: if (contsSize == 0) {
622: // nothing else to execute. we are done.
623: return null;
624: }
625: last = popCont();
626: if (traceEnabled)
627: LOGGER.finer(getName() + ' ' + last
628: + ".processResponse(" + packet
629: + ')');
630: na = last.processResponse(packet);
631: }
632: }
633:
634: if (traceEnabled)
635: LOGGER.finer(getName() + ' ' + last
636: + " returned with " + na);
637:
638: // If resume is called before suspend, then make sure
639: // resume(Packet) is not lost
640: if (na.kind != NextAction.SUSPEND) {
641: packet = na.packet;
642: throwable = na.throwable;
643: }
644:
645: switch (na.kind) {
646: case NextAction.INVOKE:
647: pushCont(last);
648: // fall through next
649: case NextAction.INVOKE_AND_FORGET:
650: next = na.next;
651: break;
652: case NextAction.RETURN:
653: case NextAction.THROW:
654: next = null;
655: break;
656: case NextAction.SUSPEND:
657: pushCont(last);
658: next = null;
659: suspend();
660: break;
661: default:
662: throw new AssertionError();
663: }
664: } catch (RuntimeException t) {
665: if (traceEnabled)
666: LOGGER.log(Level.FINER, getName() + " Caught "
667: + t + ". Start stack unwinding", t);
668: throwable = t;
669: } catch (Error t) {
670: if (traceEnabled)
671: LOGGER.log(Level.FINER, getName() + " Caught "
672: + t + ". Start stack unwinding", t);
673: throwable = t;
674: }
675: }
676: // there's nothing we can execute right away.
677: // we'll be back when this fiber is resumed.
678: return next;
679: } finally {
680: CURRENT_FIBER.set(old);
681: }
682: }
683:
684: private void pushCont(Tube tube) {
685: conts[contsSize++] = tube;
686:
687: // expand if needed
688: int len = conts.length;
689: if (contsSize == len) {
690: Tube[] newBuf = new Tube[len * 2];
691: System.arraycopy(conts, 0, newBuf, 0, len);
692: conts = newBuf;
693: }
694: }
695:
696: private Tube popCont() {
697: return conts[--contsSize];
698: }
699:
700: /**
701: * Returns true if the fiber needs to block its execution.
702: */
703: // TODO: synchronization on synchronous case is wrong.
704: private boolean isBlocking() {
705: if (synchronous) {
706: while (suspendedCount == 1)
707: try {
708: if (isTraceEnabled()) {
709: LOGGER.fine(getName() + " is blocking thread "
710: + Thread.currentThread().getName());
711: }
712: wait(); // the synchronized block is the whole runSync method.
713: } catch (InterruptedException e) {
714: // remember that we are interrupted, but don't respond to it
715: // right away. This behavior is in line with what happens
716: // when you are actually running the whole thing synchronously.
717: interrupted = true;
718: }
719: return false;
720: } else
721: return suspendedCount == 1;
722: }
723:
724: private String getName() {
725: return "engine-" + owner.id + "fiber-" + id;
726: }
727:
728: public String toString() {
729: return getName();
730: }
731:
732: /**
733: * Gets the current {@link Packet} associated with this fiber.
734: *
735: * <p>
736: * This method returns null if no packet has been associated with the fiber yet.
737: */
738: public @Nullable
739: Packet getPacket() {
740: return packet;
741: }
742:
743: /**
744: * Returns true if this fiber is still running or suspended.
745: */
746: public boolean isAlive() {
747: return !completed;
748: }
749:
750: /**
751: * (ADVANCED) Returns true if the current fiber is being executed synchronously.
752: *
753: * <p>
754: * Fiber may run synchronously for various reasons. Perhaps this is
755: * on client side and application has invoked a synchronous method call.
756: * Perhaps this is on server side and we have deployed on a synchronous
757: * transport (like servlet.)
758: *
759: * <p>
760: * When a fiber is run synchronously (IOW by {@link #runSync(Tube, Packet)}),
761: * further invocations to {@link #runSync(Tube, Packet)} can be done
762: * without degrading the performance.
763: *
764: * <p>
765: * So this value can be used as a further optimization hint for
766: * advanced {@link Tube}s to choose the best strategy to invoke
767: * the next {@link Tube}. For example, a tube may want to install
768: * a {@link FiberContextSwitchInterceptor} if running async, yet
769: * it might find it faster to do {@link #runSync(Tube, Packet)}
770: * if it's already running synchronously.
771: */
772: public static boolean isSynchronous() {
773: return current().synchronous;
774: }
775:
776: /**
777: * Gets the current fiber that's running.
778: *
779: * <p>
780: * This works like {@link Thread#currentThread()}.
781: * This method only works when invoked from {@link Tube}.
782: */
783: public static @NotNull
784: Fiber current() {
785: Fiber fiber = CURRENT_FIBER.get();
786: if (fiber == null)
787: throw new IllegalStateException(
788: "Can be only used from fibers");
789: return fiber;
790: }
791:
792: private static final ThreadLocal<Fiber> CURRENT_FIBER = new ThreadLocal<Fiber>();
793:
794: /**
795: * Used to allocate unique number for each fiber.
796: */
797: private static final AtomicInteger iotaGen = new AtomicInteger();
798:
799: private static boolean isTraceEnabled() {
800: return LOGGER.isLoggable(Level.FINE);
801: }
802:
803: private static final Logger LOGGER = Logger.getLogger(Fiber.class
804: .getName());
805:
806: private static final ReentrantLock serializedExecutionLock = new ReentrantLock();
807:
808: /**
809: * Set this boolean to true to execute fibers sequentially one by one.
810: * See class javadoc.
811: */
812: public static volatile boolean serializeExecution = Boolean
813: .getBoolean(Fiber.class.getName() + ".serialize");
814: }
|