001: /*
002: * Copyright 2001-2007 Geert Bevin <gbevin[remove] at uwyn dot com>
003: * Distributed under the terms of either:
004: * - the common development and distribution license (CDDL), v1.0; or
005: * - the GNU Lesser General Public License, v2.1 or later
006: * $Id: ContinuationManager.java 3812 2007-06-25 17:04:34Z gbevin $
007: */
008: package com.uwyn.rife.continuations;
009:
010: import java.util.*;
011:
012: import com.uwyn.rife.tools.TerracottaUtils;
013:
014: /**
015: * Manages a collection of {@code ContinuationContext} instances.
016: * <p>A {@code ContinuationManager} instance is typically associated with
017: * a specific context, like for example a {@link com.uwyn.rife.engine.Site}
018: * for RIFE's web engine. It's up to you to provide an API to your users if
019: * you want them to be able to interact with the appropriate continuations
020: * manager. For instance, in RIFE, to gain access to the
021: * {@code ContinuationManager} of an active
022: * <code>{@link com.uwyn.rife.engine.ElementSupport}</code> instance, the
023: * following code can be used: {@code getElementInfo().getSite().getContinuationManager()}.
024: * Your application or library will have to provide its own.
025: *
026: * @author Geert Bevin (gbevin[remove] at uwyn dot com)
027: * @version $Revision: 3812 $
028: * @see ContinuationManager
029: * @since 1.6
030: */
031: public class ContinuationManager<T extends ContinuableObject> {
032: private final Map<String, ContinuationContext<T>> mContexts;
033:
034: private Random mRandom = new Random();
035:
036: private ContinuationConfigRuntime mConfig;
037:
038: /**
039: * Instantiates a new continuation manager and uses the default values for
040: * the continuations duration and purging.
041: *
042: * @param config the runtime configuration that will be used be this
043: * manager
044: * @since 1.6
045: */
046: public ContinuationManager(ContinuationConfigRuntime config) {
047: mConfig = config;
048:
049: if (TerracottaUtils.isTcPresent()) {
050: mContexts = new HashMap<String, ContinuationContext<T>>();
051: } else {
052: mContexts = new WeakHashMap<String, ContinuationContext<T>>();
053: }
054: }
055:
056: /**
057: * Retrieves the runtime configuration that was provided to the manager
058: * at instantiation.
059: *
060: * @return this manager's runtime configuration
061: * @since 1.6
062: */
063: public ContinuationConfigRuntime getConfigRuntime() {
064: return mConfig;
065: }
066:
067: /**
068: * Checks if a particular continuation context is expired.
069: *
070: * @param context the context that needs to be verified
071: * @return {@code true} if the continuation context is expired; and
072: * <p>{@code false} otherwise
073: * @see com.uwyn.rife.config.RifeConfig.Engine#getContinuationDuration
074: * @since 1.6
075: */
076: public boolean isExpired(ContinuationContext<T> context) {
077: if (context.getStart() <= System.currentTimeMillis()
078: - mConfig.getContinuationDuration()) {
079: return true;
080: }
081:
082: return false;
083: }
084:
085: /**
086: * Adds a particular {@code ContinuationContext} to this manager.
087: *
088: * @param context the context that will be added
089: * @since 1.6
090: */
091: public void addContext(ContinuationContext<T> context) {
092: if (null == context) {
093: return;
094: }
095:
096: synchronized (mContexts) {
097: mContexts.put(context.getId(), context);
098: }
099: }
100:
101: /**
102: * Removes a {@link ContinuationContext} instance from this continuation
103: * manager.
104: *
105: * @param id the unique string that identifies the
106: * {@code ContinuationContext} instance that will be removed
107: * @see #getContext
108: * @since 1.6
109: */
110: public void removeContext(String id) {
111: if (null == id) {
112: return;
113: }
114:
115: synchronized (mContexts) {
116: mContexts.remove(id);
117: }
118: }
119:
120: /**
121: * Creates a new {@code ContinuationContext} from an existing one so that
122: * the execution can be resumed.
123: * <p>If the existing continuation context couldn't be found, no new one
124: * can be created. However, if it could be found, the result of
125: * {@link ContinuationConfigRuntime#cloneContinuations} will determine
126: * whether the existing continuation context will be cloned to create
127: * the new one, or if its state will be reused.
128: * <p>The new continuation context will have its own unique ID.
129: *
130: * @param id the ID of the existing continuation context
131: * @return the new {@code ContinuationContext}; or
132: * <p>{@code null} if the existing continuation context couldn't be found
133: * @throws CloneNotSupportedException
134: * @since 1.6
135: */
136: public ContinuationContext<T> resumeContext(String id)
137: throws CloneNotSupportedException {
138: synchronized (mContexts) {
139: ContinuationContext<T> result = null;
140:
141: purgeContinuations();
142:
143: ContinuationContext<T> context = getContext(id);
144: if (context != null && context.isPaused()) {
145: if (mConfig
146: .cloneContinuations(context.getContinuable())) {
147: result = cloneContext(context);
148: } else {
149: result = reuseContext(context);
150: }
151: }
152:
153: return result;
154: }
155: }
156:
157: /**
158: * Retrieves a {@link ContinuationContext} instance from this continuation
159: * manager.
160: *
161: * @param id the unique string that identifies the
162: * {@code ContinuationContext} instance that has to be retrieved
163: * @return the {@code ContinuationContext} instance that corresponds
164: * to the provided identifier; or
165: * <p>{@code null} if the identifier isn't known by the continuation
166: * manager.
167: * @see #removeContext
168: * @since 1.6
169: */
170: public ContinuationContext<T> getContext(String id) {
171: ContinuationContext<T> context = mContexts.get(id);
172: if (context != null) {
173: if (isExpired(context)) {
174: context = null;
175: removeContext(id);
176: } else {
177: // always set the manager of the continuation context, it could have been
178: // cleared if the context was pulled in through Terracotta from another
179: // node
180: context.setManager(this );
181: }
182: }
183: return context;
184: }
185:
186: private ContinuationContext<T> reuseContext(
187: ContinuationContext<T> context) {
188: mContexts.remove(context.getId());
189: context.resetId();
190: addContext(context);
191:
192: return context;
193: }
194:
195: private ContinuationContext<T> cloneContext(
196: ContinuationContext<T> context)
197: throws CloneNotSupportedException {
198: ContinuationContext<T> new_context = context.clone();
199: new_context.resetId();
200: addContext(new_context);
201:
202: return new_context;
203: }
204:
205: private void purgeContinuations() {
206: int purge_decision = -1;
207: purge_decision = mRandom.nextInt(mConfig
208: .getContinuationPurgeScale());
209: if (purge_decision <= mConfig.getContinuationPurgeFrequency()) {
210: new PurgeContinuations().start();
211: }
212: }
213:
214: private class PurgeContinuations extends Thread {
215: public void run() {
216: purge();
217: }
218:
219: private void purge() {
220: ArrayList<String> stale_continuations = new ArrayList<String>();
221: try {
222: ContinuationContext<T> context = null;
223: for (ContinuationContext<T> reference : mContexts
224: .values()) {
225: if (reference != null) {
226: context = reference;
227: if (context != null && isExpired(context)) {
228: stale_continuations.add(context.getId());
229: }
230: }
231: }
232: } catch (ConcurrentModificationException e) {
233: // Oops something changed while we were looking.
234: // Lock the context and try again.
235: // Set our priority high while we have the sessions locked
236: int old_priority = Thread.currentThread().getPriority();
237: Thread.currentThread().setPriority(Thread.MAX_PRIORITY);
238: try {
239: synchronized (mContexts) {
240: stale_continuations = null;
241: purge();
242: }
243: } finally {
244: Thread.currentThread().setPriority(old_priority);
245: }
246: }
247:
248: if (stale_continuations != null) {
249: synchronized (mContexts) {
250: for (String id : stale_continuations) {
251: mContexts.remove(id);
252: }
253: }
254: }
255: }
256: }
257: }
|