001: /**
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */package org.apache.geronimo.clustering.wadi;
017:
018: import java.util.Collection;
019: import java.util.Collections;
020: import java.util.HashMap;
021: import java.util.HashSet;
022: import java.util.Map;
023: import java.util.Set;
024: import java.util.concurrent.CopyOnWriteArrayList;
025:
026: import org.apache.commons.logging.Log;
027: import org.apache.commons.logging.LogFactory;
028: import org.apache.geronimo.clustering.Cluster;
029: import org.apache.geronimo.clustering.Node;
030: import org.apache.geronimo.clustering.Session;
031: import org.apache.geronimo.clustering.SessionAlreadyExistException;
032: import org.apache.geronimo.clustering.SessionListener;
033: import org.apache.geronimo.clustering.SessionManager;
034: import org.apache.geronimo.clustering.SessionManagerListener;
035: import org.apache.geronimo.gbean.GBeanInfo;
036: import org.apache.geronimo.gbean.GBeanInfoBuilder;
037: import org.apache.geronimo.gbean.GBeanLifecycle;
038: import org.apache.geronimo.j2ee.j2eeobjectnames.NameFactory;
039: import org.codehaus.wadi.aop.replication.AOPStackContext;
040: import org.codehaus.wadi.core.assembler.StackContext;
041: import org.codehaus.wadi.core.manager.Manager;
042: import org.codehaus.wadi.core.manager.SessionMonitor;
043: import org.codehaus.wadi.group.Dispatcher;
044: import org.codehaus.wadi.group.Peer;
045: import org.codehaus.wadi.replication.strategy.BackingStrategyFactory;
046: import org.codehaus.wadi.servicespace.LifecycleState;
047: import org.codehaus.wadi.servicespace.ServiceAlreadyRegisteredException;
048: import org.codehaus.wadi.servicespace.ServiceRegistry;
049: import org.codehaus.wadi.servicespace.ServiceSpace;
050: import org.codehaus.wadi.servicespace.ServiceSpaceLifecycleEvent;
051: import org.codehaus.wadi.servicespace.ServiceSpaceListener;
052: import org.codehaus.wadi.servicespace.ServiceSpaceName;
053:
054: /**
055: *
056: * @version $Rev$ $Date$
057: */
058: public class BasicWADISessionManager implements GBeanLifecycle,
059: SessionManager, WADISessionManager {
060: private static final Log log = LogFactory
061: .getLog(BasicWADISessionManager.class);
062:
063: protected final ClassLoader cl;
064: private final WADICluster cluster;
065: protected final WADISessionManagerConfigInfo configInfo;
066: protected final BackingStrategyFactory backingStrategyFactory;
067: private final Collection<ClusteredServiceHolder> serviceHolders;
068: private final CopyOnWriteArrayList<SessionListener> listeners;
069: private final Map<SessionManagerListener, ServiceSpaceListener> sessionManagerListenerToAdapter;
070:
071: private Manager manager;
072: private SessionMonitor sessionMonitor;
073: private ServiceSpace serviceSpace;
074:
075: public BasicWADISessionManager(ClassLoader cl,
076: WADISessionManagerConfigInfo configInfo,
077: WADICluster cluster,
078: BackingStrategyFactory backingStrategyFactory,
079: Collection<ClusteredServiceHolder> serviceHolders) {
080: if (null == cl) {
081: throw new IllegalArgumentException("cl is required");
082: } else if (null == configInfo) {
083: throw new IllegalArgumentException("configInfo is required");
084: } else if (null == cluster) {
085: throw new IllegalArgumentException("cluster is required");
086: } else if (null == backingStrategyFactory) {
087: throw new IllegalArgumentException(
088: "backingStrategyFactory is required");
089: }
090: this .cl = cl;
091: this .configInfo = configInfo;
092: this .cluster = cluster;
093: this .backingStrategyFactory = backingStrategyFactory;
094:
095: if (null == serviceHolders) {
096: serviceHolders = Collections.emptySet();
097: }
098: this .serviceHolders = serviceHolders;
099:
100: listeners = new CopyOnWriteArrayList<SessionListener>();
101: sessionManagerListenerToAdapter = new HashMap<SessionManagerListener, ServiceSpaceListener>();
102: }
103:
104: public void doStart() throws Exception {
105: Dispatcher underlyingDisp = cluster.getCluster()
106: .getDispatcher();
107:
108: ServiceSpaceName serviceSpaceName = new ServiceSpaceName(
109: configInfo.getServiceSpaceURI());
110: StackContext stackContext;
111: if (configInfo.isDeltaReplication()) {
112: stackContext = newAOPStackContext(underlyingDisp,
113: serviceSpaceName);
114: } else {
115: stackContext = newStackContext(underlyingDisp,
116: serviceSpaceName);
117: }
118: stackContext.setDisableReplication(configInfo
119: .isDisableReplication());
120: stackContext.build();
121:
122: serviceSpace = stackContext.getServiceSpace();
123:
124: manager = stackContext.getManager();
125:
126: sessionMonitor = stackContext.getSessionMonitor();
127: sessionMonitor.addSessionListener(new SessionListenerAdapter());
128:
129: registerClusteredServices();
130:
131: serviceSpace.start();
132: }
133:
134: public void doStop() throws Exception {
135: serviceSpace.stop();
136: }
137:
138: public void doFail() {
139: try {
140: serviceSpace.stop();
141: } catch (Exception e) {
142: log.error(e);
143: }
144: }
145:
146: public Session createSession(String sessionId)
147: throws SessionAlreadyExistException {
148: org.codehaus.wadi.core.session.Session session;
149: try {
150: session = manager.createWithName(sessionId);
151: } catch (org.codehaus.wadi.core.manager.SessionAlreadyExistException e) {
152: throw new SessionAlreadyExistException("Session "
153: + sessionId + " already exists", e);
154: }
155: return new WADISessionAdaptor(session);
156: }
157:
158: public Manager getManager() {
159: return manager;
160: }
161:
162: public Cluster getCluster() {
163: return cluster;
164: }
165:
166: public Node getNode() {
167: return cluster.getLocalNode();
168: }
169:
170: public Set<Node> getRemoteNodes() {
171: Set<Peer> peers = serviceSpace.getHostingPeers();
172: return mapToNodes(peers);
173: }
174:
175: public void registerListener(SessionListener listener) {
176: listeners.add(listener);
177: }
178:
179: public void unregisterListener(SessionListener listener) {
180: listeners.remove(listener);
181: }
182:
183: public void registerSessionManagerListener(
184: SessionManagerListener listener) {
185: ServiceSpaceListener adapter = new ServiceSpaceListenerAdapter(
186: listener);
187: serviceSpace.addServiceSpaceListener(adapter);
188: synchronized (sessionManagerListenerToAdapter) {
189: sessionManagerListenerToAdapter.put(listener, adapter);
190: }
191: }
192:
193: public void unregisterSessionManagerListener(
194: SessionManagerListener listener) {
195: ServiceSpaceListener adapter;
196: synchronized (sessionManagerListenerToAdapter) {
197: adapter = sessionManagerListenerToAdapter.remove(listener);
198: }
199: if (null == adapter) {
200: throw new IllegalArgumentException("Listener [" + listener
201: + "] is not registered");
202: }
203: serviceSpace.removeServiceSpaceListener(adapter);
204: }
205:
206: public ServiceSpace getServiceSpace() {
207: return serviceSpace;
208: }
209:
210: protected StackContext newStackContext(Dispatcher underlyingDisp,
211: ServiceSpaceName serviceSpaceName) {
212: return new StackContext(cl, serviceSpaceName, underlyingDisp,
213: configInfo.getSessionTimeoutSeconds(), configInfo
214: .getNumPartitions(), configInfo
215: .getSweepInterval(), backingStrategyFactory);
216: }
217:
218: protected AOPStackContext newAOPStackContext(
219: Dispatcher underlyingDisp, ServiceSpaceName serviceSpaceName) {
220: return new AOPStackContext(cl, serviceSpaceName,
221: underlyingDisp, configInfo.getSessionTimeoutSeconds(),
222: configInfo.getNumPartitions(), configInfo
223: .getSweepInterval(), backingStrategyFactory);
224: }
225:
226: protected void registerClusteredServices()
227: throws ServiceAlreadyRegisteredException {
228: ServiceRegistry serviceRegistry = serviceSpace
229: .getServiceRegistry();
230: for (ClusteredServiceHolder serviceHolder : serviceHolders) {
231: serviceRegistry.register(serviceHolder.getServiceName(),
232: serviceHolder.getService());
233: }
234: }
235:
236: protected Set<Node> mapToNodes(Set<Peer> peers)
237: throws AssertionError {
238: Set<Node> nodes = new HashSet<Node>();
239: for (Peer peer : peers) {
240: RemoteNode remoteNode = RemoteNode.retrieveAdaptor(peer);
241: nodes.add(remoteNode);
242: }
243: return nodes;
244: }
245:
246: protected Node mapToNode(Peer peer) throws AssertionError {
247: return RemoteNode.retrieveAdaptor(peer);
248: }
249:
250: protected void notifyInboundSessionMigration(
251: org.codehaus.wadi.core.session.Session session) {
252: for (SessionListener listener : listeners) {
253: listener
254: .notifyInboundSessionMigration(new WADISessionAdaptor(
255: session));
256: }
257: }
258:
259: protected void notifyOutboundSessionMigration(
260: org.codehaus.wadi.core.session.Session session) {
261: for (SessionListener listener : listeners) {
262: WADISessionAdaptor adaptor = WADISessionAdaptor
263: .retrieveAdaptor(session);
264: listener.notifyOutboundSessionMigration(adaptor);
265: }
266: }
267:
268: protected void notifySessionDestruction(
269: org.codehaus.wadi.core.session.Session session) {
270: for (SessionListener listener : listeners) {
271: WADISessionAdaptor adaptor = WADISessionAdaptor
272: .retrieveAdaptor(session);
273: listener.notifySessionDestruction(adaptor);
274: }
275: }
276:
277: protected class SessionListenerAdapter implements
278: org.codehaus.wadi.core.manager.SessionListener {
279:
280: public void onSessionCreation(
281: org.codehaus.wadi.core.session.Session session) {
282: }
283:
284: public void onSessionDestruction(
285: org.codehaus.wadi.core.session.Session session) {
286: notifySessionDestruction(session);
287: }
288:
289: public void onInboundSessionMigration(
290: org.codehaus.wadi.core.session.Session session) {
291: notifyInboundSessionMigration(session);
292: }
293:
294: public void onOutbountSessionMigration(
295: org.codehaus.wadi.core.session.Session session) {
296: notifyOutboundSessionMigration(session);
297: }
298:
299: }
300:
301: protected class ServiceSpaceListenerAdapter implements
302: ServiceSpaceListener {
303: private final SessionManagerListener listener;
304:
305: public ServiceSpaceListenerAdapter(
306: SessionManagerListener listener) {
307: this .listener = listener;
308: }
309:
310: public void receive(ServiceSpaceLifecycleEvent event,
311: Set<Peer> newHostingPeers) {
312: LifecycleState state = event.getState();
313: if (state == LifecycleState.STARTED) {
314: Set<Node> newHostingNodes = mapToNodes(newHostingPeers);
315: Node joiningNode = mapToNode(event.getHostingPeer());
316: listener.onJoin(joiningNode, newHostingNodes);
317: } else if (state == LifecycleState.STOPPED
318: || state == LifecycleState.FAILED) {
319: Set<Node> newHostingNodes = mapToNodes(newHostingPeers);
320: Node leavingNode = mapToNode(event.getHostingPeer());
321: listener.onLeave(leavingNode, newHostingNodes);
322: }
323: }
324: }
325:
326: public static final GBeanInfo GBEAN_INFO;
327:
328: public static final String GBEAN_ATTR_WADI_CONFIG_INFO = "wadiConfigInfo";
329:
330: public static final String GBEAN_REF_CLUSTER = "Cluster";
331: public static final String GBEAN_REF_BACKING_STRATEGY_FACTORY = "BackingStrategyFactory";
332: public static final String GBEAN_REF_SERVICE_HOLDERS = "ServiceHolders";
333:
334: static {
335: GBeanInfoBuilder infoBuilder = GBeanInfoBuilder.createStatic(
336: "WADI Session Manager", BasicWADISessionManager.class,
337: NameFactory.GERONIMO_SERVICE);
338:
339: infoBuilder.addAttribute("classLoader", ClassLoader.class,
340: false);
341: infoBuilder.addAttribute(GBEAN_ATTR_WADI_CONFIG_INFO,
342: WADISessionManagerConfigInfo.class, true);
343:
344: infoBuilder.addReference(GBEAN_REF_CLUSTER, WADICluster.class,
345: NameFactory.GERONIMO_SERVICE);
346: infoBuilder.addReference(GBEAN_REF_BACKING_STRATEGY_FACTORY,
347: BackingStrategyFactory.class,
348: NameFactory.GERONIMO_SERVICE);
349: infoBuilder.addReference(GBEAN_REF_SERVICE_HOLDERS,
350: ClusteredServiceHolder.class,
351: NameFactory.GERONIMO_SERVICE);
352:
353: infoBuilder.addInterface(SessionManager.class);
354: infoBuilder.addInterface(WADISessionManager.class);
355:
356: infoBuilder.setConstructor(new String[] { "classLoader",
357: GBEAN_ATTR_WADI_CONFIG_INFO, GBEAN_REF_CLUSTER,
358: GBEAN_REF_BACKING_STRATEGY_FACTORY,
359: GBEAN_REF_SERVICE_HOLDERS });
360:
361: GBEAN_INFO = infoBuilder.getBeanInfo();
362: }
363:
364: public static GBeanInfo getGBeanInfo() {
365: return GBEAN_INFO;
366: }
367:
368: }
|