001: //========================================================================
002: // Parts Copyright 2006 Mort Bay Consulting Pty. Ltd.
003: //------------------------------------------------------------------------
004: // Licensed under the Apache License, Version 2.0 (the "License");
005: // you may not use this file except in compliance with the License.
006: // You may obtain a copy of the License at
007: // http://www.apache.org/licenses/LICENSE-2.0
008: // Unless required by applicable law or agreed to in writing, software
009: // distributed under the License is distributed on an "AS IS" BASIS,
010: // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
011: // See the License for the specific language governing permissions and
012: // limitations under the License.
013: //========================================================================
014:
015: package org.mortbay.jetty.grizzly;
016:
017: import com.sun.enterprise.web.connector.grizzly.MultiSelectorThread;
018: import com.sun.enterprise.web.connector.grizzly.ReadTask;
019: import java.io.IOException;
020: import java.util.ArrayList;
021: import java.util.logging.Level;
022: import java.nio.channels.ClosedChannelException;
023: import java.nio.channels.Selector;
024: import java.nio.channels.SelectionKey;
025: import java.nio.channels.SocketChannel;
026:
027: /**
028: * Specialized <code>SelectorThread</code> that only handle OP_READ.
029: *
030: * @author Jeanfrancois Arcand
031: */
032: public class JettyMultiSelectorThread extends JettySelectorThread
033: implements MultiSelectorThread {
034:
035: /**
036: * List of <code>Channel<code> to process.
037: */
038: ArrayList channels = new ArrayList();
039:
040: /**
041: * Int used to differenciate thsi instance
042: */
043: public static int countName;
044:
045: /**
046: * Add a <code>Channel</code> to be processed by this
047: * <code>Selector</code>
048: */
049: public synchronized void addChannel(SocketChannel channel)
050: throws IOException, ClosedChannelException {
051: channels.add(channel);
052: getSelector().wakeup();
053: }
054:
055: /**
056: * Register all <code>Channel</code> with an OP_READ opeation.
057: */
058: private synchronized void registerNewChannels() throws IOException {
059: int size = channels.size();
060: for (int i = 0; i < size; i++) {
061: SocketChannel sc = (SocketChannel) channels.get(i);
062: sc.configureBlocking(false);
063: try {
064: SelectionKey readKey = sc.register(getSelector(),
065: SelectionKey.OP_READ);
066: setSocketOptions(((SocketChannel) readKey.channel())
067: .socket());
068: } catch (ClosedChannelException cce) {
069: }
070: }
071: channels.clear();
072: }
073:
074: /**
075: * Initialize this <code>SelectorThread</code>
076: */
077: public void initEndpoint() throws IOException,
078: InstantiationException {
079: setName("JettyMultiSelectorThread-" + getPort());
080: initAlgorithm();
081: }
082:
083: /**
084: * Start and wait for incoming connection
085: */
086: public void startEndpoint() throws IOException,
087: InstantiationException {
088: setRunning(true);
089: while (isRunning()) {
090: try {
091: if (getSelector() == null) {
092: setSelector(Selector.open());
093: }
094:
095: registerNewChannels();
096: doSelect();
097: } catch (Throwable t) {
098: logger.log(Level.FINE, "selectorThread.errorOnRequest",
099: t);
100: }
101: }
102: }
103:
104: /**
105: * Return a <code>ReadTask</code> configured to use this instance.
106: */
107: public ReadTask getReadTask(SelectionKey key) throws IOException {
108: ReadTask task = super .getReadTask(key);
109: task.setSelectorThread(this );
110: return task;
111: }
112:
113: /**
114: * Provides the count of request threads that are currently
115: * being processed by the container
116: *
117: * @return Count of requests
118: */
119: public int getCurrentBusyProcessorThreads() {
120: return (getProcessorPipeline().getCurrentThreadsBusy());
121: }
122:
123: }
|