001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.cocoon.transformation.helpers;
018:
019: import java.io.IOException;
020: import java.net.URL;
021:
022: import org.apache.avalon.framework.activity.Disposable;
023: import org.apache.avalon.framework.component.Component;
024: import org.apache.avalon.framework.logger.AbstractLogEnabled;
025: import org.apache.avalon.framework.parameters.ParameterException;
026: import org.apache.avalon.framework.parameters.Parameterizable;
027: import org.apache.avalon.framework.parameters.Parameters;
028: import org.apache.avalon.framework.service.ServiceException;
029: import org.apache.avalon.framework.service.ServiceManager;
030: import org.apache.avalon.framework.service.Serviceable;
031: import org.apache.avalon.framework.thread.ThreadSafe;
032: import org.apache.cocoon.ProcessingException;
033: import org.apache.cocoon.caching.CachedResponse;
034: import org.apache.cocoon.components.sax.XMLDeserializer;
035: import org.apache.cocoon.components.sax.XMLSerializer;
036: import org.apache.cocoon.components.sax.XMLTeePipe;
037: import org.apache.cocoon.components.source.SourceUtil;
038: import org.apache.cocoon.components.thread.RunnableManager;
039: import org.apache.cocoon.environment.CocoonRunnable;
040: import org.apache.cocoon.xml.XMLConsumer;
041: import org.apache.excalibur.source.Source;
042: import org.apache.excalibur.source.SourceException;
043: import org.apache.excalibur.source.SourceResolver;
044: import org.apache.excalibur.source.SourceValidity;
045: import org.apache.excalibur.store.Store;
046: import org.xml.sax.SAXException;
047: import EDU.oswego.cs.dl.util.concurrent.CountDown;
048:
049: /**
050: * Default implementation of a {@link IncludeCacheManager}.
051: *
052: * This implementation requires a configuration, if preemptive
053: * loading is used:
054: * <parameter name="preemptive-loader-url" value="some url"/>
055: *
056: * This is a url inside cocoon, that contains the preemptive loader
057: * url; it must be specified absolute (with http://...)
058: * If this loader cannot be started, only an error is logged into the
059: * log, so actually cached content is never updated!
060: *
061: * @author <a href="mailto:cziegeler@apache.org">Carsten Ziegeler</a>
062: * @version CVS $Id: DefaultIncludeCacheManager.java 433543 2006-08-22 06:22:54Z crossley $
063: * @since 2.1
064: */
065: public final class DefaultIncludeCacheManager extends
066: AbstractLogEnabled implements IncludeCacheManager, ThreadSafe,
067: Serviceable, Disposable, Parameterizable, Component {
068:
069: private ServiceManager manager;
070:
071: private SourceResolver resolver;
072:
073: private Store store;
074:
075: private IncludeCacheStorageProxy defaultCacheStorage;
076:
077: private String preemptiveLoaderURI;
078:
079: /**
080: * @see IncludeCacheManager#getSession(org.apache.avalon.framework.parameters.Parameters)
081: */
082: public IncludeCacheManagerSession getSession(Parameters pars) {
083: String sourceURI = pars.getParameter("source", null);
084: IncludeCacheManagerSession session;
085: if (null == sourceURI) {
086: session = new IncludeCacheManagerSession(pars,
087: this .defaultCacheStorage);
088: } else {
089: Source source = null;
090: try {
091: source = this .resolver.resolveURI(sourceURI);
092: IncludeCacheStorageProxy proxy = new ModifiableSourceIncludeCacheStorageProxy(
093: this .resolver, source.getURI(), this
094: .getLogger());
095: session = new IncludeCacheManagerSession(pars, proxy);
096: } catch (Exception local) {
097: session = new IncludeCacheManagerSession(pars,
098: this .defaultCacheStorage);
099: this .getLogger().warn(
100: "Error creating writeable source.", local);
101: } finally {
102: this .resolver.release(source);
103: }
104: }
105: if (session.isPreemptive()) {
106: if (null == this .preemptiveLoaderURI) {
107: this
108: .getLogger()
109: .error(
110: "Preemptive loading is turned off because the preemptive-loader-url is not configured.");
111: session.setPreemptive(false);
112: } else {
113: if (!PreemptiveLoader.getInstance().alive) {
114:
115: if (this .getLogger().isDebugEnabled()) {
116: this .getLogger().debug(
117: "Booting preemptive loader: "
118: + this .preemptiveLoaderURI);
119: }
120: PreemptiveBooter thread = new PreemptiveBooter(
121: this .preemptiveLoaderURI);
122: try {
123: final RunnableManager runnableManager = (RunnableManager) this .manager
124: .lookup(RunnableManager.ROLE);
125: runnableManager.execute(thread);
126: this .manager.release(runnableManager);
127: } catch (final ServiceException se) {
128: getLogger().error(
129: "Cannot lookup RunnableManager", se);
130: }
131: }
132: }
133: }
134: if (this .getLogger().isDebugEnabled()) {
135: this .getLogger().debug(
136: "Creating cache manager session: " + session);
137: }
138: return session;
139: }
140:
141: /**
142: * @see IncludeCacheManager#load(java.lang.String, IncludeCacheManagerSession)
143: */
144: public String load(String uri, IncludeCacheManagerSession session)
145: throws IOException, SourceException {
146: if (this .getLogger().isDebugEnabled()) {
147: this .getLogger().debug(
148: "Load " + uri + " for session " + session);
149: }
150:
151: // first make the URI absolute
152: if (uri.indexOf("://") == -1) {
153: final Source source = session
154: .resolveURI(uri, this .resolver);
155: uri = source.getURI();
156: }
157:
158: // if we are not processing in parallel (or do preemptive)
159: // then we don't have to do anything in this method - everything
160: // is done in the stream method.
161:
162: // if we are processing in parallel (and not preemptive) then....
163: if (session.isParallel() && !session.isPreemptive()) {
164:
165: // first look-up if we have a valid stored response
166: IncludeCacheStorageProxy storage = session
167: .getCacheStorageProxy();
168: CachedResponse response = (CachedResponse) storage.get(uri);
169: if (null != response) {
170: SourceValidity[] validities = response
171: .getValidityObjects();
172:
173: // if we are valid and do not purging
174: if (!session.isPurging()
175: && validities[0].isValid() == SourceValidity.VALID) {
176: if (this .getLogger().isDebugEnabled()) {
177: this
178: .getLogger()
179: .debug(
180: "Using cached response for parallel processing.");
181: }
182: session.add(uri, response.getResponse());
183: return uri;
184: } else {
185: // response is not used
186: storage.remove(uri);
187: }
188: }
189:
190: if (this .getLogger().isDebugEnabled()) {
191: this .getLogger().debug(
192: "Starting parallel thread for loading " + uri);
193: }
194: // now we start a parallel thread, this thread gets all required avalon components
195: // so it does not have to lookup them by itself
196: try {
197: XMLSerializer serializer = (XMLSerializer) this .manager
198: .lookup(XMLSerializer.ROLE);
199: Source source = session.resolveURI(uri, this .resolver);
200:
201: LoaderThread loader = new LoaderThread(source,
202: serializer, this .manager);
203: // Load the included content in a thread that inherits the current thread's environment
204: Thread thread = new Thread(new CocoonRunnable(loader));
205: session.add(uri, loader);
206: thread.start();
207: if (this .getLogger().isDebugEnabled()) {
208: this .getLogger().debug("Thread started for " + uri);
209: }
210: } catch (ServiceException ce) {
211: throw new SourceException(
212: "Unable to lookup thread pool or xml serializer.",
213: ce);
214: } catch (Exception e) {
215: throw new SourceException(
216: "Unable to get pooled thread.", e);
217: }
218: }
219: return uri;
220: }
221:
222: /**
223: * @see IncludeCacheManager#stream(java.lang.String, IncludeCacheManagerSession, XMLConsumer)
224: */
225: public void stream(String uri, IncludeCacheManagerSession session,
226: XMLConsumer handler) throws IOException, SourceException,
227: SAXException {
228:
229: if (this .getLogger().isDebugEnabled()) {
230: this .getLogger().debug(
231: "Stream " + uri + " for session " + session);
232: }
233:
234: // if we are processing in parallel (and not preemptive) then....
235: if (session.isParallel() && !session.isPreemptive()) {
236:
237: // get either the cached content or the pooled thread
238: Object object = session.get(uri);
239:
240: if (null == object) {
241: // this should never happen!
242: throw new SAXException("No pooled thread found for "
243: + uri);
244: }
245: byte[] result;
246:
247: // is this a pooled thread?
248: if (object instanceof LoaderThread) {
249: LoaderThread loader = (LoaderThread) object;
250:
251: if (this .getLogger().isDebugEnabled()) {
252: this
253: .getLogger()
254: .debug(
255: "Waiting for pooled thread to finish loading.");
256: }
257:
258: // wait for it
259: loader.join();
260:
261: if (this .getLogger().isDebugEnabled()) {
262: this .getLogger().debug(
263: "Pooled thread finished loading.");
264: }
265:
266: // did an exception occur? Then reraise it
267: if (null != loader.exception) {
268: if (loader.exception instanceof SAXException) {
269: throw (SAXException) loader.exception;
270: } else if (loader.exception instanceof SourceException) {
271: throw (SourceException) loader.exception;
272: } else if (loader.exception instanceof IOException) {
273: throw (IOException) loader.exception;
274: } else {
275: throw new SAXException("Exception.",
276: loader.exception);
277: }
278: }
279:
280: if (this .getLogger().isDebugEnabled()) {
281: this .getLogger().debug(
282: "Streaming from pooled thread.");
283: }
284: result = loader.content;
285:
286: // cache the response (remember preemptive is off)
287: if (session.getExpires() > 0) {
288: SourceValidity[] validities = new SourceValidity[1];
289: validities[0] = session.getExpiresValidity();
290: CachedResponse response = new CachedResponse(
291: validities, result);
292: session.getCacheStorageProxy().put(uri, response);
293: }
294: } else {
295: if (this .getLogger().isDebugEnabled()) {
296: this .getLogger().debug(
297: "Streaming from cached response.");
298: }
299:
300: // use the response from the cache
301: result = (byte[]) object;
302: }
303:
304: // stream the content
305: XMLDeserializer deserializer = null;
306: try {
307: deserializer = (XMLDeserializer) this .manager
308: .lookup(XMLDeserializer.ROLE);
309: deserializer.setConsumer(handler);
310: deserializer.deserialize(result);
311: } catch (ServiceException ce) {
312: throw new SAXException(
313: "Unable to lookup xml deserializer.", ce);
314: } finally {
315: this .manager.release(deserializer);
316: }
317: return;
318:
319: } else {
320: // we are not processing parallel
321:
322: // first: test for a cached response
323: IncludeCacheStorageProxy storage = session
324: .getCacheStorageProxy();
325: CachedResponse response = (CachedResponse) storage.get(uri);
326: if (null != response) {
327: SourceValidity[] validities = response
328: .getValidityObjects();
329: // if purging is turned off and either the cached response is valid or
330: // we are loading preemptive, then use the cached response
331: if (!session.isPurging()
332: && (session.isPreemptive() || validities[0]
333: .isValid() == SourceValidity.VALID)) {
334:
335: // stream the content
336: if (this .getLogger().isDebugEnabled()) {
337: this .getLogger().debug(
338: "Streaming from cached response.");
339: }
340: XMLDeserializer deserializer = null;
341: try {
342: deserializer = (XMLDeserializer) this .manager
343: .lookup(XMLDeserializer.ROLE);
344: deserializer.setConsumer(handler);
345: deserializer
346: .deserialize(response.getResponse());
347: } catch (ServiceException ce) {
348: throw new SAXException(
349: "Unable to lookup xml deserializer.",
350: ce);
351: } finally {
352: this .manager.release(deserializer);
353: }
354:
355: // load preemptive if the response is not valid
356: if (session.getExpires() > 0
357: && session.isPreemptive()
358: && validities[0].isValid() != SourceValidity.VALID) {
359: if (this .getLogger().isDebugEnabled()) {
360: this .getLogger().debug(
361: "Add uri to preemptive loader list "
362: + uri);
363: }
364: if (!PreemptiveLoader.getInstance().alive) {
365: this
366: .getLogger()
367: .error(
368: "Preemptive loader has not started yet.");
369: }
370: PreemptiveLoader.getInstance().add(
371: session.getCacheStorageProxy(), uri,
372: session.getExpires());
373: }
374: return;
375:
376: } else {
377: // cached response is not valid
378: storage.remove(uri);
379: }
380: }
381: }
382:
383: // we are not processing in parallel and have no (valid) cached response
384: XMLSerializer serializer = null;
385: try {
386: final Source source = session
387: .resolveURI(uri, this .resolver);
388:
389: // stream directly (and cache the response)
390: if (this .getLogger().isDebugEnabled()) {
391: this .getLogger().debug(
392: "Streaming directly from source.");
393: }
394: if (session.getExpires() > 0) {
395: serializer = (XMLSerializer) this .manager
396: .lookup(XMLSerializer.ROLE);
397: XMLTeePipe tee = new XMLTeePipe(handler, serializer);
398:
399: SourceUtil.toSAX(source, tee);
400:
401: SourceValidity[] validities = new SourceValidity[1];
402: validities[0] = session.getExpiresValidity();
403: CachedResponse response = new CachedResponse(
404: validities, (byte[]) serializer
405: .getSAXFragment());
406: session.getCacheStorageProxy().put(uri, response);
407: } else {
408: SourceUtil.toSAX(source, handler);
409: }
410:
411: } catch (ProcessingException pe) {
412: throw new SAXException("ProcessingException", pe);
413: } catch (ServiceException e) {
414: throw new SAXException("Unable to lookup xml serializer.",
415: e);
416: } finally {
417: this .manager.release(serializer);
418: }
419: }
420:
421: /**
422: * @see IncludeCacheManager#terminateSession(IncludeCacheManagerSession)
423: */
424: public void terminateSession(IncludeCacheManagerSession session) {
425: if (this .getLogger().isDebugEnabled()) {
426: this .getLogger().debug(
427: "Terminating cache manager session " + session);
428: }
429: session.cleanup(this .resolver);
430: }
431:
432: /**
433: * @see org.apache.avalon.framework.service.Serviceable#service(org.apache.avalon.framework.service.ServiceManager)
434: */
435: public void service(ServiceManager manager) throws ServiceException {
436: this .manager = manager;
437: this .resolver = (SourceResolver) this .manager
438: .lookup(SourceResolver.ROLE);
439: }
440:
441: /**
442: * @see org.apache.avalon.framework.activity.Disposable#dispose()
443: */
444: public void dispose() {
445: // stop preemptive loader (if running)
446: PreemptiveLoader.getInstance().stop();
447: if (null != this .manager) {
448: this .manager.release(this .resolver);
449: this .manager.release(this .store);
450: this .store = null;
451: this .resolver = null;
452: this .manager = null;
453: this .defaultCacheStorage = null;
454: }
455: }
456:
457: /**
458: * @see org.apache.avalon.framework.parameters.Parameterizable#parameterize(org.apache.avalon.framework.parameters.Parameters)
459: */
460: public void parameterize(Parameters parameters)
461: throws ParameterException {
462: this .preemptiveLoaderURI = parameters.getParameter(
463: "preemptive-loader-url", null);
464: if (null != this .preemptiveLoaderURI
465: && this .preemptiveLoaderURI.indexOf("://") == -1) {
466: throw new ParameterException(
467: "The preemptive-loader-url must be absolute: "
468: + this .preemptiveLoaderURI);
469: }
470: final String storeRole = parameters.getParameter("use-store",
471: Store.ROLE);
472: try {
473: this .store = (Store) this .manager.lookup(storeRole);
474: } catch (ServiceException e) {
475: throw new ParameterException(
476: "Unable to lookup store with role " + storeRole, e);
477: }
478: this .defaultCacheStorage = new StoreIncludeCacheStorageProxy(
479: this .store, this .getLogger());
480: }
481:
482: final private static class LoaderThread implements Runnable {
483:
484: final private Source source;
485: final private XMLSerializer serializer;
486: final private CountDown finished;
487: Exception exception;
488: byte[] content;
489: final private ServiceManager manager;
490:
491: public LoaderThread(Source source, XMLSerializer serializer,
492: ServiceManager manager) {
493: this .source = source;
494: this .serializer = serializer;
495: this .finished = new CountDown(1);
496: this .manager = manager;
497: }
498:
499: public void run() {
500: try {
501: SourceUtil.toSAX(this .source, this .serializer);
502: this .content = (byte[]) this .serializer
503: .getSAXFragment();
504: } catch (Exception local) {
505: this .exception = local;
506: } finally {
507: this .manager.release(this .serializer);
508: this .finished.release();
509: }
510: }
511:
512: void join() {
513: try {
514: this .finished.acquire();
515: } catch (final InterruptedException ie) {
516: // ignore
517: }
518: }
519: }
520:
521: final static class PreemptiveBooter implements Runnable {
522:
523: private final String uri;
524:
525: public PreemptiveBooter(final String uri) {
526: this .uri = uri;
527: }
528:
529: public void run() {
530: try {
531: URL url = new URL(this .uri);
532: url.getContent();
533: } catch (Exception ignore) {
534: }
535: }
536: }
537: }
|