001: /*
002: * Copyright 2007 The Kuali Foundation
003: *
004: * Licensed under the Educational Community License, Version 1.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.opensource.org/licenses/ecl1.php
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016: package edu.iu.uis.eden.messaging;
017:
018: import java.util.ArrayList;
019: import java.util.Collections;
020: import java.util.HashMap;
021: import java.util.Iterator;
022: import java.util.List;
023: import java.util.Map;
024: import java.util.Random;
025:
026: import javax.xml.namespace.QName;
027:
028: import org.apache.commons.lang.StringUtils;
029: import org.apache.log4j.Logger;
030: import org.kuali.bus.services.KSBServiceLocator;
031: import org.kuali.rice.core.Core;
032: import org.kuali.rice.definition.ObjectDefinition;
033: import org.kuali.rice.exceptions.RiceRuntimeException;
034: import org.kuali.rice.resourceloader.GlobalResourceLoader;
035: import org.kuali.rice.resourceloader.ResourceLoaderContainer;
036:
037: import edu.emory.mathcs.backport.java.util.concurrent.ScheduledFuture;
038: import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
039: import edu.iu.uis.eden.messaging.exceptionhandling.DefaultMessageExceptionHandler;
040: import edu.iu.uis.eden.messaging.exceptionhandling.MessageExceptionHandler;
041: import edu.iu.uis.eden.messaging.objectremoting.ObjectRemoterService;
042: import edu.iu.uis.eden.messaging.objectremoting.RemoteObjectCleanup;
043: import edu.iu.uis.eden.messaging.serviceconnectors.ServiceConnectorFactory;
044:
045: public class RemoteResourceServiceLocatorImpl extends
046: ResourceLoaderContainer implements Runnable,
047: RemoteResourceServiceLocator {
048:
049: private static final Logger LOG = Logger
050: .getLogger(RemoteResourceServiceLocatorImpl.class);
051:
052: private Random randomNumber = new Random();
053:
054: private boolean started;
055:
056: private ScheduledFuture future;
057:
058: private Map<QName, List<RemotedServiceHolder>> clients = Collections
059: .synchronizedMap(new HashMap<QName, List<RemotedServiceHolder>>());
060:
061: public RemoteResourceServiceLocatorImpl(QName name) {
062: super (name);
063: }
064:
065: public void removeService(ServiceInfo serviceInfo, Object service) {
066: QName serviceName = serviceInfo.getQname();
067: LOG.info("Removing service '" + serviceName + "'...");
068: List<RemotedServiceHolder> clientProxies = this .getClients()
069: .get(serviceName);
070: // these could be null in the case that they were removed by another
071: // thread (the thread pool) prior to entry into this method
072: if (clientProxies != null) {
073: boolean removed = removeServiceFromCollection(service,
074: clientProxies);
075: if (!removed) {
076: LOG
077: .info("There was no client proxy removed for the given service: "
078: + serviceName);
079: }
080: if (clientProxies.isEmpty()) {
081: List<RemotedServiceHolder> removedList = this
082: .getClients().remove(serviceName);
083: if (!removedList.isEmpty()) {
084: LOG
085: .warn("No client proxy was removed for the given service "
086: + serviceName);
087: }
088: }
089: }
090: }
091:
092: /**
093: * Removes a service (it's RemotedServiceHolder wrapper) from the list of
094: * services. This isn't very effecient but for time reasons hashcode and
095: * equals wasn't implemented on the RemotedServiceHolder and IPTable, which
096: * is a member of te RemotedServiceHolder.
097: *
098: * @param service
099: * @param serviceList
100: * @return boolean indicating if the entry was removed from the list
101: */
102: private boolean removeServiceFromCollection(Object service,
103: List<RemotedServiceHolder> serviceList) {
104: RemotedServiceHolder serviceToRemove = null;
105: for (RemotedServiceHolder remotedServiceHolder : serviceList) {
106: if (remotedServiceHolder.getService().equals(service)) {
107: serviceToRemove = remotedServiceHolder;
108: }
109: }
110: if (serviceToRemove != null) {
111:
112: serviceToRemove.getServiceInfo().setAlive(false);
113: List<ServiceInfo> serviceInfos = new ArrayList<ServiceInfo>();
114: serviceInfos.add(serviceToRemove.getServiceInfo());
115: KSBServiceLocator.getIPTableService().markServicesDead(
116: serviceInfos);
117: return serviceList.remove(serviceToRemove);
118: }
119:
120: return false;
121: }
122:
123: /**
124: * Fetches a service from the client proxies configured in this resource
125: * loader.
126: */
127: public Object getService(QName serviceName) {
128: LOG.debug("ResourceLoader " + getName() + " fetching service "
129: + serviceName);
130:
131: //go to our remotely deployed services first
132: RemotedServiceRegistry remoteRegistry = KSBServiceLocator
133: .getServiceDeployer();
134: Object service = remoteRegistry.getLocalService(serviceName);
135: if (service != null) {
136: return service;
137: }
138:
139: List<RemotedServiceHolder> clientProxies = getAllServices(serviceName);
140: if (clientProxies == null || clientProxies.isEmpty()) {
141: return null;
142: }
143: // randomly get a proxy for 'load balancing'
144: service = getRemotedServiceHolderFromList(clientProxies)
145: .getService();
146: if (service != null) {
147: LOG.debug("Located a remote proxy to service "
148: + serviceName);
149: }
150: return service;
151: }
152:
153: public Object getService(QName qName, String url) {
154: List<RemotedServiceHolder> clientProxies = getAllServices(qName);
155: if (clientProxies == null || clientProxies.isEmpty()) {
156: return null;
157: }
158: for (RemotedServiceHolder holder : clientProxies) {
159: if (holder.getServiceInfo().getEndpointUrl().equals(url)) {
160: return holder.getService();
161: }
162: }
163: return null;
164: }
165:
166: public RemotedServiceHolder getRemotedServiceHolderFromList(
167: List<RemotedServiceHolder> remotedServices) {
168: return remotedServices.get(this .randomNumber
169: .nextInt(remotedServices.size()));
170: }
171:
172: public List<RemotedServiceHolder> getAllServices(QName qName) {
173: List<RemotedServiceHolder> clientProxies = this .getClients()
174: .get(qName);
175: if (clientProxies == null) {
176: LOG
177: .debug("Client proxies are null, Re-aquiring services. Message Entity "
178: + Core.getCurrentContextConfig()
179: .getMessageEntity());
180: run();
181: clientProxies = this .getClients().get(qName);
182: if (clientProxies == null || clientProxies.size() == 0) {
183: throw new RiceRuntimeException(
184: "No remote services available for client access when attempting to lookup '"
185: + qName + "'");
186: }
187: }
188: return clientProxies;
189: }
190:
191: public void refresh() {
192: run();
193: }
194:
195: public void run() {
196: if (!isStarted()) {
197: return;
198: }
199: LOG.debug("Checking for new services on the bus");
200: List<ServiceInfo> servicesOnBus = null;
201: if (Core.getCurrentContextConfig().getDevMode()) {
202: servicesOnBus = new ArrayList<ServiceInfo>();
203: for (RemotedServiceHolder remoteServiceHolder : KSBServiceLocator
204: .getServiceDeployer().getPublishedServices()
205: .values()) {
206: servicesOnBus.add(remoteServiceHolder.getServiceInfo());
207: }
208: } else {
209: servicesOnBus = KSBServiceLocator.getIPTableService()
210: .fetchAllActive();
211: }
212:
213: synchronized (getClients()) {
214: if (new RoutingTableDiffCalculator()
215: .calculateClientSideUpdate(this .getClients(),
216: servicesOnBus)) {
217: LOG
218: .debug("Located new services on the bus, numServices="
219: + servicesOnBus.size());
220: Map<QName, List<RemotedServiceHolder>> updatedRemoteServicesMap = new HashMap<QName, List<RemotedServiceHolder>>();
221: for (Iterator<ServiceInfo> iter = servicesOnBus
222: .iterator(); iter.hasNext();) {
223: ServiceInfo entry = iter.next();
224: if (entry.getAlive()) {
225: try {
226: registerClient(entry,
227: updatedRemoteServicesMap);
228: } catch (Exception e) {
229: LOG.error("Unable to register client "
230: + entry, e);
231: }
232: }
233: }
234: this .setClients(updatedRemoteServicesMap);
235: } else {
236: LOG.debug("No new services on the bus.");
237: }
238: }
239: }
240:
241: private void registerClient(ServiceInfo serviceInfo,
242: Map<QName, List<RemotedServiceHolder>> clientMap) {
243:
244: RemotedServiceHolder serviceHolder;
245: try {
246: serviceHolder = ServiceConnectorFactory
247: .getServiceConnector(serviceInfo)
248: .getServiceHolder();
249: } catch (Exception e) {
250: LOG.error("Failed to register client service "
251: + serviceInfo.getQname(), e);
252: this .removeService(serviceInfo, null);
253: return;
254: }
255:
256: if (clientMap.get(serviceInfo.getQname()) == null) {
257: clientMap.put(serviceInfo.getQname(),
258: new ArrayList<RemotedServiceHolder>());
259: }
260: clientMap.get(serviceInfo.getQname()).add(serviceHolder);
261: }
262:
263: public boolean isStarted() {
264: return this .started;
265: }
266:
267: public void start() throws Exception {
268: LOG.info("Starting the RemoteResourceServiceLocator...");
269:
270: int refreshRate = Core.getCurrentContextConfig()
271: .getRefreshRate();
272: this .future = KSBServiceLocator.getScheduledPool()
273: .scheduleWithFixedDelay(this , 30, refreshRate,
274: TimeUnit.SECONDS);
275: this .started = true;
276: run();
277: LOG.info("...RemoteResourceServiceLocator started.");
278: }
279:
280: public void stop() throws Exception {
281: LOG.info("Stopping the RemoteResourceServiceLocator...");
282: if (this .future != null) {
283: if (!this .future.cancel(true)) {
284: LOG
285: .warn("Failed to cancel the RemoteResourceServiceLocator service.");
286: }
287: this .future = null;
288: }
289: this .started = false;
290: LOG.info("...RemoteResourceServiceLocator stopped.");
291: }
292:
293: public Object getObject(ObjectDefinition definition) {
294: if (definition.isAtRemotingLayer()) {
295: return null;
296: }
297: if (StringUtils.isEmpty(definition.getMessageEntity())) {
298: return null;
299: }
300: QName objectRemoterName = new QName(definition
301: .getMessageEntity(), KSBServiceLocator.OBJECT_REMOTER);
302: ObjectRemoterService classRemoter = (ObjectRemoterService) GlobalResourceLoader
303: .getService(objectRemoterName);
304: ServiceInfo serviceInfo = classRemoter
305: .getRemotedClassURL(definition);
306:
307: if (serviceInfo == null) {
308: return null;
309: }
310:
311: try {
312: RemoteObjectCleanup remoteCleanup = new RemoteObjectCleanup(
313: objectRemoterName, serviceInfo.getQname());
314: KSBServiceLocator.getJtaTransactionManager()
315: .getTransaction().registerSynchronization(
316: remoteCleanup);
317:
318: return ServiceConnectorFactory.getServiceConnector(
319: serviceInfo).getServiceHolder().getService();
320: } catch (Exception e) {
321: throw new RiceRuntimeException(e);
322: }
323: }
324:
325: public MessageExceptionHandler getMessageExceptionHandler(
326: QName qname) {
327: List<RemotedServiceHolder> remotedServices = getAllServices(qname);
328: if (remotedServices == null || remotedServices.isEmpty()) {
329: throw new RiceRuntimeException(
330: "No services found for name " + qname);
331: }
332: RemotedServiceHolder serviceHolder = getRemotedServiceHolderFromList(remotedServices);
333: if (serviceHolder != null) {
334: String messageExceptionHandlerName = serviceHolder
335: .getServiceInfo().getServiceDefinition()
336: .getMessageExceptionHandler();
337: if (messageExceptionHandlerName == null) {
338: messageExceptionHandlerName = DefaultMessageExceptionHandler.class
339: .getName();
340: }
341: return (MessageExceptionHandler) GlobalResourceLoader
342: .getObject(new ObjectDefinition(
343: messageExceptionHandlerName));
344: }
345: throw new RiceRuntimeException("No service with QName " + qname
346: + " found");
347: }
348:
349: public Map<QName, List<RemotedServiceHolder>> getClients() {
350: return this .clients;
351: }
352:
353: public void setClients(
354: Map<QName, List<RemotedServiceHolder>> clients) {
355: this.clients = clients;
356: }
357: }
|