001: /*
002: * <copyright>
003: *
004: * Copyright 2002-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.yp;
028:
029: import java.util.Properties;
030: import java.util.Vector;
031:
032: import org.cougaar.core.service.ThreadService;
033: import org.cougaar.core.thread.Schedulable;
034: import org.cougaar.util.StackMachine;
035: import org.cougaar.util.log.Logger;
036: import org.cougaar.util.log.Logging;
037: import org.uddi4j.datatype.tmodel.TModel;
038: import org.uddi4j.response.AuthToken;
039: import org.uddi4j.response.TModelDetail;
040:
041: /**
042: **/
043:
044: public class YPStateMachine extends StackMachine {
045: private static final Logger log = Logging
046: .getLogger(YPStateMachine.class);
047:
048: private static String UDDI_USERID = "cougaar";
049: private static String UDDI_PASSWORD = "cougaarPass";
050:
051: static {
052: UDDI_USERID = System.getProperty(
053: "org.cougaar.yp.juddi-users.username",
054: YPProxy.DEFAULT_UDDI_USERNAME);
055: UDDI_PASSWORD = System.getProperty(
056: "org.cougaar.yp.juddi-users.password",
057: YPProxy.DEFAULT_UDDI_PASSWORD);
058: }
059:
060: private static int WARNING_SUPPRESSION_INTERVAL = 5;
061: protected long warningCutoffTime = 0;
062: protected static final String YP_GRACE_PERIOD_PROPERTY = "org.cougaar.servicediscovery.yp.YPGracePeriod";
063:
064: private final Properties ypproperties = new Properties();
065:
066: private final YPService yps;
067:
068: protected final YPService getYPService() {
069: return yps;
070: }
071:
072: private final YPProxy yp;
073:
074: protected final YPProxy getYPProxy() {
075: return yp;
076: }
077:
078: private final ThreadService threads;
079:
080: protected final ThreadService getThreadService() {
081: return threads;
082: }
083:
084: public YPStateMachine(YPService yps, YPProxy yp,
085: ThreadService threads) {
086: this .yps = yps;
087: this .yp = yp;
088: this .threads = threads;
089: ypproperties.put("username", UDDI_USERID);
090: ypproperties.put("password", UDDI_PASSWORD);
091: initThread();
092: init();
093: reset();
094: }
095:
096: /** set a property for just this instance of YPStateMachine.
097: * Current properties are "username" and "password".
098: * @note that it is undefined to change existing values after initialization.
099: **/
100: public void setProperty(String name, String value) {
101: ypproperties.put(name, value);
102: }
103:
104: private Schedulable thread = null;
105:
106: private AuthToken token = null;
107:
108: protected AuthToken getAuthToken() {
109: return token;
110: }
111:
112: /** restart the thread **/
113: protected synchronized void kick() {
114: thread.start();
115: }
116:
117: /** Sets the machine in motion. Will continue until DONE (or an error state) is achieved **/
118: public void start() {
119: kick();
120: }
121:
122: /** Set/Reset the machine to the starting state.
123: * By default, sets the state to "START", but subclasses may override.
124: **/
125: public void reset() {
126: set("START");
127: }
128:
129: /** initialize the states of the machine.
130: * called near the end of the constructor, followed by reset().
131: **/
132: protected void init() {
133: // called with call("saveTModel", tModels, nexttag);
134: // result (at nexttag) will be in Object SM.retval;
135: addYPQ("saveTModel", "POP", new YPQ() {
136: public YPFuture get(Frame f) {
137: Object arg = f.getArgument();
138: Vector tModels;
139: if (arg instanceof Vector) {
140: tModels = (Vector) arg;
141: } else {
142: tModels = new Vector(1);
143: tModels.addElement(arg);
144: }
145:
146: return yp.save_tModel(token.getAuthInfoString(),
147: tModels);
148: }
149:
150: public void set(Frame f, Object r) {
151: f.setRetval((TModelDetail) r);
152: }
153:
154: public void handle(Frame f, Exception e) {
155: f.setVar("YPErrorException", e);
156: f.setVar("YPErrorText", "Error in saveTModel");
157: transit("YPError");
158: }
159: });
160:
161: addYPQ("getAuthToken", "POP", new YPQ() {
162: public YPFuture get(Frame f) {
163: Properties p = (Properties) f.getArgument();
164: if (p == null)
165: p = ypproperties;
166: String username = p.getProperty("username");
167: String password = p.getProperty("password");
168: return yp.get_authToken(username, password);
169: }
170:
171: public void set(Frame f, Object r) {
172: token = (AuthToken) r;
173: f.setRetval(token);
174: }
175:
176: public void handle(Frame f, Exception e) {
177: f.setVar("YPErrorException", e);
178: f.setVar("YPErrorText", "Error in getAuthToken");
179: transit("YPError");
180: }
181: });
182:
183: addYPQ("discardAuthToken", "POP", new YPQ() {
184: public YPFuture get(Frame f) {
185: AuthToken t = (AuthToken) f.getArgument();
186: if (t == null)
187: t = token;
188: return yp.discard_authToken(t.getAuthInfoString());
189: }
190:
191: public void set(Frame f, Object r) {
192: AuthToken t = (AuthToken) f.getArgument();
193: if (t == token || t == null)
194: token = null;
195: }
196:
197: public void handle(Frame f, Exception e) {
198: AuthToken t = (AuthToken) f.getArgument();
199: if (t == null)
200: t = token;
201: logHandledError("Exception in discardAuthToken(" + t
202: + ")", e);
203: if (t == token || t == null)
204: token = null;
205: transit("POP");
206: }
207: });
208:
209: // this just sends logs the error, but extenders may override
210: addLink("YPError", "handleYPError");
211: add(new SState("handleYPError") {
212: public void invoke() {
213: logHandledError(
214: "Exception in " + getVar("YPErrorText"),
215: (Exception) getVar("YPErrorException"));
216: }
217: });
218:
219: }
220:
221: public synchronized void set(State s) { // sync to make sure we aren't still cleaning up
222: super .set(s);
223: }
224:
225: private void initThread() {
226: thread = getThreadService().getThread(this , new Runnable() {
227: public void run() {
228: try {
229: if (log.isDebugEnabled()) {
230: log.debug("running YPStateMachine.go()");
231: }
232: go();
233: } catch (RuntimeException e) {
234: handleException(e);
235: }
236: }
237: });
238: }
239:
240: protected void handleException(Exception e) {
241: logHandledError("Caught Exception - machine is dead", e);
242: }
243:
244: /** abstraction of an asynchronous YP Query **/
245: public interface YPQ {
246: /** return a YP Query to submit, in the form of a YPFuture **/
247: YPFuture get(Frame f);
248:
249: /** consume the result of a complete (isReady) YP query **/
250: void set(Frame f, Object result);
251:
252: /** consume an exception if one happens. Must do the proper transit **/
253: void handle(Frame f, Exception e);
254: }
255:
256: protected void addYPQ(String startTag, final String nextTag,
257: final YPQ ypq) {
258: add(new SState(startTag) {
259: public void invoke() {
260: final Frame frame = getFrame();
261: final YPFuture fut;
262: try {
263: fut = ypq.get(frame);
264: } catch (Exception e) {
265: logHandledError("Caught exception in YPQ.get() "
266: + ypq, e);
267: ypq.handle(frame, e);
268: return;
269: }
270:
271: YPFuture.Callback cab = new YPFuture.Callback() {
272: public void ready(YPFuture r) {
273: try {
274: if (r != fut) {
275: log.error(this .toString()
276: + " expected " + fut
277: + " instead of " + r);
278: }
279: if (r.isReady()) {
280: ypq.set(frame, r.get());
281: transit(nextTag);
282: kick();
283: } else {
284: ypq
285: .handle(
286: frame,
287: new RuntimeException(
288: "YPQ notified but not really ready!"));
289: }
290: } catch (Exception re) {
291: logHandledError(
292: "Caught exception from YP during kick() in "
293: + (YPStateMachine.this )
294: + " with YPFuture " + r, re);
295: ypq.handle(frame, re);
296: }
297: }
298: };
299: fut.setCallback(cab);
300: try {
301: yps.submit(fut);
302: } catch (RuntimeException e) {
303: logHandledError("Caught exception in YPQ.submit() "
304: + ypq, e);
305: ypq.handle(frame, e);
306: }
307: }
308: });
309: }
310:
311: /** Standard TModel publish interaction **/
312: public interface TModelThunk {
313: TModel make(Frame f);
314:
315: TModel update(Frame f, TModelDetail tmd);
316: }
317:
318: protected void addTModelPush(String tag, final String exit,
319: final TModelThunk thunk) {
320: final String t0 = tag;
321: final String t1 = tag + " (update)";
322:
323: add(new SState(t0) {
324: public void invoke() {
325: TModel tm = thunk.make(getFrame());
326:
327: // Only go on if the state has not been changed by thunk.make()
328: if (getState() == this ) {
329: call("saveTModel", tm, t1);
330: }
331: }
332: });
333: add(new SState(t1) {
334: public void invoke() {
335: TModelDetail tModelDetail = (TModelDetail) getResult();
336: try {
337: TModel nt = thunk.update(getFrame(), tModelDetail);
338:
339: // Only go on if the state has not been changed by thunk.update()
340: if (getState() == this ) {
341: call("saveTModel", nt, exit);
342: }
343: } catch (RuntimeException re) {
344: logHandledError("Caught exception", re);
345: transit("ERROR");
346: }
347: }
348: });
349: }
350:
351: protected long getWarningCutOffTime() {
352: if (warningCutoffTime == 0) {
353: WARNING_SUPPRESSION_INTERVAL = Integer.getInteger(
354: YP_GRACE_PERIOD_PROPERTY,
355: WARNING_SUPPRESSION_INTERVAL).intValue();
356: warningCutoffTime = System.currentTimeMillis()
357: + WARNING_SUPPRESSION_INTERVAL * 60000;
358: }
359:
360: return warningCutoffTime;
361: }
362:
363: // When we catch an error, log at DEBUG first. Change to logging at ERROR
364: // after After a while it becomes an error.
365: protected void logHandledError(String message) {
366: logHandledError(message, null);
367: }
368:
369: // When we catch an error, log at DEBUG first. Change to logging at ERROR
370: // after After a while it becomes an error.
371: protected void logHandledError(String message, Throwable e) {
372: if (System.currentTimeMillis() > getWarningCutOffTime()) {
373: if (e == null)
374: log.error(message);
375: else
376: log.error(message, e);
377: } else if (log.isDebugEnabled()) {
378: if (e == null)
379: log.debug(message);
380: else
381: log.debug(message, e);
382: }
383: }
384: }
|