001: /*
002: * Copyright 1996-2001 Sun Microsystems, Inc. All Rights Reserved.
003: * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
004: *
005: * This code is free software; you can redistribute it and/or modify it
006: * under the terms of the GNU General Public License version 2 only, as
007: * published by the Free Software Foundation. Sun designates this
008: * particular file as subject to the "Classpath" exception as provided
009: * by Sun in the LICENSE file that accompanied this code.
010: *
011: * This code is distributed in the hope that it will be useful, but WITHOUT
012: * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
013: * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
014: * version 2 for more details (a copy is included in the LICENSE file that
015: * accompanied this code).
016: *
017: * You should have received a copy of the GNU General Public License version
018: * 2 along with this work; if not, write to the Free Software Foundation,
019: * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
020: *
021: * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
022: * CA 95054 USA or visit www.sun.com if you need additional information or
023: * have any questions.
024: */
025: package sun.rmi.transport;
026:
027: import java.io.*;
028: import java.util.*;
029: import java.rmi.RemoteException;
030: import java.rmi.server.UID;
031: import sun.rmi.server.MarshalInputStream;
032: import sun.rmi.runtime.Log;
033:
034: /**
035: * Special stream to keep track of refs being unmarshaled so that
036: * refs can be ref-counted locally.
037: *
038: * @author Ann Wollrath
039: */
040: class ConnectionInputStream extends MarshalInputStream {
041:
042: /** indicates whether ack is required for DGC */
043: private boolean dgcAckNeeded = false;
044:
045: /** Hashtable mapping Endpoints to lists of LiveRefs to register */
046: private Map incomingRefTable = new HashMap(5);
047:
048: /** identifier for gc ack*/
049: private UID ackID;
050:
051: /**
052: * Constructs a marshal input stream using the underlying
053: * stream "in".
054: */
055: ConnectionInputStream(InputStream in) throws IOException {
056: super (in);
057: }
058:
059: void readID() throws IOException {
060: ackID = UID.read((DataInput) this );
061: }
062:
063: /**
064: * Save reference in order to send "dirty" call after all args/returns
065: * have been unmarshaled. Save in hashtable incomingRefTable. This
066: * table is keyed on endpoints, and holds objects of type
067: * IncomingRefTableEntry.
068: */
069: void saveRef(LiveRef ref) {
070: Endpoint ep = ref.getEndpoint();
071:
072: // check whether endpoint is already in the hashtable
073: List refList = (List) incomingRefTable.get(ep);
074:
075: if (refList == null) {
076: refList = new ArrayList();
077: incomingRefTable.put(ep, refList);
078: }
079:
080: // add ref to list of refs for endpoint ep
081: refList.add(ref);
082: }
083:
084: /**
085: * Add references to DGC table (and possibly send dirty call).
086: * RegisterRefs now calls DGCClient.referenced on all
087: * refs with the same endpoint at once to achieve batching of
088: * calls to the DGC
089: */
090: void registerRefs() throws IOException {
091: if (!incomingRefTable.isEmpty()) {
092: Set entrySet = incomingRefTable.entrySet();
093: Iterator iter = entrySet.iterator();
094: while (iter.hasNext()) {
095: Map.Entry entry = (Map.Entry) iter.next();
096: Endpoint ep = (Endpoint) entry.getKey();
097: List refList = (List) entry.getValue();
098: DGCClient.registerRefs(ep, refList);
099: }
100: }
101: }
102:
103: /**
104: * Indicate that an ack is required to the distributed
105: * collector.
106: */
107: void setAckNeeded() {
108: dgcAckNeeded = true;
109: }
110:
111: /**
112: * Done with input stream for remote call. Send DGC ack if necessary.
113: * Allow sending of ack to fail without flagging an error.
114: */
115: void done(Connection c) {
116: /*
117: * WARNING: The connection c may have already been freed. It
118: * is only be safe to use c to obtain c's channel.
119: */
120:
121: if (dgcAckNeeded) {
122: Connection conn = null;
123: Channel ch = null;
124: boolean reuse = true;
125:
126: DGCImpl.dgcLog.log(Log.VERBOSE, "send ack");
127:
128: try {
129: ch = c.getChannel();
130: conn = ch.newConnection();
131: DataOutputStream out = new DataOutputStream(conn
132: .getOutputStream());
133: out.writeByte(TransportConstants.DGCAck);
134: if (ackID == null) {
135: ackID = new UID();
136: }
137: ackID.write((DataOutput) out);
138: conn.releaseOutputStream();
139:
140: /*
141: * Fix for 4221173: if this connection is on top of an
142: * HttpSendSocket, the DGCAck won't actually get sent until a
143: * read operation is attempted on the socket. Calling
144: * available() is the most innocuous way of triggering the
145: * write.
146: */
147: conn.getInputStream().available();
148: conn.releaseInputStream();
149: } catch (RemoteException e) {
150: reuse = false;
151: } catch (IOException e) {
152: reuse = false;
153: }
154: try {
155: if (conn != null)
156: ch.free(conn, reuse);
157: } catch (RemoteException e) {
158: // eat exception
159: }
160: }
161: }
162: }
|