0001: /*
0002: * Copyright 1999-2005 Sun Microsystems, Inc. All Rights Reserved.
0003: * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
0004: *
0005: * This code is free software; you can redistribute it and/or modify it
0006: * under the terms of the GNU General Public License version 2 only, as
0007: * published by the Free Software Foundation. Sun designates this
0008: * particular file as subject to the "Classpath" exception as provided
0009: * by Sun in the LICENSE file that accompanied this code.
0010: *
0011: * This code is distributed in the hope that it will be useful, but WITHOUT
0012: * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
0013: * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
0014: * version 2 for more details (a copy is included in the LICENSE file that
0015: * accompanied this code).
0016: *
0017: * You should have received a copy of the GNU General Public License version
0018: * 2 along with this work; if not, write to the Free Software Foundation,
0019: * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
0020: *
0021: * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
0022: * CA 95054 USA or visit www.sun.com if you need additional information or
0023: * have any questions.
0024: */
0025:
0026: package com.sun.jndi.ldap;
0027:
0028: import java.io.BufferedInputStream;
0029: import java.io.BufferedOutputStream;
0030: import java.io.InterruptedIOException;
0031: import java.io.IOException;
0032: import java.io.OutputStream;
0033: import java.io.InputStream;
0034: import java.net.Socket;
0035: import java.util.Vector;
0036: import java.util.Hashtable;
0037:
0038: import javax.naming.CommunicationException;
0039: import javax.naming.AuthenticationException;
0040: import javax.naming.AuthenticationNotSupportedException;
0041: import javax.naming.ServiceUnavailableException;
0042: import javax.naming.NamingException;
0043: import javax.naming.InterruptedNamingException;
0044:
0045: import javax.naming.ldap.Control;
0046:
0047: import java.lang.reflect.Method;
0048: import java.lang.reflect.Constructor;
0049: import java.lang.reflect.InvocationTargetException;
0050:
0051: //import javax.net.SocketFactory;
0052:
0053: /**
0054: * A thread that creates a connection to an LDAP server.
0055: * After the connection, the thread reads from the connection.
0056: * A caller can invoke methods on the instance to read LDAP responses
0057: * and to send LDAP requests.
0058: * <p>
0059: * There is a one-to-one correspondence between an LdapClient and
0060: * a Connection. Access to Connection and its methods is only via
0061: * LdapClient with two exceptions: SASL authentication and StartTLS.
0062: * SASL needs to access Connection's socket IO streams (in order to do encryption
0063: * of the security layer). StartTLS needs to do replace IO streams
0064: * and close the IO streams on nonfatal close. The code for SASL
0065: * authentication can be treated as being the same as from LdapClient
0066: * because the SASL code is only ever called from LdapClient, from
0067: * inside LdapClient's synchronized authenticate() method. StartTLS is called
0068: * directly by the application but should only occur when the underlying
0069: * connection is quiet.
0070: * <p>
0071: * In terms of synchronization, worry about data structures
0072: * used by the Connection thread because that usage might contend
0073: * with calls by the main threads (i.e., those that call LdapClient).
0074: * Main threads need to worry about contention with each other.
0075: * Fields that Connection thread uses:
0076: * inStream - synced access and update; initialized in constructor;
0077: * referenced outside class unsync'ed (by LdapSasl) only
0078: * when connection is quiet
0079: * traceFile, traceTagIn, traceTagOut - no sync; debugging only
0080: * parent - no sync; initialized in constructor; no updates
0081: * pendingRequests - sync
0082: * pauseLock - per-instance lock;
0083: * paused - sync via pauseLock (pauseReader())
0084: * Members used by main threads (LdapClient):
0085: * host, port - unsync; read-only access for StartTLS and debug messages
0086: * setBound(), setV3() - no sync; called only by LdapClient.authenticate(),
0087: * which is a sync method called only when connection is "quiet"
0088: * getMsgId() - sync
0089: * writeRequest(), removeRequest(),findRequest(), abandonOutstandingReqs() -
0090: * access to shared pendingRequests is sync
0091: * writeRequest(), abandonRequest(), ldapUnbind() - access to outStream sync
0092: * cleanup() - sync
0093: * readReply() - access to sock sync
0094: * unpauseReader() - (indirectly via writeRequest) sync on pauseLock
0095: * Members used by SASL auth (main thread):
0096: * inStream, outStream - no sync; used to construct new stream; accessed
0097: * only when conn is "quiet" and not shared
0098: * replaceStreams() - sync method
0099: * Members used by StartTLS:
0100: * inStream, outStream - no sync; used to record the existing streams;
0101: * accessed only when conn is "quiet" and not shared
0102: * replaceStreams() - sync method
0103: * <p>
0104: * Handles anonymous, simple, and SASL bind for v3; anonymous and simple
0105: * for v2.
0106: * %%% made public for access by LdapSasl %%%
0107: *
0108: * @author Vincent Ryan
0109: * @author Rosanna Lee
0110: * @author Jagane Sundar
0111: */
0112: public final class Connection implements Runnable {
0113:
0114: private static final boolean debug = false;
0115: private static final int dump = 0; // > 0 r, > 1 rw
0116:
0117: final private Thread worker; // Initialized in constructor
0118:
0119: private boolean v3 = true; // Set in setV3()
0120:
0121: final public String host; // used by LdapClient for generating exception messages
0122: // used by StartTlsResponse when creating an SSL socket
0123: final public int port; // used by LdapClient for generating exception messages
0124: // used by StartTlsResponse when creating an SSL socket
0125:
0126: private boolean bound = false; // Set in setBound()
0127:
0128: // All three are initialized in constructor and read-only afterwards
0129: private OutputStream traceFile = null;
0130: private String traceTagIn = null;
0131: private String traceTagOut = null;
0132:
0133: // Initialized in constructor; read and used externally (LdapSasl);
0134: // Updated in replaceStreams() during "quiet", unshared, period
0135: public InputStream inStream; // must be public; used by LdapSasl
0136:
0137: // Initialized in constructor; read and used externally (LdapSasl);
0138: // Updated in replaceOutputStream() during "quiet", unshared, period
0139: public OutputStream outStream; // must be public; used by LdapSasl
0140:
0141: // Initialized in constructor; read and used externally (TLS) to
0142: // get new IO streams; closed during cleanup
0143: public Socket sock; // for TLS
0144:
0145: // For processing "disconnect" unsolicited notification
0146: // Initialized in constructor
0147: final private LdapClient parent;
0148:
0149: // Incremented and returned in sync getMsgId()
0150: private int outMsgId = 0;
0151:
0152: //
0153: // The list of ldapRequests pending on this binding
0154: //
0155: // Accessed only within sync methods
0156: private LdapRequest pendingRequests = null;
0157:
0158: volatile IOException closureReason = null;
0159: volatile boolean useable = true; // is Connection still useable
0160:
0161: private int readTimeout;
0162:
0163: // true means v3; false means v2
0164: // Called in LdapClient.authenticate() (which is synchronized)
0165: // when connection is "quiet" and not shared; no need to synchronize
0166: void setV3(boolean v) {
0167: v3 = v;
0168: }
0169:
0170: // A BIND request has been successfully made on this connection
0171: // When cleaning up, remember to do an UNBIND
0172: // Called in LdapClient.authenticate() (which is synchronized)
0173: // when connection is "quiet" and not shared; no need to synchronize
0174: void setBound() {
0175: bound = true;
0176: }
0177:
0178: ////////////////////////////////////////////////////////////////////////////
0179: //
0180: // Create an LDAP Binding object and bind to a particular server
0181: //
0182: ////////////////////////////////////////////////////////////////////////////
0183:
0184: Connection(LdapClient parent, String host, int port,
0185: String socketFactory, int connectTimeout, int readTimeout,
0186: OutputStream trace) throws NamingException {
0187:
0188: this .host = host;
0189: this .port = port;
0190: this .parent = parent;
0191: this .readTimeout = readTimeout;
0192:
0193: if (trace != null) {
0194: traceFile = trace;
0195: traceTagIn = "<- " + host + ":" + port + "\n\n";
0196: traceTagOut = "-> " + host + ":" + port + "\n\n";
0197: }
0198:
0199: //
0200: // Connect to server
0201: //
0202: try {
0203: sock = createSocket(host, port, socketFactory,
0204: connectTimeout);
0205:
0206: if (debug) {
0207: System.err.println("Connection: opening socket: "
0208: + host + "," + port);
0209: }
0210:
0211: inStream = new BufferedInputStream(sock.getInputStream());
0212: outStream = new BufferedOutputStream(sock.getOutputStream());
0213:
0214: } catch (InvocationTargetException e) {
0215: Throwable realException = e.getTargetException();
0216: // realException.printStackTrace();
0217:
0218: CommunicationException ce = new CommunicationException(host
0219: + ":" + port);
0220: ce.setRootCause(realException);
0221: throw ce;
0222: } catch (Exception e) {
0223: // Class.forName() seems to do more error checking
0224: // and will throw IllegalArgumentException and such.
0225: // That's why we need to have a catch all here and
0226: // ignore generic exceptions.
0227: // Also catches all IO errors generated by socket creation.
0228: CommunicationException ce = new CommunicationException(host
0229: + ":" + port);
0230: ce.setRootCause(e);
0231: throw ce;
0232: }
0233:
0234: worker = Obj.helper.createThread(this );
0235: worker.setDaemon(true);
0236: worker.start();
0237: }
0238:
0239: /*
0240: * Create an InetSocketAddress using the specified hostname and port number.
0241: */
0242: private Object createInetSocketAddress(String host, int port)
0243: throws NoSuchMethodException {
0244:
0245: try {
0246: Class inetSocketAddressClass = Class
0247: .forName("java.net.InetSocketAddress");
0248:
0249: Constructor inetSocketAddressCons = inetSocketAddressClass
0250: .getConstructor(new Class[] { String.class,
0251: int.class });
0252:
0253: return inetSocketAddressCons.newInstance(new Object[] {
0254: host, new Integer(port) });
0255:
0256: } catch (ClassNotFoundException e) {
0257: throw new NoSuchMethodException();
0258:
0259: } catch (InstantiationException e) {
0260: throw new NoSuchMethodException();
0261:
0262: } catch (InvocationTargetException e) {
0263: throw new NoSuchMethodException();
0264:
0265: } catch (IllegalAccessException e) {
0266: throw new NoSuchMethodException();
0267: }
0268: }
0269:
0270: /*
0271: * Create a Socket object using the specified socket factory and time limit.
0272: *
0273: * If a timeout is supplied and unconnected sockets are supported then
0274: * an unconnected socket is created and the timeout is applied when
0275: * connecting the socket. If a timeout is supplied but unconnected sockets
0276: * are not supported then the timeout is ignored and a connected socket
0277: * is created.
0278: */
0279: private Socket createSocket(String host, int port,
0280: String socketFactory, int connectTimeout) throws Exception {
0281:
0282: Socket socket = null;
0283:
0284: if (socketFactory != null) {
0285:
0286: // create the factory
0287:
0288: Class socketFactoryClass = Obj.helper
0289: .loadClass(socketFactory);
0290: Method getDefault = socketFactoryClass.getMethod(
0291: "getDefault", new Class[] {});
0292: Object factory = getDefault.invoke(null, new Object[] {});
0293:
0294: // create the socket
0295:
0296: Method createSocket = null;
0297:
0298: if (connectTimeout > 0) {
0299:
0300: try {
0301: createSocket = socketFactoryClass.getMethod(
0302: "createSocket", new Class[] {});
0303:
0304: Method connect = Socket.class
0305: .getMethod(
0306: "connect",
0307: new Class[] {
0308: Class
0309: .forName("java.net.SocketAddress"),
0310: int.class });
0311: Object endpoint = createInetSocketAddress(host,
0312: port);
0313:
0314: // unconnected socket
0315: socket = (Socket) createSocket.invoke(factory,
0316: new Object[] {});
0317:
0318: if (debug) {
0319: System.err
0320: .println("Connection: creating socket with "
0321: + "a timeout using supplied socket factory");
0322: }
0323:
0324: // connected socket
0325: connect.invoke(socket, new Object[] { endpoint,
0326: new Integer(connectTimeout) });
0327:
0328: } catch (NoSuchMethodException e) {
0329: // continue (but ignore connectTimeout)
0330: }
0331: }
0332:
0333: if (socket == null) {
0334: createSocket = socketFactoryClass.getMethod(
0335: "createSocket", new Class[] { String.class,
0336: int.class });
0337:
0338: if (debug) {
0339: System.err
0340: .println("Connection: creating socket using "
0341: + "supplied socket factory");
0342: }
0343: // connected socket
0344: socket = (Socket) createSocket.invoke(factory,
0345: new Object[] { host, new Integer(port) });
0346: }
0347: } else {
0348:
0349: if (connectTimeout > 0) {
0350:
0351: try {
0352: Constructor socketCons = Socket.class
0353: .getConstructor(new Class[] {});
0354:
0355: Method connect = Socket.class
0356: .getMethod(
0357: "connect",
0358: new Class[] {
0359: Class
0360: .forName("java.net.SocketAddress"),
0361: int.class });
0362: Object endpoint = createInetSocketAddress(host,
0363: port);
0364:
0365: socket = (Socket) socketCons
0366: .newInstance(new Object[] {});
0367:
0368: if (debug) {
0369: System.err
0370: .println("Connection: creating socket with "
0371: + "a timeout");
0372: }
0373: connect.invoke(socket, new Object[] { endpoint,
0374: new Integer(connectTimeout) });
0375:
0376: } catch (NoSuchMethodException e) {
0377: // continue (but ignore connectTimeout)
0378: }
0379: }
0380:
0381: if (socket == null) {
0382: if (debug) {
0383: System.err.println("Connection: creating socket");
0384: }
0385: // connected socket
0386: socket = new Socket(host, port);
0387: }
0388: }
0389:
0390: return socket;
0391: }
0392:
0393: ////////////////////////////////////////////////////////////////////////////
0394: //
0395: // Methods to IO to the LDAP server
0396: //
0397: ////////////////////////////////////////////////////////////////////////////
0398:
0399: synchronized int getMsgId() {
0400: return ++outMsgId;
0401: }
0402:
0403: LdapRequest writeRequest(BerEncoder ber, int msgId)
0404: throws IOException {
0405: return writeRequest(ber, msgId, false /* pauseAfterReceipt */);
0406: }
0407:
0408: LdapRequest writeRequest(BerEncoder ber, int msgId,
0409: boolean pauseAfterReceipt) throws IOException {
0410:
0411: LdapRequest req = new LdapRequest(msgId, pauseAfterReceipt);
0412: addRequest(req);
0413:
0414: if (traceFile != null) {
0415: Ber.dumpBER(traceFile, traceTagOut, ber.getBuf(), 0, ber
0416: .getDataLen());
0417: }
0418:
0419: // unpause reader so that it can get response
0420: // NOTE: Must do this before writing request, otherwise might
0421: // create a race condition where the writer unblocks its own response
0422: unpauseReader();
0423:
0424: if (debug) {
0425: System.err.println("Writing request to: " + outStream);
0426: }
0427:
0428: try {
0429: synchronized (this ) {
0430: outStream.write(ber.getBuf(), 0, ber.getDataLen());
0431: outStream.flush();
0432: }
0433: } catch (IOException e) {
0434: cleanup(null, true);
0435: throw (closureReason = e); // rethrow
0436: }
0437:
0438: return req;
0439: }
0440:
0441: /**
0442: * Reads a reply; waits until one is ready.
0443: */
0444: BerDecoder readReply(LdapRequest ldr) throws IOException,
0445: NamingException {
0446: BerDecoder rber;
0447: boolean waited = false;
0448:
0449: while (((rber = ldr.getReplyBer()) == null) && !waited) {
0450: try {
0451: // If socket closed, don't even try
0452: synchronized (this ) {
0453: if (sock == null) {
0454: throw new ServiceUnavailableException(host
0455: + ":" + port + "; socket closed");
0456: }
0457: }
0458: synchronized (ldr) {
0459: // check if condition has changed since our last check
0460: rber = ldr.getReplyBer();
0461: if (rber == null) {
0462: if (readTimeout > 0) { // Socket read timeout is specified
0463:
0464: // will be woken up before readTimeout only if reply is
0465: // available
0466: ldr.wait(readTimeout);
0467: waited = true;
0468: } else {
0469: ldr.wait(15 * 1000); // 15 second timeout
0470: }
0471: } else {
0472: break;
0473: }
0474: }
0475: } catch (InterruptedException ex) {
0476: throw new InterruptedNamingException(
0477: "Interrupted during LDAP operation");
0478: }
0479: }
0480:
0481: if ((rber == null) && waited) {
0482: removeRequest(ldr);
0483: throw new NamingException(
0484: "LDAP response read timed out, timeout used:"
0485: + readTimeout + "ms.");
0486:
0487: }
0488: return rber;
0489: }
0490:
0491: ////////////////////////////////////////////////////////////////////////////
0492: //
0493: // Methods to add, find, delete, and abandon requests made to server
0494: //
0495: ////////////////////////////////////////////////////////////////////////////
0496:
0497: private synchronized void addRequest(LdapRequest ldapRequest) {
0498:
0499: LdapRequest ldr = pendingRequests;
0500: if (ldr == null) {
0501: pendingRequests = ldapRequest;
0502: ldapRequest.next = null;
0503: } else {
0504: ldapRequest.next = pendingRequests;
0505: pendingRequests = ldapRequest;
0506: }
0507: }
0508:
0509: synchronized LdapRequest findRequest(int msgId) {
0510:
0511: LdapRequest ldr = pendingRequests;
0512: while (ldr != null) {
0513: if (ldr.msgId == msgId) {
0514: return ldr;
0515: }
0516: ldr = ldr.next;
0517: }
0518: return null;
0519:
0520: }
0521:
0522: synchronized void removeRequest(LdapRequest req) {
0523: LdapRequest ldr = pendingRequests;
0524: LdapRequest ldrprev = null;
0525:
0526: while (ldr != null) {
0527: if (ldr == req) {
0528: ldr.cancel();
0529:
0530: if (ldrprev != null) {
0531: ldrprev.next = ldr.next;
0532: } else {
0533: pendingRequests = ldr.next;
0534: }
0535: ldr.next = null;
0536: }
0537: ldrprev = ldr;
0538: ldr = ldr.next;
0539: }
0540: }
0541:
0542: void abandonRequest(LdapRequest ldr, Control[] reqCtls) {
0543: // Remove from queue
0544: removeRequest(ldr);
0545:
0546: BerEncoder ber = new BerEncoder(256);
0547: int abandonMsgId = getMsgId();
0548:
0549: //
0550: // build the abandon request.
0551: //
0552: try {
0553: ber.beginSeq(Ber.ASN_SEQUENCE | Ber.ASN_CONSTRUCTOR);
0554: ber.encodeInt(abandonMsgId);
0555: ber.encodeInt(ldr.msgId, LdapClient.LDAP_REQ_ABANDON);
0556:
0557: if (v3) {
0558: LdapClient.encodeControls(ber, reqCtls);
0559: }
0560: ber.endSeq();
0561:
0562: if (traceFile != null) {
0563: Ber.dumpBER(traceFile, traceTagOut, ber.getBuf(), 0,
0564: ber.getDataLen());
0565: }
0566:
0567: synchronized (this ) {
0568: outStream.write(ber.getBuf(), 0, ber.getDataLen());
0569: outStream.flush();
0570: }
0571:
0572: } catch (IOException ex) {
0573: //System.err.println("ldap.abandon: " + ex);
0574: }
0575:
0576: // Dont expect any response for the abandon request.
0577: }
0578:
0579: synchronized void abandonOutstandingReqs(Control[] reqCtls) {
0580: LdapRequest ldr = pendingRequests;
0581:
0582: while (ldr != null) {
0583: abandonRequest(ldr, reqCtls);
0584: pendingRequests = ldr = ldr.next;
0585: }
0586: }
0587:
0588: ////////////////////////////////////////////////////////////////////////////
0589: //
0590: // Methods to unbind from server and clear up resources when object is
0591: // destroyed.
0592: //
0593: ////////////////////////////////////////////////////////////////////////////
0594:
0595: private void ldapUnbind(Control[] reqCtls) {
0596:
0597: BerEncoder ber = new BerEncoder(256);
0598: int unbindMsgId = getMsgId();
0599:
0600: //
0601: // build the unbind request.
0602: //
0603:
0604: try {
0605:
0606: ber.beginSeq(Ber.ASN_SEQUENCE | Ber.ASN_CONSTRUCTOR);
0607: ber.encodeInt(unbindMsgId);
0608: // IMPLICIT TAGS
0609: ber.encodeByte(LdapClient.LDAP_REQ_UNBIND);
0610: ber.encodeByte(0);
0611:
0612: if (v3) {
0613: LdapClient.encodeControls(ber, reqCtls);
0614: }
0615: ber.endSeq();
0616:
0617: if (traceFile != null) {
0618: Ber.dumpBER(traceFile, traceTagOut, ber.getBuf(), 0,
0619: ber.getDataLen());
0620: }
0621:
0622: synchronized (this ) {
0623: outStream.write(ber.getBuf(), 0, ber.getDataLen());
0624: outStream.flush();
0625: }
0626:
0627: } catch (IOException ex) {
0628: //System.err.println("ldap.unbind: " + ex);
0629: }
0630:
0631: // Dont expect any response for the unbind request.
0632: }
0633:
0634: /**
0635: * @param reqCtls Possibly null request controls that accompanies the
0636: * abandon and unbind LDAP request.
0637: * @param notifyParent true means to call parent LdapClient back, notifying
0638: * it that the connection has been closed; false means not to notify
0639: * parent. If LdapClient invokes cleanup(), notifyParent should be set to
0640: * false because LdapClient already knows that it is closing
0641: * the connection. If Connection invokes cleanup(), notifyParent should be
0642: * set to true because LdapClient needs to know about the closure.
0643: */
0644: void cleanup(Control[] reqCtls, boolean notifyParent) {
0645: boolean nparent = false;
0646:
0647: synchronized (this ) {
0648: useable = false;
0649:
0650: if (sock != null) {
0651: if (debug) {
0652: System.err.println("Connection: closing socket: "
0653: + host + "," + port);
0654: }
0655: try {
0656: if (!notifyParent) {
0657: abandonOutstandingReqs(reqCtls);
0658: }
0659: if (bound) {
0660: ldapUnbind(reqCtls);
0661: }
0662: } finally {
0663: try {
0664: outStream.flush();
0665: sock.close();
0666: unpauseReader();
0667: } catch (IOException ie) {
0668: if (debug)
0669: System.err
0670: .println("Connection: problem closing socket: "
0671: + ie);
0672: }
0673: if (!notifyParent) {
0674: LdapRequest ldr = pendingRequests;
0675: while (ldr != null) {
0676: ldr.cancel();
0677: ldr = ldr.next;
0678: }
0679: }
0680: sock = null;
0681: }
0682: nparent = notifyParent;
0683: }
0684: }
0685: if (nparent) {
0686: parent.processConnectionClosure();
0687: }
0688: }
0689:
0690: // Assume everything is "quiet"
0691: // "synchronize" might lead to deadlock so don't synchronize method
0692: // Use streamLock instead for synchronizing update to stream
0693:
0694: synchronized public void replaceStreams(InputStream newIn,
0695: OutputStream newOut) {
0696: if (debug) {
0697: System.err.println("Replacing " + inStream + " with: "
0698: + newIn);
0699: System.err.println("Replacing " + outStream + " with: "
0700: + newOut);
0701: }
0702:
0703: inStream = newIn;
0704:
0705: // Cleanup old stream
0706: try {
0707: outStream.flush();
0708: } catch (IOException ie) {
0709: if (debug)
0710: System.err
0711: .println("Connection: cannot flush outstream: "
0712: + ie);
0713: }
0714:
0715: // Replace stream
0716: outStream = newOut;
0717: }
0718:
0719: /**
0720: * Used by Connection thread to read inStream into a local variable.
0721: * This ensures that there is no contention between the main thread
0722: * and the Connection thread when the main thread updates inStream.
0723: */
0724: synchronized private InputStream getInputStream() {
0725: return inStream;
0726: }
0727:
0728: ////////////////////////////////////////////////////////////////////////////
0729: //
0730: // Code for pausing/unpausing the reader thread ('worker')
0731: //
0732: ////////////////////////////////////////////////////////////////////////////
0733:
0734: /*
0735: * The main idea is to mark requests that need the reader thread to
0736: * pause after getting the response. When the reader thread gets the response,
0737: * it waits on a lock instead of returning to the read(). The next time a
0738: * request is sent, the reader is automatically unblocked if necessary.
0739: * Note that the reader must be unblocked BEFORE the request is sent.
0740: * Otherwise, there is a race condition where the request is sent and
0741: * the reader thread might read the response and be unblocked
0742: * by writeRequest().
0743: *
0744: * This pause gives the main thread (StartTLS or SASL) an opportunity to
0745: * update the reader's state (e.g., its streams) if necessary.
0746: * The assumption is that the connection will remain quiet during this pause
0747: * (i.e., no intervening requests being sent).
0748: *<p>
0749: * For dealing with StartTLS close,
0750: * when the read() exits either due to EOF or an exception,
0751: * the reader thread checks whether there is a new stream to read from.
0752: * If so, then it reattempts the read. Otherwise, the EOF or exception
0753: * is processed and the reader thread terminates.
0754: * In a StartTLS close, the client first replaces the SSL IO streams with
0755: * plain ones and then closes the SSL socket.
0756: * If the reader thread attempts to read, or was reading, from
0757: * the SSL socket (that is, it got to the read BEFORE replaceStreams()),
0758: * the SSL socket close will cause the reader thread to
0759: * get an EOF/exception and reexamine the input stream.
0760: * If the reader thread sees a new stream, it reattempts the read.
0761: * If the underlying socket is still alive, then the new read will succeed.
0762: * If the underlying socket has been closed also, then the new read will
0763: * fail and the reader thread exits.
0764: * If the reader thread attempts to read, or was reading, from the plain
0765: * socket (that is, it got to the read AFTER replaceStreams()), the
0766: * SSL socket close will have no effect on the reader thread.
0767: *
0768: * The check for new stream is made only
0769: * in the first attempt at reading a BER buffer; the reader should
0770: * never be in midst of reading a buffer when a nonfatal close occurs.
0771: * If this occurs, then the connection is in an inconsistent state and
0772: * the safest thing to do is to shut it down.
0773: */
0774:
0775: private Object pauseLock = new Object(); // lock for reader to wait on while paused
0776: private boolean paused = false; // paused state of reader
0777:
0778: /*
0779: * Unpauses reader thread if it was paused
0780: */
0781: private void unpauseReader() throws IOException {
0782: synchronized (pauseLock) {
0783: if (paused) {
0784: if (debug) {
0785: System.err.println("Unpausing reader; read from: "
0786: + inStream);
0787: }
0788: paused = false;
0789: pauseLock.notify();
0790: }
0791: }
0792: }
0793:
0794: /*
0795: * Pauses reader so that it stops reading from the input stream.
0796: * Reader blocks on pauseLock instead of read().
0797: * MUST be called from within synchronized (pauseLock) clause.
0798: */
0799: private void pauseReader() throws IOException {
0800: if (debug) {
0801: System.err.println("Pausing reader; was reading from: "
0802: + inStream);
0803: }
0804: paused = true;
0805: try {
0806: while (paused) {
0807: pauseLock.wait(); // notified by unpauseReader
0808: }
0809: } catch (InterruptedException e) {
0810: throw new InterruptedIOException(
0811: "Pause/unpause reader has problems.");
0812: }
0813: }
0814:
0815: ////////////////////////////////////////////////////////////////////////////
0816: //
0817: // The LDAP Binding thread. It does the mux/demux of multiple requests
0818: // on the same TCP connection.
0819: //
0820: ////////////////////////////////////////////////////////////////////////////
0821:
0822: public void run() {
0823: byte inbuf[]; // Buffer for reading incoming bytes
0824: int inMsgId; // Message id of incoming response
0825: int bytesread; // Number of bytes in inbuf
0826: int bytesleft; // Number of bytes that need to read for completing resp
0827: int br; // Temp; number of bytes read from stream
0828: int offset; // Offset of where to store bytes in inbuf
0829: int seqlen; // Length of ASN sequence
0830: int seqlenlen; // Number of sequence length bytes
0831: boolean eos; // End of stream
0832: BerDecoder retBer; // Decoder for ASN.1 BER data from inbuf
0833: InputStream in = null;
0834:
0835: try {
0836: while (true) {
0837: try {
0838: inbuf = new byte[2048];
0839:
0840: offset = 0;
0841: seqlen = 0;
0842: seqlenlen = 0;
0843:
0844: in = getInputStream();
0845:
0846: // check that it is the beginning of a sequence
0847: bytesread = in.read(inbuf, offset, 1);
0848: if (bytesread < 0) {
0849: if (in != getInputStream()) {
0850: continue; // a new stream to try
0851: } else {
0852: break; // EOF
0853: }
0854: }
0855:
0856: if (inbuf[offset++] != (Ber.ASN_SEQUENCE | Ber.ASN_CONSTRUCTOR))
0857: continue;
0858:
0859: // get length of sequence
0860: bytesread = in.read(inbuf, offset, 1);
0861: if (bytesread < 0)
0862: break; // EOF
0863: seqlen = inbuf[offset++];
0864:
0865: // if high bit is on, length is encoded in the
0866: // subsequent length bytes and the number of length bytes
0867: // is equal to & 0x80 (i.e. length byte with high bit off).
0868: if ((seqlen & 0x80) == 0x80) {
0869: seqlenlen = seqlen & 0x7f; // number of length bytes
0870:
0871: bytesread = 0;
0872: eos = false;
0873:
0874: // Read all length bytes
0875: while (bytesread < seqlenlen) {
0876: br = in.read(inbuf, offset + bytesread,
0877: seqlenlen - bytesread);
0878: if (br < 0) {
0879: eos = true;
0880: break; // EOF
0881: }
0882: bytesread += br;
0883: }
0884:
0885: // end-of-stream reached before length bytes are read
0886: if (eos)
0887: break; // EOF
0888:
0889: // Add contents of length bytes to determine length
0890: seqlen = 0;
0891: for (int i = 0; i < seqlenlen; i++) {
0892: seqlen = (seqlen << 8)
0893: + (inbuf[offset + i] & 0xff);
0894: }
0895: offset += bytesread;
0896: }
0897:
0898: // read in seqlen bytes
0899: bytesleft = seqlen;
0900: if ((offset + bytesleft) > inbuf.length) {
0901: byte nbuf[] = new byte[offset + bytesleft];
0902: System.arraycopy(inbuf, 0, nbuf, 0, offset);
0903: inbuf = nbuf;
0904: }
0905: while (bytesleft > 0) {
0906: bytesread = in.read(inbuf, offset, bytesleft);
0907: if (bytesread < 0)
0908: break; // EOF
0909: offset += bytesread;
0910: bytesleft -= bytesread;
0911: }
0912: /*
0913: if (dump > 0) {
0914: System.err.println("seqlen: " + seqlen);
0915: System.err.println("bufsize: " + offset);
0916: System.err.println("bytesleft: " + bytesleft);
0917: System.err.println("bytesread: " + bytesread);
0918: }
0919: */
0920:
0921: try {
0922: retBer = new BerDecoder(inbuf, 0, offset);
0923:
0924: if (traceFile != null) {
0925: Ber.dumpBER(traceFile, traceTagIn, inbuf,
0926: 0, offset);
0927: }
0928:
0929: retBer.parseSeq(null);
0930: inMsgId = retBer.parseInt();
0931: retBer.reset(); // reset offset
0932:
0933: boolean needPause = false;
0934:
0935: if (inMsgId == 0) {
0936: // Unsolicited Notification
0937: parent.processUnsolicited(retBer);
0938: } else {
0939: LdapRequest ldr = findRequest(inMsgId);
0940:
0941: if (ldr != null) {
0942:
0943: /**
0944: * Grab pauseLock before making reply available
0945: * to ensure that reader goes into paused state
0946: * before writer can attempt to unpause reader
0947: */
0948: synchronized (pauseLock) {
0949: needPause = ldr.addReplyBer(retBer);
0950: if (needPause) {
0951: /*
0952: * Go into paused state; release
0953: * pauseLock
0954: */
0955: pauseReader();
0956: }
0957:
0958: // else release pauseLock
0959: }
0960: } else {
0961: // System.err.println("Cannot find" +
0962: // "LdapRequest for " + inMsgId);
0963: }
0964: }
0965: } catch (Ber.DecodeException e) {
0966: //System.err.println("Cannot parse Ber");
0967: }
0968: } catch (IOException ie) {
0969: if (debug) {
0970: System.err.println("Connection: Inside Caught "
0971: + ie);
0972: ie.printStackTrace();
0973: }
0974:
0975: if (in != getInputStream()) {
0976: // A new stream to try
0977: // Go to top of loop and continue
0978: } else {
0979: if (debug) {
0980: System.err
0981: .println("Connection: rethrowing "
0982: + ie);
0983: }
0984: throw ie; // rethrow exception
0985: }
0986: }
0987: }
0988:
0989: if (debug) {
0990: System.err
0991: .println("Connection: end-of-stream detected: "
0992: + in);
0993: }
0994: } catch (IOException ex) {
0995: if (debug) {
0996: System.err.println("Connection: Caught " + ex);
0997: }
0998: closureReason = ex;
0999: } finally {
1000: cleanup(null, true); // cleanup
1001: }
1002: if (debug) {
1003: System.err.println("Connection: Thread Exiting");
1004: }
1005: }
1006:
1007: // This code must be uncommented to run the LdapAbandonTest.
1008: /*public void sendSearchReqs(String dn, int numReqs) {
1009: int i;
1010: String attrs[] = null;
1011: for(i = 1; i <= numReqs; i++) {
1012: BerEncoder ber = new BerEncoder(2048);
1013:
1014: try {
1015: ber.beginSeq(Ber.ASN_SEQUENCE | Ber.ASN_CONSTRUCTOR);
1016: ber.encodeInt(i);
1017: ber.beginSeq(LdapClient.LDAP_REQ_SEARCH);
1018: ber.encodeString(dn == null ? "" : dn);
1019: ber.encodeInt(0, LdapClient.LBER_ENUMERATED);
1020: ber.encodeInt(3, LdapClient.LBER_ENUMERATED);
1021: ber.encodeInt(0);
1022: ber.encodeInt(0);
1023: ber.encodeBoolean(true);
1024: LdapClient.encodeFilter(ber, "");
1025: ber.beginSeq(Ber.ASN_SEQUENCE | Ber.ASN_CONSTRUCTOR);
1026: ber.encodeStringArray(attrs);
1027: ber.endSeq();
1028: ber.endSeq();
1029: ber.endSeq();
1030: writeRequest(ber, i);
1031: //System.err.println("wrote request " + i);
1032: } catch (Exception ex) {
1033: //System.err.println("ldap.search: Caught " + ex + " building req");
1034: }
1035:
1036: }
1037: } */
1038: }
|