001: /*
002: * <copyright>
003: *
004: * Copyright 2002-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.yp;
028:
029: import java.util.Arrays;
030: import java.util.Collection;
031: import java.util.Collections;
032: import java.util.HashMap;
033: import java.util.Iterator;
034: import java.util.Set;
035:
036: import javax.naming.directory.Attribute;
037: import javax.naming.directory.Attributes;
038:
039: import org.cougaar.core.agent.service.MessageSwitchService;
040: import org.cougaar.core.blackboard.BlackboardClient;
041: import org.cougaar.core.blackboard.IncrementalSubscription;
042: import org.cougaar.core.blackboard.SubscriptionWatcher;
043: import org.cougaar.core.component.ComponentSupport;
044: import org.cougaar.core.component.ServiceBroker;
045: import org.cougaar.core.component.ServiceProvider;
046: import org.cougaar.core.mts.Message;
047: import org.cougaar.core.mts.MessageAddress;
048: import org.cougaar.core.mts.MessageHandler;
049: import org.cougaar.core.service.BlackboardService;
050: import org.cougaar.core.service.ThreadService;
051: import org.cougaar.core.service.community.Community;
052: import org.cougaar.core.service.community.CommunityResponse;
053: import org.cougaar.core.service.community.CommunityResponseListener;
054: import org.cougaar.core.service.community.CommunityService;
055: import org.cougaar.core.service.community.Entity;
056: import org.cougaar.core.thread.Schedulable;
057: import org.cougaar.util.UnaryPredicate;
058: import org.cougaar.util.log.Logger;
059: import org.cougaar.util.log.Logging;
060: import org.uddi4j.transport.TransportException;
061: import org.w3c.dom.Element;
062:
063: /** An Agent-level Component which implements the client-side of the Cougaar
064: * yellowpages application, supporting both synchronous and asychronous and
065: * both service-based and blackboard-based queries.
066: **/
067:
068: public class YPClientComponent extends ComponentSupport {
069: private static final Logger logger = Logging
070: .getLogger(YPClientComponent.class);
071:
072: private MessageSwitchService mss = null;
073: private YPServiceProvider ypsp;
074: private MessageAddress originMA;
075: private BlackboardService blackboard;;
076: private YPLP lp;
077: private ThreadService threadService;
078: private CommunityService communityService;
079:
080: public void setThreadService(ThreadService ts) {
081: this .threadService = ts;
082: }
083:
084: public void setCommunityService(CommunityService cs) {
085: this .communityService = cs;
086: }
087:
088: public void load() {
089: super .load();
090:
091: ServiceBroker sb = getServiceBroker();
092: mss = (MessageSwitchService) sb.getService(this ,
093: MessageSwitchService.class, null);
094: this .originMA = mss.getMessageAddress();
095:
096: startServiceThread();
097:
098: // need to hook into the Agent MessageHandler protocol
099: MessageHandler mh = new MessageHandler() {
100: public boolean handleMessage(Message message) {
101: if (message instanceof YPResponseMessage) {
102: getServiceThread().addMessage(message);
103: return true;
104: }
105: return false;
106: }
107: };
108:
109: mss.addMessageHandler(mh);
110:
111: lp = new YPLP();
112: blackboard = (BlackboardService) sb.getService(lp,
113: BlackboardService.class, null);
114: lp.start();
115:
116: ypsp = new YPServiceProvider();
117: sb.addService(YPService.class, ypsp);
118: }
119:
120: //
121: // Service thread for incoming (response) messages
122: //
123: private ServiceThread serviceThread = null;
124:
125: private void startServiceThread() {
126: serviceThread = new ServiceThread(new ServiceThread.Callback() {
127: public void dispatch(Message m) {
128: dispatchResponse((YPResponseMessage) m);
129: }
130: }, logger, "YPClient(" + originMA + ")");
131: serviceThread.start(threadService);
132: }
133:
134: private ServiceThread getServiceThread() {
135: return serviceThread;
136: }
137:
138: //
139: // resolver
140: //
141:
142: /** send a message via the right service **/
143: private void sendMessage(Message m) {
144: mss.sendMessage(m);
145: }
146:
147: /** Convert a YP context to a MessageAddress supporting the YP application **/
148: protected MessageAddress lookup(Object context) {
149: if (context instanceof MessageAddress) {
150: return ((MessageAddress) context);
151: } else if (context instanceof Community) {
152: Set ypAgents = ypServers((Community) context);
153:
154: if (logger.isDebugEnabled()) {
155: logger.debug("lookup: ypAgents " + ypAgents
156: + " size = " + ypAgents.size());
157: }
158:
159: for (Iterator iterator = ypAgents.iterator(); iterator
160: .hasNext();) {
161: MessageAddress ma = MessageAddress
162: .getMessageAddress(((Entity) (iterator.next()))
163: .getName());
164: if ((iterator.hasNext()) && (logger.isDebugEnabled())) {
165: logger
166: .debug(context
167: + " Community has multiple YP servers. Using "
168: + ma.toString());
169: }
170: return ma;
171: }
172:
173: // If we got to here => no YPServer
174: if (logger.isDebugEnabled()) {
175: logger.debug(context
176: + " Community does not have any YP servers.");
177: }
178: throw new NoYPServerException(originMA
179: + ": unable to find YPServer community for "
180: + context);
181: } else {
182: throw new IllegalArgumentException(
183: "Unrecognized context type "
184: + context.getClass()
185: + ". Must be either MessageAddress or Community.");
186: }
187: }
188:
189: /** Find the YP server context for the specified agent
190: * @param agentName agent
191: * @param callback callback.invoke(Object) called with the YP server context
192: * for the specified agent
193: * Next context will be null if there is no next context.
194: * @note callback.invoke may be called from within getYPServerContext
195: **/
196: private void getYPServerContext(final String agentName,
197: final YPService.NextContextCallback callback) {
198:
199: if ((agentName == null) || (agentName.equals(""))) {
200: // nowhere to go
201: callback.setNextContext(null);
202: return;
203: }
204:
205: CommunityResponseListener crl = new CommunityResponseListener() {
206: public void getResponse(CommunityResponse resp) {
207: // Found the parents so reenter with the same context
208: getYPServerContext(agentName, callback);
209: }
210: };
211:
212: Collection parents = communityService.listParentCommunities(
213: agentName, crl);
214:
215: if (logger.isDebugEnabled()) {
216: logger.debug("getYPServerContext: listParentCommunities("
217: + agentName + ") returned " + parents);
218: }
219: ;
220:
221: if (parents == null) {
222: // waiting on community callback so let callbacks handle it
223: return;
224: } else if (parents.size() == 0) {
225: // No more parents
226: callback.setNextContext(null);
227: return;
228: }
229:
230: boolean waiting = false;
231: boolean ypCommunity = false;
232:
233: for (Iterator iterator = parents.iterator(); iterator.hasNext();) {
234: String parentName = (String) iterator.next();
235:
236: crl = new CommunityResponseListener() {
237: public void getResponse(CommunityResponse resp) {
238: Community parent = (Community) resp.getContent();
239:
240: if (ypCommunity(parent)) {
241: if (!ypServers(parent).isEmpty()) {
242: callback.setNextContext(parent);
243: }
244: }
245: }
246: };
247:
248: Community parent = communityService.getCommunity(
249: parentName, crl);
250:
251: if (logger.isDebugEnabled()) {
252: logger.debug("getYPServerContext: getCommunity("
253: + parentName + ") returned " + parent);
254: }
255: ;
256:
257: if (parent != null) {
258: if (ypCommunity(parent)) {
259: ypCommunity = true;
260: if (!ypServers(parent).isEmpty()) {
261: callback.setNextContext(parent);
262: return;
263: }
264: }
265: } else {
266: waiting = true;
267: if (logger.isDebugEnabled()) {
268: logger
269: .debug("getYPServerContext: waiting on community for "
270: + parentName);
271: }
272: }
273: }
274:
275: // List of parents did not include any yp communities
276: if (!waiting && !ypCommunity) {
277: if (logger.isDebugEnabled()) {
278: logger
279: .debug("getYPServerContext: no parent YPCommunity for "
280: + agentName);
281: }
282: callback.setNextContext(null);
283: return;
284: }
285: }
286:
287: /** Find the next context to search.
288: * @param currentContext current YP context
289: * @param callback callback.invoke(Object) called with the next context.
290: * Next context will be null if there is no next context.
291: * @note callback.invoke may be called from within nextYPServerContext
292: **/
293: private void nextYPServerContext(final Object currentContext,
294: final YPService.NextContextCallback callback) {
295:
296: if ((currentContext != null)
297: && (!(currentContext instanceof Community))) {
298: // nowhere to go if not using a community context
299: callback.setNextContext(null);
300: return;
301: }
302:
303: CommunityResponseListener crl = new CommunityResponseListener() {
304: public void getResponse(CommunityResponse resp) {
305: // Found the parents so reenter with the same context
306: nextYPServerContext(currentContext, callback);
307: }
308: };
309:
310: Collection parents;
311:
312: if (currentContext == null) {
313: parents = communityService.listParentCommunities(null, crl);
314: } else {
315: if (logger.isDebugEnabled()) {
316: logger.debug("nextYPServerContext: attributes for "
317: + currentContext + " "
318: + ((Community) currentContext).getAttributes());
319: }
320:
321: parents = communityService.listParentCommunities(
322: ((Community) currentContext).getName(), crl);
323: }
324:
325: if (logger.isDebugEnabled()) {
326: logger.debug("nextYPServerContext: listParentCommunities("
327: + currentContext + ") returned " + parents);
328: }
329: ;
330:
331: if (parents == null) {
332: // waiting on community callback so let callbacks handle it
333: return;
334: } else if (parents.size() == 0) {
335: // No more parents
336: callback.setNextContext(null);
337: return;
338: }
339:
340: boolean waiting = false;
341: boolean ypCommunity = false;
342:
343: for (Iterator iterator = parents.iterator(); iterator.hasNext();) {
344: String parentName = (String) iterator.next();
345:
346: crl = new CommunityResponseListener() {
347: public void getResponse(CommunityResponse resp) {
348: Community parent = (Community) resp.getContent();
349:
350: if (ypCommunity(parent)) {
351: if (!ypServers(parent).isEmpty()) {
352: callback.setNextContext(parent);
353: } else {
354: nextYPServerContext(parent, callback);
355: }
356: }
357: }
358: };
359:
360: Community parent = communityService.getCommunity(
361: parentName, crl);
362:
363: if (logger.isDebugEnabled()) {
364: logger.debug("nextYPServerContext: getCommunity("
365: + parentName + ") returned " + parent);
366: }
367: ;
368:
369: if (parent != null) {
370: if (ypCommunity(parent)) {
371: ypCommunity = true;
372: if (!ypServers(parent).isEmpty()) {
373: callback.setNextContext(parent);
374: return;
375: } else {
376: if (logger.isDebugEnabled()) {
377: logger.debug("nextYPServerContext for "
378: + currentContext
379: + " recursing to " + parent);
380: }
381: nextYPServerContext(parent, callback);
382: return;
383: }
384: }
385: } else {
386: waiting = true;
387: if (logger.isDebugEnabled()) {
388: logger
389: .debug("nextYPServerContext: waiting on community for "
390: + parentName);
391: }
392: }
393: }
394:
395: // List of parents did not include any yp communities
396: if (!waiting && !ypCommunity) {
397: if (logger.isDebugEnabled()) {
398: logger
399: .debug("nextYPServerContext: no parent YPCommunity for "
400: + currentContext);
401: }
402: callback.setNextContext(null);
403: return;
404: }
405: }
406:
407: private void handleNextContext(YPFutureImpl query,
408: Object currentContext, Object nextContext) {
409: if (nextContext != null) {
410: if (logger.isDebugEnabled()) {
411: logger
412: .debug("Continuing resolver search, next context = "
413: + nextContext);
414: }
415: try {
416: track(query, nextContext);
417: } catch (TransportException ex) {
418: if (logger.isDebugEnabled()) {
419: logger.debug(
420: "Failing resolver search, next context = "
421: + nextContext, ex);
422: }
423: // really shouldn't happen unless there is something broken with the context tree
424: query.setFinalContext(currentContext);
425: query.setException(ex);
426: kickLP(query);
427: }
428: } else {
429: if (logger.isDebugEnabled()) {
430: logger.debug("Failed search with no result");
431: }
432: query.setFinalContext(currentContext); // last queried context
433: query.set(null); // nobody answered
434: kickLP(query);
435: }
436: }
437:
438: /** @deprecated use #isYPCommunity(community) instead **/
439: public static boolean ypCommunity(Community community) {
440: return isYPCommunity(community);
441: }
442:
443: /** test if the specified community supports YP operations **/
444: public static boolean isYPCommunity(Community community) {
445: Attributes attributes = community.getAttributes();
446: Attribute communityType = attributes.get("CommunityType");
447:
448: if (communityType == null) {
449: logger.error("ypCommunity: " + community
450: + " does not have a CommunityType attribute");
451: logger.error(community + " attributes " + attributes);
452: return false;
453: }
454:
455: if (logger.isDebugEnabled()) {
456: logger.debug("ypCommunity: returning "
457: + communityType.contains("YPCommunity") + " for "
458: + community);
459: }
460:
461: return communityType.contains("YPCommunity");
462: }
463:
464: private static final String YP_AGENT_FILTER = "(Role=YPServer)";
465:
466: /** Get the set of yp servers for a given community.
467: */
468: public static Set ypServers(Community community) {
469: if (!isYPCommunity(community)) {
470: return Collections.EMPTY_SET;
471: }
472:
473: Set ypAgents = (community).search(YP_AGENT_FILTER,
474: Community.AGENTS_ONLY);
475: if (logger.isDebugEnabled()) {
476: logger.debug("ypServers: " + ypAgents + " size = "
477: + ypAgents.size());
478: }
479:
480: return ypAgents;
481: }
482:
483: /** Return true IFF the element represents an actual answer (or positive failure) **/
484: private boolean isResponseComplete(YPFuture r, Element e) {
485: return ((r.getSearchMode() != YPProxy.SearchMode.HIERARCHICAL_COMMUNITY_SEARCH) || (e != null));
486: }
487:
488: /** (maybe) Tell the appropriate LogicProvider that the query is complete
489: * and any subscribers need to be told to wake up
490: **/
491: private void kickLP(YPFuture r) {
492: if (logger.isDebugEnabled()) {
493: logger.debug("kickLP(" + r + ")");
494: }
495: if (((YPFutureImpl) r).isFromBlackboard()) {
496: // only invoke the subscription kicker if it was submitted that way.
497: lp.kickFuture(r);
498: }
499: }
500:
501: /** Keep track of outstanding requests. Also used as sync lock for counter and selects itself. **/
502: private final HashMap selects = new HashMap(11); // assume not too many at a time
503:
504: /** Key counter. Access locked by selects. **/
505: private long counter = System.currentTimeMillis();
506:
507: private void submitFromBlackboard(YPFuture r)
508: throws TransportException {
509: if (logger.isDebugEnabled()) {
510: logger.debug("submitFromBlackboard(" + r + ")");
511: }
512: ((YPFutureImpl) r).setIsFromBlackboard(true);
513: submit(r);
514: }
515:
516: private void submitFromService(YPFuture r)
517: throws TransportException {
518: if (logger.isDebugEnabled()) {
519: logger.debug("submitFromService(" + r + ")");
520: }
521: submit(r);
522: }
523:
524: /** Submit a request.
525: */
526: private void submit(final YPFuture r) throws TransportException {
527: ((YPFutureImpl) r).submitted();
528:
529: if (!(r.getInitialContext() == null)) {
530: // Assume we know where to start
531: track(r, r.getInitialContext());
532: } else {
533: YPService.NextContextCallback callback = new YPService.NextContextCallback() {
534: public void setNextContext(Object context) {
535: if (context != null) {
536: try {
537: // Found the ypserver community so go on
538: if (logger.isDebugEnabled()) {
539: logger
540: .debug("YPService.NextClientCallback.setNextContext- query - "
541: + r
542: + " context - "
543: + context);
544: }
545: track(r, context);
546: } catch (TransportException te) {
547: logger.error(
548: "Unable to submit YP interaction to "
549: + context, te);
550: ((YPFutureImpl) r).setFinalContext(r
551: .getInitialContext());
552: ((YPFutureImpl) r).setException(te);
553: kickLP(r);
554: }
555: } else {
556: NoYPServerException nypse = new NoYPServerException(
557: originMA
558: + ": unable to find YPServer community");
559: ((YPFutureImpl) r).setFinalContext(r
560: .getInitialContext());
561: ((YPFutureImpl) r).setException(nypse);
562: kickLP(r);
563: }
564: }
565: };
566:
567: nextYPServerContext(r.getInitialContext(), callback);
568: }
569: }
570:
571: /** Track a single message, implicitly watching the whole resolver chain **/
572: void track(YPFuture r, Object context) throws TransportException {
573: Tracker t;
574:
575: synchronized (selects) {
576: Object key = new Long(counter++);
577: t = new Tracker(r, context, key);
578: selects.put(key, t);
579: }
580:
581: t.send();
582: }
583:
584: /** dispatch the response to the appropriate listener **/
585: private void dispatchResponse(YPResponseMessage r) {
586: Object key = r.getKey();
587: Element el = r.getElement();
588:
589: Tracker tracker;
590: synchronized (selects) {
591: tracker = (Tracker) selects.remove(key);
592: }
593:
594: if (tracker == null) {
595: logger
596: .warn("dispatchResponse(): Cannot find tracker for key "
597: + key
598: + " el "
599: + r.getElement()
600: + " source "
601: + r.getOriginator()
602: + " destination " + r.getTarget());
603: } else {
604: if (logger.isDebugEnabled()) {
605: logger
606: .debug("dispatchResponse(): YPResponseMessage - key "
607: + key
608: + " el "
609: + r.getElement()
610: + " source "
611: + r.getOriginator()
612: + " destination "
613: + r.getTarget()
614: + " = " + el);
615: }
616:
617: tracker.receiveResponse(el);
618: }
619: }
620:
621: /**
622: * Allow matching a recieved response to the right query.
623: **/
624:
625: private class Tracker {
626: private final YPFutureImpl query;
627: private final Object context;
628: private final Object key;
629:
630: Tracker(YPFuture r, Object context, Object key) {
631: this .query = (YPFutureImpl) r; // always an impl
632: this .context = context;
633: this .key = key;
634: }
635:
636: void send() throws TransportException {
637: try {
638: if (logger.isDebugEnabled()) {
639: logger
640: .debug("Tracker.send: sending YPQueryMessage - origin "
641: + originMA + " context " + context);
642: if (context == null) {
643: logger.error("Null context in Tracker.send.",
644: new Throwable());
645: }
646: }
647: MessageAddress ma = lookup(context);
648:
649: if (logger.isDebugEnabled()) {
650: logger.debug(originMA + " lookup(" + context
651: + " return ma ");
652: }
653: Element el = query.getElement();
654: boolean iqp = query.isInquiry();
655: YPQueryMessage m = new YPQueryMessage(originMA, ma, el,
656: iqp, key);
657: if (logger.isDebugEnabled()) {
658: logger
659: .debug("Tracker.send: sending YPQueryMessage - origin "
660: + originMA
661: + " target "
662: + ma
663: + " el " + el + " key " + key);
664: }
665: sendMessage(m);
666: } catch (RuntimeException re) {
667: if (logger.isDebugEnabled()) {
668: re.printStackTrace();
669: }
670: throw new TransportException(re);
671: }
672: }
673:
674: void receiveResponse(Element result) {
675: if (isResponseComplete(query, result)) {
676: if (logger.isDebugEnabled()) {
677: logger.debug("Tracker " + key + " waking with "
678: + result);
679: }
680: // we got THE answer. deal with it.
681: query.setFinalContext(context); // where did the answer come from?
682: query.set(result);
683: kickLP(query);
684: } else {
685: if (logger.isDebugEnabled()) {
686: logger.debug("Tracker " + key
687: + " continuing resolver search");
688: }
689:
690: YPService.NextContextCallback callback = new YPService.NextContextCallback() {
691: public void setNextContext(Object nextContext) {
692: handleNextContext(query, context, nextContext);
693: }
694: };
695:
696: // keep going
697: nextYPServerContext(context, callback);
698: }
699: }
700:
701: }
702:
703: //
704: // YPService
705: //
706:
707: private class YPServiceProvider implements ServiceProvider {
708: public YPServiceProvider() {
709: }
710:
711: public Object getService(ServiceBroker sb, Object requestor,
712: Class serviceClass) {
713: if (YPService.class.isAssignableFrom(serviceClass)) {
714: return new YPServiceImpl();
715: } else {
716: return null;
717: }
718: }
719:
720: public void releaseService(ServiceBroker sb, Object requestor,
721: Class serviceClass, Object service) {
722: // drop the service.
723: }
724: }
725:
726: private class YPServiceImpl implements YPService {
727: public YPProxy getYP(String ypAgent) {
728: return new YPProxyImpl(MessageAddress
729: .getMessageAddress(ypAgent), this , false);
730: }
731:
732: public YPProxy getYP(MessageAddress ypAgent) {
733: return new YPProxyImpl(ypAgent, this , false);
734: }
735:
736: public YPProxy getYP(Community community) {
737: return new YPProxyImpl(community, this , false,
738: YPProxy.SearchMode.HIERARCHICAL_COMMUNITY_SEARCH);
739: }
740:
741: public YPProxy getYP(Community community, int searchMode) {
742: return new YPProxyImpl(community, this , false, searchMode);
743: }
744:
745: public YPProxy getYP() {
746: return new YPProxyImpl(this , false,
747: YPProxy.SearchMode.HIERARCHICAL_COMMUNITY_SEARCH);
748: }
749:
750: public YPProxy getYP(int searchMode) {
751: return new YPProxyImpl(this , false, searchMode);
752: }
753:
754: public YPProxy getAutoYP(String ypAgent) {
755: return new YPProxyImpl(MessageAddress
756: .getMessageAddress(ypAgent), this , true);
757: }
758:
759: public YPProxy getAutoYP(MessageAddress ypAgent) {
760: return new YPProxyImpl(ypAgent, this , true);
761: }
762:
763: public YPProxy getAutoYP(Community community) {
764: return new YPProxyImpl(community, this , true,
765: YPProxy.SearchMode.HIERARCHICAL_COMMUNITY_SEARCH);
766: }
767:
768: public YPFuture submit(YPFuture r) {
769: try {
770: YPClientComponent.this .submitFromService(r);
771: return r;
772: } catch (TransportException te) {
773: throw new RuntimeException("submit nested exception",
774: te);
775: }
776: }
777:
778: public void nextYPServerContext(final Object currentContext,
779: final NextContextCallback callback) {
780: YPClientComponent.this .nextYPServerContext(currentContext,
781: callback);
782: }
783:
784: public void getYPServerContext(final String agentName,
785: final NextContextCallback callback) {
786: YPClientComponent.this .getYPServerContext(agentName,
787: callback);
788: }
789: }
790:
791: /**
792: * Act as a LogicProvider to watch blackboard activity for YP Queries
793: **/
794:
795: private class YPLP implements BlackboardClient {
796: public String getBlackboardClientName() {
797: return "YPClient";
798: }
799:
800: public long currentTimeMillis() {
801: return System.currentTimeMillis();
802: }
803:
804: IncrementalSubscription futures;
805: SubscriptionWatcher watcher;
806: Schedulable thread = null;
807:
808: private void signal() {
809: //if (logger.isDebugEnabled()) { logger.debug("LogicProvider signal()"); }
810: thread.start();
811: }
812:
813: void start() {
814: thread = threadService.getThread(this , new Runnable() {
815: public void run() {
816: cycle();
817: }
818: }, "YPLP(" + originMA.toString() + ")");
819: init();
820: signal();
821: }
822:
823: void init() {
824: watcher = new SubscriptionWatcher() {
825: public void signalNotify(int event) {
826: super .signalNotify(event);
827: //if (logger.isDebugEnabled()) { logger.debug("LogicProvider signalNotify()"); }
828: requestCycle();
829: }
830:
831: public String toString() {
832: return "YPWatcher(" + originMA + ")";
833: }
834: };
835:
836: blackboard.registerInterest(watcher);
837:
838: try {
839: blackboard.openTransaction();
840:
841: futures = (IncrementalSubscription) blackboard
842: .subscribe(new UnaryPredicate() {
843: public boolean execute(Object o) {
844: return (o instanceof YPFuture);
845: }
846: });
847: scan();
848: } finally {
849: blackboard.closeTransaction();
850: }
851: signal();
852: }
853:
854: void requestCycle() {
855: //if (logger.isDebugEnabled()) { logger.debug("LogicProvider requestCycle()"); }
856: signal();
857: }
858:
859: void cycle() {
860: //if (logger.isDebugEnabled()) { logger.debug("LogicProvider cycle()"); }
861: try {
862: blackboard.openTransaction();
863: scan();
864: } finally {
865: blackboard.closeTransaction();
866: }
867: }
868:
869: // must be called within transaction - e.g. only from cycle or init
870: void scan() {
871: for (Iterator it = futures.getAddedCollection().iterator(); it
872: .hasNext();) {
873: YPFuture fut = (YPFuture) it.next();
874: try {
875: if (logger.isDebugEnabled()) {
876: logger.debug("LogicProvider scan() submitting "
877: + fut);
878: }
879: YPClientComponent.this .submitFromBlackboard(fut);
880: } catch (TransportException te) {
881: logger.error(
882: "YPFuture submit failed (" + fut + ")", te);
883: blackboard.publishChange(fut);
884: }
885: }
886: }
887:
888: void kickFuture(YPFuture fut) {
889: if (logger.isDebugEnabled()) {
890: logger.debug("LogicProvider kickFuture(" + fut + ")");
891: }
892: try {
893: blackboard.openTransaction();
894: scan();
895: blackboard.publishChange(fut);
896: } finally {
897: blackboard.closeTransaction();
898: }
899: }
900: }
901:
902: }
|