001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
004: * Science And Control (INRIA).
005: * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
006: * Contact: sequoia@continuent.org
007: *
008: * Licensed under the Apache License, Version 2.0 (the "License");
009: * you may not use this file except in compliance with the License.
010: * You may obtain a copy of the License at
011: *
012: * http://www.apache.org/licenses/LICENSE-2.0
013: *
014: * Unless required by applicable law or agreed to in writing, software
015: * distributed under the License is distributed on an "AS IS" BASIS,
016: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017: * See the License for the specific language governing permissions and
018: * limitations under the License.
019: *
020: * Initial developer(s): Emmanuel Cecchet.
021: * Contributor(s): Duncan Smith.
022: */package org.continuent.sequoia.controller.core;
023:
024: import java.io.IOException;
025: import java.net.InetAddress;
026: import java.net.ServerSocket;
027: import java.net.Socket;
028: import java.util.ArrayList;
029:
030: import javax.net.ServerSocketFactory;
031:
032: import org.continuent.sequoia.common.i18n.Translate;
033: import org.continuent.sequoia.common.log.Trace;
034: import org.continuent.sequoia.common.net.SSLException;
035: import org.continuent.sequoia.common.net.SocketFactoryFactory;
036: import org.continuent.sequoia.common.stream.DriverBufferedOutputStream;
037:
038: /**
039: * A <code>ControllerServerThread</code> listens for Sequoia driver
040: * connections. It accepts the connection and give them to
041: * <code>ControllerWorkerThreads</code>.
042: *
043: * @see org.continuent.sequoia.controller.core.ControllerWorkerThread
044: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
045: * @author <a href="mailto:duncan@mightybot.com">Duncan Smith </a>
046: * @version 1.0
047: */
048: public class ControllerServerThread extends Thread {
049: private ServerSocket serverSocket;
050: private boolean isShuttingDown = false;
051: protected Controller controller;
052: /** Thread that responds to client pings */
053: private PingResponder pingResponder;
054: /** Pending queue of client (driver) socket connections */
055: protected ArrayList controllerServerThreadPendingQueue = new ArrayList();
056: /**
057: * Number of idle <code>ControllerWorkerThread</code>. Access to this
058: * variable must be synchronized using pendingQueue.
059: */
060: protected int idleWorkerThreads = 0;
061:
062: /** Logger instance. */
063: static Trace logger = Trace
064: .getLogger("org.continuent.sequoia.controller.core.Controller");
065:
066: static Trace endUserLogger = Trace
067: .getLogger("org.continuent.sequoia.enduser");
068:
069: /**
070: * Creates a new ControllerServerThread that listens on the given port.
071: *
072: * @param controller The controller which created this thread.
073: */
074: public ControllerServerThread(Controller controller) {
075: super ("ControllerServerThread");
076: this .controller = controller;
077:
078: try {
079: InetAddress bindAddress = null;
080:
081: // Determine if a specific IP address has been requested.
082: /**
083: * @see org.continuent.sequoia.controller.xml.ControllerParser#configureController(Attributes)
084: * for how controller's IPAddress is set by default.
085: */
086: if (!controller.getIPAddress().equals(
087: InetAddress.getLocalHost().getHostAddress())) {
088: // Non-default value: an IP has been specified that is not localhost...
089: // If the user has *asked for* getLocalHost().getHostAddress(), bad luck
090: // we lose.
091: bindAddress = InetAddress.getByName(controller
092: .getIPAddress());
093: }
094: // else, default value: no specific IP was requested or was left as the
095: // default. Create a basic local socket by specifying null bind address
096:
097: // Build an InetAddress by passing the requested IP address to the
098: // InetAddress class constructor. This will validate the sanity of the
099: // IP by either accepting the requested value or throwing a
100: // BindException.
101:
102: if (controller.isSecurityEnabled()
103: && controller.getSecurity().isSSLEnabled()) {
104: ServerSocketFactory sslFact = SocketFactoryFactory
105: .createServerFactory(controller.getSecurity()
106: .getSslConfig());
107: serverSocket = sslFact.createServerSocket(controller
108: .getPortNumber(), controller.getBacklogSize(),
109: bindAddress);
110: } else {
111: serverSocket = new ServerSocket(controller
112: .getPortNumber(), controller.getBacklogSize(),
113: bindAddress);
114: }
115: } catch (java.net.BindException e) { // Thrown if an illegal IP address was specified.
116: String msg = Translate.get(
117: "controller.server.thread.illegal.ip",
118: new String[] { controller.getIPAddress(),
119: e.getMessage() });
120: logger.fatal(msg);
121: endUserLogger.fatal(msg);
122: controller.endOfController(e);
123: } catch (IOException e) {
124: String msg = Translate.get(
125: "controller.server.thread.socket.failed",
126: new String[] {
127: String.valueOf(controller.getPortNumber()),
128: e.getMessage() });
129: if (logger.isDebugEnabled())
130: logger.fatal(msg, e);
131: else
132: logger.fatal(msg);
133: endUserLogger.fatal(msg);
134: controller.endOfController(e);
135: } catch (SSLException e) {
136: String msg = Translate.get(
137: "controller.server.thread.socket.failed",
138: new String[] {
139: String.valueOf(controller.getPortNumber()),
140: e.getMessage() });
141: if (logger.isDebugEnabled())
142: logger.fatal(msg, e);
143: else
144: logger.fatal(msg);
145: endUserLogger.fatal(msg);
146: controller.endOfController(e);
147: }
148:
149: // Create the UDP ping responder with the same port
150: try {
151: pingResponder = new PingResponder(controller
152: .getPortNumber());
153: // no special clean-up to be done, let the jvm kill the thread at exit
154: pingResponder.setDaemon(true);
155: } catch (IOException e) {
156: pingResponder = null;
157: if (logger.isErrorEnabled()) {
158: logger.error(Translate.get(
159: "controller.ping.responder.creation.failed", e
160: .getLocalizedMessage()));
161: }
162: controller.endOfController(e);
163: }
164:
165: if (logger.isInfoEnabled()) {
166: logger
167: .info(Translate
168: .get(
169: "controller.server.thread.waiting.connections",
170: new String[] {
171: serverSocket
172: .getInetAddress()
173: .getHostAddress(),
174: String.valueOf(serverSocket
175: .getLocalPort()) }));
176: logger.debug(Translate.get(
177: "controller.server.thread.backlog.size", ""
178: + controller.getBacklogSize()));
179: }
180: }
181:
182: /**
183: * Accepts connections from drivers, read the virtual database name and
184: * returns the connection point.
185: */
186: public void run() {
187: if (controller == null) {
188: logger.error(Translate
189: .get("controller.server.thread.controller.null"));
190: isShuttingDown = true;
191: }
192: // Start the ping responder
193: if (pingResponder != null)
194: pingResponder.start();
195:
196: // Start processing connections
197: Socket clientSocket = null;
198: while (!isShuttingDown) {
199: try { // Accept a connection
200: clientSocket = serverSocket.accept();
201: if (isShuttingDown)
202: break;
203: if (controller.isSecurityEnabled()
204: && !controller.getSecurity().allowConnection(
205: clientSocket)) {
206: String errmsg = Translate
207: .get(
208: "controller.server.thread.connection.refused",
209: clientSocket.getInetAddress()
210: .getHostName());
211: logger.warn(errmsg);
212: DriverBufferedOutputStream out = new DriverBufferedOutputStream(
213: clientSocket);
214: out.writeBoolean(false);
215: out.writeLongUTF(errmsg);
216: out.flush(); // FIXME: should we .close() instead ?
217: clientSocket = null;
218: continue;
219: } else {
220: if (logger.isDebugEnabled())
221: logger
222: .debug(Translate
223: .get(
224: "controller.server.thread.connection.accept",
225: clientSocket
226: .getInetAddress()
227: .getHostName()));
228: }
229: boolean createThread = false;
230: if (isShuttingDown)
231: break;
232: synchronized (controllerServerThreadPendingQueue) {
233: // Add the connection to the queue
234: controllerServerThreadPendingQueue
235: .add(clientSocket);
236: // Check if we need to create a new thread or just wake up an
237: // existing one
238: if (idleWorkerThreads == 0)
239: createThread = true;
240: else
241: // Here we notify all threads else if one thread doesn't wake up
242: // after the first notify() we will send a second notify() and
243: // one signal will be lost. So the safe way is to wake up everybody
244: // and that worker threads go back to sleep if there is no job.
245: controllerServerThreadPendingQueue.notifyAll();
246: }
247: if (createThread) { // Start a new worker thread if needed
248: ControllerWorkerThread thread = new ControllerWorkerThread(
249: this );
250: thread.start();
251: if (logger.isDebugEnabled())
252: logger
253: .debug(Translate
254: .get("controller.server.thread.starting"));
255: }
256: } catch (IOException e) {
257: if (!isShuttingDown) {
258: logger
259: .warn(
260: Translate
261: .get(
262: "controller.server.thread.new.connection.error",
263: e), e);
264: }
265: }
266: }
267: if (logger.isInfoEnabled())
268: logger.info(Translate
269: .get("controller.server.thread.terminating"));
270: }
271:
272: /**
273: * Refuse new connection to clients and finish transaction
274: */
275: public void shutdown() {
276: isShuttingDown = true;
277: // Shutting down server thread
278: try {
279: serverSocket.close();
280: } catch (Exception e) {
281: logger
282: .warn(
283: Translate
284: .get("controller.shutdown.server.socket.exception"),
285: e);
286: }
287: /*
288: * Close pending connections (not yet served by any ControllerWorkerThread)
289: * and wake up idle ControllerWorkerThreads.
290: */
291: Object lock = controllerServerThreadPendingQueue;
292: synchronized (lock) {
293: // close pending connections
294: int nbSockets = controllerServerThreadPendingQueue.size();
295: Socket socket = null;
296: for (int i = 0; i < nbSockets; i++) {
297: socket = (Socket) controllerServerThreadPendingQueue
298: .get(i);
299: logger.info(Translate.get(
300: "controller.shutdown.client.socket", socket
301: .getInetAddress().toString()));
302:
303: try {
304: socket.close();
305: } catch (Exception e) {
306: logger
307: .warn(
308: Translate
309: .get("controller.shutdown.client.socket.exception"),
310: e);
311: }
312: }
313:
314: // wake up idle ControllerWorkerThreads,
315: // asking them to die (controllerServerThreadPendingQueue=null)
316: this .controllerServerThreadPendingQueue = null;
317: lock.notifyAll();
318: }
319: }
320:
321: /**
322: * @return Returns the controllerServerThreadPendingQueue size.
323: */
324: public int getControllerServerThreadPendingQueueSize() {
325: synchronized (controllerServerThreadPendingQueue) {
326: return controllerServerThreadPendingQueue.size();
327: }
328: }
329:
330: /**
331: * @return Returns the idleWorkerThreads.
332: */
333: public int getIdleWorkerThreads() {
334: synchronized (controllerServerThreadPendingQueue) {
335: return idleWorkerThreads;
336: }
337: }
338:
339: /**
340: * Returns the isShuttingDown value.
341: *
342: * @return Returns the isShuttingDown.
343: */
344: public boolean isShuttingDown() {
345: return isShuttingDown;
346: }
347: }
|