001: /*
002: * Copyright 2007 The Kuali Foundation
003: *
004: * Licensed under the Educational Community License, Version 1.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.opensource.org/licenses/ecl1.php
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016: package edu.iu.uis.eden.messaging;
017:
018: import java.util.ArrayList;
019: import java.util.Collections;
020: import java.util.HashMap;
021: import java.util.Iterator;
022: import java.util.List;
023: import java.util.Map;
024:
025: import javax.servlet.http.HttpServletRequest;
026: import javax.servlet.http.HttpServletResponse;
027: import javax.xml.namespace.QName;
028:
029: import org.apache.commons.lang.StringUtils;
030: import org.apache.log4j.Logger;
031: import org.kuali.bus.services.KSBServiceLocator;
032: import org.kuali.rice.config.Config;
033: import org.kuali.rice.config.ConfigurationException;
034: import org.kuali.rice.core.Core;
035: import org.kuali.rice.util.RiceUtilities;
036: import org.springframework.web.servlet.mvc.Controller;
037:
038: import edu.emory.mathcs.backport.java.util.concurrent.ScheduledFuture;
039: import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
040: import edu.iu.uis.eden.messaging.callforwarding.ForwardedCallHandler;
041: import edu.iu.uis.eden.messaging.callforwarding.ForwardedCallHandlerImpl;
042: import edu.iu.uis.eden.messaging.serviceexporters.ServiceExporterFactory;
043:
044: public class RemotedServiceRegistryImpl implements
045: RemotedServiceRegistry, Runnable {
046:
047: private static final Logger LOG = Logger
048: .getLogger(RemotedServiceRegistryImpl.class);
049:
050: private Map<QName, ServerSideRemotedServiceHolder> publishedServices = Collections
051: .synchronizedMap(new HashMap<QName, ServerSideRemotedServiceHolder>());
052:
053: private Map<QName, ServerSideRemotedServiceHolder> publishedTempServices = Collections
054: .synchronizedMap(new HashMap<QName, ServerSideRemotedServiceHolder>());
055:
056: private boolean started;
057:
058: private ScheduledFuture future;
059:
060: public void handle(HttpServletRequest request,
061: HttpServletResponse response, Object handler)
062: throws Exception {
063: ((Controller) handler).handleRequest(request, response);
064: }
065:
066: private void registerService(ServiceInfo entry, Object serviceImpl)
067: throws Exception {
068: ServerSideRemotedServiceHolder serviceHolder = ServiceExporterFactory
069: .getServiceExporter(entry).getServiceExporter(
070: serviceImpl);
071: this .publishedServices.put(entry.getQname(), serviceHolder);
072:
073: }
074:
075: private ServiceInfo getForwardHandlerServiceInfo(
076: ServiceDefinition serviceDef) {
077: ForwardedCallHandler callHandler = new ForwardedCallHandlerImpl();
078:
079: ServiceDefinition serviceDefinition = new JavaServiceDefinition();
080: serviceDefinition.setBusSecurity(serviceDef.getBusSecurity());
081: if (serviceDef.getLocalServiceName() == null
082: && serviceDef.getServiceName() != null) {
083: serviceDefinition.setServiceName(new QName(serviceDef
084: .getServiceName().getNamespaceURI(), serviceDef
085: .getServiceName().getLocalPart()
086: + FORWARD_HANDLER_SUFFIX));
087: } else {
088: serviceDefinition.setLocalServiceName(serviceDef
089: .getLocalServiceName()
090: + FORWARD_HANDLER_SUFFIX);
091: serviceDefinition.setServiceNameSpaceURI(serviceDef
092: .getServiceNameSpaceURI());
093: }
094: serviceDefinition.setMessageExceptionHandler(serviceDef
095: .getMessageExceptionHandler());
096: serviceDefinition.setMillisToLive(serviceDef.getMillisToLive());
097: serviceDefinition.setPriority(serviceDef.getPriority());
098: serviceDefinition.setQueue(serviceDef.getQueue());
099: serviceDefinition.setRetryAttempts(serviceDef
100: .getRetryAttempts());
101: serviceDefinition.setService(callHandler);
102: serviceDefinition.validate();
103:
104: return new ServiceInfo(serviceDefinition);
105: }
106:
107: @SuppressWarnings("unchecked")
108: public void registerService(ServiceDefinition serviceDefinition,
109: boolean forceRegistryRefresh) {
110: if (serviceDefinition == null) {
111: throw new RuntimeException("Service Definition is null");
112: }
113: List services = (List) Core.getCurrentContextConfig()
114: .getObject(Config.BUS_DEPLOYED_SERVICES);
115: if (services == null) {
116: services = new ArrayList();
117: Core.getCurrentContextConfig().getObjects().put(
118: Config.BUS_DEPLOYED_SERVICES, services);
119: }
120: services.add(serviceDefinition);
121: // force an immediate registry of the service
122: if (forceRegistryRefresh) {
123: run();
124: }
125: }
126:
127: public void registerTempService(
128: ServiceDefinition serviceDefinition, Object service) {
129: ServiceInfo serviceInfo = new ServiceInfo(serviceDefinition);
130: Object existingService = getService(serviceInfo.getQname());
131: if (existingService != null) {
132: throw new RuntimeException(
133: "Service with that name is already registered");
134: }
135: try {
136: ServerSideRemotedServiceHolder serviceHolder = ServiceExporterFactory
137: .getServiceExporter(serviceInfo)
138: .getServiceExporter(service);
139: this .publishedTempServices.put(serviceInfo.getQname(),
140: serviceHolder);
141: LOG.debug("Registered temp service "
142: + serviceDefinition.getServiceName());
143: } catch (Exception e) {
144: throw new RuntimeException(e);
145: }
146: }
147:
148: public ServerSideRemotedServiceHolder getRemotedServiceHolder(
149: QName qname) {
150: final ServerSideRemotedServiceHolder serviceHolder = this .publishedServices
151: .get(qname);
152: return serviceHolder != null ? serviceHolder
153: : this .publishedTempServices.get(qname);
154: }
155:
156: public Object getService(QName qName, String url) {
157: ServerSideRemotedServiceHolder serviceHolder = this .publishedServices
158: .get(qName);
159: if (serviceHolder != null
160: && serviceHolder.getServiceInfo().getEndpointUrl()
161: .equals(url)) {
162: return serviceHolder.getInjectedPojo();
163: }
164:
165: serviceHolder = this .publishedTempServices.get(qName);
166: if (serviceHolder != null
167: && serviceHolder.getServiceInfo().equals(url)) {
168: return serviceHolder.getInjectedPojo();
169: }
170:
171: return null;
172: }
173:
174: public Object getLocalService(QName serviceName) {
175: ServerSideRemotedServiceHolder serviceHolder = this .publishedServices
176: .get(serviceName);
177: if (serviceHolder == null) {
178: return null;
179: }
180: return serviceHolder.getInjectedPojo();
181: }
182:
183: public Object getService(QName qname) {
184: RemotedServiceHolder serviceHolder = getRemotedServiceHolder(qname);
185: if (serviceHolder != null) {
186: Object service = serviceHolder.getService();
187: if (service == null) {
188: throw new RuntimeException(
189: "Retreived null service using "
190: + qname
191: + ". This means the service exporter returned "
192: + "a null object to this servers service repository.");
193: }
194: return service;
195: }
196: if (!StringUtils.isEmpty(qname.getNamespaceURI())) {
197: return getService(new QName(qname.getLocalPart()));
198: }
199: return null;
200: }
201:
202: public void removeRemoteServiceFromRegistry(QName serviceName) {
203: this .publishedTempServices.remove(serviceName);
204: }
205:
206: public void refresh() {
207: run();
208: }
209:
210: public synchronized void run() {
211: String serviceServletUrl = (String) Core
212: .getObjectFromConfigHierarchy(Config.SERVICE_SERVLET_URL);
213: if (serviceServletUrl == null) {
214: throw new RuntimeException(
215: "No service url provided to locate services. This is configured in the KSBConfigurer.");
216: }
217: String messageEntity = Core.getCurrentContextConfig()
218: .getMessageEntity();
219: LOG
220: .debug("Checking for newly published services on message entity "
221: + messageEntity);
222:
223: List javaServices = (List) Core.getCurrentContextConfig()
224: .getObject(Config.BUS_DEPLOYED_SERVICES);
225: // convert the ServiceDefinitions into ServiceInfos for diff comparison
226: List<ServiceInfo> configuredJavaServices = new ArrayList<ServiceInfo>();
227: for (Iterator iter = javaServices.iterator(); iter.hasNext();) {
228: ServiceDefinition serviceDef = (ServiceDefinition) iter
229: .next();
230: configuredJavaServices.add(new ServiceInfo(serviceDef));
231: configuredJavaServices
232: .add(getForwardHandlerServiceInfo(serviceDef));
233: }
234:
235: List<ServiceInfo> configuredServices = new ArrayList<ServiceInfo>();
236: configuredServices.addAll(configuredJavaServices);
237: List<ServiceInfo> fetchedServices = null;
238:
239: if (Core.getCurrentContextConfig().getDevMode()) {
240: fetchedServices = new ArrayList<ServiceInfo>();
241: } else {
242: //TODO we are not verifying that this read is not being done in dev mode in a test
243: fetchedServices = this .getServiceInfoService()
244: .findLocallyPublishedServices(
245: RiceUtilities.getIpNumber(), messageEntity);
246: }
247:
248: RoutingTableDiffCalculator diffCalc = new RoutingTableDiffCalculator();
249: boolean needUpdated = diffCalc.calculateServerSideUpdateLists(
250: configuredServices, fetchedServices);
251: if (needUpdated) {
252: if (!Core.getCurrentContextConfig().getDevMode()) {
253: getServiceInfoService().save(
254: diffCalc.getServicesNeedUpdated());
255: getServiceInfoService().remove(
256: diffCalc.getServicesNeedRemoved());
257: }
258: this .publishedServices.clear();
259: publishServiceList(diffCalc.getMasterServiceList());
260: } else if (this .publishedServices.isEmpty()) {
261: publishServiceList(configuredServices);
262: }
263: LOG.info("Finished checking for remote services.");
264: }
265:
266: private void publishServiceList(List<ServiceInfo> services) {
267: for (ServiceInfo serviceInfo : services) {
268: try {
269: registerService(serviceInfo, serviceInfo
270: .getServiceDefinition().getService());
271: } catch (Exception e) {
272: LOG.error("Encountered error registering service "
273: + serviceInfo.getQname(), e);
274: this .publishedServices.remove(serviceInfo);
275: continue;
276: }
277: }
278: }
279:
280: public boolean isStarted() {
281: return this .started;
282: }
283:
284: public synchronized void start() throws Exception {
285: if (isStarted()) {
286: return;
287: }
288: run();
289: if (!Core.getCurrentContextConfig().getDevMode()) {
290: int refreshRate = Core.getCurrentContextConfig()
291: .getRefreshRate();
292: this .future = KSBServiceLocator.getScheduledPool()
293: .scheduleWithFixedDelay(this , 30, refreshRate,
294: TimeUnit.SECONDS);
295: }
296: this .started = true;
297: }
298:
299: public void stop() throws Exception {
300: // remove services from the bus
301: if (this .future != null) {
302: if (!this .future.cancel(false)) {
303: LOG
304: .warn("Failed to cancel the RemotedServiceRegistry.");
305: }
306: this .future = null;
307: }
308: List<ServiceInfo> fetchedServices = this
309: .getServiceInfoService().findLocallyPublishedServices(
310: RiceUtilities.getIpNumber(),
311: Core.getCurrentContextConfig()
312: .getMessageEntity());
313: this .getServiceInfoService().markServicesDead(fetchedServices);
314: this .publishedServices.clear();
315: this .getPublishedTempServices().clear();
316: this .started = false;
317: }
318:
319: public String getContents(String indent, boolean servicePerLine) {
320: String content = indent
321: + "RemotedServiceRegistryImpl services=";
322:
323: for (RemotedServiceHolder serviceHolder : this .publishedServices
324: .values()) {
325: if (servicePerLine) {
326: content += indent + "+++"
327: + serviceHolder.getServiceInfo().toString()
328: + "\n";
329: } else {
330: content += serviceHolder.getServiceInfo().toString()
331: + ", ";
332: }
333: }
334: return content;
335: }
336:
337: public ServiceRegistry getServiceInfoService() {
338: return KSBServiceLocator.getIPTableService();
339: }
340:
341: public Map<QName, ServerSideRemotedServiceHolder> getPublishedServices() {
342: if (!isStarted()) {
343: try {
344: start();
345: } catch (Exception e) {
346: throw new ConfigurationException(e);
347: }
348: }
349: return this .publishedServices;
350: }
351:
352: public Map<QName, ServerSideRemotedServiceHolder> getPublishedTempServices() {
353: return this.publishedTempServices;
354: }
355: }
|