001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */
019:
020: package org.apache.geronimo.openejb.cluster.stateful.container;
021:
022: import java.lang.reflect.Method;
023: import java.rmi.dgc.VMID;
024: import java.util.HashMap;
025: import java.util.Map;
026:
027: import javax.transaction.TransactionManager;
028:
029: import org.apache.commons.logging.Log;
030: import org.apache.commons.logging.LogFactory;
031: import org.apache.geronimo.clustering.Session;
032: import org.apache.geronimo.clustering.SessionAlreadyExistException;
033: import org.apache.geronimo.clustering.SessionListener;
034: import org.apache.geronimo.clustering.SessionManager;
035: import org.apache.geronimo.openejb.cluster.infra.SessionManagerTracker;
036: import org.apache.openejb.OpenEJBException;
037: import org.apache.openejb.SystemException;
038: import org.apache.openejb.core.CoreDeploymentInfo;
039: import org.apache.openejb.core.ThreadContext;
040: import org.apache.openejb.core.stateful.BeanEntry;
041: import org.apache.openejb.core.stateful.StatefulInstanceManager;
042: import org.apache.openejb.core.stateful.StatefulContainer.MethodType;
043: import org.apache.openejb.persistence.JtaEntityManagerRegistry;
044: import org.apache.openejb.spi.SecurityService;
045: import org.apache.openejb.util.Index;
046:
047: /**
048: *
049: * @version $Rev:$ $Date:$
050: */
051: public class ClusteredStatefulInstanceManager extends
052: StatefulInstanceManager implements SessionManagerTracker {
053: private final Map<Object, SessionManager> sessionManagersById;
054: private final Map<Object, CoreDeploymentInfo> deploymentsById;
055:
056: public ClusteredStatefulInstanceManager(
057: TransactionManager transactionManager,
058: SecurityService securityService,
059: JtaEntityManagerRegistry jtaEntityManagerRegistry,
060: Class passivatorClass, int timeout, int poolSize,
061: int bulkPassivate) throws OpenEJBException {
062: super (transactionManager, securityService,
063: jtaEntityManagerRegistry, passivatorClass, timeout,
064: poolSize, bulkPassivate);
065:
066: sessionManagersById = new HashMap<Object, SessionManager>();
067: deploymentsById = new HashMap<Object, CoreDeploymentInfo>();
068: }
069:
070: public void addSessionManager(Object deploymentId,
071: SessionManager sessionManager) {
072: synchronized (sessionManagersById) {
073: sessionManagersById.put(deploymentId, sessionManager);
074: }
075: sessionManager.registerListener(new MigrationListener());
076: }
077:
078: public void removeSessionManager(Object deploymentId,
079: SessionManager sessionManager) {
080: synchronized (sessionManagersById) {
081: sessionManagersById.remove(deploymentId);
082: }
083: }
084:
085: @Override
086: public void deploy(CoreDeploymentInfo deploymentInfo,
087: Index<Method, MethodType> index) throws OpenEJBException {
088: synchronized (deploymentsById) {
089: deploymentsById.put(deploymentInfo.getDeploymentID(),
090: deploymentInfo);
091: }
092: super .deploy(deploymentInfo, index);
093: }
094:
095: @Override
096: public void undeploy(CoreDeploymentInfo deploymentInfo)
097: throws OpenEJBException {
098: synchronized (deploymentsById) {
099: deploymentsById.remove(deploymentInfo.getDeploymentID());
100: }
101: super .undeploy(deploymentInfo);
102: }
103:
104: @Override
105: protected BeanEntry newBeanEntry(Object primaryKey, Object bean) {
106: ThreadContext threadContext = ThreadContext.getThreadContext();
107: if (null == threadContext) {
108: throw new IllegalStateException("No ThreadContext");
109: }
110: Object deploymentId = threadContext.getDeploymentInfo()
111: .getDeploymentID();
112:
113: SessionManager sessionManager;
114: synchronized (sessionManagersById) {
115: sessionManager = sessionManagersById.get(deploymentId);
116: }
117: if (null == sessionManager) {
118: throw new IllegalStateException(
119: "No SessionManager registered for deployment ["
120: + deploymentId + "]");
121: }
122:
123: Session session;
124: try {
125: if (!(primaryKey instanceof VMID)) {
126: // primaryKey.toString() must be an unique String representation for an unique identifier. Here, we
127: // check that primaryKey is a VMID as its Object.toString implementation returns an unique String
128: // representation. Other types may not implement Object.toString() "correctly".
129: throw new AssertionError("primaryKey MUST be a "
130: + VMID.class.getName());
131: }
132: session = sessionManager.createSession(primaryKey
133: .toString());
134: } catch (SessionAlreadyExistException e) {
135: throw (IllegalStateException) new IllegalStateException()
136: .initCause(e);
137: }
138:
139: return new ClusteredBeanEntry(session, deploymentId, bean,
140: primaryKey, timeOut);
141: }
142:
143: @Override
144: protected void onFreeBeanEntry(ThreadContext callContext,
145: BeanEntry entry) {
146: SessionOperation operation = callContext
147: .get(SessionOperation.class);
148: if (null != operation) {
149: if (SessionOperation.DESTRUCTION != operation
150: && SessionOperation.OUTBOUND_MIGRATION != operation) {
151: throw new AssertionError();
152: }
153: return;
154: }
155: ClusteredBeanEntry clusteredBeanEntry = (ClusteredBeanEntry) entry;
156: clusteredBeanEntry.release();
157: }
158:
159: @Override
160: protected void onPoolInstanceWithoutTransaction(
161: ThreadContext callContext, BeanEntry entry) {
162: SessionOperation operation = callContext
163: .get(SessionOperation.class);
164: if (null != operation) {
165: if (SessionOperation.INBOUND_MIGRATION != operation) {
166: throw new AssertionError();
167: }
168: return;
169: }
170: ClusteredBeanEntry clusteredBeanEntry = (ClusteredBeanEntry) entry;
171: clusteredBeanEntry.endAccess();
172: }
173:
174: protected enum SessionOperation {
175: INBOUND_MIGRATION, OUTBOUND_MIGRATION, DESTRUCTION
176: }
177:
178: protected class MigrationListener implements SessionListener {
179: private final Log log = LogFactory
180: .getLog(MigrationListener.class);
181:
182: public void notifyInboundSessionMigration(
183: org.apache.geronimo.clustering.Session session) {
184: ClusteredBeanEntry beanEntry = new ClusteredBeanEntry(
185: session);
186: ThreadContext context = newThreadContext(beanEntry);
187: if (null == context) {
188: return;
189: }
190: context.set(SessionOperation.class,
191: SessionOperation.INBOUND_MIGRATION);
192:
193: try {
194: activateInstance(context, beanEntry);
195: poolInstance(context, beanEntry.getBean());
196: } catch (Exception e) {
197: log.warn("Cannot activate migrated bean entry.", e);
198: }
199: }
200:
201: public void notifyOutboundSessionMigration(
202: org.apache.geronimo.clustering.Session session) {
203: ClusteredBeanEntry beanEntry = new ClusteredBeanEntry(
204: session);
205: ThreadContext context = newThreadContext(beanEntry);
206: if (null == context) {
207: return;
208: }
209: context.set(SessionOperation.class,
210: SessionOperation.OUTBOUND_MIGRATION);
211:
212: passivate(context, beanEntry);
213: try {
214: freeInstance(context);
215: } catch (SystemException e) {
216: log.warn("Cannot free bean entry", e);
217: }
218: }
219:
220: public void notifySessionDestruction(
221: org.apache.geronimo.clustering.Session session) {
222: ClusteredBeanEntry beanEntry = new ClusteredBeanEntry(
223: session);
224: ThreadContext context = newThreadContext(beanEntry);
225: if (null == context) {
226: return;
227: }
228: context.set(SessionOperation.class,
229: SessionOperation.DESTRUCTION);
230:
231: try {
232: freeInstance(context);
233: } catch (SystemException e) {
234: log.warn("Cannot free bean entry", e);
235: }
236: }
237:
238: protected ThreadContext newThreadContext(
239: ClusteredBeanEntry beanEntry) {
240: Object deploymentId = beanEntry.getDeploymentId();
241: CoreDeploymentInfo deploymentInfo;
242: synchronized (deploymentsById) {
243: deploymentInfo = deploymentsById.get(deploymentId);
244: }
245: if (null == deploymentInfo) {
246: log.warn("Deployment [" + deploymentId
247: + "] is unknown.");
248: return null;
249: }
250: return new ThreadContext(deploymentInfo, beanEntry
251: .getPrimaryKey());
252: }
253: }
254:
255: }
|