001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026: package org.cougaar.mts.rmi;
027:
028: import java.net.URI;
029: import java.rmi.Remote;
030: import java.rmi.server.UnicastRemoteObject;
031:
032: import org.cougaar.core.component.ServiceBroker;
033: import org.cougaar.core.mts.MessageAddress;
034: import org.cougaar.core.mts.MessageAttributes;
035: import org.cougaar.core.thread.SchedulableStatus;
036: import org.cougaar.mts.base.CommFailureException;
037: import org.cougaar.mts.base.CougaarIOException;
038: import org.cougaar.mts.base.DestinationLink;
039: import org.cougaar.mts.base.LinkProtocol;
040: import org.cougaar.mts.base.MisdeliveredMessageException;
041: import org.cougaar.mts.base.NameLookupException;
042: import org.cougaar.mts.base.RPCLinkProtocol;
043: import org.cougaar.mts.base.SocketFactory;
044: import org.cougaar.mts.base.UnregisteredNameException;
045: import org.cougaar.mts.std.AttributedMessage;
046: import org.cougaar.util.StateModelException;
047:
048: /**
049: * This {@link LinkProtocol} handles message passing via RMI, one
050: * example RPC-like communication. The interface is {@link MT}.
051: *
052: * The cost function of the DestinationLink inner subclass is
053: * currently hardwired to an arbitrary value of 1000. This should be
054: * made smarter eventually.
055: *
056: */
057: public class RMILinkProtocol extends RPCLinkProtocol {
058:
059: // private MessageAddress myAddress;
060: private MT myProxy;
061: private SocketFactory socfac;
062: private RMISocketControlService controlService;
063:
064: public RMILinkProtocol() {
065: super ();
066: socfac = getSocketFactory();
067: }
068:
069: // If LinkProtocols classes want to define this method, eg in
070: // order to provide a service, they should not in general invoke
071: // super.load(), since if they do they'll end up clobbering any
072: // services defined by super classes service. Instead they should
073: // use super_load(), defined in LinkProtocol, which runs the
074: // standard load() method without running any intervening ones.
075: public void load() {
076: super .load();
077: ServiceBroker sb = getServiceBroker();
078:
079: // RMISocketControlService could be null
080: controlService = (RMISocketControlService) sb.getService(this ,
081: RMISocketControlService.class, null);
082: }
083:
084: /**
085: * @see org.cougaar.util.GenericStateModelAdapter#unload()
086: */
087: public synchronized void unload() throws StateModelException {
088: super .unload();
089: if (controlService != null) {
090: ServiceBroker sb = getServiceBroker();
091: sb.releaseService(this , RMISocketControlService.class,
092: controlService);
093: }
094: controlService = null;
095: }
096:
097: protected String getProtocolType() {
098: return "-RMI";
099: }
100:
101: protected SocketFactory getSocketFactory() {
102: return new SocketFactory(false, true);
103: }
104:
105: // If this is called, we've already found the remote reference.
106: protected int computeCost(AttributedMessage message) {
107: return 1000;
108: }
109:
110: protected MTImpl makeMTImpl(MessageAddress myAddress,
111: SocketFactory socfac) throws java.rmi.RemoteException {
112: return new MTImpl(myAddress, getServiceBroker(), socfac);
113: }
114:
115: // Even though MisdeliveredMessageExceptions are
116: // RemoteExceptions, nonethless they'll be wrapped. Check for
117: // this case here. Also look for IllegalArgumentExceptions,
118: // which can also occur as a side-effect of mobility.
119: protected void checkForMisdelivery(Throwable ex,
120: AttributedMessage message)
121: throws MisdeliveredMessageException {
122: if (ex instanceof MisdeliveredMessageException) {
123: throw ((MisdeliveredMessageException) ex);
124: } else if (ex instanceof IllegalArgumentException) {
125: // Probably a misdelivered message that failed during
126: // deserialization. Try to check with a string match...
127: String msg = ex.getMessage();
128: int match = msg.indexOf("is not an Agent on this node");
129: if (match > 0) {
130: // pretend this is a MisdeliveredMessageException
131: throw new MisdeliveredMessageException(message);
132: }
133: }
134: // If we get here, the caller is responsible for rethrowing
135: // the exception.
136: }
137:
138: protected MessageAttributes doForwarding(MT remote,
139: AttributedMessage message)
140: throws MisdeliveredMessageException,
141: java.rmi.RemoteException, CommFailureException
142: // Declare CommFailureException because the signature needs to
143: // match SerializedRMILinkProtocol's doForwarding method. That
144: // exception will never be thrown here.
145: {
146: MessageAttributes result = null;
147: try {
148: SchedulableStatus.beginNetIO("RMI call");
149: result = remote.rerouteMessage(message); // **** RMI-specific
150: } catch (java.rmi.RemoteException remote_ex) {
151: Throwable cause = remote_ex.getCause();
152: checkForMisdelivery(cause, message);
153: // Not a misdelivery - rethrow the remote exception
154: throw remote_ex;
155: } catch (IllegalArgumentException illegal_arg) {
156: checkForMisdelivery(illegal_arg, message);
157: // Not a misdelivery - rethrow the exception
158: throw illegal_arg;
159: } finally {
160: SchedulableStatus.endBlocking();
161: }
162: return result;
163: }
164:
165: protected Boolean usesEncryptedSocket() {
166: return Boolean.FALSE;
167: }
168:
169: protected DestinationLink createDestinationLink(
170: MessageAddress address) {
171: return new RMILink(address);
172: }
173:
174: // Standard RMI handling of security and other cougaar-specific io
175: // exceptions. Subclasses may need to do something different (see
176: // CORBALinkProtocol).
177: //
178: // If the argument itself is a MarshalException whose cause is a
179: // CougaarIOException, a local cougaar-specific error has occured.
180: //
181: // If the argument is some other RemoteException whose cause is an
182: // UnmarshalException whose cause in turn is a CougaarIOException,
183: // a remote cougaar-specific error has occured.
184: //
185: // Otherwise this is some other kind of remote error.
186: protected void handleSecurityException(Exception ex)
187: throws CommFailureException {
188: Throwable cause = ex.getCause();
189: if (ex instanceof java.rmi.MarshalException) {
190: if (cause instanceof CougaarIOException) {
191: throw new CommFailureException((Exception) cause);
192: }
193: // When a TransientIOException is thrown sometimes it
194: // triggers different exception on the socket, which gets
195: // through instead of the TransientIOException. For now we
196: // will catch these and treat them as if they were
197: // transient (though other kinds of SocketExceptions
198: // really shouldn't be).
199: else if (cause instanceof java.net.SocketException) {
200: // Throwing a CommFailureException doesn't seem right
201: // anymore (as of 1.4.2). So don't do it anymore,
202: // but log it.
203: if (loggingService.isDebugEnabled())
204: loggingService.debug(
205: "Got a SocketException as the cause of a MarshallException: "
206: + cause.getMessage(), ex);
207: // cause = new TransientIOException(cause.getMessage());
208: // throw new CommFailureException((Exception) cause);
209: }
210: } else if (cause instanceof java.rmi.UnmarshalException) {
211: Throwable remote_cause = cause.getCause();
212: if (remote_cause instanceof CougaarIOException) {
213: throw new CommFailureException((Exception) remote_cause);
214: }
215: }
216: }
217:
218: protected void findOrMakeNodeServant() {
219: if (myProxy != null)
220: return;
221: try {
222: MessageAddress myAddress = getNameSupport()
223: .getNodeMessageAddress();
224: myProxy = makeMTImpl(myAddress, socfac);
225: Remote remote = UnicastRemoteObject.exportObject(myProxy,
226: 0, socfac, socfac);
227: setNodeURI(RMIRemoteObjectEncoder.encode(remote));
228: } catch (java.rmi.RemoteException ex) {
229: loggingService.error(null, ex);
230: } catch (Exception other) {
231: loggingService.error(null, other);
232: }
233: }
234:
235: protected void releaseNodeServant() {
236: try {
237: UnicastRemoteObject.unexportObject(myProxy, true);
238: } catch (java.rmi.NoSuchObjectException ex) {
239: // don't care
240: }
241: myProxy = null;
242: }
243:
244: protected void remakeNodeServant() {
245: try {
246: UnicastRemoteObject.unexportObject(myProxy, true);
247: } catch (java.rmi.NoSuchObjectException ex) {
248: // don't care
249: }
250: myProxy = null;
251: findOrMakeNodeServant();
252: }
253:
254: protected class RMILink extends Link {
255:
256: protected RMILink(MessageAddress destination) {
257: super (destination);
258: }
259:
260: protected Object decodeRemoteRef(URI ref) throws Exception {
261: MessageAddress target = getDestination();
262: if (getRegistry().isLocalClient(target)) {
263: // myself as an RMI stub
264: return myProxy;
265: }
266:
267: if (ref == null)
268: return null;
269:
270: Object object = null;
271: try {
272: // This call can block in net i/o
273: SchedulableStatus.beginNetIO("RMI reference decode");
274: object = RMIRemoteObjectDecoder.decode(ref);
275: } catch (Throwable ex) {
276: loggingService.error("Can't decode URI " + ref, ex);
277: } finally {
278: SchedulableStatus.endBlocking();
279: }
280:
281: if (object == null)
282: return null;
283:
284: if (controlService != null)
285: controlService.setReferenceAddress((Remote) object,
286: target);
287:
288: if (object instanceof MT) {
289: return (MT) object;
290: } else {
291: throw new RuntimeException("Object " + object
292: + " is not a MessageTransport!");
293: }
294: }
295:
296: public Class getProtocolClass() {
297: return RMILinkProtocol.this .getClass();
298: }
299:
300: protected MessageAttributes forwardByProtocol(
301: Object remote_ref, AttributedMessage message)
302:
303: throws NameLookupException, UnregisteredNameException,
304: CommFailureException, MisdeliveredMessageException {
305: try {
306: return doForwarding((MT) remote_ref, message);
307: } catch (MisdeliveredMessageException mis) {
308: // force recache of remote
309: decache();
310: throw mis;
311: }
312: // RMILinkProtocol won't throw this but subclasses might.
313: catch (CommFailureException cfe) {
314: // force recache of remote
315: decache();
316: throw cfe;
317: } catch (java.rmi.RemoteException ex) {
318: if (loggingService.isDebugEnabled()) {
319: loggingService.debug("RemoteException", ex);
320: }
321: handleSecurityException(ex);
322: // If we get here it wasn't a security exception
323: decache();
324: throw new CommFailureException(ex);
325: } catch (Exception ex) {
326: // Ordinary comm failure. Force recache of remote
327: if (loggingService.isDebugEnabled()) {
328: loggingService.debug("Ordinary comm failure", ex);
329: }
330: decache();
331: // Ordinary comm failure
332: throw new CommFailureException(ex);
333: }
334: }
335:
336: }
337:
338: }
|