001: /**
002: * $RCSfile$
003: * $Revision: 617 $
004: * $Date: 2004-12-03 05:59:50 -0300 (Fri, 03 Dec 2004) $
005: *
006: * Copyright (C) 2007 Jive Software. All rights reserved.
007: *
008: * This software is published under the terms of the GNU Public License (GPL),
009: * a copy of which is included in this distribution.
010: */package org.jivesoftware.openfire;
011:
012: import org.jivesoftware.openfire.session.Session;
013: import org.jivesoftware.util.LocaleUtils;
014: import org.jivesoftware.util.Log;
015: import org.xmpp.packet.Packet;
016:
017: import java.util.concurrent.LinkedBlockingQueue;
018: import java.util.concurrent.ThreadPoolExecutor;
019: import java.util.concurrent.TimeUnit;
020:
021: /**
022: * A channel provides a mechanism to queue work units for processing. Each work unit is
023: * encapsulated as a ChannelMessage, and processing of each message is performed by a
024: * ChannelHandler.<p>
025: *
026: * As a request is handled by the system, it will travel through a sequence of channels.
027: * This architecture has a number of advantages:
028: * <ul>
029: * <li> Each request doesn't need to correspond to a thread. Instead, a thread pool
030: * in each channel processes requests from a queue.
031: * <li> Due to the queue at each channel, the system is much better able to respond
032: * to load spikes.
033: * </ul><p>
034: *
035: * Channels are modeled after SEDA stages. For much much more in-depth architecture information,
036: * refer to the <a href="http://www.cs.berkeley.edu/~mdw/proj/sandstorm/">SEDA website</a>.
037: *
038: * @author Matt Tucker
039: */
040: public class Channel<T extends Packet> {
041:
042: private String name;
043: private ChannelHandler channelHandler;
044:
045: ThreadPoolExecutor executor;
046:
047: /**
048: * Creates a new channel. The channel should be registered after it's created.
049: *
050: * @param name the name of the channel.
051: * @param channelHandler the handler for this channel.
052: */
053: public Channel(String name, ChannelHandler<T> channelHandler) {
054: this .name = name;
055: this .channelHandler = channelHandler;
056:
057: executor = new ThreadPoolExecutor(1, 8, 15, TimeUnit.SECONDS,
058: new LinkedBlockingQueue());
059: }
060:
061: /**
062: * Returns the name of the channel.
063: *
064: * @return the name of the channel.
065: */
066: public String getName() {
067: return name;
068: }
069:
070: /**
071: * Enqueus a message to be handled by this channel. After the ChannelHandler is done
072: * processing the message, it will be sent to the next channel. Messages with a higher
073: * priority will be handled first.
074: *
075: * @param packet an XMPP packet to add to the channel for processing.
076: */
077: public void add(final T packet) {
078: Runnable r = new Runnable() {
079: public void run() {
080: try {
081: channelHandler.process(packet);
082: } catch (Exception e) {
083: Log.error(LocaleUtils
084: .getLocalizedString("admin.error"), e);
085:
086: try {
087: Session session = SessionManager.getInstance()
088: .getSession(packet.getFrom());
089: session.close();
090: } catch (Exception e1) {
091: Log.error(e1);
092: }
093: }
094: }
095: };
096: executor.execute(r);
097: }
098:
099: /**
100: * Returns true if the channel is currently running. The channel can be started and
101: * stopped by calling the start() and stop() methods.
102: *
103: * @return true if the channel is running.
104: */
105: public boolean isRunning() {
106: return !executor.isShutdown();
107: }
108:
109: /**
110: * Starts the channel, which means that worker threads will start processing messages
111: * from the queue. If the server isn't running, messages can still be enqueued.
112: */
113: public void start() {
114:
115: }
116:
117: /**
118: * Stops the channel, which means that worker threads will stop processing messages from
119: * the queue. If the server isn't running, messages can still be enqueued.
120: */
121: public synchronized void stop() {
122: executor.shutdown();
123: }
124:
125: /**
126: * Returns the number of currently active worker threads in the channel. This value
127: * will always fall in between the min a max thread count.
128: *
129: * @return the current number of worker threads.
130: */
131: public int getThreadCount() {
132: return executor.getPoolSize();
133: }
134:
135: /**
136: * Returns the min number of threads the channel will use for processing messages.
137: * The channel will automatically de-allocate worker threads as the queue load shrinks,
138: * down to the defined minimum. This lets the channel consume fewer resources when load
139: * is low.
140: *
141: * @return the min number of threads that can be used by the channel.
142: */
143: public int getMinThreadCount() {
144: return executor.getCorePoolSize();
145: }
146:
147: /**
148: * Sets the min number of threads the channel will use for processing messages.
149: * The channel will automatically de-allocate worker threads as the queue load shrinks,
150: * down to the defined minimum. This lets the channel consume fewer resources when load
151: * is low.
152: *
153: * @param minThreadCount the min number of threads that can be used by the channel.
154: */
155: public void setMinThreadCount(int minThreadCount) {
156: executor.setCorePoolSize(minThreadCount);
157: }
158:
159: /**
160: * Returns the max number of threads the channel will use for processing messages. The
161: * channel will automatically allocate new worker threads as the queue load grows, up to the
162: * defined maximum. This lets the channel meet higher concurrency needs, but prevents too
163: * many threads from being allocated, which decreases overall system performance.
164: *
165: * @return the max number of threads that can be used by the channel.
166: */
167: public int getMaxThreadCount() {
168: return executor.getMaximumPoolSize();
169: }
170:
171: /**
172: * Sets the max number of threads the channel will use for processing messages. The channel
173: * will automatically allocate new worker threads as the queue size grows, up to the defined
174: * maximum. This lets the channel meet higher concurrency needs, but prevents too many threads
175: * from being allocated, which decreases overall system performance.
176: *
177: * @param maxThreadCount the max number of threads that can be used by the channel.
178: */
179: public void setMaxThreadCount(int maxThreadCount) {
180: executor.setMaximumPoolSize(maxThreadCount);
181: }
182:
183: /**
184: * Returns the current number of ChannelMessage objects waiting to be processed by
185: * the channel.
186: *
187: * @return the current number of elements in the processing queue.
188: */
189: public int getQueueSize() {
190: return executor.getQueue().size();
191: }
192: }
|