001: /*
002: * Copyright 1999-2004 The Apache Software Foundation
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.apache.jk.common;
018:
019: import java.io.BufferedInputStream;
020: import java.io.BufferedOutputStream;
021: import java.io.IOException;
022: import java.io.InputStream;
023: import java.io.OutputStream;
024: import java.net.URLEncoder;
025: import java.net.InetAddress;
026: import java.net.ServerSocket;
027: import java.net.Socket;
028: import java.net.SocketException;
029:
030: import javax.management.ListenerNotFoundException;
031: import javax.management.MBeanNotificationInfo;
032: import javax.management.Notification;
033: import javax.management.NotificationBroadcaster;
034: import javax.management.NotificationBroadcasterSupport;
035: import javax.management.NotificationFilter;
036: import javax.management.NotificationListener;
037: import javax.management.ObjectName;
038:
039: import org.apache.commons.modeler.Registry;
040: import org.apache.jk.core.JkHandler;
041: import org.apache.jk.core.Msg;
042: import org.apache.jk.core.MsgContext;
043: import org.apache.jk.core.JkChannel;
044: import org.apache.jk.core.WorkerEnv;
045: import org.apache.coyote.Request;
046: import org.apache.coyote.RequestGroupInfo;
047: import org.apache.coyote.RequestInfo;
048: import org.apache.tomcat.util.threads.ThreadPool;
049: import org.apache.tomcat.util.threads.ThreadPoolRunnable;
050:
051: /* XXX Make the 'message type' pluggable
052: */
053:
054: /* A lot of the 'original' behavior is hardcoded - this uses Ajp13 wire protocol,
055: TCP, Ajp14 API etc.
056: As we add other protocols/transports/APIs this will change, the current goal
057: is to get the same level of functionality as in the original jk connector.
058: */
059:
060: /**
061: * Jk2 can use multiple protocols/transports.
062: * Various container adapters should load this object ( as a bean ),
063: * set configurations and use it. Note that the connector will handle
064: * all incoming protocols - it's not specific to ajp1x. The protocol
065: * is abstracted by MsgContext/Message/Channel.
066: */
067:
068: /** Accept ( and send ) TCP messages.
069: *
070: * @author Costin Manolache
071: * @jmx:mbean name="jk2:service=ChannelSocket"
072: * description="Accept socket connections"
073: * @jmx:notification name="org.apache.coyote.INVOKE
074: * @jmx:notification-handler name="org.apache.jk.JK_SEND_PACKET
075: * @jmx:notification-handler name="org.apache.jk.JK_RECEIVE_PACKET
076: * @jmx:notification-handler name="org.apache.jk.JK_FLUSH
077: */
078: public class ChannelSocket extends JkHandler implements
079: NotificationBroadcaster, JkChannel {
080: private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
081: .getLog(ChannelSocket.class);
082:
083: int startPort = 8009;
084: int maxPort = 8019; // 0 for backward compat.
085: int port = startPort;
086: InetAddress inet;
087: int serverTimeout;
088: boolean tcpNoDelay = true; // nodelay to true by default
089: int linger = 100;
090: int socketTimeout;
091:
092: long requestCount = 0;
093:
094: /* Turning this to true will reduce the latency with about 20%.
095: But it requires changes in tomcat to make sure client-requested
096: flush() is honored ( on my test, I got 367->433 RPS and
097: 52->35ms average time with a simple servlet )
098: */
099: static final boolean BUFFER_WRITE = false;
100:
101: ThreadPool tp = ThreadPool.createThreadPool(true);
102:
103: /* ==================== Tcp socket options ==================== */
104:
105: /**
106: * @jmx:managed-constructor description="default constructor"
107: */
108: public ChannelSocket() {
109: // This should be integrated with the domain setup
110: }
111:
112: public ThreadPool getThreadPool() {
113: return tp;
114: }
115:
116: public long getRequestCount() {
117: return requestCount;
118: }
119:
120: /** Set the port for the ajp13 channel.
121: * To support seemless load balancing and jni, we treat this
122: * as the 'base' port - we'll try up until we find one that is not
123: * used. We'll also provide the 'difference' to the main coyote
124: * handler - that will be our 'sessionID' and the position in
125: * the scoreboard and the suffix for the unix domain socket.
126: *
127: * @jmx:managed-attribute description="Port to listen" access="READ_WRITE"
128: */
129: public void setPort(int port) {
130: this .startPort = port;
131: this .port = port;
132: this .maxPort = port + 10;
133: }
134:
135: public int getPort() {
136: return port;
137: }
138:
139: public void setAddress(InetAddress inet) {
140: this .inet = inet;
141: }
142:
143: /**
144: * @jmx:managed-attribute description="Bind on a specified address" access="READ_WRITE"
145: */
146: public void setAddress(String inet) {
147: try {
148: this .inet = InetAddress.getByName(inet);
149: } catch (Exception ex) {
150: log.error("Error parsing " + inet, ex);
151: }
152: }
153:
154: public String getAddress() {
155: if (inet != null)
156: return inet.toString();
157: return "/0.0.0.0";
158: }
159:
160: /**
161: * Sets the timeout in ms of the server sockets created by this
162: * server. This method allows the developer to make servers
163: * more or less responsive to having their server sockets
164: * shut down.
165: *
166: * <p>By default this value is 1000ms.
167: */
168: public void setServerTimeout(int timeout) {
169: this .serverTimeout = timeout;
170: }
171:
172: public int getServerTimeout() {
173: return serverTimeout;
174: }
175:
176: public void setTcpNoDelay(boolean b) {
177: tcpNoDelay = b;
178: }
179:
180: public boolean getTcpNoDelay() {
181: return tcpNoDelay;
182: }
183:
184: public void setSoLinger(int i) {
185: linger = i;
186: }
187:
188: public int getSoLinger() {
189: return linger;
190: }
191:
192: public void setSoTimeout(int i) {
193: socketTimeout = i;
194: }
195:
196: public int getSoTimeout() {
197: return socketTimeout;
198: }
199:
200: public void setMaxPort(int i) {
201: maxPort = i;
202: }
203:
204: public int getMaxPort() {
205: return maxPort;
206: }
207:
208: /** At startup we'll look for the first free port in the range.
209: The difference between this port and the beggining of the range
210: is the 'id'.
211: This is usefull for lb cases ( less config ).
212: */
213: public int getInstanceId() {
214: return port - startPort;
215: }
216:
217: /** If set to false, the thread pool will be created in
218: * non-daemon mode, and will prevent main from exiting
219: */
220: public void setDaemon(boolean b) {
221: tp.setDaemon(b);
222: }
223:
224: public boolean getDaemon() {
225: return tp.getDaemon();
226: }
227:
228: public void setMaxThreads(int i) {
229: if (log.isDebugEnabled())
230: log.debug("Setting maxThreads " + i);
231: tp.setMaxThreads(i);
232: }
233:
234: public void setMinSpareThreads(int i) {
235: if (log.isDebugEnabled())
236: log.debug("Setting minSpareThreads " + i);
237: tp.setMinSpareThreads(i);
238: }
239:
240: public void setMaxSpareThreads(int i) {
241: if (log.isDebugEnabled())
242: log.debug("Setting maxSpareThreads " + i);
243: tp.setMaxSpareThreads(i);
244: }
245:
246: public int getMaxThreads() {
247: return tp.getMaxThreads();
248: }
249:
250: public int getMinSpareThreads() {
251: return tp.getMinSpareThreads();
252: }
253:
254: public int getMaxSpareThreads() {
255: return tp.getMaxSpareThreads();
256: }
257:
258: public void setBacklog(int i) {
259: }
260:
261: /* ==================== ==================== */
262: ServerSocket sSocket;
263: final int socketNote = 1;
264: final int isNote = 2;
265: final int osNote = 3;
266: final int notifNote = 4;
267: boolean paused = false;
268:
269: public void pause() throws Exception {
270: synchronized (this ) {
271: paused = true;
272: unLockSocket();
273: }
274: }
275:
276: public void resume() throws Exception {
277: synchronized (this ) {
278: paused = false;
279: notify();
280: }
281: }
282:
283: public void accept(MsgContext ep) throws IOException {
284: if (sSocket == null)
285: return;
286: synchronized (this ) {
287: while (paused) {
288: try {
289: wait();
290: } catch (InterruptedException ie) {
291: //Ignore, since can't happen
292: }
293: }
294: }
295: Socket s = sSocket.accept();
296: ep.setNote(socketNote, s);
297: if (log.isDebugEnabled())
298: log.debug("Accepted socket " + s);
299: if (linger > 0)
300: s.setSoLinger(true, linger);
301: if (socketTimeout > 0)
302: s.setSoTimeout(socketTimeout);
303:
304: s.setTcpNoDelay(tcpNoDelay); // set socket tcpnodelay state
305:
306: requestCount++;
307:
308: InputStream is = new BufferedInputStream(s.getInputStream());
309: OutputStream os;
310: if (BUFFER_WRITE)
311: os = new BufferedOutputStream(s.getOutputStream());
312: else
313: os = s.getOutputStream();
314: ep.setNote(isNote, is);
315: ep.setNote(osNote, os);
316: ep.setControl(tp);
317: }
318:
319: public void resetCounters() {
320: requestCount = 0;
321: }
322:
323: /** Called after you change some fields at runtime using jmx.
324: Experimental for now.
325: */
326: public void reinit() throws IOException {
327: destroy();
328: init();
329: }
330:
331: /**
332: * @jmx:managed-operation
333: */
334: public void init() throws IOException {
335: // Find a port.
336: if (startPort == 0) {
337: port = 0;
338: log.info("JK2: ajp13 disabling channelSocket");
339: running = true;
340: return;
341: }
342: if (maxPort < startPort)
343: maxPort = startPort;
344: for (int i = startPort; i <= maxPort; i++) {
345: try {
346: if (inet == null) {
347: sSocket = new ServerSocket(i, 0);
348: } else {
349: sSocket = new ServerSocket(i, 0, inet);
350: }
351: port = i;
352: break;
353: } catch (IOException ex) {
354: log.info("Port busy " + i + " " + ex.toString());
355: continue;
356: }
357: }
358:
359: if (sSocket == null) {
360: log.error("Can't find free port " + startPort + " "
361: + maxPort);
362: return;
363: }
364: log
365: .info("JK2: ajp13 listening on " + getAddress() + ":"
366: + port);
367:
368: // If this is not the base port and we are the 'main' channleSocket and
369: // SHM didn't already set the localId - we'll set the instance id
370: if ("channelSocket".equals(name) && port != startPort
371: && (wEnv.getLocalId() == 0)) {
372: wEnv.setLocalId(port - startPort);
373: }
374: if (serverTimeout > 0)
375: sSocket.setSoTimeout(serverTimeout);
376:
377: // XXX Reverse it -> this is a notification generator !!
378: if (next == null && wEnv != null) {
379: if (nextName != null)
380: setNext(wEnv.getHandler(nextName));
381: if (next == null)
382: next = wEnv.getHandler("dispatch");
383: if (next == null)
384: next = wEnv.getHandler("request");
385: }
386: JMXRequestNote = wEnv.getNoteId(WorkerEnv.ENDPOINT_NOTE,
387: "requestNote");
388: running = true;
389:
390: // Run a thread that will accept connections.
391: // XXX Try to find a thread first - not sure how...
392: if (this .domain != null) {
393: try {
394: tpOName = new ObjectName(domain
395: + ":type=ThreadPool,name=" + getChannelName());
396:
397: Registry.getRegistry(null, null).registerComponent(tp,
398: tpOName, null);
399:
400: rgOName = new ObjectName(domain
401: + ":type=GlobalRequestProcessor,name="
402: + getChannelName());
403: Registry.getRegistry(null, null).registerComponent(
404: global, rgOName, null);
405: } catch (Exception e) {
406: log.error("Can't register threadpool");
407: }
408: }
409:
410: tp.start();
411: SocketAcceptor acceptAjp = new SocketAcceptor(this );
412: tp.runIt(acceptAjp);
413:
414: }
415:
416: ObjectName tpOName;
417: ObjectName rgOName;
418: RequestGroupInfo global = new RequestGroupInfo();
419: int JMXRequestNote;
420:
421: public void start() throws IOException {
422: if (sSocket == null)
423: init();
424: }
425:
426: public void stop() throws IOException {
427: destroy();
428: }
429:
430: public void registerRequest(Request req, MsgContext ep, int count) {
431: if (this .domain != null) {
432: try {
433: RequestInfo rp = req.getRequestProcessor();
434: rp.setGlobalProcessor(global);
435: ObjectName roname = new ObjectName(getDomain()
436: + ":type=RequestProcessor,worker="
437: + getChannelName() + ",name=JkRequest" + count);
438: ep.setNote(JMXRequestNote, roname);
439:
440: Registry.getRegistry().registerComponent(rp, roname,
441: null);
442: } catch (Exception ex) {
443: log.warn("Error registering request");
444: }
445: }
446: }
447:
448: public void open(MsgContext ep) throws IOException {
449: }
450:
451: public void close(MsgContext ep) throws IOException {
452: Socket s = (Socket) ep.getNote(socketNote);
453: s.close();
454: }
455:
456: private void unLockSocket() throws IOException {
457: // Need to create a connection to unlock the accept();
458: Socket s;
459: InetAddress ladr = inet;
460:
461: if (ladr == null || "0.0.0.0".equals(ladr.getHostAddress())) {
462: ladr = InetAddress.getLocalHost();
463: }
464: s = new Socket(ladr, port);
465: // setting soLinger to a small value will help shutdown the
466: // connection quicker
467: s.setSoLinger(true, 0);
468:
469: s.close();
470: }
471:
472: public void destroy() throws IOException {
473: running = false;
474: try {
475: /* If we disabled the channel return */
476: if (port == 0)
477: return;
478: tp.shutdown();
479:
480: if (!paused) {
481: unLockSocket();
482: }
483:
484: sSocket.close(); // XXX?
485:
486: if (tpOName != null) {
487: Registry.getRegistry().unregisterComponent(tpOName);
488: }
489: if (rgOName != null) {
490: Registry.getRegistry().unregisterComponent(rgOName);
491: }
492: } catch (Exception e) {
493: log.info("Error shutting down the channel " + port + " "
494: + e.toString());
495: if (log.isDebugEnabled())
496: log.debug("Trace", e);
497: }
498: }
499:
500: public int send(Msg msg, MsgContext ep) throws IOException {
501: msg.end(); // Write the packet header
502: byte buf[] = msg.getBuffer();
503: int len = msg.getLen();
504:
505: if (log.isTraceEnabled())
506: log.trace("send() " + len + " " + buf[4]);
507:
508: OutputStream os = (OutputStream) ep.getNote(osNote);
509: os.write(buf, 0, len);
510: return len;
511: }
512:
513: public int flush(Msg msg, MsgContext ep) throws IOException {
514: if (BUFFER_WRITE) {
515: OutputStream os = (OutputStream) ep.getNote(osNote);
516: os.flush();
517: }
518: return 0;
519: }
520:
521: public int receive(Msg msg, MsgContext ep) throws IOException {
522: if (log.isDebugEnabled()) {
523: log.debug("receive() ");
524: }
525:
526: byte buf[] = msg.getBuffer();
527: int hlen = msg.getHeaderLength();
528:
529: // XXX If the length in the packet header doesn't agree with the
530: // actual number of bytes read, it should probably return an error
531: // value. Also, callers of this method never use the length
532: // returned -- should probably return true/false instead.
533:
534: int rd = this .read(ep, buf, 0, hlen);
535:
536: if (rd < 0) {
537: // Most likely normal apache restart.
538: // log.warn("Wrong message " + rd );
539: return rd;
540: }
541:
542: msg.processHeader();
543:
544: /* After processing the header we know the body
545: length
546: */
547: int blen = msg.getLen();
548:
549: // XXX check if enough space - it's assert()-ed !!!
550:
551: int total_read = 0;
552:
553: total_read = this .read(ep, buf, hlen, blen);
554:
555: if ((total_read <= 0) && (blen > 0)) {
556: log.warn("can't read body, waited #" + blen);
557: return -1;
558: }
559:
560: if (total_read != blen) {
561: log.warn("incomplete read, waited #" + blen + " got only "
562: + total_read);
563: return -2;
564: }
565:
566: return total_read;
567: }
568:
569: /**
570: * Read N bytes from the InputStream, and ensure we got them all
571: * Under heavy load we could experience many fragmented packets
572: * just read Unix Network Programming to recall that a call to
573: * read didn't ensure you got all the data you want
574: *
575: * from read() Linux manual
576: *
577: * On success, the number of bytes read is returned (zero indicates end
578: * of file),and the file position is advanced by this number.
579: * It is not an error if this number is smaller than the number of bytes
580: * requested; this may happen for example because fewer bytes
581: * are actually available right now (maybe because we were close to
582: * end-of-file, or because we are reading from a pipe, or from a
583: * terminal), or because read() was interrupted by a signal.
584: * On error, -1 is returned, and errno is set appropriately. In this
585: * case it is left unspecified whether the file position (if any) changes.
586: *
587: **/
588: public int read(MsgContext ep, byte[] b, int offset, int len)
589: throws IOException {
590: InputStream is = (InputStream) ep.getNote(isNote);
591: int pos = 0;
592: int got;
593:
594: while (pos < len) {
595: try {
596: got = is.read(b, pos + offset, len - pos);
597: } catch (SocketException sex) {
598: if (pos > 0) {
599: log.info("Error reading data after " + pos
600: + "bytes", sex);
601: } else {
602: log.debug("Error reading data", sex);
603: }
604: got = -1;
605: }
606: if (log.isTraceEnabled()) {
607: log.trace("read() " + b + " "
608: + (b == null ? 0 : b.length) + " " + offset
609: + " " + len + " = " + got);
610: }
611:
612: // connection just closed by remote.
613: if (got <= 0) {
614: // This happens periodically, as apache restarts
615: // periodically.
616: // It should be more gracefull ! - another feature for Ajp14
617: // log.warn( "server has closed the current connection (-1)" );
618: return -3;
619: }
620:
621: pos += got;
622: }
623: return pos;
624: }
625:
626: protected boolean running = true;
627:
628: /** Accept incoming connections, dispatch to the thread pool
629: */
630: void acceptConnections() {
631: if (log.isDebugEnabled())
632: log.debug("Accepting ajp connections on " + port);
633: while (running) {
634: try {
635: MsgContext ep = new MsgContext();
636: ep.setSource(this );
637: ep.setWorkerEnv(wEnv);
638: this .accept(ep);
639:
640: if (!running)
641: break;
642:
643: // Since this is a long-running connection, we don't care
644: // about the small GC
645: SocketConnection ajpConn = new SocketConnection(this ,
646: ep);
647: tp.runIt(ajpConn);
648: } catch (Exception ex) {
649: if (running)
650: log.warn("Exception executing accept", ex);
651: }
652: }
653: }
654:
655: /** Process a single ajp connection.
656: */
657: void processConnection(MsgContext ep) {
658: try {
659: MsgAjp recv = new MsgAjp();
660: while (running) {
661: if (paused) { // Drop the connection on pause
662: break;
663: }
664: int status = this .receive(recv, ep);
665: if (status <= 0) {
666: if (status == -3)
667: log
668: .debug("server has been restarted or reset this connection");
669: else
670: log.warn("Closing ajp connection " + status);
671: break;
672: }
673: ep.setLong(MsgContext.TIMER_RECEIVED, System
674: .currentTimeMillis());
675:
676: ep.setType(0);
677: // Will call next
678: status = this .invoke(recv, ep);
679: if (status != JkHandler.OK) {
680: log.warn("processCallbacks status " + status);
681: break;
682: }
683: }
684: } catch (Exception ex) {
685: if (ex.getMessage().indexOf("Connection reset") >= 0)
686: log
687: .debug("Server has been restarted or reset this connection");
688: else if (ex.getMessage().indexOf("Read timed out") >= 0)
689: log.info("connection timeout reached");
690: else
691: log.error("Error, processing connection", ex);
692: } finally {
693: /*
694: * Whatever happened to this connection (remote closed it, timeout, read error)
695: * the socket SHOULD be closed, or we may be in situation where the webserver
696: * will continue to think the socket is still open and will forward request
697: * to tomcat without receiving ever a reply
698: */
699: try {
700: this .close(ep);
701: } catch (Exception e) {
702: log.error("Error, closing connection", e);
703: }
704: try {
705: Request req = (Request) ep.getRequest();
706: if (req != null) {
707: ObjectName roname = (ObjectName) ep
708: .getNote(JMXRequestNote);
709: Registry.getRegistry().unregisterComponent(roname);
710: req.getRequestProcessor().setGlobalProcessor(null);
711: }
712: } catch (Exception ee) {
713: log.error("Error, releasing connection", ee);
714: }
715: }
716: }
717:
718: // XXX This should become handleNotification
719: public int invoke(Msg msg, MsgContext ep) throws IOException {
720: int type = ep.getType();
721:
722: switch (type) {
723: case JkHandler.HANDLE_RECEIVE_PACKET:
724: if (log.isDebugEnabled())
725: log.debug("RECEIVE_PACKET ?? ");
726: return receive(msg, ep);
727: case JkHandler.HANDLE_SEND_PACKET:
728: return send(msg, ep);
729: case JkHandler.HANDLE_FLUSH:
730: return flush(msg, ep);
731: }
732:
733: if (log.isDebugEnabled())
734: log.debug("Call next " + type + " " + next);
735:
736: // Send notification
737: if (nSupport != null) {
738: Notification notif = (Notification) ep.getNote(notifNote);
739: if (notif == null) {
740: notif = new Notification("channelSocket.message", ep,
741: requestCount);
742: ep.setNote(notifNote, notif);
743: }
744: nSupport.sendNotification(notif);
745: }
746:
747: if (next != null) {
748: return next.invoke(msg, ep);
749: } else {
750: log.info("No next ");
751: }
752:
753: return OK;
754: }
755:
756: public boolean isSameAddress(MsgContext ep) {
757: Socket s = (Socket) ep.getNote(socketNote);
758: return isSameAddress(s.getLocalAddress(), s.getInetAddress());
759: }
760:
761: public String getChannelName() {
762: String encodedAddr = "";
763: if (inet != null && !"0.0.0.0".equals(inet.getHostAddress())) {
764: encodedAddr = getAddress();
765: if (encodedAddr.startsWith("/"))
766: encodedAddr = encodedAddr.substring(1);
767: encodedAddr = URLEncoder.encode(encodedAddr) + "-";
768: }
769: return ("jk-" + encodedAddr + port);
770: }
771:
772: /**
773: * Return <code>true</code> if the specified client and server addresses
774: * are the same. This method works around a bug in the IBM 1.1.8 JVM on
775: * Linux, where the address bytes are returned reversed in some
776: * circumstances.
777: *
778: * @param server The server's InetAddress
779: * @param client The client's InetAddress
780: */
781: public static boolean isSameAddress(InetAddress server,
782: InetAddress client) {
783: // Compare the byte array versions of the two addresses
784: byte serverAddr[] = server.getAddress();
785: byte clientAddr[] = client.getAddress();
786: if (serverAddr.length != clientAddr.length)
787: return (false);
788: boolean match = true;
789: for (int i = 0; i < serverAddr.length; i++) {
790: if (serverAddr[i] != clientAddr[i]) {
791: match = false;
792: break;
793: }
794: }
795: if (match)
796: return (true);
797:
798: // Compare the reversed form of the two addresses
799: for (int i = 0; i < serverAddr.length; i++) {
800: if (serverAddr[i] != clientAddr[(serverAddr.length - 1) - i])
801: return (false);
802: }
803: return (true);
804: }
805:
806: public void sendNewMessageNotification(Notification notification) {
807: if (nSupport != null)
808: nSupport.sendNotification(notification);
809: }
810:
811: private NotificationBroadcasterSupport nSupport = null;
812:
813: public void addNotificationListener(NotificationListener listener,
814: NotificationFilter filter, Object handback)
815: throws IllegalArgumentException {
816: if (nSupport == null)
817: nSupport = new NotificationBroadcasterSupport();
818: nSupport.addNotificationListener(listener, filter, handback);
819: }
820:
821: public void removeNotificationListener(NotificationListener listener)
822: throws ListenerNotFoundException {
823: if (nSupport != null)
824: nSupport.removeNotificationListener(listener);
825: }
826:
827: MBeanNotificationInfo notifInfo[] = new MBeanNotificationInfo[0];
828:
829: public void setNotificationInfo(MBeanNotificationInfo info[]) {
830: this .notifInfo = info;
831: }
832:
833: public MBeanNotificationInfo[] getNotificationInfo() {
834: return notifInfo;
835: }
836: }
837:
838: class SocketAcceptor implements ThreadPoolRunnable {
839: ChannelSocket wajp;
840:
841: SocketAcceptor(ChannelSocket wajp) {
842: this .wajp = wajp;
843: }
844:
845: public Object[] getInitData() {
846: return null;
847: }
848:
849: public void runIt(Object thD[]) {
850: wajp.acceptConnections();
851: }
852: }
853:
854: class SocketConnection implements ThreadPoolRunnable {
855: ChannelSocket wajp;
856: MsgContext ep;
857:
858: SocketConnection(ChannelSocket wajp, MsgContext ep) {
859: this .wajp = wajp;
860: this .ep = ep;
861: }
862:
863: public Object[] getInitData() {
864: return null;
865: }
866:
867: public void runIt(Object perTh[]) {
868: wajp.processConnection(ep);
869: }
870: }
|