001: /*
002: * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
003: *
004: * Copyright 1997-2007 Sun Microsystems, Inc. All rights reserved.
005: *
006: * The contents of this file are subject to the terms of either the GNU
007: * General Public License Version 2 only ("GPL") or the Common Development
008: * and Distribution License("CDDL") (collectively, the "License"). You
009: * may not use this file except in compliance with the License. You can obtain
010: * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
011: * or glassfish/bootstrap/legal/LICENSE.txt. See the License for the specific
012: * language governing permissions and limitations under the License.
013: *
014: * When distributing the software, include this License Header Notice in each
015: * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
016: * Sun designates this particular file as subject to the "Classpath" exception
017: * as provided by Sun in the GPL Version 2 section of the License file that
018: * accompanied this code. If applicable, add the following below the License
019: * Header, with the fields enclosed by brackets [] replaced by your own
020: * identifying information: "Portions Copyrighted [year]
021: * [name of copyright owner]"
022: *
023: * Contributor(s):
024: *
025: * If you wish your version of this file to be governed by only the CDDL or
026: * only the GPL Version 2, indicate your decision by adding "[Contributor]
027: * elects to include this software in this distribution under the [CDDL or GPL
028: * Version 2] license." If you don't indicate a single choice of license, a
029: * recipient has the option to distribute your version of this file under
030: * either the CDDL, the GPL Version 2 or to extend the choice of license to
031: * its licensees as provided above. However, if you add GPL Version 2 code
032: * and therefore, elected the GPL Version 2 license, then the option applies
033: * only if the new code is made subject to such option by the copyright
034: * holder.
035: */
036:
037: package com.sun.xml.ws.transport.tcp.server;
038:
039: import com.sun.istack.NotNull;
040: import com.sun.istack.Nullable;
041: import com.sun.xml.ws.transport.tcp.io.Connection;
042: import com.sun.xml.ws.transport.tcp.util.ChannelSettings;
043: import com.sun.xml.ws.transport.tcp.util.SessionCloseListener;
044: import com.sun.xml.ws.transport.tcp.resources.MessagesMessages;
045: import com.sun.xml.ws.transport.tcp.util.ChannelContext;
046: import com.sun.xml.ws.transport.tcp.util.ConnectionSession;
047: import com.sun.xml.ws.transport.tcp.util.TCPConstants;
048: import com.sun.xml.ws.transport.tcp.util.Version;
049: import com.sun.xml.ws.transport.tcp.util.VersionController;
050: import com.sun.xml.ws.transport.tcp.io.DataInOutUtils;
051: import com.sun.xml.ws.transport.tcp.util.WSTCPError;
052: import com.sun.xml.ws.transport.tcp.wsit.ConnectionManagementSettings;
053: import com.sun.xml.ws.transport.tcp.connectioncache.spi.transport.InboundConnectionCache;
054: import com.sun.xml.ws.transport.tcp.connectioncache.spi.transport.ConnectionCacheFactory;
055: import java.io.IOException;
056: import java.io.InputStream;
057: import java.io.OutputStream;
058: import java.nio.ByteBuffer;
059: import java.nio.channels.SocketChannel;
060: import java.util.Collections;
061: import java.util.HashMap;
062: import java.util.Map;
063: import java.util.Properties;
064: import java.util.WeakHashMap;
065: import java.util.logging.Level;
066: import java.util.logging.Logger;
067:
068: /**
069: * @author Alexey Stashok
070: */
071: @SuppressWarnings({"unchecked"})
072: public final class IncomeMessageProcessor implements
073: SessionCloseListener {
074: private static final int HIGH_WATER_MARK = 1500;
075: private static final int NUMBER_TO_RECLAIM = 1;
076:
077: private static final Logger logger = Logger
078: .getLogger(com.sun.xml.ws.transport.tcp.util.TCPConstants.LoggingDomain
079: + ".server");
080:
081: private final TCPMessageListener listener;
082:
083: // Properties passed to IncomeMessageProcessor by SOAP/TCP launcher
084: private final Properties properties;
085:
086: // Cache for inbound connections (orb). Initialized on first SOAP/TCP request
087: private volatile InboundConnectionCache<ServerConnectionSession> connectionCache;
088:
089: private static Map<Integer, IncomeMessageProcessor> portMessageProcessors = new HashMap<Integer, IncomeMessageProcessor>(
090: 1);
091:
092: public static IncomeMessageProcessor registerListener(
093: final int port, @NotNull
094: final TCPMessageListener listener, @NotNull
095: final Properties properties) {
096: IncomeMessageProcessor processor = new IncomeMessageProcessor(
097: listener, properties);
098: portMessageProcessors.put(port, processor);
099: return processor;
100: }
101:
102: public static void releaseListener(final int port) {
103: portMessageProcessors.remove(port);
104: }
105:
106: public static @Nullable
107: IncomeMessageProcessor getMessageProcessorForPort(final int port) {
108: return portMessageProcessors.get(port);
109: }
110:
111: private IncomeMessageProcessor(final @NotNull
112: TCPMessageListener listener) {
113: this (listener, null);
114: }
115:
116: private IncomeMessageProcessor(final @NotNull
117: TCPMessageListener listener, final @Nullable
118: Properties properties) {
119: this .listener = listener;
120: this .properties = properties;
121: }
122:
123: public void process(@NotNull
124: final ByteBuffer messageBuffer, @NotNull
125: final SocketChannel socketChannel) throws IOException {
126: // get TCPConnectionSession associated with SocketChannel
127: if (logger.isLoggable(Level.FINE)) {
128: logger.log(Level.FINE, MessagesMessages
129: .WSTCP_1080_INCOME_MSG_PROC_ENTER(Connection
130: .getHost(socketChannel), Connection
131: .getPort(socketChannel)));
132: }
133:
134: if (connectionCache == null) {
135: setupInboundConnectionCache();
136: }
137:
138: ServerConnectionSession connectionSession = getConnectionSession(socketChannel); //@TODO take it from nio framework?
139:
140: if (connectionSession == null) {
141: // First message on connection
142: if (logger.isLoggable(Level.FINE)) {
143: logger.log(Level.FINE, MessagesMessages
144: .WSTCP_1081_INCOME_MSG_CREATE_NEW_SESSION());
145: }
146: connectionSession = createConnectionSession(socketChannel,
147: messageBuffer);
148: if (connectionSession != null) {
149: // Connection is opened. Magic and version are compatible
150: connectionCache.requestReceived(connectionSession);
151: connectionCache.responseSent(connectionSession);
152: offerConnectionSession(connectionSession);
153: } else {
154: // Client's version is not supported
155: logger.log(Level.WARNING, MessagesMessages
156: .WSTCP_0006_VERSION_MISMATCH());
157: }
158: return;
159: }
160:
161: final Connection connection = connectionSession.getConnection();
162: connection.setInputStreamByteBuffer(messageBuffer);
163: connectionCache.requestReceived(connectionSession);
164:
165: try {
166: do {
167: connection.prepareForReading(); // Reading headers
168:
169: final int channelId = connection.getChannelId();
170: final ChannelContext channelContext = connectionSession
171: .findWSServiceContextByChannelId(channelId);
172:
173: if (channelContext != null) {
174: listener.onMessage(channelContext);
175: } else {
176: // Create fake channel context for received channel-id and session
177: ChannelContext fakeChannelContext = createFakeChannelContext(
178: channelId, connectionSession);
179: // Notify error on channel context
180: listener
181: .onError(
182: fakeChannelContext,
183: WSTCPError
184: .createNonCriticalError(
185: TCPConstants.UNKNOWN_CHANNEL_ID,
186: MessagesMessages
187: .WSTCP_0026_UNKNOWN_CHANNEL_ID(channelId)));
188: }
189: } while (messageBuffer.hasRemaining());
190: } finally {
191: offerConnectionSession(connectionSession);
192: connectionCache.responseSent(connectionSession);
193: }
194: }
195:
196: /**
197: * associative map for SocketChannel and correspondent ConnectionContext
198: * in future probably should be replaced, as could be handled by
199: * nio framework
200: */
201: private final Map<SocketChannel, ServerConnectionSession> connectionSessionMap = new WeakHashMap<SocketChannel, ServerConnectionSession>();
202:
203: private @Nullable
204: ServerConnectionSession getConnectionSession(@NotNull
205: final SocketChannel socketChannel) {
206:
207: final ServerConnectionSession connectionSession = connectionSessionMap
208: .get(socketChannel);
209: if (connectionSession == null) {
210: return null;
211: }
212:
213: // Restore socketChannel in connection
214: connectionSession.getConnection().setSocketChannel(
215: socketChannel);
216: return connectionSession;
217: }
218:
219: private void offerConnectionSession(@NotNull
220: final ServerConnectionSession connectionSession) {
221: connectionSessionMap.put(connectionSession.getConnection()
222: .getSocketChannel(), connectionSession);
223:
224: // to let WeakHashMap clean socketChannel if not use
225: connectionSession.getConnection().setSocketChannel(null);
226: }
227:
228: /**
229: * Remove session entry from session map
230: */
231: private void removeConnectionSessionBySocketChannel(@NotNull
232: final SocketChannel socketChannel) {
233: connectionSessionMap.remove(socketChannel);
234: }
235:
236: /**
237: * Create new ConnectionSession for just came request, but check
238: * version compatibilities before
239: */
240: private @Nullable
241: ServerConnectionSession createConnectionSession(@NotNull
242: final SocketChannel socketChannel, @NotNull
243: final ByteBuffer messageBuffer) throws IOException {
244:
245: final Connection connection = new Connection(socketChannel);
246: connection.setInputStreamByteBuffer(messageBuffer);
247: if (!checkMagicAndVersionCompatibility(connection)) {
248: connection.close();
249: return null;
250: }
251:
252: return new ServerConnectionSession(connection, this );
253: }
254:
255: private boolean checkMagicAndVersionCompatibility(@NotNull
256: final Connection connection) throws IOException {
257: logger.log(Level.FINE, MessagesMessages
258: .WSTCP_1082_INCOME_MSG_VERSION_CHECK_ENTER());
259:
260: connection.setDirectMode(true);
261: final InputStream inputStream = connection.openInputStream();
262:
263: final byte[] magicBuf = new byte[TCPConstants.PROTOCOL_SCHEMA
264: .length()];
265: DataInOutUtils.readFully(inputStream, magicBuf);
266: final String magic = new String(magicBuf, "US-ASCII");
267: if (!TCPConstants.PROTOCOL_SCHEMA.equals(magic)) {
268: logger.log(Level.WARNING, MessagesMessages
269: .WSTCP_0020_WRONG_MAGIC(magic));
270: return false;
271: }
272:
273: final int[] versionInfo = new int[4];
274:
275: DataInOutUtils.readInts4(inputStream, versionInfo, 4);
276:
277: final Version clientFramingVersion = new Version(
278: versionInfo[0], versionInfo[1]);
279: final Version clientConnectionManagementVersion = new Version(
280: versionInfo[2], versionInfo[3]);
281:
282: final VersionController versionController = VersionController
283: .getInstance();
284:
285: final boolean isSupported = versionController
286: .isVersionSupported(clientFramingVersion,
287: clientConnectionManagementVersion);
288:
289: final OutputStream outputStream = connection.openOutputStream();
290:
291: final Version framingVersion = isSupported ? clientFramingVersion
292: : versionController
293: .getClosestSupportedFramingVersion(clientFramingVersion);
294: final Version connectionManagementVersion = isSupported ? clientConnectionManagementVersion
295: : versionController
296: .getClosestSupportedConnectionManagementVersion(clientConnectionManagementVersion);
297:
298: DataInOutUtils.writeInts4(outputStream, framingVersion
299: .getMajor(), framingVersion.getMinor(),
300: connectionManagementVersion.getMajor(),
301: connectionManagementVersion.getMinor());
302: connection.flush();
303:
304: connection.setDirectMode(false);
305:
306: logger.log(Level.FINE, MessagesMessages
307: .WSTCP_1083_INCOME_MSG_VERSION_CHECK_RESULT(
308: clientFramingVersion,
309: clientConnectionManagementVersion,
310: framingVersion, connectionManagementVersion,
311: isSupported));
312: return isSupported;
313: }
314:
315: /**
316: * Close callback method
317: * Will be called by NIO framework, when it will decide to close connection
318: */
319: public void notifyClosed(@NotNull
320: final SocketChannel socketChannel) {
321: connectionCache.close(getConnectionSession(socketChannel));
322: }
323:
324: /**
325: * Close callback method
326: * Will be called by Connection.close() to let IncomeMessageProcessor
327: * remove the correspondent session from Map
328: */
329: public void notifySessionClose(@NotNull
330: final ConnectionSession connectionSession) {
331: removeConnectionSessionBySocketChannel(connectionSession
332: .getConnection().getSocketChannel());
333: }
334:
335: private synchronized void setupInboundConnectionCache() {
336: if (connectionCache == null) {
337: int highWatermark = HIGH_WATER_MARK;
338: int numberToReclaim = NUMBER_TO_RECLAIM;
339:
340: ConnectionManagementSettings policySettings = ConnectionManagementSettings
341: .getServerSettingsInstance();
342: if (policySettings != null) {
343: highWatermark = policySettings
344: .getHighWatermark(HIGH_WATER_MARK);
345: numberToReclaim = policySettings
346: .getNumberToReclaim(NUMBER_TO_RECLAIM);
347: } else if (properties != null) {
348: String highWaterMarkStr = properties.getProperty(
349: TCPConstants.HIGH_WATER_MARK, Integer
350: .toString(HIGH_WATER_MARK));
351: String numberToReclaimStr = properties.getProperty(
352: TCPConstants.NUMBER_TO_RECLAIM, Integer
353: .toString(NUMBER_TO_RECLAIM));
354: highWatermark = Integer.parseInt(highWaterMarkStr);
355: numberToReclaim = Integer.parseInt(numberToReclaimStr);
356: }
357:
358: connectionCache = ConnectionCacheFactory
359: .<ServerConnectionSession> makeBlockingInboundConnectionCache(
360: "SOAP/TCP server side cache",
361: highWatermark, numberToReclaim, logger);
362: }
363: }
364:
365: /**
366: * Method creates fake channel context for defined channel-id and ConnectionSession
367: * Normally channel context should be created only by Connection Management service
368: */
369: private ChannelContext createFakeChannelContext(int channelId,
370: @NotNull
371: ConnectionSession connectionSession) {
372: return new ChannelContext(connectionSession,
373: new ChannelSettings(Collections.<String> emptyList(),
374: Collections.<String> emptyList(), channelId,
375: null, null));
376: }
377: }
|