001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.ejb3.stateful;
023:
024: import org.jboss.aop.metadata.SimpleMetaData;
025: import org.jboss.ejb3.BaseContext;
026: import org.jboss.ejb3.Container;
027: import org.jboss.ejb3.Ejb3Registry;
028: import org.jboss.ejb3.ProxyFactoryHelper;
029: import org.jboss.ejb3.ThreadLocalStack;
030: import org.jboss.ejb3.cache.StatefulCache;
031: import org.jboss.ejb3.interceptor.InterceptorInfo;
032: import org.jboss.ejb3.tx.TxUtil;
033: import org.jboss.serial.io.MarshalledObject;
034: import org.jboss.tm.TxUtils;
035:
036: import javax.ejb.EJBLocalObject;
037: import javax.naming.NamingException;
038: import javax.persistence.EntityManager;
039: import javax.transaction.Synchronization;
040: import javax.transaction.Transaction;
041: import java.io.Externalizable;
042: import java.io.IOException;
043: import java.io.ObjectInput;
044: import java.io.ObjectOutput;
045: import java.util.ArrayList;
046: import java.util.HashMap;
047: import java.util.Iterator;
048: import java.util.List;
049: import java.util.Map;
050: import java.util.concurrent.locks.ReentrantLock;
051:
052: /**
053: * BeanContext for a stateful session bean.
054: *
055: * @author <a href="mailto:bill@jboss.org">Bill Burke</a>
056: * @author Brian Stansberry
057: *
058: * @version $Revision: 61595 $
059: */
060: public class StatefulBeanContext extends BaseContext implements
061: Externalizable {
062: /** The serialVersionUID */
063: private static final long serialVersionUID = -102470788178912606L;
064:
065: protected Object id;
066:
067: protected boolean txSynchronized = false;
068:
069: protected boolean inInvocation = false;
070:
071: protected MarshalledObject beanMO;
072:
073: protected ReentrantLock lock = new ReentrantLock();
074:
075: protected boolean discarded;
076:
077: // these two are needed for propagated extended persistence contexts when one
078: // SFSB injects another.
079: public static ThreadLocalStack<StatefulBeanContext> propagatedContainedIn = new ThreadLocalStack<StatefulBeanContext>();
080:
081: public static ThreadLocalStack<StatefulBeanContext> currentBean = new ThreadLocalStack<StatefulBeanContext>();
082:
083: protected StatefulBeanContext containedIn;
084:
085: protected List<StatefulBeanContext> contains;
086:
087: protected HashMap<String, EntityManager> persistenceContexts;
088:
089: protected boolean removed;
090:
091: protected String containerName;
092:
093: protected boolean replicationIsPassivation = true;
094:
095: protected transient boolean passivated = false;
096:
097: public StatefulBeanContext() {
098:
099: }
100:
101: public List<StatefulBeanContext> getContains() {
102: if (bean == null)
103: extractBeanAndInterceptors();
104: return contains;
105: }
106:
107: /**
108: * Makes a copy of the contains list so nested callback iterators
109: * can iterate over it without concern that another thread will
110: * remove the context.
111: *
112: * TODO replace contains list with a concurrent collection
113: */
114: private List<StatefulBeanContext> getThreadSafeContains() {
115: // Call getContains() to ensure unmarshalling
116: List<StatefulBeanContext> orig = getContains();
117: List<StatefulBeanContext> copy = null;
118: if (orig != null) {
119: synchronized (orig) {
120: copy = new ArrayList<StatefulBeanContext>(orig);
121: }
122: }
123: return copy;
124: }
125:
126: public EntityManager getExtendedPersistenceContext(String id) {
127: EntityManager found = null;
128: Map<String, EntityManager> extendedPCS = getExtendedPersistenceContexts();
129: if (extendedPCS != null) {
130: found = extendedPCS.get(id);
131: }
132: if (found != null)
133: return found;
134: if (containedIn != null) {
135: found = containedIn.getExtendedPersistenceContext(id);
136: }
137: return found;
138: }
139:
140: public void addExtendedPersistenceContext(String id,
141: EntityManager pc) {
142: Map<String, EntityManager> extendedPCS = getExtendedPersistenceContexts();
143: if (extendedPCS == null) {
144: extendedPCS = persistenceContexts = new HashMap<String, EntityManager>();
145: }
146: extendedPCS.put(id, pc);
147: }
148:
149: public boolean scanForExtendedPersistenceContext(String id,
150: StatefulBeanContext ignore) {
151: if (this .equals(ignore))
152: return false;
153:
154: if (!removed) {
155: Map<String, EntityManager> extendedPCS = getExtendedPersistenceContexts();
156: if (extendedPCS != null && extendedPCS.containsKey(id))
157: return true;
158: }
159:
160: if (getContains() != null) {
161: synchronized (contains) {
162: for (StatefulBeanContext contained : contains) {
163: if (!contained.equals(ignore)) {
164: if (contained
165: .scanForExtendedPersistenceContext(id,
166: ignore))
167: return true;
168: }
169: }
170: }
171: }
172:
173: return false;
174: }
175:
176: public void removeExtendedPersistenceContext(String id) {
177: Map<String, EntityManager> extendedPCS = getExtendedPersistenceContexts();
178: if (extendedPCS != null) {
179: extendedPCS.remove(id);
180: }
181:
182: if (getContains() != null) {
183: synchronized (contains) {
184: for (StatefulBeanContext contained : contains) {
185: contained.removeExtendedPersistenceContext(id);
186: }
187: }
188: }
189: }
190:
191: public Map<String, EntityManager> getExtendedPersistenceContexts() {
192: if (persistenceContexts == null) {
193: if (bean == null)
194: extractBeanAndInterceptors(); // unmarshall
195: }
196: return persistenceContexts;
197: }
198:
199: public StatefulBeanContext getContainedIn() {
200: return containedIn;
201: }
202:
203: public StatefulBeanContext getUltimateContainedIn() {
204: StatefulBeanContext child = this ;
205: StatefulBeanContext parent = containedIn;
206:
207: while (parent != null) {
208: child = parent;
209: parent = parent.getContainedIn();
210: }
211:
212: if (parent == null && this != child) {
213: // Don't hand out a ref to our parent obtained by walking the
214: // tree. Rather, get it from its cache. This gives the cache
215: // a chance to activate it if it hasn't been. We don't want
216: // to mark the parent as in use though.
217: StatefulCache ultimateCache = ((StatefulContainer) child
218: .getContainer()).getCache();
219: child = ultimateCache.get(child.getId(), false);
220: }
221:
222: return child;
223: }
224:
225: public void addContains(StatefulBeanContext ctx) {
226: if (getContains() == null)
227: contains = new ArrayList<StatefulBeanContext>();
228:
229: synchronized (contains) {
230: contains.add(ctx);
231: ctx.containedIn = this ;
232: }
233: }
234:
235: public void removeContains(StatefulBeanContext ctx) {
236: if (getContains() != null) // call getContains() to ensure unmarshalling
237: {
238: // Need to be thread safe
239: synchronized (contains) {
240: if (contains.remove(ctx)) {
241: ctx.containedIn = null;
242: }
243: }
244:
245: if (removed) {
246: // Close out any XPCs that are no longer referenced
247: cleanExtendedPCs();
248: }
249:
250: if (getCanRemoveFromCache()) {
251: if (containedIn != null) {
252: containedIn.removeContains(this );
253: }
254:
255: // Notify our cache to remove us as we no longer have children
256: ((StatefulContainer) getContainer()).getCache().remove(
257: getId());
258: }
259: }
260: }
261:
262: public StatefulBeanContext pushContainedIn() {
263: StatefulBeanContext this Ptr = this ;
264: if (propagatedContainedIn.getList() != null) {
265: // This is a nested stateful bean, within another stateful bean.
266: // We need to create a nested bean context. The nested one will
267: // be put in the parent's list and owned by it. It is a special
268: // class because we do not want to put its state in a separate
269: // marshalled object as we want to maintain object references
270: // between it and its parent.
271:
272: // We also do not want to put the nested context within its container's
273: // cache. If placed in the cache, it could be independently passivated,
274: // activated and replicated, again breaking object references due to
275: // independent marshalling. Instead, we return a proxy to it that will
276: // be stored in its container's cache
277: containedIn = propagatedContainedIn.get();
278: NestedStatefulBeanContext nested = new NestedStatefulBeanContext();
279: nested.id = id;
280: nested.container = getContainer();
281: nested.containerName = containerName;
282: nested.bean = bean;
283: nested.replicationIsPassivation = replicationIsPassivation;
284: containedIn.addContains(nested);
285: this Ptr = new ProxiedStatefulBeanContext(nested);
286: }
287: propagatedContainedIn.push(this Ptr);
288: return this Ptr;
289: }
290:
291: /**
292: * Checks whether this context or any of its children are in use.
293: */
294: public boolean getCanPassivate() {
295: boolean canPassivate = (removed || !inUse);
296:
297: // Just check contains directly; don't call getContains() since
298: // getContains() will deserialize the beanMO. If the beanMO isn't
299: // deserialized it's safe to assume the children aren't in use
300: if (canPassivate && contains != null) {
301: synchronized (contains) {
302: for (StatefulBeanContext contained : contains) {
303: if (!contained.getCanPassivate()) {
304: canPassivate = false;
305: break;
306: }
307: }
308: }
309: }
310:
311: return canPassivate;
312: }
313:
314: /**
315: * Notification from a non-clustered StatefulCache to inform
316: * that we are about to be passivated.
317: */
318: public void prePassivate() {
319: if (!removed && !passivated) {
320: getContainer().invokePrePassivate(this );
321: passivated = true;
322: }
323:
324: // Pass the call on to any nested children
325: List<StatefulBeanContext> children = getThreadSafeContains();
326: if (children != null) {
327: for (StatefulBeanContext contained : children) {
328: contained.prePassivate();
329: }
330: }
331: }
332:
333: /**
334: * Notification from a non-clustered StatefulCache to inform
335: * that we have been activated.
336: */
337: public void postActivate() {
338: if (!removed && passivated) {
339: getContainer().invokePostActivate(this );
340: passivated = false;
341: }
342:
343: // Pass the call on to any nested children
344: List<StatefulBeanContext> children = getThreadSafeContains();
345: if (children != null) {
346: for (StatefulBeanContext contained : children) {
347: contained.postActivate();
348: }
349: }
350: }
351:
352: /**
353: * Notification from a ClusteredStatefulCache to inform
354: * that a bean that is stored in the distributed cache is now
355: * being passivated as well. Something of a misnomer
356: * as it is possible the bean wasn't replicated (if it implements
357: * Optimized it may have been activated and then a reference left
358: * in the cache without the bean ever being replicated).
359: */
360: public void passivateAfterReplication() {
361: if (!removed && !passivated) {
362: getInstance(); // make sure we're unmarshalled
363: getContainer().invokePrePassivate(this );
364: passivated = true;
365: }
366:
367: // Only bother informing children if we aren't already serialized.
368: // If we're serialized, so are they and there's no point.
369: // Notifying them would cause us to deserialize a beanMO to no purpose.
370: if (contains != null) {
371: // Pass the call on to any nested children
372: List<StatefulBeanContext> children = getThreadSafeContains();
373: if (children != null) {
374: for (StatefulBeanContext contained : children) {
375: contained.passivateAfterReplication();
376: }
377: }
378: }
379: }
380:
381: public void activateAfterReplication() {
382: if (!removed && passivated) {
383: getInstance(); // make sure we're unmarshalled
384: getContainer().invokePostActivate(this );
385: passivated = false;
386: }
387:
388: // Pass the call on to any nested children
389: List<StatefulBeanContext> children = getThreadSafeContains();
390: if (children != null) {
391: for (StatefulBeanContext contained : children) {
392: contained.activateAfterReplication();
393: }
394: }
395: }
396:
397: public boolean getReplicationIsPassivation() {
398: return replicationIsPassivation;
399: }
400:
401: public void setReplicationIsPassivation(
402: boolean replicationIsPassivation) {
403: this .replicationIsPassivation = replicationIsPassivation;
404: }
405:
406: /**
407: * Notification from a ClusteredStatefulCache before a bean is
408: * replicated.
409: */
410: public void preReplicate() {
411: if (!removed && replicationIsPassivation && !passivated) {
412: getContainer().invokePrePassivate(this );
413: passivated = true;
414: }
415:
416: // Pass the call on to any nested children
417: List<StatefulBeanContext> children = getThreadSafeContains();
418: if (children != null) {
419: for (StatefulBeanContext contained : children) {
420: contained.preReplicate();
421: }
422: }
423: }
424:
425: /**
426: * Notification from a ClusteredStatefulCache after the bean
427: * is fetched from the distributed cache. Something of a misnomer
428: * as it is possible the bean wasn't replicated (if it implements
429: * Optimized it can be fetched from the cache twice without ever
430: * being replicated).
431: */
432: public void postReplicate() {
433: // We may not have been replicated, so only invoke @PostActivate
434: // if we are marked as passivated
435: if (!removed && passivated) {
436: getContainer().invokePostActivate(this );
437: passivated = false;
438: }
439:
440: // Pass the call on to any nested children
441: List<StatefulBeanContext> children = getThreadSafeContains();
442: if (children != null) {
443: for (StatefulBeanContext contained : children) {
444: contained.postReplicate();
445: }
446: }
447: }
448:
449: public void popContainedIn() {
450: propagatedContainedIn.pop();
451: }
452:
453: public boolean isInUse() {
454: return inUse;
455: }
456:
457: public void setInUse(boolean inUse) {
458: this .inUse = inUse;
459: }
460:
461: public boolean isDiscarded() {
462: return discarded;
463: }
464:
465: public void setDiscarded(boolean discarded) {
466: this .discarded = discarded;
467: }
468:
469: public ReentrantLock getLock() {
470: return lock;
471: }
472:
473: public boolean isInInvocation() {
474: return inInvocation;
475: }
476:
477: public void setInInvocation(boolean inInvocation) {
478: this .inInvocation = inInvocation;
479: }
480:
481: public Object getId() {
482: return id;
483: }
484:
485: public void setId(Object id) {
486: this .id = id;
487: }
488:
489: public boolean isTxSynchronized() {
490: return txSynchronized;
491: }
492:
493: public void setTxSynchronized(boolean txSynchronized) {
494: this .txSynchronized = txSynchronized;
495: }
496:
497: public boolean isRemoved() {
498: return removed;
499: }
500:
501: public void remove() {
502: if (removed)
503: return;
504: removed = true;
505: RuntimeException exceptionThrown = null;
506:
507: // Close any XPCs that haven't been injected into live
508: // beans in our family
509: try {
510: cleanExtendedPCs();
511: } catch (RuntimeException e) {
512: // we still need to remove ourself from any parent, so save
513: // the thrown exception and rethrow it after we have cleaned up.
514: if (exceptionThrown == null)
515: exceptionThrown = e;
516: }
517:
518: if (containedIn != null && getCanRemoveFromCache()) {
519: try {
520: containedIn.removeContains(this );
521: } catch (RuntimeException e) {
522: // we still need to clean internal state, so save the
523: // thrown exception and rethrow it after we have cleaned up.
524: if (exceptionThrown == null)
525: exceptionThrown = e;
526: }
527: }
528:
529: // Clear out refs to our bean and interceptors, to reduce our footprint
530: // in case we are still cached for our refs to any XPCs
531: bean = null;
532: interceptorInstances = null;
533:
534: if (exceptionThrown != null)
535: throw new RuntimeException(
536: "exception thrown while removing SFSB",
537: exceptionThrown);
538: }
539:
540: public boolean getCanRemoveFromCache() {
541: boolean canRemove = removed;
542:
543: if (canRemove && getContains() != null) // call getContains() to ensure unmarshalling
544: {
545: synchronized (contains) {
546: canRemove = (contains.size() == 0);
547: }
548: }
549:
550: return canRemove;
551: }
552:
553: private void cleanExtendedPCs() {
554: try {
555: Transaction tx = TxUtil.getTransactionManager()
556: .getTransaction();
557: if (tx != null && TxUtils.isActive(tx)) {
558: tx.registerSynchronization(new XPCCloseSynchronization(
559: this ));
560: } else {
561: closeExtendedPCs();
562: }
563: } catch (RuntimeException e) {
564: throw e;
565: } catch (Exception e) {
566: throw new RuntimeException(
567: "Error cleaning PersistenceContexts in SFSB removal",
568: e);
569: }
570: }
571:
572: private void closeExtendedPCs() {
573: Map<String, EntityManager> extendedPCS = getExtendedPersistenceContexts();
574: if (extendedPCS != null) {
575: RuntimeException exceptionThrown = null;
576:
577: List<String> closedXPCs = new ArrayList<String>();
578: StatefulBeanContext topCtx = getUltimateContainedIn();
579:
580: for (Iterator<Map.Entry<String, EntityManager>> iter = extendedPCS
581: .entrySet().iterator(); iter.hasNext();) {
582: Map.Entry<String, EntityManager> entry = iter.next();
583: String id = entry.getKey();
584: EntityManager xpc = entry.getValue();
585:
586: // Only close the XPC if our live parent(s) or cousins
587: // don't also have a ref to it
588: boolean canClose = topCtx
589: .scanForExtendedPersistenceContext(id, this );
590:
591: if (canClose && getContains() != null) {
592: // Only close the XPC if our live childrenScan don't have a ref
593: synchronized (contains) {
594: for (StatefulBeanContext contained : contains) {
595: if (contained
596: .scanForExtendedPersistenceContext(
597: id, null)) {
598: canClose = false;
599: break;
600: }
601: }
602: }
603: }
604:
605: if (canClose) {
606: try {
607: xpc.close();
608: closedXPCs.add(id);
609: } catch (RuntimeException e) {
610: exceptionThrown = e;
611: }
612: }
613: }
614:
615: // Clean all refs to the closed XPCs from the tree
616: for (String id : closedXPCs) {
617: topCtx.removeExtendedPersistenceContext(id);
618: }
619:
620: if (exceptionThrown != null)
621: throw new RuntimeException(
622: "Error closing PersistenceContexts in SFSB removal",
623: exceptionThrown);
624: }
625: }
626:
627: public void setContainer(Container container) {
628: super .setContainer(container);
629: containerName = container.getObjectName().getCanonicalName();
630: }
631:
632: public Container getContainer() {
633: if (container == null) {
634: container = Ejb3Registry.getContainer(containerName);
635: }
636: return container;
637: }
638:
639: @Override
640: public Object getInstance() {
641: if (bean == null) {
642: extractBeanAndInterceptors();
643: }
644: return bean;
645: }
646:
647: @Override
648: public SimpleMetaData getMetaData() {
649: return super .getMetaData();
650: }
651:
652: // these are public for fast concurrent access/update
653: public volatile boolean markedForPassivation = false;
654:
655: public volatile boolean markedForReplication = false;
656:
657: // BES 2007/02/16 make private and use a getter/setter as
658: // ProxiedStatefulBeanContext needs to pass the value on
659: // to its NestedStatefulBeanContext
660: private volatile boolean inUse = false;
661:
662: public long lastUsed = System.currentTimeMillis();
663:
664: @Override
665: public Object[] getInterceptorInstances(
666: InterceptorInfo[] interceptorInfos) {
667: if (bean == null) {
668: extractBeanAndInterceptors();
669: }
670: return super .getInterceptorInstances(interceptorInfos);
671: }
672:
673: protected synchronized void extractBeanAndInterceptors() {
674: if (beanMO == null)
675: return;
676:
677: try {
678: Object[] beanAndInterceptors = (Object[]) beanMO.get();
679: bean = beanAndInterceptors[0];
680: persistenceContexts = (HashMap<String, EntityManager>) beanAndInterceptors[1];
681: ArrayList list = (ArrayList) beanAndInterceptors[2];
682: interceptorInstances = new HashMap<Class, Object>();
683: if (list != null) {
684: for (Object o : list) {
685: interceptorInstances.put(o.getClass(), o);
686: }
687: }
688: contains = (List<StatefulBeanContext>) beanAndInterceptors[3];
689: // Reestablish links to our children; if they serialize a link
690: // to us for some reason serialization blows up
691: if (contains != null) {
692: for (StatefulBeanContext contained : contains) {
693: contained.containedIn = this ;
694: }
695: }
696:
697: // Don't hold onto the beanMo, as its contents are mutable
698: // and we don't want to serialize a stale version of them
699: beanMO = null;
700: } catch (IOException e) {
701: throw new RuntimeException(e);
702: } catch (ClassNotFoundException e) {
703: throw new RuntimeException(e);
704: }
705: }
706:
707: public void writeExternal(ObjectOutput out) throws IOException {
708: out.writeUTF(containerName);
709: out.writeObject(id);
710: out.writeObject(metadata);
711: out.writeLong(lastUsed);
712:
713: if (beanMO == null) {
714: Object[] beanAndInterceptors = new Object[4];
715: beanAndInterceptors[0] = bean;
716: beanAndInterceptors[1] = persistenceContexts;
717: if (interceptorInstances != null
718: && interceptorInstances.size() > 0) {
719: ArrayList list = new ArrayList();
720: list.addAll(interceptorInstances.values());
721: beanAndInterceptors[2] = list;
722: }
723: beanAndInterceptors[3] = contains;
724:
725: // BES 2007/02/12 Previously we were trying to hold a ref to
726: // beanMO after we created it, but that exposes the risk of
727: // two different versions of the constituent state that
728: // can fall out of sync. So now we just write a local variable.
729:
730: MarshalledObject mo = new MarshalledObject(
731: beanAndInterceptors);
732: out.writeObject(mo);
733: } else {
734: // We've been deserialized and are now being re-serialized, but
735: // extractBeanAndInterceptors hasn't been called in between.
736: // This can happen if a passivated session is involved in a
737: // JBoss Cache state transfer to a newly deployed node.
738: out.writeObject(beanMO);
739: }
740:
741: out.writeBoolean(removed);
742: out.writeBoolean(replicationIsPassivation);
743: }
744:
745: public void readExternal(ObjectInput in) throws IOException,
746: ClassNotFoundException {
747: containerName = in.readUTF();
748: id = in.readObject();
749: metadata = (SimpleMetaData) in.readObject();
750: lastUsed = in.readLong();
751:
752: beanMO = (MarshalledObject) in.readObject();
753: removed = in.readBoolean();
754: replicationIsPassivation = in.readBoolean();
755:
756: // If we've just been deserialized, we are passivated
757: passivated = true;
758: }
759:
760: public Object getInvokedMethodKey() {
761: return this .getId();
762: }
763:
764: public EJBLocalObject getEJBLocalObject()
765: throws IllegalStateException {
766: try {
767: Object proxy = ((StatefulContainer) container)
768: .createLocalProxy(getId());
769: return (EJBLocalObject) proxy;
770: } catch (Exception e) {
771: throw new IllegalStateException(e);
772: }
773: }
774:
775: private static class XPCCloseSynchronization implements
776: Synchronization {
777: private StatefulBeanContext ctx;
778:
779: private XPCCloseSynchronization(StatefulBeanContext context) {
780: ctx = context;
781: }
782:
783: public void beforeCompletion() {
784: }
785:
786: public void afterCompletion(int status) {
787: ctx.closeExtendedPCs();
788: // Clean ref to ctx, as some TMs leak Synchronization refs
789: ctx = null;
790: }
791: }
792: }
|