001: package org.jacorb.poa;
002:
003: /*
004: * JacORB - a free Java ORB
005: *
006: * Copyright (C) 1997-2004 Gerald Brose.
007: *
008: * This library is free software; you can redistribute it and/or
009: * modify it under the terms of the GNU Library General Public
010: * License as published by the Free Software Foundation; either
011: * version 2 of the License, or (at your option) any later version.
012: *
013: * This library is distributed in the hope that it will be useful,
014: * but WITHOUT ANY WARRANTY; without even the implied warranty of
015: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
016: * Library General Public License for more details.
017: *
018: * You should have received a copy of the GNU Library General Public
019: * License along with this library; if not, write to the Free
020: * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
021: */
022:
023: import org.jacorb.poa.util.*;
024: import org.jacorb.poa.except.*;
025:
026: import org.jacorb.orb.dsi.ServerRequest;
027:
028: import org.omg.PortableServer.Servant;
029: import org.omg.PortableServer.ServantManager;
030:
031: import org.apache.avalon.framework.logger.Logger;
032: import org.apache.avalon.framework.configuration.*;
033:
034: import java.util.*;
035:
036: /**
037: * This class manages all request processing affairs. The main thread takes the
038: * requests out from the queue and will see that the necessary steps are taken.
039: *
040: * @author Reimo Tiedemann
041: * @version $Id: RequestController.java,v 1.34 2006/06/27 10:51:19 alphonse.bendt Exp $
042: */
043:
044: public final class RequestController extends Thread implements
045: Configurable {
046: private POA poa;
047: private org.jacorb.orb.ORB orb;
048: private RequestQueue requestQueue;
049: private AOM aom;
050: private final RPPoolManager poolManager;
051: private int localRequests = 0;
052:
053: private static int count = 0;
054:
055: /** the configuration object for this controller */
056: private org.jacorb.config.Configuration configuration = null;
057:
058: /** this controller's logger instance */
059: private Logger logger;
060: private int threadPoolMax = 0;
061:
062: // stores all active requests
063: private Hashtable activeRequestTable;
064: // RequestProcessor -> oid
065: // for synchronisation with the object deactiviation process
066: private Vector deactivationList = new Vector();
067: // oid's
068:
069: // other synchronisation stuff
070: private boolean terminate;
071: private boolean waitForCompletionCalled;
072: private boolean waitForShutdownCalled;
073: private final java.lang.Object queueLog = new java.lang.Object();
074: private int threadPriority = Thread.MAX_PRIORITY;
075:
076: RequestController(POA _poa, org.jacorb.orb.ORB _orb, AOM _aom,
077: RPPoolManager _poolManager) {
078: super ("RequestController-" + (++count));
079: poa = _poa;
080: aom = _aom;
081: orb = _orb;
082:
083: requestQueue = new RequestQueue(this );
084:
085: poolManager = _poolManager;
086: }
087:
088: public void configure(Configuration myConfiguration)
089: throws ConfigurationException {
090: this .configuration = (org.jacorb.config.Configuration) myConfiguration;
091:
092: logger = configuration.getNamedLogger("jacorb.poa.controller");
093:
094: requestQueue.configure(myConfiguration);
095:
096: threadPoolMax = configuration.getAttributeAsInteger(
097: "jacorb.poa.thread_pool_max", 20);
098:
099: threadPriority = configuration.getAttributeAsInteger(
100: "jacorb.poa.thread_priority", Thread.MAX_PRIORITY);
101:
102: activeRequestTable = poa.isSingleThreadModel() ? new Hashtable(
103: 1) : new Hashtable(threadPoolMax);
104:
105: if (threadPriority < Thread.MIN_PRIORITY) {
106: threadPriority = Thread.MIN_PRIORITY;
107: } else if (threadPriority > Thread.MAX_PRIORITY) {
108: threadPriority = Thread.MAX_PRIORITY;
109: }
110:
111: setPriority(threadPriority);
112: setDaemon(true);
113:
114: start();
115: }
116:
117: void clearUpPool() {
118: poolManager.destroy();
119: }
120:
121: /**
122: * rejects all queued requests with specified system exception
123: */
124:
125: void clearUpQueue(org.omg.CORBA.SystemException exception) {
126: ServerRequest request;
127: while ((request = requestQueue.removeLast()) != null) {
128: rejectRequest(request, exception);
129: }
130: }
131:
132: /**
133: * indicates that the assumptions for blocking the
134: * request controller thread have changed,
135: * a waiting request controller thread will notified
136: */
137:
138: void continueToWork() {
139: synchronized (queueLog) {
140: queueLog.notifyAll();
141: }
142: }
143:
144: synchronized void end() {
145: terminate = true;
146: continueToWork();
147: }
148:
149: /**
150: * frees an object from the deactivation in progress state,
151: * a call indicates that the object deactivation process is complete
152: */
153:
154: synchronized void freeObject(byte[] oid) {
155: deactivationList.removeElement(new ByteArrayKey(oid));
156: }
157:
158: AOM getAOM() {
159: return aom;
160: }
161:
162: Logger getLogger() {
163: return logger;
164: }
165:
166: org.jacorb.orb.ORB getORB() {
167: return orb;
168: }
169:
170: POA getPOA() {
171: return poa;
172: }
173:
174: RPPoolManager getPoolManager() {
175: return poolManager;
176: }
177:
178: RequestQueue getRequestQueue() {
179: return requestQueue;
180: }
181:
182: boolean isDeactivating(ByteArrayKey oid) {
183: return deactivationList.contains(oid);
184: }
185:
186: /**
187: * requests will dispatched to request processors,
188: * attention, if the processor pool is empty, this method returns only
189: * if the getProcessor() method from RequestProcessorPool can satisfied
190: */
191:
192: private void processRequest(ServerRequest request)
193: throws ShutdownInProgressException,
194: CompletionRequestedException {
195: Servant servant = null;
196: ServantManager servantManager = null;
197: boolean invalid = false;
198: ByteArrayKey oid = new ByteArrayKey(request.objectId());
199:
200: synchronized (this ) {
201: if (waitForCompletionCalled) {
202: /* state has changed to holding, discarding or inactive */
203:
204: if (logger.isInfoEnabled()) {
205: logger
206: .info("rid: "
207: + request.requestId()
208: + " opname: "
209: + request.operation()
210: + " cannot process request because waitForCompletion was called");
211: }
212: throw new CompletionRequestedException();
213: }
214:
215: if (waitForShutdownCalled) {
216: /* poa goes down */
217: if (logger.isInfoEnabled()) {
218: logger
219: .info("rid: "
220: + request.requestId()
221: + " opname: "
222: + request.operation()
223: + " cannot process request because POA shutdown in progress");
224: }
225: throw new ShutdownInProgressException();
226: }
227:
228: /* below this point it's save that the poa is active */
229:
230: if ((aom != null && aom.isDeactivating(oid))
231: || deactivationList.contains(oid)) {
232: if (!poa.isUseServantManager()
233: && !poa.isUseDefaultServant()) {
234: if (logger.isInfoEnabled()) {
235: logger
236: .info("rid: "
237: + request.requestId()
238: + " opname: "
239: + request.operation()
240: + " cannot process request, because object is already in the deactivation process");
241: }
242:
243: throw new org.omg.CORBA.OBJECT_NOT_EXIST();
244: }
245: invalid = true;
246: }
247:
248: /* below this point it's save that the object is not in a
249: deactivation process */
250:
251: if (!invalid && poa.isRetain()) {
252: servant = aom.getServant(request.objectId());
253: }
254:
255: if (servant == null) {
256: if (poa.isUseDefaultServant()) {
257: if ((servant = poa.defaultServant) == null) {
258: if (logger.isWarnEnabled()) {
259: logger
260: .warn("rid: "
261: + request.requestId()
262: + " opname: "
263: + request.operation()
264: + " cannot process request because default servant is not set");
265: }
266: throw new org.omg.CORBA.OBJ_ADAPTER();
267: }
268:
269: } else if (poa.isUseServantManager()) {
270: if ((servantManager = poa.servantManager) == null) {
271: if (logger.isWarnEnabled()) {
272: logger
273: .warn("rid: "
274: + request.requestId()
275: + " opname: "
276: + request.operation()
277: + " cannot process request because servant manager is not set");
278: }
279: throw new org.omg.CORBA.OBJ_ADAPTER();
280: }
281: // USE_OBJECT_MAP_ONLY is in effect but object not exists
282: } else {
283: if (logger.isWarnEnabled()) {
284: logger
285: .warn("rid: "
286: + request.requestId()
287: + " opname: "
288: + request.operation()
289: + " cannot process request, because object doesn't exist");
290: }
291: throw new org.omg.CORBA.OBJECT_NOT_EXIST();
292: }
293: }
294: /* below this point it's save that the request is valid
295: (all preconditions can be met) */
296: activeRequestTable.put(request, oid);
297: }
298:
299: // get and initialize a processor for request processing
300: if (logger.isDebugEnabled()) {
301: logger.debug("rid: " + request.requestId() + " opname: "
302: + request.operation()
303: + " trying to get a RequestProcessor");
304: }
305:
306: RequestProcessor processor = poolManager.getProcessor();
307: processor.init(this , request, servant, servantManager);
308: processor.begin();
309: }
310:
311: void queueRequest(ServerRequest request)
312: throws ResourceLimitReachedException {
313: requestQueue.add(request);
314: }
315:
316: /**
317: * this method calls the basic adapter and hands out the request if
318: * something went wrong, the specified system exception will set
319: */
320:
321: void rejectRequest(ServerRequest request,
322: org.omg.CORBA.SystemException exception) {
323: if (exception != null)
324: request.setSystemException(exception);
325:
326: orb.getBasicAdapter().return_result(request);
327:
328: if (logger.isWarnEnabled()) {
329: logger.warn("rid: " + request.requestId() + " opname: "
330: + request.operation()
331: + " request rejected with exception: "
332: + exception.getMessage());
333: }
334: }
335:
336: /**
337: * resets a previous waitForCompletion call,
338: * everybody who is waiting will notified
339: */
340:
341: synchronized void resetPreviousCompletionCall() {
342: if (logger.isDebugEnabled())
343: logger.debug("reset a previous completion call");
344:
345: waitForCompletionCalled = false;
346: notifyAll(); /* maybe somebody waits for completion */
347: }
348:
349: /**
350: * Sends the reply of the given request via the BasicAdapter.
351: */
352: void returnResult(ServerRequest request) {
353: orb.getBasicAdapter().return_result(request);
354: }
355:
356: /**
357: * Called from RequestProcessor when the request has been handled.
358: * The request is removed from the active request table.
359: */
360: synchronized void finish(ServerRequest request) {
361: activeRequestTable.remove(request);
362: notifyAll();
363: }
364:
365: /**
366: * the main loop for dispatching requests to request processors
367: */
368:
369: public void run() {
370: org.omg.PortableServer.POAManagerPackage.State state;
371: ServerRequest request;
372: org.omg.CORBA.OBJ_ADAPTER closed_connection_exception = new org.omg.CORBA.OBJ_ADAPTER(
373: "connection closed: adapter inactive");
374:
375: org.omg.CORBA.TRANSIENT transient_exception = new org.omg.CORBA.TRANSIENT();
376: while (!terminate) {
377: state = poa.getState();
378: if (POAUtil.isActive(state)) {
379: request = requestQueue.getFirst();
380:
381: /* Request available */
382: if (request != null) {
383: if (request.remainingPOAName() != null) {
384: orb.getBasicAdapter().deliverRequest(request,
385: poa);
386: requestQueue.removeFirst();
387: } else {
388: try {
389: processRequest(request);
390: requestQueue.removeFirst();
391: } catch (CompletionRequestedException e) {
392: /* if waitForCompletion was called the poa
393: state was changed to holding,
394: discarding or inactive, the loop don't
395: block in waitForContinue, the loop
396: continues and will detect the changed
397: state in the next turn (for this turn
398: the request will not processed) */
399: } catch (ShutdownInProgressException e) {
400: /* waitForShutdown was called */
401: waitForQueue();
402: } catch (org.omg.CORBA.OBJ_ADAPTER e) {
403: requestQueue.removeFirst();
404: rejectRequest(request, e);
405: } catch (org.omg.CORBA.OBJECT_NOT_EXIST e) {
406: requestQueue.removeFirst();
407: rejectRequest(request, e);
408: }
409: }
410: continue;
411: }
412: } else {
413: if (!waitForShutdownCalled
414: && (POAUtil.isDiscarding(state) || POAUtil
415: .isInactive(state))) {
416: request = requestQueue.removeLast();
417:
418: /* Request available */
419: if (request != null) {
420: if (POAUtil.isDiscarding(state)) {
421: rejectRequest(request, transient_exception);
422: } else {
423: rejectRequest(request,
424: closed_connection_exception);
425: }
426: continue;
427: }
428: }
429: }
430: /* if waitForShutdown was called the RequestController
431: loop blocks for ALL TIME in waitForQueue (the poa
432: behaves as if he is in holding state now) ATTENTION,
433: it's a lazy synchronisation, a request could be
434: rejected if waitForShutdown was called but couldn't be
435: processed (it's save)
436: */
437: waitForQueue();
438: }
439: }
440:
441: /**
442: * called from external thread for synchronizing with the
443: * request controller thread,
444: * a caller waits for completion of all active requests,
445: * no new requests will started from now on
446: */
447:
448: synchronized void waitForCompletion() {
449: waitForCompletionCalled = true;
450:
451: while (waitForCompletionCalled && !activeRequestTable.isEmpty()) {
452: try {
453: if (logger.isDebugEnabled())
454: logger
455: .debug("somebody waits for completion and there are active processors");
456: wait();
457: } catch (InterruptedException e) {
458: }
459: }
460: }
461:
462: /**
463: * called from external thread for synchronizing with the request
464: * controller thread, a caller waits for completion of all active
465: * requests on this object. No new requests on this object will be
466: * started from now on because a steady stream of incomming
467: * requests could keep the object from being deactivated, a
468: * servant may invoke recursive method calls on the object it
469: * incarnates and deactivation should not necessarily prevent
470: * those invocations.
471: */
472:
473: synchronized void waitForObjectCompletion(byte[] oid) {
474: ByteArrayKey oidbak = new ByteArrayKey(oid);
475:
476: while (activeRequestTable.contains(oidbak)) {
477: try {
478: wait();
479: } catch (InterruptedException e) {
480: }
481: }
482: if (logger.isDebugEnabled()) {
483: logger
484: .debug(POAUtil.convert(oid)
485: + "all active processors for this object have finished");
486:
487: }
488:
489: deactivationList.addElement(oidbak);
490: }
491:
492: /**
493: * blocks the request controller thread if the queue is empty,
494: * the poa is in holding state or waitForShutdown was called,
495: * if waitForShutdown was called the RequestController loop blocks for
496: * ALL TIME in this method (the poa behaves as if he is in holding state now)
497: */
498:
499: private void waitForQueue() {
500: synchronized (queueLog) {
501: if (logger.isDebugEnabled()) {
502: logger.debug("waiting for queue");
503: }
504:
505: while ((requestQueue.isEmpty() || poa.isHolding() || waitForShutdownCalled)
506: && !terminate) {
507: try {
508: queueLog.wait();
509: } catch (java.lang.InterruptedException e) {
510: // ignored
511: }
512: }
513: }
514: }
515:
516: /**
517: * called from external thread for synchronizing with the
518: * request controller thread,
519: * a caller waits for completion of all active requests,
520: * no new requests will started for ALL TIME
521: */
522: synchronized void waitForShutdown() {
523: waitForShutdownCalled = true;
524:
525: while ((waitForShutdownCalled && !activeRequestTable.isEmpty())
526: || (localRequests != 0)) {
527: try {
528: if (logger.isDebugEnabled()) {
529: logger
530: .debug("somebody waits for shutdown and there are active processors");
531: }
532: wait();
533: } catch (InterruptedException e) {
534: }
535: }
536: }
537:
538: synchronized void addLocalRequest() {
539: localRequests++;
540: }
541:
542: synchronized void removeLocalRequest() {
543: localRequests--;
544: notifyAll();
545: }
546: }
|