001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026: package org.cougaar.mts.rmi;
027:
028: import java.net.Socket;
029: import java.rmi.Remote;
030: import java.util.ArrayList;
031: import java.util.HashMap;
032: import java.util.Iterator;
033: import java.util.Map;
034:
035: import org.cougaar.core.component.ServiceBroker;
036: import org.cougaar.core.component.ServiceProvider;
037: import org.cougaar.core.mts.MessageAddress;
038: import org.cougaar.core.service.ThreadService;
039: import org.cougaar.core.thread.Schedulable;
040: import org.cougaar.mts.base.StandardAspect;
041:
042: /**
043: * This Aspect creates a ServiceProvider for and implementation of the
044: * RMISocketControlService. As currently defined, this service is
045: * mostly for setting socket timeouts. It can also be used to get a
046: * List of open sockets from a given MessageAddress.
047: */
048: public class RMISocketControlAspect extends StandardAspect
049:
050: {
051: private Impl impl;
052:
053: public RMISocketControlAspect() {
054: }
055:
056: public void load() {
057: super .load();
058:
059: Provider provider = new Provider();
060: impl = new Impl();
061: getServiceBroker().addService(RMISocketControlService.class,
062: provider);
063: }
064:
065: public Object getDelegate(Object object, Class type) {
066: if (type == Socket.class) {
067: impl.cacheSocket((Socket) object);
068: }
069: return null;
070: }
071:
072: private class Provider implements ServiceProvider {
073: public Object getService(ServiceBroker sb, Object requestor,
074: Class serviceClass) {
075: if (serviceClass == RMISocketControlService.class) {
076: return impl;
077: } else {
078: return null;
079: }
080: }
081:
082: public void releaseService(ServiceBroker sb, Object requestor,
083: Class serviceClass, Object service) {
084: }
085: }
086:
087: private class Impl implements RMISocketControlService {
088: HashMap sockets, // host:port -> list of sockets
089: references, // MessageAddress -> Remote stub
090: addresses, // Remote stub -> MessageAddress
091: default_timeouts, // MessageAddress -> timeout
092: referencesByKey; // host:port -> Remote stub
093:
094: private Impl() {
095: sockets = new HashMap();
096: references = new HashMap();
097: addresses = new HashMap();
098: default_timeouts = new HashMap();
099: referencesByKey = new HashMap();
100:
101: Runnable reaper = new Runnable() {
102: public void run() {
103: reapClosedSockets();
104: }
105: };
106: ThreadService tsvc = getThreadService();
107: Schedulable sched = tsvc.getThread(this , reaper,
108: "Socket Reaper");
109:
110: sched.schedule(0, 5000);
111: }
112:
113: private String getKey(String host, int port) {
114: return getKey(host, Integer.toString(port));
115: }
116:
117: private String getKey(String host, String port) {
118: // May need to canonicalize the host
119: return host + ":" + port;
120: }
121:
122: private String getKey(Remote ref) {
123: // Dig out the host and port, then look it up in 'sockets'.
124: // form is
125: // classname[RemoteStub [ref: [endpoint:[host:port](local),objID:[0]]]]
126: String refString = ref.toString();
127: int host_start = refString.indexOf("[endpoint:[");
128: if (host_start < 0)
129: return null;
130: host_start += 11;
131: int host_end = refString.indexOf(':', host_start);
132: if (host_end < 0)
133: return null;
134: String host = refString.substring(host_start, host_end);
135: int port_start = 1 + host_end;
136: int port_end = port_start;
137: int port_end_1 = refString.indexOf(',', host_end);
138: int port_end_2 = refString.indexOf(']', host_end);
139: if (port_end_1 < 0 && port_end_2 < 0)
140: return null;
141: if (port_end_1 < 0)
142: port_end = port_end_2;
143: else if (port_end_2 < 0)
144: port_end = port_end_1;
145: else
146: port_end = Math.min(port_end_1, port_end_2);
147: String portString = refString.substring(port_start,
148: port_end);
149:
150: String key = getKey(host, portString);
151: referencesByKey.put(key, ref);
152:
153: return key;
154: }
155:
156: private String getKey(Socket skt) {
157: String host = skt.getInetAddress().getHostAddress();
158: int port = skt.getPort();
159: return getKey(host, port);
160: }
161:
162: private Integer getDefaultTimeout(String key) {
163: Object ref = referencesByKey.get(key);
164: Object addr = addresses.get(ref);
165: Integer result = (Integer) default_timeouts.get(addr);
166: return result;
167: }
168:
169: private void cacheSocket(Socket skt) {
170: String key = getKey(skt);
171: Integer timeout = (Integer) getDefaultTimeout(key);
172: if (timeout != null) {
173: try {
174: skt.setSoTimeout(timeout.intValue());
175: } catch (java.net.SocketException ex) {
176: // Don't care
177: }
178: }
179: synchronized (this ) {
180: ArrayList skt_list = (ArrayList) sockets.get(key);
181: if (skt_list == null) {
182: skt_list = new ArrayList();
183: sockets.put(key, skt_list);
184: }
185: skt_list.add(skt);
186: }
187: }
188:
189: synchronized void reapClosedSockets() {
190: // Prune closed sockets
191: Map.Entry entry;
192: Socket socket;
193: Iterator itr2;
194: Iterator itr = sockets.entrySet().iterator();
195: while (itr.hasNext()) {
196: entry = (Map.Entry) itr.next();
197: itr2 = ((ArrayList) entry.getValue()).iterator();
198: while (itr2.hasNext()) {
199: socket = (Socket) itr2.next();
200: if (socket.isClosed())
201: itr2.remove();
202: }
203: }
204: }
205:
206: synchronized boolean setSoTimeout(Remote reference, int timeout) {
207: String key = getKey(reference);
208: ArrayList skt_list = (ArrayList) sockets.get(key);
209: if (skt_list == null)
210: return false;
211: boolean success = false;
212: Iterator itr = skt_list.iterator();
213: while (itr.hasNext()) {
214: Socket skt = (Socket) itr.next();
215: try {
216: skt.setSoTimeout(timeout);
217: success = true;
218: } catch (java.net.SocketException ex) {
219: itr.remove();
220: }
221: }
222: return success;
223: }
224:
225: public boolean setSoTimeout(MessageAddress addr, int timeout) {
226: // Could use the NameService to lookup the Reference from
227: // the address.
228: default_timeouts.put(addr, new Integer(timeout));
229: Remote reference = (Remote) references.get(addr);
230: if (reference != null) {
231: return setSoTimeout(reference, timeout);
232: } else {
233: return false;
234: }
235: }
236:
237: public synchronized void setReferenceAddress(Remote reference,
238: MessageAddress addr) {
239: references.put(addr, reference);
240: addresses.put(reference, addr);
241: Integer timeout = (Integer) default_timeouts.get(addr);
242: if (timeout != null)
243: setSoTimeout(reference, timeout.intValue());
244: }
245:
246: public ArrayList getSocket(MessageAddress addr) {
247: Remote ref = (Remote) references.get(addr);
248: if (ref == null)
249: return null;
250: String key = getKey(ref);
251: ArrayList skt_list = (ArrayList) sockets.get(key);
252: return skt_list;
253: }
254:
255: }
256:
257: }
|