001: //========================================================================
002: // Parts Copyright 2006 Mort Bay Consulting Pty. Ltd.
003: //------------------------------------------------------------------------
004: // Licensed under the Apache License, Version 2.0 (the "License");
005: // you may not use this file except in compliance with the License.
006: // You may obtain a copy of the License at
007: // http://www.apache.org/licenses/LICENSE-2.0
008: // Unless required by applicable law or agreed to in writing, software
009: // distributed under the License is distributed on an "AS IS" BASIS,
010: // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
011: // See the License for the specific language governing permissions and
012: // limitations under the License.
013: //========================================================================
014:
015: package org.mortbay.jetty.grizzly;
016:
017: import com.sun.enterprise.web.connector.grizzly.Pipeline;
018: import com.sun.enterprise.web.connector.grizzly.ProcessorTask;
019: import com.sun.enterprise.web.connector.grizzly.ReadTask;
020: import com.sun.enterprise.web.connector.grizzly.SelectorThread;
021: import com.sun.enterprise.web.connector.grizzly.StreamAlgorithm;
022: import com.sun.enterprise.web.connector.grizzly.XAReadTask;
023: import com.sun.enterprise.web.connector.grizzly.algorithms.NoParsingAlgorithm;
024: import java.io.IOException;
025: import java.nio.channels.SelectionKey;
026: import java.util.Iterator;
027: import java.util.Set;
028: import org.mortbay.thread.BoundedThreadPool;
029: import org.mortbay.thread.ThreadPool;
030:
031: /**
032: * Extend the default Grizzly implementation to allow the customization of
033: * <code>Task</code> used by the Jetty back-end
034: *
035: * @author Jeanfrancois Arcand
036: */
037: public class JettySelectorThread extends SelectorThread {
038:
039: /**
040: * The <code>AbstractNIOConnector</code> implementation for Grizzly.
041: */
042: private GrizzlyConnector grizzlyConnector;
043:
044: /**
045: * The Jetty thread pool implementation.
046: */
047: private ThreadPool _threadPool;
048:
049: private static String JETTY_GRIZZLY_STRATEGY = "org.mortbay.jetty.grizzly.useTemporarySelector";
050:
051: private boolean useTemporarySelector = false;
052:
053: public JettySelectorThread() {
054: super ();
055: if (System.getProperty(JETTY_GRIZZLY_STRATEGY) != null) {
056: useTemporarySelector = Boolean.valueOf(
057: System.getProperty(JETTY_GRIZZLY_STRATEGY))
058: .booleanValue();
059: System.out.println("Using temporary Selectors strategy: "
060: + useTemporarySelector);
061: }
062: }
063:
064: /**
065: * Initialize <code>JeetySelectorReadThread</code> used to process
066: * OP_READ operations.
067: */
068: protected void initMultiSelectors() throws IOException,
069: InstantiationException {
070: for (int i = 0; i < readThreads.length; i++) {
071: readThreads[i] = new JettyMultiSelectorThread();
072: ((JettyMultiSelectorThread) readThreads[i]).countName = i;
073: configureReadThread((JettyMultiSelectorThread) readThreads[i]);
074: }
075: }
076:
077: /**
078: * Force Grizzly to use the <code>JettyStreamAlgorithm</code>
079: * implementation by default.
080: */
081: protected void initAlgorithm() {
082: algorithmClass = JettyStreamAlgorithm.class;
083: algorithmClassName = algorithmClass.getName();
084:
085: // Tell the Selector to not clear the SelectionKey.attach(..)
086: if (!useTemporarySelector) {
087: defaultAlgorithmInstalled = false;
088: }
089: }
090:
091: /**
092: * Create a new <code>Pipeline</code> instance using the
093: * <code>pipelineClassName</code> value. If the pipeline is an instance of
094: * <code>JettyPipeline</code>, use the Jetty thread pool implementation
095: * (wrapped inside a Pipeline).
096: */
097: protected Pipeline newPipeline(int maxThreads, int minThreads,
098: String name, int port, int priority) {
099: //System.err.println("JettySelectorThread.newPipeline");
100: Pipeline pipeline = super .newPipeline(maxThreads, minThreads,
101: name, port, priority);
102: if (pipeline instanceof JettyPipeline) {
103: ((JettyPipeline) pipeline)
104: .setThreadPool((BoundedThreadPool) _threadPool);
105: }
106: return pipeline;
107: }
108:
109: /**
110: * Return a <code>JettyProcessorTask</code> implementation.
111: */
112: public ProcessorTask newProcessorTask(boolean initialize) {
113: JettyProcessorTask task = new JettyProcessorTask();
114: task.setMaxHttpHeaderSize(maxHttpHeaderSize);
115: task.setBufferSize(requestBufferSize);
116: task.setSelectorThread(this );
117: task.setRecycle(recycleTasks);
118:
119: task.initialize();
120:
121: if (keepAlivePipeline.dropConnection()) {
122: task.setDropConnection(true);
123: }
124:
125: task.setPipeline(processorPipeline);
126: return task;
127: }
128:
129: /**
130: * Enable all registered interestOps. Due a a NIO bug, all interestOps
131: * invokation needs to occurs on the same thread as the selector thread.
132: */
133: public void enableSelectionKeys() {
134: SelectionKey selectionKey;
135: int size = getKeysToEnable().size();
136: long currentTime = System.currentTimeMillis();
137: for (int i = 0; i < size; i++) {
138: selectionKey = (SelectionKey) getKeysToEnable().poll();
139:
140: selectionKey.interestOps(selectionKey.interestOps()
141: | SelectionKey.OP_READ);
142:
143: if (selectionKey.attachment() != null) {
144: ((XAReadTask) selectionKey.attachment())
145: .setIdleTime(currentTime);
146: }
147: keepAlivePipeline.trap(selectionKey);
148: }
149: }
150:
151: /**
152: * Cancel keep-alive connections.
153: */
154: protected void expireIdleKeys() {
155: if (keepAliveTimeoutInSeconds <= 0 || !selector.isOpen())
156: return;
157: long current = System.currentTimeMillis();
158:
159: if (current < getNextKeysExpiration()) {
160: return;
161: }
162: setNextKeysExpiration(current + getKaTimeout());
163:
164: Set readyKeys = selector.keys();
165: if (readyKeys.isEmpty()) {
166: return;
167: }
168: Iterator iterator = readyKeys.iterator();
169: SelectionKey key;
170: while (iterator.hasNext()) {
171: key = (SelectionKey) iterator.next();
172: if (!key.isValid()) {
173: keepAlivePipeline.untrap(key);
174: continue;
175: }
176:
177: // Keep-alive expired
178: if (key.attachment() != null) {
179:
180: long expire = ((XAReadTask) key.attachment())
181: .getIdleTime();
182: if (current - expire >= getKaTimeout()) {
183: cancelKey(key);
184: } else if (expire + getKaTimeout() < getNextKeysExpiration()) {
185: setNextKeysExpiration(expire + getKaTimeout());
186: }
187: }
188: }
189: }
190:
191: /**
192: * Return a new <code>JettyReadTask</code> instance
193: */
194: protected ReadTask newReadTask() {
195: StreamAlgorithm streamAlgorithm = new JettyStreamAlgorithm();
196: streamAlgorithm.setPort(port);
197:
198: // TODO: For now, hardcode the JettyReadTask
199: ReadTask task = new JettyReadTask();
200: task.initialize(streamAlgorithm, useDirectByteBuffer,
201: useByteBufferView);
202: task.setPipeline(readPipeline);
203: task.setSelectorThread(this );
204: task.setRecycle(recycleTasks);
205:
206: return task;
207: }
208:
209: public ReadTask getReadTask() throws IOException {
210: return getReadTask(null);
211: }
212:
213: public void setGrizzlyConnector(GrizzlyConnector grizzlyConnector) {
214: this .grizzlyConnector = grizzlyConnector;
215: }
216:
217: public GrizzlyConnector getGrizzlyConnector() {
218: return grizzlyConnector;
219: }
220:
221: public void setThreadPool(ThreadPool threadPool) {
222: _threadPool = threadPool;
223: }
224:
225: public boolean isUseTemporarySelector() {
226: return useTemporarySelector;
227: }
228:
229: public void setUseTemporarySelector(boolean useTemporarySelector) {
230: this.useTemporarySelector = useTemporarySelector;
231: }
232:
233: }
|