001: /*
002: * BEGIN_HEADER - DO NOT EDIT
003: *
004: * The contents of this file are subject to the terms
005: * of the Common Development and Distribution License
006: * (the "License"). You may not use this file except
007: * in compliance with the License.
008: *
009: * You can obtain a copy of the license at
010: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
011: * See the License for the specific language governing
012: * permissions and limitations under the License.
013: *
014: * When distributing Covered Code, include this CDDL
015: * HEADER in each file and include the License file at
016: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
017: * If applicable add the following below this CDDL HEADER,
018: * with the fields enclosed by brackets "[]" replaced with
019: * your own identifying information: Portions Copyright
020: * [year] [name of copyright owner]
021: */
022:
023: /*
024: * @(#)EndpointManager.java
025: * Copyright 2004-2007 Sun Microsystems, Inc. All Rights Reserved.
026: *
027: * END_HEADER - DO NOT EDIT
028: */
029: package com.sun.jbi.binding.jms;
030:
031: import com.sun.jbi.StringTranslator;
032:
033: import com.sun.jbi.binding.jms.deploy.EndpointRegistry;
034:
035: import com.sun.jbi.binding.jms.mq.MQConnection;
036: import com.sun.jbi.binding.jms.mq.MQDestination;
037: import com.sun.jbi.binding.jms.mq.MQManager;
038: import com.sun.jbi.binding.jms.mq.MQSession;
039:
040: import com.sun.jbi.binding.jms.config.ConfigConstants;
041: import com.sun.jbi.binding.jms.util.UtilBase;
042:
043: import java.util.Collection;
044: import java.util.Iterator;
045:
046: import java.util.logging.Logger;
047:
048: import javax.jbi.messaging.DeliveryChannel;
049:
050: import javax.jbi.servicedesc.ServiceEndpoint;
051:
052: import javax.xml.namespace.QName;
053:
054: /**
055: * Manages all the stopping and starting of endpoints.
056: *
057: * @author Sun Microsystems Inc.
058: */
059: public class EndpointManager extends UtilBase implements
060: JMSBindingResources {
061: /**
062: * Sleep time for thread.
063: */
064: private static final long SLEEP_TIME = 50;
065:
066: /**
067: * Binding Channel.
068: */
069: private DeliveryChannel mChannel;
070:
071: /**
072: * Deployment Registry.
073: */
074: private EndpointRegistry mRegistry;
075:
076: /**
077: * Logger object.
078: */
079: private Logger mLogger;
080: /**
081: * MQ Manager.
082: */
083: private MQManager mManager;
084:
085: /**
086: * Helper for i18n.
087: */
088: private StringTranslator mStringTranslator;
089: /**
090: * Session thread.
091: */
092: private Thread mTmpSessionThread;
093:
094: /**
095: * Creates a new EndpointManager object.
096: */
097: public EndpointManager() {
098: mRegistry = JMSBindingContext.getInstance().getRegistry();
099: mLogger = JMSBindingContext.getInstance().getLogger();
100: mStringTranslator = JMSBindingContext.getInstance()
101: .getStringTranslator();
102: }
103:
104: /**
105: * Sets the channel.
106: *
107: * @param bc binding channel.
108: */
109: public void setChannel(DeliveryChannel bc) {
110: mChannel = bc;
111: }
112:
113: /**
114: * Sets the connection manager.
115: *
116: * @param man manager.
117: */
118: public void setConnectionManager(MQManager man) {
119: mManager = man;
120: }
121:
122: /**
123: * Returns the number of endpoints actually running.
124: *
125: * @return int number of running endpoints.
126: */
127: public int getRunningEndpointCount() {
128: int count = 0;
129:
130: return count;
131: }
132:
133: /**
134: * Stops all running endpoints.
135: */
136: public void startAllEndpoints() {
137: /* We have to wait till stop timeout configured in
138: * the configuration file, if we cannot stop the endpoints within
139: * that time we quit
140: */
141: Collection eps = mRegistry.getAllEndpoints();
142:
143: Iterator iter = eps.iterator();
144:
145: while (iter.hasNext()) {
146: EndpointBean eb = (EndpointBean) iter.next();
147:
148: if (eb == null) {
149: mLogger.severe(mStringTranslator
150: .getString(JMS_START_ENDPOINT_FAILED));
151: setError(mStringTranslator
152: .getString(JMS_START_ENDPOINT_FAILED));
153:
154: continue;
155: }
156:
157: if (!startEndpoint(eb)) {
158: setError("Cannot start endpoint " + eb.getUniqueName());
159: stopEndpoint(eb);
160:
161: continue;
162: }
163:
164: if (eb.getRole() == ConfigConstants.PROVIDER) {
165: if (!startOutboundEndpoint(eb)) {
166: setError("Cannot start outbound endpoint "
167: + eb.getUniqueName() + " in deployment ");
168:
169: //stop endpoint
170: continue;
171: }
172: } else if (eb.getRole() == ConfigConstants.CONSUMER) {
173: if (!startInboundEndpoint(eb)) {
174: setError("Cannot start inbound endpoint "
175: + eb.getUniqueName() + " in deployment ");
176: stopInboundEndpoint(eb);
177:
178: // stop endpoint here
179: continue;
180: }
181: }
182: }
183: }
184:
185: /**
186: * Starts a deployment.
187: *
188: * @param asid service unit id.
189: */
190: public void startDeployment(String asid) {
191: mLogger.info(mStringTranslator.getString(JMS_START_DEPLOYMENT,
192: asid));
193: super .clear();
194:
195: Collection l = mRegistry.getEndpoints(asid);
196: Iterator iter = l.iterator();
197:
198: while (iter.hasNext()) {
199: EndpointBean eb = (EndpointBean) iter.next();
200:
201: if (eb == null) {
202: mLogger.severe(mStringTranslator.getString(
203: JMS_START_DEPLOYMENT_FAILED, asid));
204: mLogger.severe(mStringTranslator.getString(
205: JMS_START_DEPLOYMENT_FAILED_BEANNULL, asid));
206: setError(mStringTranslator.getString(
207: JMS_START_DEPLOYMENT_FAILED_BEANNULL, asid));
208:
209: continue;
210: }
211:
212: if (!startEndpoint(eb)) {
213: setError("Cannot start endpoint " + eb.getUniqueName());
214: stopEndpoint(eb);
215:
216: continue;
217: }
218:
219: if (eb.getRole() == ConfigConstants.PROVIDER) {
220: if (!startOutboundEndpoint(eb)) {
221: setError("Cannot start outbound endpoint "
222: + eb.getUniqueName() + " in deployment "
223: + asid);
224:
225: //stop endpoint
226: continue;
227: }
228: } else if (eb.getRole() == ConfigConstants.CONSUMER) {
229: if (!startInboundEndpoint(eb)) {
230: setError("Cannot start inbound endpoint "
231: + eb.getUniqueName() + " in deployment "
232: + asid);
233: stopInboundEndpoint(eb);
234:
235: // stop endpoint here
236: continue;
237: }
238:
239: mLogger.info(mStringTranslator.getString(
240: JMS_ACTIVATE_INBOUND_SUCCESS, eb
241: .getUniqueName()));
242: }
243: }
244:
245: mLogger.info(mStringTranslator.getString(
246: JMS_START_DEPLOYMENT_SUCCESS, asid));
247: }
248:
249: /**
250: * Sraterts the endpoint.
251: *
252: * @param eb endpoint bean.
253: *
254: * @return true of started.
255: */
256: public boolean startEndpoint(EndpointBean eb) {
257: MQConnection con = mManager.createConnection(eb
258: .getValue(ConfigConstants.CONNECTION_FACTORY), eb
259: .getValue(ConfigConstants.CONNECTION_USER_ID), eb
260: .getValue(ConfigConstants.CONNECTION_PASSWORD), eb
261: .getStyle());
262:
263: if (con == null) {
264: mLogger.severe("Cannot create connection using "
265: + eb.getValue(ConfigConstants.CONNECTION_FACTORY));
266: setError(mManager.getError()
267: + "\n"
268: + (String) eb
269: .getValue(ConfigConstants.CONNECTION_FACTORY));
270:
271: return false;
272: }
273:
274: if (mManager.destinationExists(eb
275: .getValue(ConfigConstants.DESTINATION_NAME))) {
276: if (eb.getStyle() == ConfigConstants.QUEUE) {
277: mLogger
278: .severe("Destination name already exisst for a different endpoint");
279: mManager.closeConnection(con);
280: setError("Desintination already exists ");
281:
282: return false;
283: }
284: }
285:
286: MQDestination destination = mManager.getDestination(eb
287: .getValue(ConfigConstants.DESTINATION_NAME));
288:
289: if (destination == null) {
290: mLogger.severe("Cannot get destination using "
291: + eb.getValue(ConfigConstants.DESTINATION_NAME));
292: setError("Desintination cannot be created ");
293: setError(mManager.getError());
294:
295: return false;
296: }
297:
298: eb.setConnection(con);
299: eb.setDestination(destination);
300: return true;
301: }
302:
303: /**
304: * Start inbound endpoint.
305: *
306: * @param eb endpoint bean.
307: *
308: * @return true if started.
309: */
310: public boolean startInboundEndpoint(EndpointBean eb) {
311: ServiceEndpoint ref = null;
312: MQSession session = mManager.getSession(false, eb);
313:
314: mLogger.fine("Starting inbound Endpoint " + eb.getUniqueName());
315: try {
316: session.init();
317: } catch (Exception e) {
318: setError("Cannot initialise session ");
319: setError(session.getError());
320:
321: return false;
322: }
323:
324: eb.getConnection().start();
325:
326: if (!eb.getConnection().isValid()) {
327: setError(eb.getConnection().getError());
328:
329: return false;
330: }
331:
332: if (!session.setReceiver()) {
333: setError(session.getError());
334: mManager.releaseSession(session);
335:
336: return false;
337: }
338:
339: Thread t = new Thread(session);
340: eb.setReceiverSession(session);
341: eb.setReceiverThread(t);
342: t.start();
343:
344: mLogger.fine("Started inbound Endpoint " + eb.getUniqueName());
345: return true;
346: }
347:
348: /**
349: * Starts the outbound endpoint.
350: *
351: * @param eb endpoint bean.
352: *
353: * @return true if started.
354: */
355: public boolean startOutboundEndpoint(EndpointBean eb) {
356: // TO DO other tasks
357: ServiceEndpoint ref = null;
358: mLogger
359: .fine("Starting Outbount Endpoint "
360: + eb.getUniqueName());
361:
362: try {
363: ref = JMSBindingContext
364: .getInstance()
365: .getContext()
366: .activateEndpoint(
367: new QName(
368: eb
369: .getValue(ConfigConstants.SERVICE_NAMESPACE),
370: eb
371: .getValue(ConfigConstants.SERVICENAME)),
372: eb.getValue(ConfigConstants.ENDPOINTNAME));
373: mLogger.info(mStringTranslator.getString(
374: JMS_ACTIVATE_OUTBOUND_SUCCESS, eb.getUniqueName()));
375: } catch (javax.jbi.JBIException me) {
376: mLogger.severe(mStringTranslator.getString(
377: JMS_ACTIVATE_OUTBOUND_FAILED, eb.getUniqueName()));
378: mLogger.severe(me.getMessage());
379: setError(me.getMessage());
380:
381: return false;
382: }
383:
384: if (mTmpSessionThread == null) {
385: MQSession tmpsession = mManager.getTemporarySession();
386:
387: mLogger.info("temp session is " + tmpsession);
388:
389: if ((!mManager.isValid()) || (tmpsession == null)) {
390: setError(mManager.getError());
391:
392: return false;
393: }
394:
395: try {
396: mLogger.info("initing temp session is " + tmpsession);
397: tmpsession.init();
398: } catch (Exception e) {
399: setError(mManager.getError());
400:
401: return false;
402: }
403:
404: if (!tmpsession.setReceiver()) {
405: mLogger.info("Setting receiver " + tmpsession);
406: setError(tmpsession.getError());
407:
408: return false;
409: }
410:
411: mTmpSessionThread = new Thread(tmpsession);
412: mTmpSessionThread.start();
413: }
414:
415: eb.setServiceEndpoint(ref);
416: eb.getConnection().start();
417:
418: mLogger.fine("Started Outbount Endpoint " + eb.getUniqueName());
419: return true;
420: }
421:
422: /**
423: * Stops all running endpoints.
424: */
425: public void stopAllEndpoints() {
426: /* We have to wait till stop timeout configured in
427: * the configuration file, if we cannot stop the endpoints within
428: * that time we quit
429: */
430: Collection eps = mRegistry.getAllEndpoints();
431:
432: Iterator iter = eps.iterator();
433:
434: while (iter.hasNext()) {
435: EndpointBean eb = (EndpointBean) iter.next();
436:
437: if (eb != null) {
438: stopEndpoint(eb);
439:
440: if (eb.getRole() == ConfigConstants.CONSUMER) {
441: stopInboundEndpoint(eb);
442: } else if (eb.getRole() == ConfigConstants.PROVIDER) {
443: stopOutboundEndpoint(eb);
444: }
445: }
446: }
447:
448: if (mTmpSessionThread != null) {
449: mManager.releaseTemporarySession();
450: mTmpSessionThread = null;
451: }
452: }
453:
454: /**
455: * Releases the temporary session when the binding stops
456: *
457: */
458:
459: public void releaseTemporarySession() {
460: if (mTmpSessionThread != null) {
461: mManager.releaseTemporarySession();
462: mTmpSessionThread = null;
463: }
464: }
465:
466: /**
467: * Stops a deployment.
468: *
469: * @param asid Application Sub assembly ID.
470: */
471: public void stopDeployment(String asid) {
472: mLogger.info(mStringTranslator.getString(JMS_STOP_DEPLOYMENT,
473: asid));
474: super .clear();
475:
476: Collection eps = mRegistry.getEndpoints(asid);
477:
478: Iterator iter = eps.iterator();
479:
480: while (iter.hasNext()) {
481: EndpointBean eb = (EndpointBean) iter.next();
482:
483: if (eb != null) {
484: stopEndpoint(eb);
485:
486: if (eb.getRole() == ConfigConstants.CONSUMER) {
487: stopInboundEndpoint(eb);
488: }
489: }
490: }
491:
492: mLogger.info(mStringTranslator.getString(
493: JMS_STOP_DEPLOYMENT_SUCCESS, asid));
494: }
495:
496: /**
497: * Stop an endpoint.
498: *
499: * @param eb endpoint bean.
500: */
501: private void stopEndpoint(EndpointBean eb) {
502: MQConnection con = eb.getConnection();
503:
504: if (con != null) {
505: mManager.closeConnection(con);
506:
507: if (!con.isValid()) {
508: setWarning(con.getError());
509: }
510: }
511:
512: MQDestination dest = eb.getDestination();
513: mManager.closeDestination(dest);
514: dest = null;
515: }
516:
517: /**
518: * Stops the inbound endpoint.
519: *
520: * @param eb endpoint bean.
521: */
522: private void stopInboundEndpoint(EndpointBean eb) {
523: MQSession session = eb.getReceiverSession();
524:
525: if (session != null) {
526: session.stopReceiving();
527: }
528:
529: mManager.releaseSession(session);
530: }
531:
532: /**
533: * Stops an outbound endpoint.
534: *
535: * @param eb endpoint bean.
536: */
537: private void stopOutboundEndpoint(EndpointBean eb) {
538: ServiceEndpoint ref = eb.getServiceEndpoint();
539:
540: try {
541: if (ref != null) {
542: JMSBindingContext.getInstance().getContext()
543: .deactivateEndpoint(ref);
544: }
545:
546: mLogger.info(mStringTranslator.getString(
547: JMS_STOP_ENDPOINT_SUCCESS, eb.getUniqueName()));
548: } catch (javax.jbi.JBIException me) {
549: mLogger.severe(mStringTranslator.getString(
550: JMS_STOP_ENDPOINT_FAILED, eb.getUniqueName()));
551: mLogger.severe(me.getMessage());
552: setWarning(me.getMessage());
553: }
554: }
555: }
|