001: package org.jacorb.poa;
002:
003: /*
004: * JacORB - a free Java ORB
005: *
006: * Copyright (C) 1997-2004 Gerald Brose.
007: *
008: * This library is free software; you can redistribute it and/or
009: * modify it under the terms of the GNU Library General Public
010: * License as published by the Free Software Foundation; either
011: * version 2 of the License, or (at your option) any later version.
012: *
013: * This library is distributed in the hope that it will be useful,
014: * but WITHOUT ANY WARRANTY; without even the implied warranty of
015: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
016: * Library General Public License for more details.
017: *
018: * You should have received a copy of the GNU Library General Public
019: * License along with this library; if not, write to the Free
020: * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
021: */
022:
023: import org.jacorb.poa.except.*;
024:
025: import org.jacorb.util.*;
026: import org.jacorb.orb.dsi.ServerRequest;
027: import org.jacorb.orb.SystemExceptionHelper;
028: import org.jacorb.orb.portableInterceptor.*;
029: import org.jacorb.orb.giop.ReplyOutputStream;
030:
031: import java.util.*;
032:
033: import org.apache.avalon.framework.configuration.*;
034: import org.apache.avalon.framework.logger.Logger;
035:
036: import org.omg.PortableServer.Servant;
037: import org.omg.PortableServer.ServantManager;
038: import org.omg.PortableServer.ServantActivator;
039: import org.omg.PortableServer.ServantLocator;
040: import org.omg.PortableServer.DynamicImplementation;
041: import org.omg.PortableServer.ServantLocatorPackage.CookieHolder;
042:
043: import org.omg.CORBA.CompletionStatus;
044: import org.omg.CORBA.portable.InvokeHandler;
045: import org.omg.GIOP.ReplyStatusType_1_2;
046: import org.omg.PortableInterceptor.*;
047: import org.omg.IOP.ServiceContext;
048:
049: /**
050: * This thread performs the request processing, the actual method invocation and
051: * it returns the ServerRequest object to the ORB.
052: *
053: * @author Reimo Tiedemann, FU Berlin
054: * @version $Id: RequestProcessor.java,v 1.38 2007/02/15 12:56:06 andre.spiegel Exp $
055: */
056:
057: public class RequestProcessor extends Thread implements
058: InvocationContext, Configurable {
059: private boolean start;
060: private boolean terminate;
061: private final RPPoolManager poolManager;
062:
063: private RequestController controller;
064: private ServerRequest request;
065: private Servant servant;
066: private ServantManager servantManager;
067: private CookieHolder cookieHolder;
068:
069: /**
070: * Whether to check for expiry of any ReplyEndTimePolicy. Normally,
071: * it is sufficient to check this on the client side, but the additional
072: * check on the server side can save the server and the network some work.
073: * It requires that the clocks of the client and server machine are
074: * synchronized, though.
075: */
076: private boolean checkReplyEndTime = false;
077:
078: /** this processor's logger instance, obtained from the request controller */
079: private Logger logger;
080:
081: private final static Set specialOperations;
082: private static int count = 0;
083:
084: static {
085: specialOperations = new HashSet(50);
086: specialOperations.add("_is_a");
087: specialOperations.add("_interface");
088: specialOperations.add("_non_existent");
089:
090: specialOperations.add("_get_policy");
091: specialOperations.add("_set_policy_overrides");
092:
093: specialOperations.add("_get_component");
094: }
095:
096: RequestProcessor(RPPoolManager _poolManager) {
097: super ("RequestProcessor-" + (++count));
098: poolManager = _poolManager;
099: }
100:
101: public void configure(Configuration configuration)
102: throws ConfigurationException {
103: checkReplyEndTime = configuration.getAttributeAsBoolean(
104: "jacorb.poa.check_reply_end_time", false);
105: }
106:
107: /**
108: * starts the request processor
109: */
110:
111: synchronized void begin() {
112: start = true;
113: notify();
114: }
115:
116: /**
117: * terminates the request processor
118: */
119:
120: synchronized void end() {
121: terminate = true;
122: notify();
123: }
124:
125: /**
126: * returns the oid associated current servant invocation
127: */
128:
129: public byte[] getObjectId() {
130: if (!start)
131: throw new POAInternalError(
132: "error: RequestProcessor not started (getObjectId)");
133: return request.objectId();
134: }
135:
136: /**
137: * returns the orb that has received the request
138: */
139:
140: public org.omg.CORBA.ORB getORB() {
141: if (!start)
142: throw new POAInternalError(
143: "error: RequestProcessor not started (getORB)");
144: return controller.getORB();
145: }
146:
147: /**
148: * returns the poa that has dispatched the request
149: */
150:
151: public POA getPOA() {
152: if (!start)
153: throw new POAInternalError(
154: "error: RequestProcessor not started (getPOA)");
155: return controller.getPOA();
156: }
157:
158: /**
159: * returns the actual servant
160: */
161:
162: public Servant getServant() {
163: if (!start)
164: throw new POAInternalError(
165: "error: RequestProcessor not started (getServant)");
166: return servant;
167: }
168:
169: /**
170: * initializes the request processor
171: */
172:
173: void init(RequestController requestController,
174: ServerRequest serverRequest, Servant srvnt,
175: ServantManager manager) {
176: this .controller = requestController;
177: this .request = serverRequest;
178: this .servant = srvnt;
179: this .servantManager = manager;
180: cookieHolder = null;
181: logger = requestController.getLogger();
182: }
183:
184: private void clear() {
185: controller = null;
186: request = null;
187: servant = null;
188: servantManager = null;
189: cookieHolder = null;
190: }
191:
192: /**
193: * causes the aom to perform the incarnate call on a servant activator
194: */
195:
196: private void invokeIncarnate() {
197: if (logger.isDebugEnabled()) {
198: logger.debug("rid: " + request.requestId() + " opname: "
199: + request.operation()
200: + " invoke incarnate on servant activator");
201: }
202: try {
203:
204: servant = controller.getAOM().incarnate(request.objectId(),
205: (ServantActivator) servantManager,
206: controller.getPOA());
207: if (servant == null) {
208: if (logger.isWarnEnabled()) {
209: logger.warn("rid: " + request.requestId()
210: + " opname: " + request.operation()
211: + " incarnate: returns null");
212: }
213:
214: request
215: .setSystemException(new org.omg.CORBA.OBJ_ADAPTER());
216: }
217: } catch (org.omg.CORBA.SystemException e) {
218: if (logger.isWarnEnabled()) {
219: logger
220: .warn(
221: "rid: "
222: + request.requestId()
223: + " opname: "
224: + request.operation()
225: + " incarnate: system exception was thrown.",
226: e);
227: }
228: request.setSystemException(e);
229: } catch (org.omg.PortableServer.ForwardRequest e) {
230: if (logger.isWarnEnabled()) {
231: logger.warn("rid: " + request.requestId() + " opname: "
232: + request.operation()
233: + " incarnate: forward exception was thrown.",
234: e);
235: }
236: request.setLocationForward(e);
237:
238: } catch (Throwable e) {
239: /* not spec. */
240: if (logger.isErrorEnabled()) {
241: logger.error("rid: " + request.requestId()
242: + " opname: " + request.operation()
243: + " incarnate: throwable was thrown.", e);
244: }
245: request.setSystemException(new org.omg.CORBA.OBJ_ADAPTER(e
246: .toString()));
247: }
248: }
249:
250: /**
251: * invokes the operation on servant,
252: */
253:
254: private void invokeOperation() {
255: try {
256: if (servant instanceof org.omg.CORBA.portable.InvokeHandler) {
257: if (logger.isDebugEnabled()) {
258: logger
259: .debug("rid: "
260: + request.requestId()
261: + " opname: "
262: + request.operation()
263: + " invokeOperation on servant (stream based)");
264: }
265:
266: if (specialOperations.contains(request.operation())) {
267: ((org.jacorb.orb.ServantDelegate) servant
268: ._get_delegate())._invoke(servant, request
269: .operation(), request.getInputStream(),
270: request);
271: } else {
272: ((InvokeHandler) servant)._invoke(request
273: .operation(), request.getInputStream(),
274: request);
275: }
276:
277: } else if (servant instanceof org.omg.PortableServer.DynamicImplementation) {
278: if (logger.isDebugEnabled()) {
279: logger
280: .debug("rid: "
281: + request.requestId()
282: + " opname: "
283: + request.operation()
284: + " invoke operation on servant (dsi based)");
285: }
286: if (specialOperations.contains(request.operation())
287: && !(servant instanceof org.jacorb.orb.Forwarder)) {
288: ((org.jacorb.orb.ServantDelegate) servant
289: ._get_delegate())._invoke(servant, request
290: .operation(), request.getInputStream(),
291: request);
292: } else {
293: ((DynamicImplementation) servant).invoke(request);
294: }
295: } else {
296: if (logger.isWarnEnabled()) {
297: logger
298: .warn("rid: "
299: + request.requestId()
300: + " opname: "
301: + request.operation()
302: + " unknown servant type (neither stream nor dsi based)");
303: }
304: }
305:
306: } catch (org.omg.CORBA.SystemException e) {
307: if (logger.isInfoEnabled()) {
308: logger.info("rid: " + request.requestId() + " opname: "
309: + request.operation()
310: + " invocation: system exception was thrown.",
311: e);
312: }
313: request.setSystemException(e);
314: } catch (Throwable e) {
315: /* not spec. */
316: if (logger.isErrorEnabled()) {
317: logger.error("rid: " + request.requestId()
318: + " opname: " + request.operation()
319: + " invocation: throwable was thrown.", e);
320: }
321: request.setSystemException(new org.omg.CORBA.UNKNOWN(e
322: .toString()));
323: }
324: }
325:
326: /**
327: * performs the postinvoke call on a servant locator
328: */
329:
330: private void invokePostInvoke() {
331: try {
332: if (logger.isDebugEnabled()) {
333: logger.debug("rid: " + request.requestId()
334: + " opname: " + request.operation()
335: + " invoke postinvoke on servant locator");
336: }
337:
338: ((ServantLocator) servantManager).postinvoke(request
339: .objectId(), controller.getPOA(), request
340: .operation(), cookieHolder.value, servant);
341: } catch (org.omg.CORBA.SystemException e) {
342: if (logger.isInfoEnabled()) {
343: logger.info("rid: " + request.requestId() + " opname: "
344: + request.operation()
345: + " postinvoke: system exception was thrown.",
346: e);
347: }
348: request.setSystemException(e);
349:
350: } catch (Throwable e) {
351: /* not spec. */
352: if (logger.isWarnEnabled()) {
353: logger.warn("rid: " + request.requestId() + " opname: "
354: + request.operation()
355: + " postinvoke: throwable was thrown.", e);
356: }
357: request.setSystemException(new org.omg.CORBA.OBJ_ADAPTER(e
358: .toString()));
359: /* which system exception I should raise? */
360: }
361: }
362:
363: /**
364: * performs the preinvoke call on a servant locator
365: */
366:
367: private void invokePreInvoke() {
368: if (logger.isDebugEnabled()) {
369: logger.debug("rid: " + request.requestId() + " opname: "
370: + request.operation()
371: + " invoke preinvoke on servant locator");
372: }
373: try {
374: cookieHolder = new CookieHolder();
375: servant = ((ServantLocator) servantManager).preinvoke(
376: request.objectId(), controller.getPOA(), request
377: .operation(), cookieHolder);
378: if (servant == null) {
379: if (logger.isWarnEnabled()) {
380: logger.warn("rid: " + request.requestId()
381: + " opname: " + request.operation()
382: + " preinvoke: returns null");
383: }
384: request
385: .setSystemException(new org.omg.CORBA.OBJ_ADAPTER());
386: }
387: controller.getORB().set_delegate(servant); // set the orb
388:
389: } catch (org.omg.CORBA.SystemException e) {
390: if (logger.isInfoEnabled()) {
391: logger
392: .info(
393: "rid: "
394: + request.requestId()
395: + " opname: "
396: + request.operation()
397: + " preinvoke: system exception was thrown.",
398: e);
399: }
400: request.setSystemException(e);
401:
402: } catch (org.omg.PortableServer.ForwardRequest e) {
403: if (logger.isInfoEnabled()) {
404: logger.info("rid: " + request.requestId() + " opname: "
405: + request.operation()
406: + " preinvoke: forward exception was thrown.",
407: e);
408: }
409: request.setLocationForward(e);
410: } catch (Throwable e) {
411: /* not spec. */
412: if (logger.isWarnEnabled()) {
413: logger.warn("rid: " + request.requestId() + " opname: "
414: + request.operation()
415: + " preinvoke: throwable was thrown.", e);
416: }
417: request.setSystemException(new org.omg.CORBA.OBJ_ADAPTER(e
418: .toString()));
419: /* which system exception I should raise? */
420: }
421: }
422:
423: boolean isActive() {
424: return start;
425: }
426:
427: /**
428: * the main request processing routine
429: */
430:
431: private void process() {
432: ServerRequestInfoImpl info = null;
433:
434: // Notify parties interested in using information about a Transport
435: controller.getORB().notifyTransportListeners(
436: request.getConnection());
437:
438: if (controller.getORB().hasServerRequestInterceptors()) {
439: //RequestInfo attributes
440: info = new ServerRequestInfoImpl(controller.getORB(),
441: request, servant);
442:
443: InterceptorManager manager = controller.getORB()
444: .getInterceptorManager();
445: info.setCurrent(manager.getEmptyCurrent());
446:
447: if (!invokeInterceptors(
448: info,
449: ServerInterceptorIterator.RECEIVE_REQUEST_SERVICE_CONTEXTS)) {
450: //an interceptor bailed out, so don't continue request
451: //processing and return here. The service contexts for
452: //the result have to be set, of course.
453: ReplyOutputStream out = request.getReplyOutputStream();
454: Enumeration ctx = info.getReplyServiceContexts();
455:
456: while (ctx.hasMoreElements()) {
457: out.addServiceContext((ServiceContext) ctx
458: .nextElement());
459: }
460:
461: return;
462: }
463:
464: manager.setTSCurrent(info.current());
465: }
466:
467: // TODO: The exception replies below should also trigger interceptors.
468: // Requires some re-arranging of the entire method.
469: if (Time.hasPassed(request.getRequestEndTime())) {
470: request.setSystemException(new org.omg.CORBA.TIMEOUT(
471: "Request End Time exceeded", 0,
472: CompletionStatus.COMPLETED_NO));
473: return;
474: }
475: if (checkReplyEndTime
476: && Time.hasPassed(request.getReplyEndTime())) {
477: request.setSystemException(new org.omg.CORBA.TIMEOUT(
478: "Reply End Time exceeded", 0,
479: CompletionStatus.COMPLETED_NO));
480: return;
481: }
482:
483: Time.waitFor(request.getRequestStartTime());
484:
485: if (servantManager != null) {
486: if (servantManager instanceof org.omg.PortableServer.ServantActivator) {
487: invokeIncarnate();
488: } else {
489: invokePreInvoke();
490: }
491: }
492:
493: if (servant != null) {
494: if (info != null) {
495: info.setServant(servant);
496:
497: if (servant instanceof org.omg.CORBA.portable.InvokeHandler) {
498: if (!invokeInterceptors(info,
499: ServerInterceptorIterator.RECEIVE_REQUEST)) {
500: //an interceptor bailed out, so don't continue
501: //request processing and return here. The
502: //service contexts for the result have to be
503: //set, of course.
504:
505: if (cookieHolder != null) {
506: invokePostInvoke();
507: }
508:
509: ReplyOutputStream out = request
510: .getReplyOutputStream();
511: Enumeration ctx = info
512: .getReplyServiceContexts();
513:
514: while (ctx.hasMoreElements()) {
515: out.addServiceContext((ServiceContext) ctx
516: .nextElement());
517: }
518:
519: return;
520: }
521: } else if (servant instanceof org.omg.PortableServer.DynamicImplementation) {
522: request.setServerRequestInfo(info);
523: }
524: }
525:
526: invokeOperation();
527: }
528:
529: // preinvoke and postinvoke are always called in pairs
530: // but what happens if the servant is null
531:
532: if (cookieHolder != null) {
533: invokePostInvoke();
534: }
535:
536: if (checkReplyEndTime
537: && Time.hasPassed(request.getReplyEndTime())) {
538: request.setSystemException(new org.omg.CORBA.TIMEOUT(
539: "Reply End Time exceeded after invocation", 0,
540: CompletionStatus.COMPLETED_YES));
541: }
542:
543: if (info != null) {
544: InterceptorManager manager = controller.getORB()
545: .getInterceptorManager();
546: info.setCurrent(manager.getCurrent());
547:
548: short op = 0;
549: switch (request.status().value()) {
550: case ReplyStatusType_1_2._NO_EXCEPTION:
551: op = ServerInterceptorIterator.SEND_REPLY;
552: info.setReplyStatus(SUCCESSFUL.value);
553: break;
554:
555: case ReplyStatusType_1_2._USER_EXCEPTION:
556: info.setReplyStatus(USER_EXCEPTION.value);
557: SystemExceptionHelper
558: .insert(
559: info.sending_exception,
560: new org.omg.CORBA.UNKNOWN(
561: "Stream-based UserExceptions are not available!"));
562: op = ServerInterceptorIterator.SEND_EXCEPTION;
563: break;
564:
565: case ReplyStatusType_1_2._SYSTEM_EXCEPTION:
566: info.setReplyStatus(SYSTEM_EXCEPTION.value);
567: SystemExceptionHelper.insert(info.sending_exception,
568: request.getSystemException());
569: op = ServerInterceptorIterator.SEND_EXCEPTION;
570: break;
571:
572: case ReplyStatusType_1_2._LOCATION_FORWARD:
573: info.setReplyStatus(LOCATION_FORWARD.value);
574: op = ServerInterceptorIterator.SEND_OTHER;
575: break;
576: }
577:
578: invokeInterceptors(info, op);
579:
580: ReplyOutputStream out = request.get_out();
581: Enumeration ctx = info.getReplyServiceContexts();
582:
583: while (ctx.hasMoreElements()) {
584: out.addServiceContext((ServiceContext) ctx
585: .nextElement());
586: }
587:
588: manager.removeTSCurrent();
589: }
590: }
591:
592: private boolean invokeInterceptors(ServerRequestInfoImpl info,
593: short op) {
594:
595: ServerInterceptorIterator intercept_iter = controller.getORB()
596: .getInterceptorManager().getServerIterator();
597:
598: try {
599: intercept_iter.iterate(info, op);
600: } catch (org.omg.CORBA.UserException ue) {
601: if (ue instanceof org.omg.PortableInterceptor.ForwardRequest) {
602: org.omg.PortableInterceptor.ForwardRequest fwd = (org.omg.PortableInterceptor.ForwardRequest) ue;
603:
604: request
605: .setLocationForward(new org.omg.PortableServer.ForwardRequest(
606: fwd.forward));
607: }
608: return false;
609:
610: } catch (org.omg.CORBA.SystemException _sys_ex) {
611: request.setSystemException(_sys_ex);
612: return false;
613: }
614: return true;
615: }
616:
617: /**
618: * the main loop for request processing
619: */
620:
621: public void run() {
622: while (true) {
623: synchronized (this ) {
624: while (!terminate && !start) {
625: try {
626: wait(); /* waits for the next task */
627: } catch (InterruptedException e) {
628: // ignored
629: }
630: }
631:
632: if (terminate) {
633: return;
634: }
635: }
636:
637: if (logger.isDebugEnabled()) {
638: logger.debug("rid: " + request.requestId()
639: + " opname: " + request.operation()
640: + " starts with request processing");
641: }
642:
643: if (request.syncScope() == org.omg.Messaging.SYNC_WITH_SERVER.value) {
644: controller.returnResult(request);
645: process();
646: } else {
647: process();
648: controller.returnResult(request);
649: }
650:
651: // return the request to the request controller
652: if (logger.isDebugEnabled()) {
653: logger.debug("rid: " + request.requestId()
654: + " opname: " + request.operation()
655: + " ends with request processing");
656: }
657:
658: controller.finish(request);
659:
660: start = false;
661: clear();
662:
663: // give back the processor into the pool
664: poolManager.releaseProcessor(this);
665: }
666: }
667: }
|