001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017:
018: package org.apache.jk.common;
019:
020: import java.io.BufferedInputStream;
021: import java.io.BufferedOutputStream;
022: import java.io.IOException;
023: import java.io.InputStream;
024: import java.io.OutputStream;
025: import java.net.URLEncoder;
026: import java.net.InetAddress;
027: import java.net.ServerSocket;
028: import java.net.Socket;
029: import java.net.SocketException;
030:
031: import javax.management.ListenerNotFoundException;
032: import javax.management.MBeanNotificationInfo;
033: import javax.management.Notification;
034: import javax.management.NotificationBroadcaster;
035: import javax.management.NotificationBroadcasterSupport;
036: import javax.management.NotificationFilter;
037: import javax.management.NotificationListener;
038: import javax.management.ObjectName;
039:
040: import org.apache.jk.core.JkHandler;
041: import org.apache.jk.core.Msg;
042: import org.apache.jk.core.MsgContext;
043: import org.apache.jk.core.JkChannel;
044: import org.apache.jk.core.WorkerEnv;
045: import org.apache.coyote.Request;
046: import org.apache.coyote.RequestGroupInfo;
047: import org.apache.coyote.RequestInfo;
048: import org.apache.tomcat.util.modeler.Registry;
049: import org.apache.tomcat.util.threads.ThreadPool;
050: import org.apache.tomcat.util.threads.ThreadPoolRunnable;
051:
052: /**
053: * Accept ( and send ) TCP messages.
054: *
055: * @author Costin Manolache
056: * @author Bill Barker
057: * jmx:mbean name="jk:service=ChannelNioSocket"
058: * description="Accept socket connections"
059: * jmx:notification name="org.apache.coyote.INVOKE
060: * jmx:notification-handler name="org.apache.jk.JK_SEND_PACKET
061: * jmx:notification-handler name="org.apache.jk.JK_RECEIVE_PACKET
062: * jmx:notification-handler name="org.apache.jk.JK_FLUSH
063: *
064: * Jk can use multiple protocols/transports.
065: * Various container adapters should load this object ( as a bean ),
066: * set configurations and use it. Note that the connector will handle
067: * all incoming protocols - it's not specific to ajp1x. The protocol
068: * is abstracted by MsgContext/Message/Channel.
069: *
070: * A lot of the 'original' behavior is hardcoded - this uses Ajp13 wire protocol,
071: * TCP, Ajp14 API etc.
072: * As we add other protocols/transports/APIs this will change, the current goal
073: * is to get the same level of functionality as in the original jk connector.
074: *
075: * XXX Make the 'message type' pluggable
076: */
077: public class ChannelSocket extends JkHandler implements
078: NotificationBroadcaster, JkChannel {
079: private static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory
080: .getLog(ChannelSocket.class);
081:
082: private int startPort = 8009;
083: private int maxPort = 8019; // 0 for backward compat.
084: private int port = startPort;
085: private InetAddress inet;
086: private int serverTimeout;
087: private boolean tcpNoDelay = true; // nodelay to true by default
088: private int linger = 100;
089: private int socketTimeout;
090: private int bufferSize = -1;
091: private int packetSize = 8 * 1024;
092:
093: private long requestCount = 0;
094:
095: ThreadPool tp = ThreadPool.createThreadPool(true);
096:
097: /* ==================== Tcp socket options ==================== */
098:
099: /**
100: * jmx:managed-constructor description="default constructor"
101: */
102: public ChannelSocket() {
103: // This should be integrated with the domain setup
104: }
105:
106: public ThreadPool getThreadPool() {
107: return tp;
108: }
109:
110: public long getRequestCount() {
111: return requestCount;
112: }
113:
114: /** Set the port for the ajp13 channel.
115: * To support seemless load balancing and jni, we treat this
116: * as the 'base' port - we'll try up until we find one that is not
117: * used. We'll also provide the 'difference' to the main coyote
118: * handler - that will be our 'sessionID' and the position in
119: * the scoreboard and the suffix for the unix domain socket.
120: *
121: * jmx:managed-attribute description="Port to listen" access="READ_WRITE"
122: */
123: public void setPort(int port) {
124: this .startPort = port;
125: this .port = port;
126: this .maxPort = port + 10;
127: }
128:
129: public int getPort() {
130: return port;
131: }
132:
133: public void setAddress(InetAddress inet) {
134: this .inet = inet;
135: }
136:
137: /**
138: * jmx:managed-attribute description="Bind on a specified address" access="READ_WRITE"
139: */
140: public void setAddress(String inet) {
141: try {
142: this .inet = InetAddress.getByName(inet);
143: } catch (Exception ex) {
144: log.error("Error parsing " + inet, ex);
145: }
146: }
147:
148: public String getAddress() {
149: if (inet != null)
150: return inet.toString();
151: return "/0.0.0.0";
152: }
153:
154: /**
155: * Sets the timeout in ms of the server sockets created by this
156: * server. This method allows the developer to make servers
157: * more or less responsive to having their server sockets
158: * shut down.
159: *
160: * <p>By default this value is 1000ms.
161: */
162: public void setServerTimeout(int timeout) {
163: this .serverTimeout = timeout;
164: }
165:
166: public int getServerTimeout() {
167: return serverTimeout;
168: }
169:
170: public void setTcpNoDelay(boolean b) {
171: tcpNoDelay = b;
172: }
173:
174: public boolean getTcpNoDelay() {
175: return tcpNoDelay;
176: }
177:
178: public void setSoLinger(int i) {
179: linger = i;
180: }
181:
182: public int getSoLinger() {
183: return linger;
184: }
185:
186: public void setSoTimeout(int i) {
187: socketTimeout = i;
188: }
189:
190: public int getSoTimeout() {
191: return socketTimeout;
192: }
193:
194: public void setMaxPort(int i) {
195: maxPort = i;
196: }
197:
198: public int getMaxPort() {
199: return maxPort;
200: }
201:
202: public void setBufferSize(int bs) {
203: bufferSize = bs;
204: }
205:
206: public int getBufferSize() {
207: return bufferSize;
208: }
209:
210: public void setPacketSize(int ps) {
211: if (ps < 8 * 1024) {
212: ps = 8 * 1024;
213: }
214: packetSize = ps;
215: }
216:
217: public int getPacketSize() {
218: return packetSize;
219: }
220:
221: /** At startup we'll look for the first free port in the range.
222: The difference between this port and the beggining of the range
223: is the 'id'.
224: This is usefull for lb cases ( less config ).
225: */
226: public int getInstanceId() {
227: return port - startPort;
228: }
229:
230: /** If set to false, the thread pool will be created in
231: * non-daemon mode, and will prevent main from exiting
232: */
233: public void setDaemon(boolean b) {
234: tp.setDaemon(b);
235: }
236:
237: public boolean getDaemon() {
238: return tp.getDaemon();
239: }
240:
241: public void setMaxThreads(int i) {
242: if (log.isDebugEnabled())
243: log.debug("Setting maxThreads " + i);
244: tp.setMaxThreads(i);
245: }
246:
247: public void setMinSpareThreads(int i) {
248: if (log.isDebugEnabled())
249: log.debug("Setting minSpareThreads " + i);
250: tp.setMinSpareThreads(i);
251: }
252:
253: public void setMaxSpareThreads(int i) {
254: if (log.isDebugEnabled())
255: log.debug("Setting maxSpareThreads " + i);
256: tp.setMaxSpareThreads(i);
257: }
258:
259: public int getMaxThreads() {
260: return tp.getMaxThreads();
261: }
262:
263: public int getMinSpareThreads() {
264: return tp.getMinSpareThreads();
265: }
266:
267: public int getMaxSpareThreads() {
268: return tp.getMaxSpareThreads();
269: }
270:
271: public void setBacklog(int i) {
272: }
273:
274: /* ==================== ==================== */
275: ServerSocket sSocket;
276: final int socketNote = 1;
277: final int isNote = 2;
278: final int osNote = 3;
279: final int notifNote = 4;
280: boolean paused = false;
281:
282: public void pause() throws Exception {
283: synchronized (this ) {
284: paused = true;
285: unLockSocket();
286: }
287: }
288:
289: public void resume() throws Exception {
290: synchronized (this ) {
291: paused = false;
292: notify();
293: }
294: }
295:
296: public void accept(MsgContext ep) throws IOException {
297: if (sSocket == null)
298: return;
299: synchronized (this ) {
300: while (paused) {
301: try {
302: wait();
303: } catch (InterruptedException ie) {
304: //Ignore, since can't happen
305: }
306: }
307: }
308: Socket s = sSocket.accept();
309: ep.setNote(socketNote, s);
310: if (log.isDebugEnabled())
311: log.debug("Accepted socket " + s);
312:
313: try {
314: setSocketOptions(s);
315: } catch (SocketException sex) {
316: log.debug("Error initializing Socket Options", sex);
317: }
318:
319: requestCount++;
320:
321: InputStream is = new BufferedInputStream(s.getInputStream());
322: OutputStream os;
323: if (bufferSize > 0)
324: os = new BufferedOutputStream(s.getOutputStream(),
325: bufferSize);
326: else
327: os = s.getOutputStream();
328: ep.setNote(isNote, is);
329: ep.setNote(osNote, os);
330: ep.setControl(tp);
331: }
332:
333: private void setSocketOptions(Socket s) throws SocketException {
334: if (socketTimeout > 0)
335: s.setSoTimeout(socketTimeout);
336:
337: s.setTcpNoDelay(tcpNoDelay); // set socket tcpnodelay state
338:
339: if (linger > 0)
340: s.setSoLinger(true, linger);
341: }
342:
343: public void resetCounters() {
344: requestCount = 0;
345: }
346:
347: /** Called after you change some fields at runtime using jmx.
348: Experimental for now.
349: */
350: public void reinit() throws IOException {
351: destroy();
352: init();
353: }
354:
355: /**
356: * jmx:managed-operation
357: */
358: public void init() throws IOException {
359: // Find a port.
360: if (startPort == 0) {
361: port = 0;
362: if (log.isInfoEnabled())
363: log.info("JK: ajp13 disabling channelSocket");
364: running = true;
365: return;
366: }
367: if (maxPort < startPort)
368: maxPort = startPort;
369: for (int i = startPort; i <= maxPort; i++) {
370: try {
371: if (inet == null) {
372: sSocket = new ServerSocket(i, 0);
373: } else {
374: sSocket = new ServerSocket(i, 0, inet);
375: }
376: port = i;
377: break;
378: } catch (IOException ex) {
379: if (log.isInfoEnabled())
380: log.info("Port busy " + i + " " + ex.toString());
381: continue;
382: }
383: }
384:
385: if (sSocket == null) {
386: log.error("Can't find free port " + startPort + " "
387: + maxPort);
388: return;
389: }
390: if (log.isInfoEnabled())
391: log.info("JK: ajp13 listening on " + getAddress() + ":"
392: + port);
393:
394: // If this is not the base port and we are the 'main' channleSocket and
395: // SHM didn't already set the localId - we'll set the instance id
396: if ("channelSocket".equals(name) && port != startPort
397: && (wEnv.getLocalId() == 0)) {
398: wEnv.setLocalId(port - startPort);
399: }
400: if (serverTimeout > 0)
401: sSocket.setSoTimeout(serverTimeout);
402:
403: // XXX Reverse it -> this is a notification generator !!
404: if (next == null && wEnv != null) {
405: if (nextName != null)
406: setNext(wEnv.getHandler(nextName));
407: if (next == null)
408: next = wEnv.getHandler("dispatch");
409: if (next == null)
410: next = wEnv.getHandler("request");
411: }
412: JMXRequestNote = wEnv.getNoteId(WorkerEnv.ENDPOINT_NOTE,
413: "requestNote");
414: running = true;
415:
416: // Run a thread that will accept connections.
417: // XXX Try to find a thread first - not sure how...
418: if (this .domain != null) {
419: try {
420: tpOName = new ObjectName(domain
421: + ":type=ThreadPool,name=" + getChannelName());
422:
423: Registry.getRegistry(null, null).registerComponent(tp,
424: tpOName, null);
425:
426: rgOName = new ObjectName(domain
427: + ":type=GlobalRequestProcessor,name="
428: + getChannelName());
429: Registry.getRegistry(null, null).registerComponent(
430: global, rgOName, null);
431: } catch (Exception e) {
432: log.error("Can't register threadpool");
433: }
434: }
435:
436: tp.start();
437: SocketAcceptor acceptAjp = new SocketAcceptor(this );
438: tp.runIt(acceptAjp);
439:
440: }
441:
442: ObjectName tpOName;
443: ObjectName rgOName;
444: RequestGroupInfo global = new RequestGroupInfo();
445: int JMXRequestNote;
446:
447: public void start() throws IOException {
448: if (sSocket == null)
449: init();
450: }
451:
452: public void stop() throws IOException {
453: destroy();
454: }
455:
456: public void registerRequest(Request req, MsgContext ep, int count) {
457: if (this .domain != null) {
458: try {
459: RequestInfo rp = req.getRequestProcessor();
460: rp.setGlobalProcessor(global);
461: ObjectName roname = new ObjectName(getDomain()
462: + ":type=RequestProcessor,worker="
463: + getChannelName() + ",name=JkRequest" + count);
464: ep.setNote(JMXRequestNote, roname);
465:
466: Registry.getRegistry(null, null).registerComponent(rp,
467: roname, null);
468: } catch (Exception ex) {
469: log.warn("Error registering request");
470: }
471: }
472: }
473:
474: public void open(MsgContext ep) throws IOException {
475: }
476:
477: public void close(MsgContext ep) throws IOException {
478: Socket s = (Socket) ep.getNote(socketNote);
479: s.close();
480: }
481:
482: private void unLockSocket() throws IOException {
483: // Need to create a connection to unlock the accept();
484: Socket s;
485: InetAddress ladr = inet;
486:
487: if (port == 0)
488: return;
489: if (ladr == null || "0.0.0.0".equals(ladr.getHostAddress())) {
490: ladr = InetAddress.getLocalHost();
491: }
492: s = new Socket(ladr, port);
493: // setting soLinger to a small value will help shutdown the
494: // connection quicker
495: s.setSoLinger(true, 0);
496:
497: s.close();
498: }
499:
500: public void destroy() throws IOException {
501: running = false;
502: try {
503: /* If we disabled the channel return */
504: if (port == 0)
505: return;
506: tp.shutdown();
507:
508: if (!paused) {
509: unLockSocket();
510: }
511:
512: sSocket.close(); // XXX?
513:
514: if (tpOName != null) {
515: Registry.getRegistry(null, null).unregisterComponent(
516: tpOName);
517: }
518: if (rgOName != null) {
519: Registry.getRegistry(null, null).unregisterComponent(
520: rgOName);
521: }
522: } catch (Exception e) {
523: log.info("Error shutting down the channel " + port + " "
524: + e.toString());
525: if (log.isDebugEnabled())
526: log.debug("Trace", e);
527: }
528: }
529:
530: public int send(Msg msg, MsgContext ep) throws IOException {
531: msg.end(); // Write the packet header
532: byte buf[] = msg.getBuffer();
533: int len = msg.getLen();
534:
535: if (log.isTraceEnabled())
536: log.trace("send() " + len + " " + buf[4]);
537:
538: OutputStream os = (OutputStream) ep.getNote(osNote);
539: os.write(buf, 0, len);
540: return len;
541: }
542:
543: public int flush(Msg msg, MsgContext ep) throws IOException {
544: if (bufferSize > 0) {
545: OutputStream os = (OutputStream) ep.getNote(osNote);
546: os.flush();
547: }
548: return 0;
549: }
550:
551: public int receive(Msg msg, MsgContext ep) throws IOException {
552: if (log.isDebugEnabled()) {
553: log.debug("receive() ");
554: }
555:
556: byte buf[] = msg.getBuffer();
557: int hlen = msg.getHeaderLength();
558:
559: // XXX If the length in the packet header doesn't agree with the
560: // actual number of bytes read, it should probably return an error
561: // value. Also, callers of this method never use the length
562: // returned -- should probably return true/false instead.
563:
564: int rd = this .read(ep, buf, 0, hlen);
565:
566: if (rd < 0) {
567: // Most likely normal apache restart.
568: // log.warn("Wrong message " + rd );
569: return rd;
570: }
571:
572: msg.processHeader();
573:
574: /* After processing the header we know the body
575: length
576: */
577: int blen = msg.getLen();
578:
579: // XXX check if enough space - it's assert()-ed !!!
580:
581: int total_read = 0;
582:
583: total_read = this .read(ep, buf, hlen, blen);
584:
585: if ((total_read <= 0) && (blen > 0)) {
586: log.warn("can't read body, waited #" + blen);
587: return -1;
588: }
589:
590: if (total_read != blen) {
591: log.warn("incomplete read, waited #" + blen + " got only "
592: + total_read);
593: return -2;
594: }
595:
596: return total_read;
597: }
598:
599: /**
600: * Read N bytes from the InputStream, and ensure we got them all
601: * Under heavy load we could experience many fragmented packets
602: * just read Unix Network Programming to recall that a call to
603: * read didn't ensure you got all the data you want
604: *
605: * from read() Linux manual
606: *
607: * On success, the number of bytes read is returned (zero indicates end
608: * of file),and the file position is advanced by this number.
609: * It is not an error if this number is smaller than the number of bytes
610: * requested; this may happen for example because fewer bytes
611: * are actually available right now (maybe because we were close to
612: * end-of-file, or because we are reading from a pipe, or from a
613: * terminal), or because read() was interrupted by a signal.
614: * On error, -1 is returned, and errno is set appropriately. In this
615: * case it is left unspecified whether the file position (if any) changes.
616: *
617: **/
618: public int read(MsgContext ep, byte[] b, int offset, int len)
619: throws IOException {
620: InputStream is = (InputStream) ep.getNote(isNote);
621: int pos = 0;
622: int got;
623:
624: while (pos < len) {
625: try {
626: got = is.read(b, pos + offset, len - pos);
627: } catch (SocketException sex) {
628: if (pos > 0) {
629: log.info("Error reading data after " + pos
630: + "bytes", sex);
631: } else {
632: log.debug("Error reading data", sex);
633: }
634: got = -1;
635: }
636: if (log.isTraceEnabled()) {
637: log.trace("read() " + b + " "
638: + (b == null ? 0 : b.length) + " " + offset
639: + " " + len + " = " + got);
640: }
641:
642: // connection just closed by remote.
643: if (got <= 0) {
644: // This happens periodically, as apache restarts
645: // periodically.
646: // It should be more gracefull ! - another feature for Ajp14
647: // log.warn( "server has closed the current connection (-1)" );
648: return -3;
649: }
650:
651: pos += got;
652: }
653: return pos;
654: }
655:
656: protected boolean running = true;
657:
658: /** Accept incoming connections, dispatch to the thread pool
659: */
660: void acceptConnections() {
661: if (log.isDebugEnabled())
662: log.debug("Accepting ajp connections on " + port);
663: while (running) {
664: try {
665: MsgContext ep = createMsgContext(packetSize);
666: ep.setSource(this );
667: ep.setWorkerEnv(wEnv);
668: this .accept(ep);
669:
670: if (!running)
671: break;
672:
673: // Since this is a long-running connection, we don't care
674: // about the small GC
675: SocketConnection ajpConn = new SocketConnection(this ,
676: ep);
677: tp.runIt(ajpConn);
678: } catch (Exception ex) {
679: if (running)
680: log.warn("Exception executing accept", ex);
681: }
682: }
683: }
684:
685: /** Process a single ajp connection.
686: */
687: void processConnection(MsgContext ep) {
688: try {
689: MsgAjp recv = new MsgAjp(packetSize);
690: while (running) {
691: if (paused) { // Drop the connection on pause
692: break;
693: }
694: int status = this .receive(recv, ep);
695: if (status <= 0) {
696: if (status == -3)
697: log
698: .debug("server has been restarted or reset this connection");
699: else
700: log.warn("Closing ajp connection " + status);
701: break;
702: }
703: ep.setLong(MsgContext.TIMER_RECEIVED, System
704: .currentTimeMillis());
705:
706: ep.setType(0);
707: // Will call next
708: status = this .invoke(recv, ep);
709: if (status != JkHandler.OK) {
710: log.warn("processCallbacks status " + status);
711: break;
712: }
713: }
714: } catch (Exception ex) {
715: String msg = ex.getMessage();
716: if (msg != null && msg.indexOf("Connection reset") >= 0)
717: log
718: .debug("Server has been restarted or reset this connection");
719: else if (msg != null && msg.indexOf("Read timed out") >= 0)
720: log.debug("connection timeout reached");
721: else
722: log.error("Error, processing connection", ex);
723: } finally {
724: /*
725: * Whatever happened to this connection (remote closed it, timeout, read error)
726: * the socket SHOULD be closed, or we may be in situation where the webserver
727: * will continue to think the socket is still open and will forward request
728: * to tomcat without receiving ever a reply
729: */
730: try {
731: this .close(ep);
732: } catch (Exception e) {
733: log.error("Error, closing connection", e);
734: }
735: try {
736: Request req = (Request) ep.getRequest();
737: if (req != null) {
738: ObjectName roname = (ObjectName) ep
739: .getNote(JMXRequestNote);
740: if (roname != null) {
741: Registry.getRegistry(null, null)
742: .unregisterComponent(roname);
743: }
744: req.getRequestProcessor().setGlobalProcessor(null);
745: }
746: } catch (Exception ee) {
747: log.error("Error, releasing connection", ee);
748: }
749: }
750: }
751:
752: // XXX This should become handleNotification
753: public int invoke(Msg msg, MsgContext ep) throws IOException {
754: int type = ep.getType();
755:
756: switch (type) {
757: case JkHandler.HANDLE_RECEIVE_PACKET:
758: if (log.isDebugEnabled())
759: log.debug("RECEIVE_PACKET ?? ");
760: return receive(msg, ep);
761: case JkHandler.HANDLE_SEND_PACKET:
762: return send(msg, ep);
763: case JkHandler.HANDLE_FLUSH:
764: return flush(msg, ep);
765: }
766:
767: if (log.isDebugEnabled())
768: log.debug("Call next " + type + " " + next);
769:
770: // Send notification
771: if (nSupport != null) {
772: Notification notif = (Notification) ep.getNote(notifNote);
773: if (notif == null) {
774: notif = new Notification("channelSocket.message", ep,
775: requestCount);
776: ep.setNote(notifNote, notif);
777: }
778: nSupport.sendNotification(notif);
779: }
780:
781: if (next != null) {
782: return next.invoke(msg, ep);
783: } else {
784: log.info("No next ");
785: }
786:
787: return OK;
788: }
789:
790: public boolean isSameAddress(MsgContext ep) {
791: Socket s = (Socket) ep.getNote(socketNote);
792: return isSameAddress(s.getLocalAddress(), s.getInetAddress());
793: }
794:
795: public String getChannelName() {
796: String encodedAddr = "";
797: if (inet != null && !"0.0.0.0".equals(inet.getHostAddress())) {
798: encodedAddr = getAddress();
799: if (encodedAddr.startsWith("/"))
800: encodedAddr = encodedAddr.substring(1);
801: encodedAddr = URLEncoder.encode(encodedAddr) + "-";
802: }
803: return ("jk-" + encodedAddr + port);
804: }
805:
806: /**
807: * Return <code>true</code> if the specified client and server addresses
808: * are the same. This method works around a bug in the IBM 1.1.8 JVM on
809: * Linux, where the address bytes are returned reversed in some
810: * circumstances.
811: *
812: * @param server The server's InetAddress
813: * @param client The client's InetAddress
814: */
815: public static boolean isSameAddress(InetAddress server,
816: InetAddress client) {
817: // Compare the byte array versions of the two addresses
818: byte serverAddr[] = server.getAddress();
819: byte clientAddr[] = client.getAddress();
820: if (serverAddr.length != clientAddr.length)
821: return (false);
822: boolean match = true;
823: for (int i = 0; i < serverAddr.length; i++) {
824: if (serverAddr[i] != clientAddr[i]) {
825: match = false;
826: break;
827: }
828: }
829: if (match)
830: return (true);
831:
832: // Compare the reversed form of the two addresses
833: for (int i = 0; i < serverAddr.length; i++) {
834: if (serverAddr[i] != clientAddr[(serverAddr.length - 1) - i])
835: return (false);
836: }
837: return (true);
838: }
839:
840: public void sendNewMessageNotification(Notification notification) {
841: if (nSupport != null)
842: nSupport.sendNotification(notification);
843: }
844:
845: private NotificationBroadcasterSupport nSupport = null;
846:
847: public void addNotificationListener(NotificationListener listener,
848: NotificationFilter filter, Object handback)
849: throws IllegalArgumentException {
850: if (nSupport == null)
851: nSupport = new NotificationBroadcasterSupport();
852: nSupport.addNotificationListener(listener, filter, handback);
853: }
854:
855: public void removeNotificationListener(NotificationListener listener)
856: throws ListenerNotFoundException {
857: if (nSupport != null)
858: nSupport.removeNotificationListener(listener);
859: }
860:
861: MBeanNotificationInfo notifInfo[] = new MBeanNotificationInfo[0];
862:
863: public void setNotificationInfo(MBeanNotificationInfo info[]) {
864: this .notifInfo = info;
865: }
866:
867: public MBeanNotificationInfo[] getNotificationInfo() {
868: return notifInfo;
869: }
870:
871: static class SocketAcceptor implements ThreadPoolRunnable {
872: ChannelSocket wajp;
873:
874: SocketAcceptor(ChannelSocket wajp) {
875: this .wajp = wajp;
876: }
877:
878: public Object[] getInitData() {
879: return null;
880: }
881:
882: public void runIt(Object thD[]) {
883: wajp.acceptConnections();
884: }
885: }
886:
887: static class SocketConnection implements ThreadPoolRunnable {
888: ChannelSocket wajp;
889: MsgContext ep;
890:
891: SocketConnection(ChannelSocket wajp, MsgContext ep) {
892: this .wajp = wajp;
893: this .ep = ep;
894: }
895:
896: public Object[] getInitData() {
897: return null;
898: }
899:
900: public void runIt(Object perTh[]) {
901: wajp.processConnection(ep);
902: ep = null;
903: }
904: }
905:
906: }
|