001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. The ASF licenses this file to You
004: * under the Apache License, Version 2.0 (the "License"); you may not
005: * use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License. For additional information regarding
015: * copyright in this work, please see the NOTICE file in the top level
016: * directory of this distribution.
017: */
018: /*
019: * ReferrerQueueManagerImpl.java
020: *
021: * Created on December 16, 2005, 5:06 PM
022: */
023:
024: package org.apache.roller.business.referrers;
025:
026: import java.util.ArrayList;
027: import java.util.Collections;
028: import java.util.HashMap;
029: import java.util.Iterator;
030: import java.util.List;
031: import org.apache.commons.logging.Log;
032: import org.apache.commons.logging.LogFactory;
033: import org.apache.roller.RollerException;
034: import org.apache.roller.business.runnable.ContinuousWorkerThread;
035: import org.apache.roller.business.runnable.WorkerThread;
036: import org.apache.roller.config.RollerConfig;
037: import org.apache.roller.business.RollerFactory;
038:
039: /**
040: * The base implementation of the ReferrerQueueManager.
041: *
042: * This class is implemented using the singleton pattern to ensure that only
043: * one instance exists at any given time.
044: *
045: * This implementation can be configured to handle referrers in 2 ways ...
046: * 1. synchronously. referrers are processed immediately.
047: * 2. asynchronously. referrers are queued for later processing.
048: *
049: * Users can control the referrer queue mode via properties in the static
050: * roller.properties configuration file.
051: *
052: * In asynchronous processing mode we start some number of worker threads which
053: * run continously to process any referrers that have been queued. Each worker
054: * processes queued referrers until the queue is empty, then sleeps for a given
055: * amount of time. The number of workers used and their sleep time can be set
056: * via properties of the static roller.properties file.
057: *
058: * @author Allen Gilliland
059: */
060: public class ReferrerQueueManagerImpl implements ReferrerQueueManager {
061:
062: private static Log mLogger = LogFactory
063: .getLog(ReferrerQueueManagerImpl.class);
064:
065: private static ReferrerQueueManager instance = null;
066:
067: private boolean asyncMode = false;
068: private int numWorkers = 1;
069: private int sleepTime = 10000;
070: private List workers = null;
071: private List referrerQueue = null;
072:
073: static {
074: instance = new ReferrerQueueManagerImpl();
075: }
076:
077: // private because we are a singleton
078: private ReferrerQueueManagerImpl() {
079: mLogger.info("Initializing Referrer Queue Manager");
080:
081: // lookup config options
082: this .asyncMode = RollerConfig
083: .getBooleanProperty("referrers.asyncProcessing.enabled");
084:
085: mLogger.info("Asynchronous referrer processing = "
086: + this .asyncMode);
087:
088: if (this .asyncMode) {
089:
090: String num = RollerConfig
091: .getProperty("referrers.queue.numWorkers");
092: String sleep = RollerConfig
093: .getProperty("referrers.queue.sleepTime");
094:
095: try {
096: this .numWorkers = Integer.parseInt(num);
097:
098: if (numWorkers < 1)
099: this .numWorkers = 1;
100:
101: } catch (NumberFormatException nfe) {
102: mLogger.warn("Invalid num workers [" + num
103: + "], using default");
104: }
105:
106: try {
107: // multiply by 1000 because we expect input in seconds
108: this .sleepTime = Integer.parseInt(sleep) * 1000;
109: } catch (NumberFormatException nfe) {
110: mLogger.warn("Invalid sleep time [" + sleep
111: + "], using default");
112: }
113:
114: // create the processing queue
115: this .referrerQueue = Collections
116: .synchronizedList(new ArrayList());
117:
118: // start up workers
119: this .workers = new ArrayList();
120: ContinuousWorkerThread worker = null;
121: QueuedReferrerProcessingJob job = null;
122: for (int i = 0; i < this .numWorkers; i++) {
123: job = new QueuedReferrerProcessingJob();
124: worker = new ContinuousWorkerThread("ReferrerWorker"
125: + i, job, this .sleepTime);
126: workers.add(worker);
127: worker.start();
128: }
129: }
130: }
131:
132: /**
133: * Get access to the singleton instance.
134: */
135: public static ReferrerQueueManager getInstance() {
136: return instance;
137: }
138:
139: /**
140: * Process an incoming referrer.
141: *
142: * If we are doing asynchronous referrer processing then the referrer will
143: * just go into the queue for later processing. If not then we process it
144: * now.
145: */
146: public void processReferrer(IncomingReferrer referrer) {
147:
148: if (this .asyncMode) {
149: mLogger.debug("QUEUING: " + referrer.getRequestUrl());
150:
151: // add to queue
152: this .enqueue(referrer);
153: } else {
154: // process now
155: ReferrerProcessingJob job = new ReferrerProcessingJob();
156:
157: // setup input
158: HashMap inputs = new HashMap();
159: inputs.put("referrer", referrer);
160: job.input(inputs);
161:
162: // execute
163: job.execute();
164:
165: try {
166: // flush changes
167: RollerFactory.getRoller().flush();
168: } catch (RollerException ex) {
169: mLogger.error("ERROR commiting referrer", ex);
170: }
171: }
172:
173: }
174:
175: /**
176: * Place a referrer in the queue.
177: */
178: public void enqueue(IncomingReferrer referrer) {
179: this .referrerQueue.add(referrer);
180:
181: if (this .referrerQueue.size() > 250) {
182: mLogger.warn("Referrer queue is rather full. queued="
183: + this .referrerQueue.size());
184: }
185: }
186:
187: /**
188: * Retrieve the next referrer in the queue.
189: */
190: public synchronized IncomingReferrer dequeue() {
191:
192: if (!this .referrerQueue.isEmpty()) {
193: return (IncomingReferrer) this .referrerQueue.remove(0);
194: }
195:
196: return null;
197: }
198:
199: /**
200: * clean up.
201: */
202: public void shutdown() {
203:
204: if (this .workers != null && this .workers.size() > 0) {
205: mLogger.info("stopping all ReferrerQueue worker threads");
206:
207: // kill all of our threads
208: WorkerThread worker = null;
209: Iterator it = this .workers.iterator();
210: while (it.hasNext()) {
211: worker = (WorkerThread) it.next();
212: worker.interrupt();
213: }
214: }
215:
216: }
217:
218: }
|