001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.mts.base;
028:
029: import java.net.InetAddress;
030: import java.net.URI;
031: import java.util.ArrayList;
032: import java.util.HashMap;
033: import java.util.Map;
034:
035: import org.cougaar.bootstrap.SystemProperties;
036: import org.cougaar.core.component.ServiceBroker;
037: import org.cougaar.core.mts.MessageAddress;
038: import org.cougaar.core.mts.MessageAttributes;
039: import org.cougaar.core.mts.MessageTransportClient;
040: import org.cougaar.core.service.IncarnationService;
041: import org.cougaar.core.service.wp.AddressEntry;
042: import org.cougaar.core.service.wp.Callback;
043: import org.cougaar.core.service.wp.Response;
044: import org.cougaar.core.service.wp.WhitePagesService;
045: import org.cougaar.mts.std.AttributedMessage;
046:
047: /**
048: * This is the parent class of all {@link LinkProtocol}s that use a
049: * remote-procedure-call (rpc) semantics. Instantiable extensions
050: * must provide implementations for a small set of abstract methods,
051: * covering protocol specifics.
052: */
053: abstract public class RPCLinkProtocol extends LinkProtocol {
054:
055: private URI ref;
056: private IncarnationService incarnationService;
057: private WhitePagesService wpService;
058: private HashMap links;
059: private Object ipAddrLock = new Object();
060: private ArrayList clients = new ArrayList();
061:
062: // Just for testing -- an example of supplying a service from a
063: // LinkProtocol.
064:
065: public interface Service extends LinkProtocolService {
066: // protocol-specific methods would go here.
067: }
068:
069: public Object getService(ServiceBroker sb, Object requestor,
070: Class serviceClass) {
071: if (serviceClass == Service.class) {
072: return new ServiceProxy();
073: } else {
074: return null;
075: }
076: }
077:
078: public void releaseService(ServiceBroker sb, Object requestor,
079: Class serviceClass, Object service) {
080:
081: if (serviceClass == Service.class) {
082: // no-op for this example
083: }
084: }
085:
086: // If LinkProtocols classes want to define this method, eg in
087: // order to provide a service, they should not in general invoke
088: // super.load(), since if they do they'll end up clobbering any
089: // services defined by super classes service. Instead they should
090: // use super_load(), defined in LinkProtocol, which runs the
091: // standard load() method without running any intervening ones.
092: public void load() {
093: super _load();
094: links = new HashMap();
095:
096: ServiceBroker sb = getServiceBroker();
097: sb.addService(Service.class, this );
098:
099: incarnationService = (IncarnationService) sb.getService(this ,
100: IncarnationService.class, null);
101: if (incarnationService == null
102: && loggingService.isWarnEnabled())
103: loggingService.warn("Couldn't load IncarnationService");
104:
105: wpService = (WhitePagesService) sb.getService(this ,
106: WhitePagesService.class, null);
107: if (wpService == null && loggingService.isWarnEnabled())
108: loggingService.warn("Couldn't load WhitePagesService");
109: }
110:
111: // subclass responsibility
112:
113: /**
114: * @return an identifier of the protocol that can be used by the
115: * WP to distinguish them from one another.
116: */
117: abstract protected String getProtocolType();
118:
119: /**
120: * @return a boolean indicating where or not the protocol uses ssl
121: */
122: abstract protected Boolean usesEncryptedSocket();
123:
124: /**
125: * @return the cost of transmitting the message over this
126: * protocol.
127: */
128: abstract protected int computeCost(AttributedMessage message);
129:
130: /**
131: * Return a protocol-specific {@link DestinationLink} for the
132: * target address. The name should not be taken to mean that a
133: * new link will be created for every call.
134: */
135: abstract protected DestinationLink createDestinationLink(
136: MessageAddress address);
137:
138: /**
139: * Ensure that some abstract form of 'servant' object exists for
140: * this protocol that will allow other Nodes to send messages to his
141: * one.
142: */
143: abstract protected void findOrMakeNodeServant();
144:
145: /**
146: * Releases all resources associated with this link protocol.
147: * <p>
148: *
149: * This method is invoked when the MTS is unloaded so the garbage
150: * collector can reclaim all resources that had been allocated.
151: *
152: * Fix for bug 3965:
153: * http://bugs.cougaar.org/show_bug.cgi?id=3965
154: */
155: abstract protected void releaseNodeServant();
156:
157: /**
158: * Force the proticol to remake its 'servant', typically because
159: * the address of the Host on which the Node is running has changed.
160: * Some protoocol (eg HTTP) can ignore this.
161: */
162: abstract protected void remakeNodeServant();
163:
164: protected void setNodeURI(URI ref) {
165: this .ref = ref;
166: }
167:
168: public boolean addressKnown(MessageAddress address) {
169: if (loggingService.isErrorEnabled())
170: loggingService
171: .error("The addressKnown method of RMILinkProtocol is no longer supported");
172: Link link = (Link) links.get(address);
173: return link != null && link.remote_ref != null;
174: }
175:
176: protected boolean isServantAlive() {
177: return ref != null;
178: }
179:
180: public final void registerClient(MessageTransportClient client) {
181: synchronized (ipAddrLock) {
182: findOrMakeNodeServant();
183: if (isServantAlive()) {
184: try {
185: // Assume node-redirect
186: MessageAddress addr = client.getMessageAddress();
187: getNameSupport().registerAgentInNameServer(ref,
188: addr, getProtocolType());
189: } catch (Exception e) {
190: if (loggingService.isErrorEnabled())
191: loggingService.error(
192: "Error registering client", e);
193: }
194: }
195: clients.add(client);
196: }
197: }
198:
199: public final void unregisterClient(MessageTransportClient client) {
200: synchronized (ipAddrLock) {
201: try {
202:
203: // Assume node-redirect
204: MessageAddress addr = client.getMessageAddress();
205: getNameSupport().unregisterAgentInNameServer(ref, addr,
206: getProtocolType());
207: clients.remove(client);
208:
209: if (clients.isEmpty()) {
210: // Fix for bug 3965: Release RMI stub on node shutdown.
211: releaseNodeServant();
212: }
213:
214: } catch (Exception e) {
215: if (loggingService.isErrorEnabled())
216: loggingService.error("Error unregistering client",
217: e);
218: }
219: }
220: }
221:
222: public final void reregisterClients() {
223: synchronized (ipAddrLock) {
224: if (ref != null) {
225: String protocolType = getProtocolType();
226: for (int i = 0; i < clients.size(); i++) {
227: MessageTransportClient client = (MessageTransportClient) clients
228: .get(i);
229: MessageAddress addr = client.getMessageAddress();
230: getNameSupport().registerAgentInNameServer(ref,
231: addr, protocolType);
232: }
233: }
234: }
235: }
236:
237: // Unregister all current clients (inform the WP); close the RMI
238: // listener; remake the RMI impl; re-register clients (WP).
239: public void ipAddressChanged() {
240: synchronized (ipAddrLock) {
241: // update hostname property
242: try {
243: InetAddress local = InetAddress.getLocalHost();
244: String hostaddr = local.getHostAddress();
245: SystemProperties.setProperty(
246: "java.rmi.server.hostname", hostaddr);
247: } catch (java.net.UnknownHostException ex) {
248: // log something
249: if (loggingService.isWarnEnabled())
250: loggingService.warn("Couldn't get localhost: "
251: + ex.getMessage());
252: }
253: NameSupport ns = getNameSupport();
254: String type = getProtocolType();
255: MessageTransportClient client;
256: for (int i = 0; i < clients.size(); i++) {
257: client = (MessageTransportClient) clients.get(i);
258: ns.unregisterAgentInNameServer(ref, client
259: .getMessageAddress(), type);
260: }
261:
262: remakeNodeServant();
263: for (int i = 0; i < clients.size(); i++) {
264: client = (MessageTransportClient) clients.get(i);
265: ns.registerAgentInNameServer(ref, client
266: .getMessageAddress(), type);
267: }
268: }
269: }
270:
271: // Factory methods:
272:
273: public DestinationLink getDestinationLink(MessageAddress address) {
274: DestinationLink link = null;
275: synchronized (links) {
276: link = (DestinationLink) links.get(address);
277: if (link == null) {
278: link = createDestinationLink(address);
279: link = (DestinationLink) attachAspects(link,
280: DestinationLink.class);
281: links.put(address, link);
282: }
283: }
284:
285: return link;
286: }
287:
288: private class WPCallback implements Callback {
289: Link link;
290:
291: WPCallback(Link link) {
292: this .link = link;
293: }
294:
295: private long extractIncarnation(Map entries) {
296: // parse "(.. type=version uri=version:///1234/blah)"
297: if (entries == null)
298: return 0;
299:
300: AddressEntry ae = (AddressEntry) entries.get("version");
301: if (ae == null)
302: return 0;
303:
304: try {
305: String path = ae.getURI().getPath();
306: int end = path.indexOf('/', 1);
307: String incn_str = path.substring(1, end);
308: return Long.parseLong(incn_str);
309: } catch (Exception e) {
310: if (loggingService.isDetailEnabled()) {
311: loggingService
312: .detail("ignoring invalid version entry: "
313: + ae);
314: }
315: return 0;
316: }
317: }
318:
319: public void execute(Response response) {
320: Response.GetAll rg = (Response.GetAll) response;
321: Map entries = rg.getAddressEntries();
322: AddressEntry entry = null;
323: long incn = 0;
324: if (entries != null) {
325: entry = (AddressEntry) entries.get(getProtocolType());
326: incn = extractIncarnation(entries);
327: }
328: if (loggingService.isDebugEnabled())
329: loggingService.debug("Brand spanking new WP callback: "
330: + entry + " incarnation = " + incn);
331: link.handleWPCallback(entry, incn);
332: }
333: }
334:
335: abstract protected class Link implements DestinationLink,
336: IncarnationService.Callback {
337:
338: private MessageAddress target;
339: private boolean lookup_pending = false;
340: private AddressEntry lookup_result = null;
341: private Object lookup_lock = new Object();
342: private long incarnation;
343: private Callback lookup_cb = new WPCallback(this );
344: private Object remote_lock = new Object();
345: private Object remote_ref; // URI, URL etc
346:
347: protected Link(MessageAddress destination) {
348: this .target = destination;
349: // subscribe to IncarnationService
350: if (incarnationService != null) {
351: incarnation = incarnationService.getIncarnation(target);
352: incarnationService.subscribe(destination, this );
353: }
354: }
355:
356: abstract protected Object decodeRemoteRef(URI ref)
357: throws Exception;
358:
359: abstract protected MessageAttributes forwardByProtocol(
360: Object remote, AttributedMessage message)
361: throws NameLookupException, UnregisteredNameException,
362: CommFailureException, MisdeliveredMessageException;
363:
364: // WP callback
365: private void handleWPCallback(AddressEntry entry, long incn) {
366: synchronized (lookup_lock) {
367: lookup_pending = false;
368: if (incn > incarnation) {
369: // tell the incarnation service
370: incarnationService.updateIncarnation(target, incn);
371: incarnation = incn;
372: } else if (incn == 0) {
373: // Reset the incarnation number, for nameserver bootstrap
374: // don't tell incarnation service, it will ignore 0;
375: incarnation = 0;
376: }
377: lookup_result = (entry != null && incn == incarnation) ? entry
378: : null;
379: }
380: }
381:
382: protected void decache() {
383: synchronized (remote_lock) {
384: remote_ref = null;
385: }
386: synchronized (lookup_lock) {
387: if (!lookup_pending)
388: lookup_result = null;
389: }
390: }
391:
392: protected URI getRemoteURI() {
393: synchronized (lookup_lock) {
394: if (lookup_result != null) {
395: return lookup_result.getURI();
396: } else if (lookup_pending) {
397: return null;
398: } else {
399: lookup_pending = true;
400: wpService.getAll(target.getAddress(), lookup_cb);
401: // The results may have arrived as part of
402: // registering callback
403: if (lookup_result != null) {
404: return lookup_result.getURI();
405: } else {
406: return null;
407: }
408: }
409: }
410: }
411:
412: // IncarnationService callback
413: public void incarnationChanged(MessageAddress addr, long incn) {
414: synchronized (lookup_lock) {
415: if (incn > incarnation) {
416: // newer value -- decache the stub
417: incarnation = incn;
418: decache();
419: } else if (incn == 0) {
420: // incarnation number reset for nameserver bootstrap
421: if (loggingService.isWarnEnabled())
422: loggingService
423: .warn("Incarnation service callback has zero incarnation number for "
424: + addr);
425: incarnation = 0;
426: decache();
427: } else if (incn < incarnation) {
428: // out-of-date info
429: if (loggingService.isWarnEnabled())
430: loggingService
431: .warn("Incarnation service callback has out of date incarnation number "
432: + incn + " for " + addr);
433: }
434: }
435: }
436:
437: // NB: Intentionally not synchronized on remote_lock because a
438: // network invocation can happen in lookupRMIObject(). Worst
439: // case scenario: normal return (no exception) but 'remote' has
440: // been reset to null in the meantime. The two callers
441: // (isValid and forwardMessage) deal with this case.
442: protected void cacheRemote() throws NameLookupException,
443: UnregisteredNameException {
444: if (remote_ref == null) {
445: try {
446: URI ref = getRemoteURI();
447: remote_ref = decodeRemoteRef(ref);
448: } catch (Exception lookup_failure) {
449: throw new NameLookupException(lookup_failure);
450: }
451:
452: if (remote_ref == null)
453: throw new UnregisteredNameException(target);
454:
455: }
456: }
457:
458: public MessageAttributes forwardMessage(
459: AttributedMessage message) throws NameLookupException,
460: UnregisteredNameException, CommFailureException,
461: MisdeliveredMessageException {
462: cacheRemote();
463: // Ordinarily cacheRemote either throws an Exception or
464: // caches a non-null reference. But with the addition of
465: // the IncarnationService callbacks, the reference can now
466: // be clobbered subsequently by another thread. Deal with
467: // that here.
468:
469: // JAZ remote lock is no longer around the remote call.
470: // The look isonly be held long enough to get the reference
471: // and "commit to the remote call"
472: // Why was it around a remote call an not just the remote ref?
473: // Does this have something to do with detecting that the message is intransiant
474:
475: Object committedRemoteRef; // URI, URL etc
476: synchronized (remote_lock) {
477: committedRemoteRef = remote_ref;
478: }
479: if (committedRemoteRef == null) {
480: Exception cause = new Exception(
481: "Inconsistent remote reference cache");
482: throw new NameLookupException(cause);
483: } else {
484: return forwardByProtocol(committedRemoteRef, message);
485: }
486:
487: }
488:
489: public boolean retryFailedMessage(AttributedMessage message,
490: int retryCount) {
491: return true;
492: }
493:
494: public boolean isValid() {
495: try {
496: cacheRemote();
497: // Ordinarily cacheRemote either throws an Exception
498: // or caches a non-null reference. But with the
499: // addition of the IncarnationService callback, the
500: // reference can now be clobbered subsequently by
501: // another thread. Deal with that here.
502: synchronized (remote_lock) {
503: return remote_ref != null;
504: }
505: } catch (NameLookupException name_ex) {
506: return false;
507: } catch (UnregisteredNameException unknown_ex) {
508: // still waiting?
509: return false;
510: } catch (Throwable th) {
511: loggingService.error("Can't compute RMI cost", th);
512: return false;
513: }
514: }
515:
516: public int cost(AttributedMessage message) {
517: synchronized (remote_lock) {
518: if (remote_ref == null)
519: return Integer.MAX_VALUE;
520: else
521: return computeCost(message);
522: }
523: }
524:
525: public MessageAddress getDestination() {
526: return target;
527: }
528:
529: public Object getRemoteReference() {
530: return remote_ref;
531: }
532:
533: public void addMessageAttributes(MessageAttributes attrs) {
534: attrs.addValue(MessageAttributes.IS_STREAMING_ATTRIBUTE,
535: Boolean.TRUE);
536:
537: attrs.addValue(
538: MessageAttributes.ENCRYPTED_SOCKET_ATTRIBUTE,
539: usesEncryptedSocket());
540:
541: }
542:
543: }
544:
545: }
|