001: package org.jacorb.concurrency;
002:
003: /*
004:
005: * JacORB concurrency control service - a free CCS for JacORB
006:
007: *
008:
009: * Copyright (C) 1999-2004 LogicLand group, Viacheslav Tararin.
010:
011: *
012:
013: * This library is free software; you can redistribute it and/or
014:
015: * modify it under the terms of the GNU Library General Public
016:
017: * License as published by the Free Software Foundation; either
018:
019: * version 2 of the License, or (at your option) any later version.
020:
021: *
022:
023: * This library is distributed in the hope that it will be useful,
024:
025: * but WITHOUT ANY WARRANTY; without even the implied warranty of
026:
027: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
028:
029: * Library General Public License for more details.
030:
031: *
032:
033: * You should have received a copy of the GNU Library General Public
034:
035: * License along with this library; if not, write to the Free
036:
037: * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
038:
039: */
040:
041: import java.util.Enumeration;
042:
043: import java.util.Hashtable;
044:
045: import java.util.Vector;
046:
047: import org.omg.CosConcurrencyControl.LockCoordinator;
048:
049: import org.omg.CosConcurrencyControl.LockNotHeld;
050:
051: import org.omg.CosConcurrencyControl.TransactionalLockSet;
052:
053: import org.omg.CosConcurrencyControl.TransactionalLockSetPOA;
054:
055: import org.omg.CosConcurrencyControl.lock_mode;
056:
057: import org.omg.CosTransactions.Coordinator;
058:
059: import org.omg.CosTransactions.Status;
060:
061: class TransactionalLockSetImpl extends TransactionalLockSetPOA {
062:
063: private Hashtable locks = new Hashtable();
064:
065: private Vector queue = new Vector();
066:
067: private Vector related = new Vector();
068:
069: private LockSetFactoryImpl factory;
070:
071: private boolean is_active = true;
072:
073: TransactionalLockSetImpl(LockSetFactoryImpl factory) {
074:
075: this .factory = factory;
076:
077: };
078:
079: public void lock(Coordinator current, lock_mode mode) {
080:
081: synchronized (queue) {
082:
083: check_active();
084:
085: TransactionCoordinator tc = factory
086: .get_transaction_coordinator(current);
087:
088: Request rqst = null;
089:
090: synchronized (tc) {
091:
092: check_status(tc);
093:
094: if (attempt_lock(tc, mode)) {
095:
096: return;
097:
098: }
099:
100: rqst = new Request();
101:
102: rqst.state = LockSetFactoryImpl.REQUEST;
103:
104: rqst.current = tc;
105:
106: rqst.to_do = Request.LOCK;
107:
108: rqst.set_mode = mode;
109:
110: rqst.reset_mode = null;
111:
112: queue.addElement(rqst);
113:
114: tc.set_lock_coordinator(this );
115:
116: }
117:
118: while (rqst.state == LockSetFactoryImpl.REQUEST) {
119:
120: try {
121:
122: queue.wait();
123:
124: } catch (Exception e) {
125:
126: e.printStackTrace(System.out);
127:
128: throw new org.omg.CORBA.INTERNAL();
129:
130: }
131:
132: }
133: ;
134:
135: switch (rqst.state) {
136:
137: case LockSetFactoryImpl.COMMIT:
138:
139: case LockSetFactoryImpl.NO_TRANS:
140:
141: throw new org.omg.CORBA.INVALID_TRANSACTION();
142:
143: case LockSetFactoryImpl.ROLLBACK:
144:
145: throw new org.omg.CORBA.TRANSACTION_ROLLEDBACK();
146:
147: }
148:
149: }
150:
151: };
152:
153: public boolean try_lock(Coordinator current, lock_mode mode) {
154:
155: synchronized (queue) {
156:
157: check_active();
158:
159: TransactionCoordinator tc = factory
160: .get_transaction_coordinator(current);
161:
162: synchronized (tc) {
163:
164: check_status(tc);
165:
166: return attempt_lock(tc, mode);
167:
168: }
169:
170: }
171:
172: };
173:
174: public void unlock(Coordinator current, lock_mode mode)
175: throws LockNotHeld {
176:
177: synchronized (queue) {
178:
179: check_active();
180:
181: TransactionCoordinator tc = factory
182: .get_transaction_coordinator(current);
183:
184: synchronized (tc) {
185:
186: check_status(tc);
187:
188: TransactionLocks current_locks = (TransactionLocks) locks
189: .get(tc);
190:
191: if (current_locks == null) {
192:
193: throw new LockNotHeld();
194:
195: }
196:
197: current_locks.unlock(mode);
198:
199: }
200:
201: if (attempt_lock_from_queue()) {
202:
203: queue.notifyAll();
204:
205: }
206: ;
207:
208: }
209:
210: };
211:
212: public void change_mode(Coordinator current, lock_mode held_mode,
213: lock_mode new_mode) throws LockNotHeld {
214:
215: synchronized (queue) {
216:
217: check_active();
218:
219: TransactionCoordinator tc = factory
220: .get_transaction_coordinator(current);
221:
222: Request rqst = null;
223:
224: synchronized (tc) {
225:
226: check_status(tc);
227:
228: if (attempt_change(tc, new_mode, held_mode)) {
229:
230: return;
231:
232: }
233:
234: rqst = new Request();
235:
236: rqst.state = LockSetFactoryImpl.REQUEST;
237:
238: rqst.current = tc;
239:
240: rqst.to_do = Request.CHANGE;
241:
242: rqst.set_mode = new_mode;
243:
244: rqst.reset_mode = held_mode;
245:
246: queue.addElement(rqst);
247:
248: tc.get_lock_coordinator(this );
249:
250: }
251:
252: while (rqst.state == LockSetFactoryImpl.REQUEST) {
253:
254: try {
255:
256: queue.wait();
257:
258: } catch (Exception e) {
259:
260: e.printStackTrace(System.out);
261:
262: throw new org.omg.CORBA.INTERNAL();
263:
264: }
265:
266: }
267: ;
268:
269: switch (rqst.state) {
270:
271: case LockSetFactoryImpl.COMMIT:
272:
273: case LockSetFactoryImpl.NO_TRANS:
274:
275: throw new org.omg.CORBA.INVALID_TRANSACTION();
276:
277: case LockSetFactoryImpl.ROLLBACK:
278:
279: throw new org.omg.CORBA.TRANSACTION_ROLLEDBACK();
280:
281: case LockSetFactoryImpl.REJECT:
282:
283: throw new LockNotHeld();
284:
285: }
286:
287: if (attempt_lock_from_queue()) {
288:
289: queue.notifyAll();
290:
291: }
292: ;
293:
294: }
295:
296: };
297:
298: public LockCoordinator get_coordinator(Coordinator which) {
299:
300: TransactionCoordinator tc = factory
301: .get_transaction_coordinator(which);
302:
303: return tc.get_lock_coordinator(this );
304:
305: };
306:
307: synchronized void add_related(TransactionalLockSet which) {
308:
309: related.addElement(which);
310:
311: };
312:
313: synchronized boolean attempt_lock(TransactionCoordinator tc,
314: lock_mode mode) {
315:
316: Enumeration enumeration = locks.elements();
317:
318: TransactionLocks current_transaction_locks = null;
319:
320: while (enumeration.hasMoreElements()) {
321:
322: TransactionLocks lock = (TransactionLocks) enumeration
323: .nextElement();
324:
325: if (lock.current == tc) {
326:
327: current_transaction_locks = lock;
328:
329: continue;
330:
331: }
332:
333: if (!lock.no_conflict(mode)) {
334:
335: return false;
336:
337: }
338:
339: }
340: ;
341:
342: if (current_transaction_locks == null) {
343:
344: current_transaction_locks = new TransactionLocks(tc);
345:
346: tc.get_lock_coordinator(this );
347:
348: locks.put(tc, current_transaction_locks);
349:
350: }
351:
352: current_transaction_locks.lock(mode);
353:
354: return true;
355:
356: };
357:
358: private void check_status(TransactionCoordinator tc) {
359:
360: Status status = tc.get_state();
361:
362: if (status.equals(Status.StatusActive)) {
363:
364: return;
365:
366: } else if (status.equals(Status.StatusPrepared) ||
367:
368: status.equals(Status.StatusCommitted) ||
369:
370: status.equals(Status.StatusUnknown) ||
371:
372: status.equals(Status.StatusNoTransaction) ||
373:
374: status.equals(Status.StatusPreparing) ||
375:
376: status.equals(Status.StatusCommitting)) {
377:
378: throw new org.omg.CORBA.INVALID_TRANSACTION();
379:
380: } else if (status.equals(Status.StatusRollingBack) ||
381:
382: status.equals(Status.StatusMarkedRollback) ||
383:
384: status.equals(Status.StatusRolledBack)) {
385:
386: throw new org.omg.CORBA.TRANSACTION_ROLLEDBACK();
387:
388: }
389:
390: }
391:
392: private synchronized boolean attempt_lock_from_queue() {
393:
394: boolean do_recursive = false;
395:
396: Vector executed = new Vector();
397:
398: Enumeration enumeration = queue.elements();
399:
400: while (enumeration.hasMoreElements()) {
401:
402: Request r = (Request) enumeration.nextElement();
403:
404: synchronized (r.current) {
405:
406: try {
407:
408: check_status(r.current);
409:
410: } catch (org.omg.CORBA.INVALID_TRANSACTION e) {
411:
412: r.state = LockSetFactoryImpl.NO_TRANS;
413:
414: executed.addElement(r);
415:
416: continue;
417:
418: } catch (org.omg.CORBA.TRANSACTION_ROLLEDBACK e) {
419:
420: r.state = LockSetFactoryImpl.ROLLBACK;
421:
422: executed.addElement(r);
423:
424: continue;
425:
426: }
427:
428: switch (r.to_do) {
429:
430: case Request.LOCK:
431:
432: if (!attempt_lock(r.current, r.set_mode)) {
433:
434: continue;
435:
436: }
437:
438: r.state = LockSetFactoryImpl.SATISFIED;
439:
440: break;
441:
442: case Request.CHANGE:
443:
444: try {
445:
446: if (!attempt_change(r.current, r.set_mode,
447: r.reset_mode)) {
448:
449: continue;
450:
451: }
452:
453: r.state = LockSetFactoryImpl.SATISFIED;
454:
455: do_recursive = true;
456:
457: } catch (LockNotHeld e) {
458:
459: r.state = LockSetFactoryImpl.REJECT;
460:
461: }
462:
463: break;
464:
465: }
466:
467: executed.addElement(r);
468:
469: }
470:
471: }
472: ;
473:
474: enumeration = executed.elements();
475:
476: while (enumeration.hasMoreElements()) {
477:
478: queue.removeElement(enumeration.nextElement());
479:
480: }
481:
482: if (do_recursive) {
483:
484: attempt_lock_from_queue();
485:
486: }
487:
488: return executed.size() > 0;
489:
490: };
491:
492: private synchronized boolean attempt_change(
493: TransactionCoordinator tc, lock_mode set_mode,
494: lock_mode reset_mode) throws LockNotHeld {
495:
496: TransactionLocks current_locks = (TransactionLocks) locks
497: .get(tc);
498:
499: if (current_locks == null || !current_locks.is_held(reset_mode)) {
500:
501: throw new LockNotHeld();
502:
503: }
504:
505: if (attempt_lock(tc, set_mode)) {
506:
507: current_locks.unlock(reset_mode);
508:
509: return true;
510:
511: }
512:
513: return false;
514:
515: };
516:
517: synchronized void transaction_finished(TransactionCoordinator tc) {
518:
519: Vector executed = new Vector();
520:
521: Enumeration enumeration;
522:
523: boolean do_notify = false;
524:
525: synchronized (queue) {
526:
527: enumeration = queue.elements();
528:
529: while (enumeration.hasMoreElements()) {
530:
531: Request r = (Request) enumeration.nextElement();
532:
533: if (r.current == tc) {
534:
535: r.state = LockSetFactoryImpl.ROLLBACK;
536:
537: executed.addElement(r);
538:
539: }
540:
541: }
542:
543: if (executed.size() > 0) {
544:
545: enumeration = executed.elements();
546:
547: while (enumeration.hasMoreElements()) {
548:
549: queue.removeElement(enumeration.nextElement());
550:
551: }
552:
553: do_notify = true;
554:
555: }
556:
557: if (locks.remove(tc) != null) {
558:
559: do_notify = attempt_lock_from_queue() ? true
560: : do_notify;
561:
562: }
563:
564: if (do_notify) {
565:
566: queue.notifyAll();
567:
568: }
569:
570: }
571:
572: if (related.size() > 0) {
573:
574: enumeration = related.elements();
575:
576: while (enumeration.hasMoreElements()) {
577:
578: TransactionalLockSet ls = (TransactionalLockSet) enumeration
579: .nextElement();
580:
581: ls.get_coordinator(tc.get_coordinator()).drop_locks();
582:
583: }
584:
585: }
586:
587: };
588:
589: public void print() {
590:
591: Enumeration enumeration;
592:
593: System.out
594: .println("\n=============================================================================");
595:
596: System.out.println(" LOCKS" + locks.size());
597:
598: System.out
599: .println("-----------------------------------------------------------------------------");
600:
601: synchronized (queue) {
602:
603: enumeration = locks.elements();
604:
605: while (enumeration.hasMoreElements()) {
606:
607: TransactionLocks r = (TransactionLocks) enumeration
608: .nextElement();
609:
610: System.out.println(r.toString());
611:
612: }
613:
614: System.out
615: .println("\n-----------------------------------------------------------------------------");
616:
617: System.out.println(" QUEUE" + queue.size());
618:
619: System.out
620: .println("-----------------------------------------------------------------------------");
621:
622: enumeration = queue.elements();
623:
624: while (enumeration.hasMoreElements()) {
625:
626: Request r = (Request) enumeration.nextElement();
627:
628: System.out.println(r.toString());
629:
630: }
631:
632: }
633: ;
634:
635: System.out
636: .println("=============================================================================\n");
637:
638: };
639:
640: // public void destroy() throws LockExists {
641:
642: public void destroy() {
643:
644: synchronized (queue) {
645:
646: check_active();
647:
648: is_active = false;
649:
650: if (locks.size() > 0) {
651:
652: Enumeration enumeration = locks.elements();
653:
654: while (enumeration.hasMoreElements()) {
655:
656: TransactionLocks ls = (TransactionLocks) enumeration
657: .nextElement();
658:
659: if (ls.any_locks()) {
660:
661: throw new RuntimeException("LockExists");
662:
663: }
664:
665: }
666:
667: }
668:
669: factory.remove_me(this );
670:
671: }
672: ;
673:
674: };
675:
676: private void check_active() {
677:
678: if (!is_active) {
679:
680: throw new org.omg.CORBA.OBJECT_NOT_EXIST();
681:
682: }
683:
684: }
685:
686: };
|