001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.servicemix.jbi.framework;
018:
019: import java.util.ArrayList;
020: import java.util.Collection;
021: import java.util.HashSet;
022: import java.util.Iterator;
023: import java.util.List;
024: import java.util.Map;
025: import java.util.Set;
026: import java.util.concurrent.ConcurrentHashMap;
027:
028: import javax.jbi.JBIException;
029: import javax.jbi.component.ComponentContext;
030: import javax.jbi.servicedesc.ServiceEndpoint;
031: import javax.management.JMException;
032: import javax.management.ObjectName;
033: import javax.xml.namespace.QName;
034:
035: import org.apache.commons.logging.Log;
036: import org.apache.commons.logging.LogFactory;
037: import org.apache.servicemix.jbi.event.EndpointEvent;
038: import org.apache.servicemix.jbi.event.EndpointListener;
039: import org.apache.servicemix.jbi.framework.support.EndpointProcessor;
040: import org.apache.servicemix.jbi.servicedesc.AbstractServiceEndpoint;
041: import org.apache.servicemix.jbi.servicedesc.ExternalEndpoint;
042: import org.apache.servicemix.jbi.servicedesc.InternalEndpoint;
043: import org.apache.servicemix.jbi.servicedesc.LinkedEndpoint;
044:
045: /**
046: * Registry for Components
047: *
048: * @version $Revision: 564900 $
049: */
050: public class EndpointRegistry {
051:
052: private static final Log LOG = LogFactory
053: .getLog(EndpointRegistry.class);
054:
055: private Registry registry;
056:
057: private Map<AbstractServiceEndpoint, Endpoint> endpointMBeans;
058:
059: private Map<String, ServiceEndpoint> internalEndpoints;
060:
061: private Map<String, ServiceEndpoint> externalEndpoints;
062:
063: private Map<String, ServiceEndpoint> linkedEndpoints;
064:
065: private Map<QName, InterfaceConnection> interfaceConnections;
066:
067: private List<EndpointProcessor> endpointProcessors;
068:
069: /**
070: * Constructor
071: *
072: * @param cr
073: */
074: public EndpointRegistry(Registry registry) {
075: this .registry = registry;
076: this .endpointMBeans = new ConcurrentHashMap<AbstractServiceEndpoint, Endpoint>();
077: this .internalEndpoints = new ConcurrentHashMap<String, ServiceEndpoint>();
078: this .externalEndpoints = new ConcurrentHashMap<String, ServiceEndpoint>();
079: this .linkedEndpoints = new ConcurrentHashMap<String, ServiceEndpoint>();
080: this .interfaceConnections = new ConcurrentHashMap<QName, InterfaceConnection>();
081: this .endpointProcessors = getEndpointProcessors();
082: }
083:
084: private List<EndpointProcessor> getEndpointProcessors() {
085: List<EndpointProcessor> l = new ArrayList<EndpointProcessor>();
086: String[] classes = {
087: "org.apache.servicemix.jbi.framework.support.SUDescriptorProcessor",
088: "org.apache.servicemix.jbi.framework.support.WSDL1Processor",
089: "org.apache.servicemix.jbi.framework.support.WSDL2Processor" };
090: for (int i = 0; i < classes.length; i++) {
091: try {
092: EndpointProcessor p = (EndpointProcessor) Class
093: .forName(classes[i]).newInstance();
094: p.init(registry);
095: l.add(p);
096: } catch (Throwable e) {
097: LOG.warn("Disabled endpoint processor '" + classes[i]
098: + "': " + e);
099: }
100: }
101: return l;
102: }
103:
104: public ServiceEndpoint[] getEndpointsForComponent(
105: ComponentNameSpace cns) {
106: Collection<ServiceEndpoint> endpoints = new ArrayList<ServiceEndpoint>();
107: for (Iterator<ServiceEndpoint> iter = getInternalEndpoints()
108: .iterator(); iter.hasNext();) {
109: InternalEndpoint endpoint = (InternalEndpoint) iter.next();
110: if (cns.equals(endpoint.getComponentNameSpace())) {
111: endpoints.add(endpoint);
112: }
113: }
114: return asEndpointArray(endpoints);
115: }
116:
117: public ServiceEndpoint[] getAllEndpointsForComponent(
118: ComponentNameSpace cns) {
119: Collection<ServiceEndpoint> endpoints = new ArrayList<ServiceEndpoint>();
120: for (Iterator<ServiceEndpoint> iter = getInternalEndpoints()
121: .iterator(); iter.hasNext();) {
122: InternalEndpoint endpoint = (InternalEndpoint) iter.next();
123: if (cns.equals(endpoint.getComponentNameSpace())) {
124: endpoints.add(endpoint);
125: }
126: }
127: for (Iterator<ServiceEndpoint> iter = getExternalEndpoints()
128: .iterator(); iter.hasNext();) {
129: ExternalEndpoint endpoint = (ExternalEndpoint) iter.next();
130: if (cns.equals(endpoint.getComponentNameSpace())) {
131: endpoints.add(endpoint);
132: }
133: }
134: return asEndpointArray(endpoints);
135: }
136:
137: /**
138: * Returns a collection of Endpoint objects
139: */
140: public Collection<Endpoint> getEndpointMBeans() {
141: return endpointMBeans.values();
142: }
143:
144: /**
145: * Get all endpoints for a given service
146: *
147: * @param serviceName
148: * @return array of endpoints
149: */
150: public ServiceEndpoint[] getEndpointsForService(QName serviceName) {
151: Collection<ServiceEndpoint> collection = getEndpointsByService(
152: serviceName, getInternalEndpoints());
153: return asEndpointArray(collection);
154: }
155:
156: /**
157: * This will return the endpoints for all services and endpoints that implement the named interface (portType in
158: * WSDL 1.1). This method does NOT include external endpoints.
159: *
160: * @param interfaceName qualified name of interface/portType that is implemented by the endpoint; if
161: * <code>null</code> then all activated endpoints in the JBI environment must be returned.
162: * @return an array of available endpoints for the specified interface name; must be non-null; may be empty.
163: */
164: public ServiceEndpoint[] getEndpointsForInterface(
165: QName interfaceName) {
166: if (interfaceName == null) {
167: return asEndpointArray(internalEndpoints.values());
168: }
169: InterfaceConnection conn = interfaceConnections
170: .get(interfaceName);
171: if (conn != null) {
172: String key = getKey(conn.service, conn.endpoint);
173: ServiceEndpoint ep = internalEndpoints.get(key);
174: if (ep == null) {
175: LOG.warn("Connection for interface " + interfaceName
176: + " could not find target for service "
177: + conn.service + " and endpoint "
178: + conn.endpoint);
179: return new ServiceEndpoint[0];
180: } else {
181: return new ServiceEndpoint[] { ep };
182: }
183: }
184: Collection<ServiceEndpoint> result = getEndpointsByInterface(
185: interfaceName, getInternalEndpoints());
186: return asEndpointArray(result);
187: }
188:
189: /**
190: * Activate an endpoint
191: *
192: * @param provider
193: * @param serviceName
194: * @param endpointName
195: * @return the endpoint
196: * @throws JBIException
197: */
198: public InternalEndpoint registerInternalEndpoint(
199: ComponentContextImpl provider, QName serviceName,
200: String endpointName) throws JBIException {
201: // Create endpoint
202: String key = getKey(serviceName, endpointName);
203: InternalEndpoint registered = (InternalEndpoint) internalEndpoints
204: .get(key);
205: // Check if the endpoint has already been activated by another component
206: if (registered != null && registered.isLocal()) {
207: throw new JBIException("An internal endpoint for service "
208: + serviceName + " and endpoint " + endpointName
209: + " is already registered");
210: }
211: // Create a new endpoint
212: InternalEndpoint serviceEndpoint = new InternalEndpoint(
213: provider.getComponentNameSpace(), endpointName,
214: serviceName);
215: // Get interface from activationSpec
216: if (provider.getActivationSpec().getInterfaceName() != null) {
217: serviceEndpoint.addInterface(provider.getActivationSpec()
218: .getInterfaceName());
219: }
220: // Get interfaces
221: for (Iterator<EndpointProcessor> it = endpointProcessors
222: .iterator(); it.hasNext();) {
223: EndpointProcessor p = it.next();
224: p.process(serviceEndpoint);
225: }
226: // Set remote namespaces
227: if (registered != null) {
228: InternalEndpoint[] remote = registered.getRemoteEndpoints();
229: for (int i = 0; i < remote.length; i++) {
230: serviceEndpoint.addRemoteEndpoint(remote[i]);
231: }
232: }
233: // Register endpoint
234: internalEndpoints.put(key, serviceEndpoint);
235: registerEndpoint(serviceEndpoint);
236: fireEvent(serviceEndpoint,
237: EndpointEvent.INTERNAL_ENDPOINT_REGISTERED);
238: return serviceEndpoint;
239: }
240:
241: /**
242: * Called by component context when endpoints are being deactivated.
243: *
244: * @param provider
245: * @param serviceEndpoint
246: */
247: public void unregisterInternalEndpoint(ComponentContext provider,
248: InternalEndpoint serviceEndpoint) {
249: if (serviceEndpoint.isClustered()) {
250: fireEvent(serviceEndpoint,
251: EndpointEvent.INTERNAL_ENDPOINT_UNREGISTERED);
252: // set endpoint to be no more local
253: serviceEndpoint.setComponentName(null);
254: } else {
255: String key = getKey(serviceEndpoint);
256: internalEndpoints.remove(key);
257: unregisterEndpoint(serviceEndpoint);
258: fireEvent(serviceEndpoint,
259: EndpointEvent.INTERNAL_ENDPOINT_UNREGISTERED);
260: }
261: }
262:
263: /**
264: * Registers a remote endpoint
265: *
266: * @param remote
267: */
268: public void registerRemoteEndpoint(InternalEndpoint remote) {
269: InternalEndpoint endpoint = (InternalEndpoint) internalEndpoints
270: .get(getKey(remote));
271: // Create endpoint if not already existing
272: if (endpoint == null) {
273: endpoint = new InternalEndpoint(null, remote
274: .getEndpointName(), remote.getServiceName());
275: internalEndpoints.put(getKey(endpoint), endpoint);
276: }
277: // Add remote endpoint
278: endpoint.addRemoteEndpoint(remote);
279: fireEvent(remote, EndpointEvent.REMOTE_ENDPOINT_REGISTERED);
280: }
281:
282: /**
283: * Unregisters a remote endpoint
284: *
285: * @param remote
286: */
287: public void unregisterRemoteEndpoint(InternalEndpoint remote) {
288: String key = getKey(remote);
289: InternalEndpoint endpoint = (InternalEndpoint) internalEndpoints
290: .get(key);
291: if (endpoint != null) {
292: endpoint.removeRemoteEndpoint(remote);
293: if (!endpoint.isClustered() && !endpoint.isLocal()) {
294: internalEndpoints.remove(key);
295: unregisterEndpoint(endpoint);
296: }
297: fireEvent(remote,
298: EndpointEvent.REMOTE_ENDPOINT_UNREGISTERED);
299: }
300: }
301:
302: /**
303: * Get the named ServiceEndpoint, if activated
304: *
305: * @param service
306: * @param name
307: * @return the activated ServiceEndpoint or null
308: */
309: public ServiceEndpoint getEndpoint(QName service, String name) {
310: String key = getKey(service, name);
311: ServiceEndpoint ep = linkedEndpoints.get(key);
312: if (ep == null) {
313: ep = internalEndpoints.get(key);
314: }
315: return ep;
316: }
317:
318: public ServiceEndpoint getInternalEndpoint(QName service,
319: String name) {
320: return internalEndpoints.get(getKey(service, name));
321: }
322:
323: /**
324: * Registers the given external endpoint with the NMR. This indicates to the NMR that the given endpoint is used as
325: * a proxy for external service consumers to access an internal service of the same service name (but a different
326: * endpoint name).
327: *
328: * @param provider
329: * @param externalEndpoint the external endpoint to be registered, must be non-null.
330: * @throws JBIException
331: */
332: public void registerExternalEndpoint(ComponentNameSpace cns,
333: ServiceEndpoint externalEndpoint) throws JBIException {
334: ExternalEndpoint serviceEndpoint = new ExternalEndpoint(cns,
335: externalEndpoint);
336: if (externalEndpoints.get(getKey(serviceEndpoint)) != null) {
337: throw new JBIException("An external endpoint for service "
338: + externalEndpoint.getServiceName()
339: + " and endpoint "
340: + externalEndpoint.getEndpointName()
341: + " is already registered");
342: }
343: registerEndpoint(serviceEndpoint);
344: externalEndpoints.put(getKey(serviceEndpoint), serviceEndpoint);
345: fireEvent(serviceEndpoint,
346: EndpointEvent.EXTERNAL_ENDPOINT_REGISTERED);
347: }
348:
349: /**
350: * Deregisters the given external endpoint with the NMR. This indicates to the NMR that the given external endpoint
351: * can no longer be used as a proxy for external service consumers to access an internal service of the same service
352: * name.
353: *
354: * @param provider
355: * @param externalEndpoint the external endpoint to be deregistered; must be non-null.
356: */
357: public void unregisterExternalEndpoint(ComponentNameSpace cns,
358: ServiceEndpoint externalEndpoint) {
359: ExternalEndpoint ep = (ExternalEndpoint) externalEndpoints
360: .remove(getKey(externalEndpoint));
361: unregisterEndpoint(ep);
362: fireEvent(ep, EndpointEvent.EXTERNAL_ENDPOINT_UNREGISTERED);
363: }
364:
365: /**
366: * This methods returns only registered external endpoints
367: *
368: * @param interfaceName qualified name of interface implemented by the endpoints; must be non-null.
369: * @return an array of available external endpoints for the specified interface name; must be non-null; may be
370: * empty.
371: */
372: public ServiceEndpoint[] getExternalEndpointsForInterface(
373: QName interfaceName) {
374: Collection<ServiceEndpoint> endpoints = getEndpointsByInterface(
375: interfaceName, getExternalEndpoints());
376: return asEndpointArray(endpoints);
377: }
378:
379: /**
380: * Get external endpoints for the service
381: *
382: * @param serviceName qualified name of service that contains the endpoints; must be non-null.
383: * @return an array of available external endpoints for the specified service name; must be non-null; may be empty.
384: */
385: public ServiceEndpoint[] getExternalEndpointsForService(
386: QName serviceName) {
387: Collection<ServiceEndpoint> endpoints = getEndpointsByService(
388: serviceName, getExternalEndpoints());
389: return asEndpointArray(endpoints);
390: }
391:
392: /**
393: * Helper method to convert the given collection into an array of endpoints
394: *
395: * @param collection
396: * @return array of endpoints
397: */
398: protected ServiceEndpoint[] asEndpointArray(
399: Collection<ServiceEndpoint> collection) {
400: if (collection == null) {
401: return new ServiceEndpoint[0];
402: }
403: ServiceEndpoint[] answer = new ServiceEndpoint[collection
404: .size()];
405: answer = collection.toArray(answer);
406: return answer;
407: }
408:
409: /**
410: * return a collection of endpoints
411: *
412: * @param serviceName
413: * @param endpoints
414: * @return collection of endpoints
415: */
416: protected Collection<ServiceEndpoint> getEndpointsByService(
417: QName serviceName, Collection<ServiceEndpoint> endpoints) {
418: Collection<ServiceEndpoint> answer = new ArrayList<ServiceEndpoint>();
419: for (Iterator<ServiceEndpoint> i = endpoints.iterator(); i
420: .hasNext();) {
421: ServiceEndpoint endpoint = i.next();
422: if (endpoint.getServiceName().equals(serviceName)) {
423: answer.add(endpoint);
424: }
425: }
426: return answer;
427: }
428:
429: /**
430: * Filters the given endpoints and returns those implementing the
431: * given interface name. If interfaceName is null, then no filter
432: * is applied.
433: *
434: */
435: protected Collection<ServiceEndpoint> getEndpointsByInterface(
436: QName interfaceName, Collection<ServiceEndpoint> endpoints) {
437: if (interfaceName == null) {
438: return endpoints;
439: }
440: Set<ServiceEndpoint> answer = new HashSet<ServiceEndpoint>();
441: for (Iterator<ServiceEndpoint> i = endpoints.iterator(); i
442: .hasNext();) {
443: ServiceEndpoint endpoint = i.next();
444: QName[] interfaces = endpoint.getInterfaces();
445: if (interfaces != null) {
446: for (int k = 0; k < interfaces.length; k++) {
447: QName qn = interfaces[k];
448: if (qn != null && qn.equals(interfaceName)) {
449: answer.add(endpoint);
450: break;
451: }
452: }
453: }
454: }
455: return answer;
456: }
457:
458: /**
459: * @return all default endpoints
460: */
461: protected Collection<ServiceEndpoint> getInternalEndpoints() {
462: return internalEndpoints.values();
463: }
464:
465: /**
466: * @return all external endpoints
467: */
468: protected Collection<ServiceEndpoint> getExternalEndpoints() {
469: return externalEndpoints.values();
470: }
471:
472: /**
473: * Registers an endpoint connection.
474: *
475: * @param fromSvc
476: * @param fromEp
477: * @param toSvc
478: * @param toEp
479: * @param link
480: * @throws JBIException
481: */
482: public void registerEndpointConnection(QName fromSvc,
483: String fromEp, QName toSvc, String toEp, String link)
484: throws JBIException {
485: LinkedEndpoint ep = new LinkedEndpoint(fromSvc, fromEp, toSvc,
486: toEp, link);
487: if (linkedEndpoints.get(getKey(ep)) != null) {
488: throw new JBIException(
489: "An endpoint connection for service "
490: + ep.getServiceName() + " and name "
491: + ep.getEndpointName()
492: + " is already registered");
493: }
494: linkedEndpoints.put(getKey(ep), ep);
495: registerEndpoint(ep);
496: fireEvent(ep, EndpointEvent.LINKED_ENDPOINT_REGISTERED);
497: }
498:
499: /**
500: * Unregister an endpoint connection.
501: *
502: * @param fromSvc
503: * @param fromEp
504: */
505: public void unregisterEndpointConnection(QName fromSvc,
506: String fromEp) {
507: LinkedEndpoint ep = (LinkedEndpoint) linkedEndpoints
508: .remove(getKey(fromSvc, fromEp));
509: unregisterEndpoint(ep);
510: fireEvent(ep, EndpointEvent.LINKED_ENDPOINT_UNREGISTERED);
511: }
512:
513: /**
514: * Registers an interface connection.
515: *
516: * @param fromItf
517: * @param toSvc
518: * @param toEp
519: * @throws JBIException
520: */
521: public void registerInterfaceConnection(QName fromItf, QName toSvc,
522: String toEp) throws JBIException {
523: if (interfaceConnections.get(fromItf) != null) {
524: throw new JBIException("An interface connection for "
525: + fromItf + " is already registered");
526: }
527: interfaceConnections.put(fromItf, new InterfaceConnection(
528: toSvc, toEp));
529: }
530:
531: /**
532: * Unregisters an interface connection.
533: *
534: * @param fromItf
535: */
536: public void unregisterInterfaceConnection(QName fromItf) {
537: interfaceConnections.remove(fromItf);
538:
539: }
540:
541: private void registerEndpoint(
542: AbstractServiceEndpoint serviceEndpoint) {
543: try {
544: Endpoint endpoint = new Endpoint(serviceEndpoint, registry);
545: ObjectName objectName = registry.getContainer()
546: .getManagementContext().createObjectName(endpoint);
547: registry.getContainer().getManagementContext()
548: .registerMBean(objectName, endpoint,
549: EndpointMBean.class);
550: endpointMBeans.put(serviceEndpoint, endpoint);
551: } catch (JMException e) {
552: LOG.error("Could not register MBean for endpoint", e);
553: }
554: }
555:
556: private void unregisterEndpoint(AbstractServiceEndpoint se) {
557: Endpoint ep = endpointMBeans.remove(se);
558: try {
559: registry.getContainer().getManagementContext()
560: .unregisterMBean(ep);
561: } catch (JBIException e) {
562: LOG.error("Could not unregister MBean for endpoint", e);
563: }
564: }
565:
566: private String getKey(ServiceEndpoint ep) {
567: return getKey(ep.getServiceName(), ep.getEndpointName());
568: }
569:
570: private String getKey(QName svcName, String epName) {
571: return svcName + epName;
572: }
573:
574: private static class InterfaceConnection {
575: QName service;
576: String endpoint;
577:
578: InterfaceConnection(QName service, String endpoint) {
579: this .service = service;
580: this .endpoint = endpoint;
581: }
582: }
583:
584: protected void fireEvent(ServiceEndpoint ep, int type) {
585: EndpointEvent event = new EndpointEvent(ep, type);
586: EndpointListener[] listeners = (EndpointListener[]) registry
587: .getContainer().getListeners(EndpointListener.class);
588: for (int i = 0; i < listeners.length; i++) {
589: switch (type) {
590: case EndpointEvent.INTERNAL_ENDPOINT_REGISTERED:
591: listeners[i].internalEndpointRegistered(event);
592: break;
593: case EndpointEvent.INTERNAL_ENDPOINT_UNREGISTERED:
594: listeners[i].internalEndpointUnregistered(event);
595: break;
596: case EndpointEvent.EXTERNAL_ENDPOINT_REGISTERED:
597: listeners[i].externalEndpointRegistered(event);
598: break;
599: case EndpointEvent.EXTERNAL_ENDPOINT_UNREGISTERED:
600: listeners[i].externalEndpointUnregistered(event);
601: break;
602: case EndpointEvent.LINKED_ENDPOINT_REGISTERED:
603: listeners[i].linkedEndpointRegistered(event);
604: break;
605: case EndpointEvent.LINKED_ENDPOINT_UNREGISTERED:
606: listeners[i].linkedEndpointUnregistered(event);
607: break;
608: case EndpointEvent.REMOTE_ENDPOINT_REGISTERED:
609: listeners[i].remoteEndpointRegistered(event);
610: break;
611: case EndpointEvent.REMOTE_ENDPOINT_UNREGISTERED:
612: listeners[i].remoteEndpointUnregistered(event);
613: break;
614: default:
615: break;
616: }
617: }
618:
619: }
620:
621: }
|