001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.net.core;
006:
007: import com.tc.logging.TCLogger;
008: import com.tc.logging.TCLogging;
009: import com.tc.net.TCSocketAddress;
010: import com.tc.net.core.event.TCConnectionErrorEvent;
011: import com.tc.net.core.event.TCConnectionEvent;
012: import com.tc.net.core.event.TCConnectionEventListener;
013: import com.tc.net.core.event.TCListenerEvent;
014: import com.tc.net.core.event.TCListenerEventListener;
015: import com.tc.net.protocol.ProtocolAdaptorFactory;
016: import com.tc.net.protocol.TCProtocolAdaptor;
017: import com.tc.util.concurrent.SetOnceFlag;
018:
019: import java.io.IOException;
020: import java.net.InetSocketAddress;
021: import java.net.ServerSocket;
022: import java.nio.channels.ServerSocketChannel;
023: import java.util.HashSet;
024: import java.util.Set;
025:
026: /**
027: * JDK 1.4 implementation of TCConnectionManager interface
028: *
029: * @author teck
030: */
031: public class TCConnectionManagerJDK14 implements TCConnectionManager {
032: protected static final TCConnection[] EMPTY_CONNECTION_ARRAY = new TCConnection[] {};
033: protected static final TCListener[] EMPTY_LISTENER_ARRAY = new TCListener[] {};
034: protected static final TCLogger logger = TCLogging
035: .getLogger(TCConnectionManager.class);
036:
037: protected final TCCommJDK14 comm;
038: private final Set connections = new HashSet();
039: private final Set listeners = new HashSet();
040: private final SetOnceFlag shutdown = new SetOnceFlag();
041: private final ConnectionEvents connEvents;
042: private final ListenerEvents listenerEvents;
043: private final SocketParams socketParams;
044:
045: public TCConnectionManagerJDK14() {
046: this .connEvents = new ConnectionEvents();
047: this .listenerEvents = new ListenerEvents();
048: this .socketParams = new SocketParams();
049: this .comm = new TCCommJDK14(socketParams);
050: this .comm.start();
051: }
052:
053: protected TCConnection createConnectionImpl(
054: TCProtocolAdaptor adaptor,
055: TCConnectionEventListener listener) {
056: return new TCConnectionJDK14(listener, comm, adaptor, this ,
057: socketParams);
058: }
059:
060: protected TCListener createListenerImpl(TCSocketAddress addr,
061: ProtocolAdaptorFactory factory, int backlog,
062: boolean reuseAddr) throws IOException {
063: ServerSocketChannel ssc = ServerSocketChannel.open();
064: ssc.configureBlocking(false);
065: ServerSocket serverSocket = ssc.socket();
066: serverSocket.setReuseAddress(reuseAddr);
067: serverSocket.setReceiveBufferSize(64 * 1024);
068:
069: try {
070: serverSocket.bind(new InetSocketAddress(addr.getAddress(),
071: addr.getPort()), backlog);
072: } catch (IOException ioe) {
073: logger.warn("Unable to bind socket on address "
074: + addr.getAddress() + ", port " + addr.getPort()
075: + ", " + ioe.getMessage());
076: throw ioe;
077: }
078:
079: if (logger.isDebugEnabled()) {
080: logger.debug("Bind: "
081: + serverSocket.getLocalSocketAddress());
082: }
083:
084: TCListenerJDK14 rv = new TCListenerJDK14(ssc, factory, comm,
085: getConnectionListener(), this );
086: comm.requestAcceptInterest(rv, ssc);
087:
088: return rv;
089: }
090:
091: public TCConnection[] getAllConnections() {
092: synchronized (connections) {
093: return (TCConnection[]) connections
094: .toArray(EMPTY_CONNECTION_ARRAY);
095: }
096: }
097:
098: public TCListener[] getAllListeners() {
099: synchronized (listeners) {
100: return (TCListener[]) listeners
101: .toArray(EMPTY_LISTENER_ARRAY);
102: }
103: }
104:
105: public final synchronized TCListener createListener(
106: TCSocketAddress addr, ProtocolAdaptorFactory factory)
107: throws IOException {
108: return createListener(addr, factory,
109: Constants.DEFAULT_ACCEPT_QUEUE_DEPTH, true);
110: }
111:
112: public final synchronized TCListener createListener(
113: TCSocketAddress addr, ProtocolAdaptorFactory factory,
114: int backlog, boolean reuseAddr) throws IOException {
115: checkShutdown();
116:
117: TCListener rv = createListenerImpl(addr, factory, backlog,
118: reuseAddr);
119: rv.addEventListener(listenerEvents);
120: rv.addEventListener(comm);
121: comm.listenerAdded(rv);
122:
123: synchronized (listeners) {
124: listeners.add(rv);
125: }
126:
127: return rv;
128: }
129:
130: public final synchronized TCConnection createConnection(
131: TCProtocolAdaptor adaptor) {
132: checkShutdown();
133:
134: TCConnection rv = createConnectionImpl(adaptor, connEvents);
135: newConnection(rv);
136:
137: return rv;
138: }
139:
140: public synchronized void closeAllConnections(long timeout) {
141: closeAllConnections(false, timeout);
142: }
143:
144: public synchronized void asynchCloseAllConnections() {
145: closeAllConnections(true, 0);
146: }
147:
148: private void closeAllConnections(boolean async, long timeout) {
149: TCConnection[] conns;
150:
151: synchronized (connections) {
152: conns = (TCConnection[]) connections
153: .toArray(EMPTY_CONNECTION_ARRAY);
154: }
155:
156: for (int i = 0; i < conns.length; i++) {
157: TCConnection conn = conns[i];
158:
159: try {
160: if (async) {
161: conn.asynchClose();
162: } else {
163: conn.close(timeout);
164: }
165: } catch (Exception e) {
166: logger.error("Exception trying to close " + conn, e);
167: }
168: }
169: }
170:
171: public synchronized void closeAllListeners() {
172: TCListener[] list;
173:
174: synchronized (listeners) {
175: list = (TCListener[]) listeners
176: .toArray(EMPTY_LISTENER_ARRAY);
177: }
178:
179: for (int i = 0; i < list.length; i++) {
180: TCListener lsnr = list[i];
181:
182: try {
183: lsnr.stop();
184: } catch (Exception e) {
185: logger.error("Exception trying to close " + lsnr, e);
186: }
187: }
188: }
189:
190: public final synchronized void shutdown() {
191: if (shutdown.attemptSet()) {
192: closeAllListeners();
193: asynchCloseAllConnections();
194: comm.stop();
195: }
196: }
197:
198: void connectionClosed(TCConnection conn) {
199: synchronized (connections) {
200: connections.remove(conn);
201: }
202: }
203:
204: void newConnection(TCConnection conn) {
205: synchronized (connections) {
206: connections.add(conn);
207: }
208: }
209:
210: void removeConnection(TCConnection connection) {
211: synchronized (connections) {
212: connections.remove(connection);
213: }
214: }
215:
216: protected TCConnectionEventListener getConnectionListener() {
217: return connEvents;
218: }
219:
220: private final void checkShutdown() {
221: if (shutdown.isSet()) {
222: throw new IllegalStateException(
223: "connection manager shutdown");
224: }
225: }
226:
227: class ConnectionEvents implements TCConnectionEventListener {
228: public final void connectEvent(TCConnectionEvent event) {
229: if (logger.isDebugEnabled()) {
230: logger.debug("connect event: " + event.toString());
231: }
232: }
233:
234: public final void closeEvent(TCConnectionEvent event) {
235: if (logger.isDebugEnabled()) {
236: logger.debug("close event: " + event.toString());
237: }
238: }
239:
240: public final void errorEvent(TCConnectionErrorEvent event) {
241: try {
242: final Throwable err = event.getException();
243:
244: if (err != null) {
245: if (err instanceof IOException) {
246: if (logger.isInfoEnabled()) {
247: logger.info("error event on connection "
248: + event.getSource() + ": "
249: + err.getMessage());
250: }
251: } else {
252: logger.error(err);
253: }
254: }
255: } finally {
256: event.getSource().asynchClose();
257: }
258: }
259:
260: public final void endOfFileEvent(TCConnectionEvent event) {
261: if (logger.isDebugEnabled()) {
262: logger.debug("EOF event: " + event.toString());
263: }
264:
265: event.getSource().asynchClose();
266: }
267: }
268:
269: class ListenerEvents implements TCListenerEventListener {
270: public void closeEvent(TCListenerEvent event) {
271: synchronized (listeners) {
272: listeners.remove(event.getSource());
273: }
274: }
275: }
276:
277: }
|