001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */
019:
020: package org.apache.axis2.clustering.context;
021:
022: import org.apache.axis2.clustering.ClusterManager;
023: import org.apache.axis2.clustering.ClusteringFault;
024: import org.apache.axis2.clustering.ClusteringConstants;
025: import org.apache.axis2.context.AbstractContext;
026: import org.apache.axis2.context.ConfigurationContext;
027: import org.apache.axis2.context.MessageContext;
028: import org.apache.axis2.context.ServiceContext;
029: import org.apache.axis2.context.ServiceGroupContext;
030: import org.apache.axis2.engine.AxisConfiguration;
031: import org.apache.commons.logging.Log;
032: import org.apache.commons.logging.LogFactory;
033:
034: import java.util.ArrayList;
035: import java.util.List;
036:
037: public final class Replicator {
038:
039: private static final Log log = LogFactory.getLog(Replicator.class);
040:
041: public static void replicate(MessageContext msgContext)
042: throws ClusteringFault {
043: if (!doReplication(msgContext)) {
044: return;
045: }
046: log.debug("Going to replicate state...");
047: try {
048: replicateState(msgContext);
049: } catch (Exception e) {
050: String message = "Could not replicate the state";
051: log.error(message, e);
052: throw new ClusteringFault(message, e);
053: }
054: }
055:
056: public static void replicate(AbstractContext abstractContext)
057: throws ClusteringFault {
058: if (!doReplication(abstractContext)) {
059: return;
060: }
061: log.debug("Going to replicate state...");
062: try {
063: replicateState(abstractContext);
064: } catch (Exception e) {
065: String message = "Could not replicate the state";
066: log.error(message, e);
067: throw new ClusteringFault(message, e);
068: }
069: }
070:
071: /**
072: * Do replication only if context replication is enabled.
073: * Also note that if there are no members, we need not do any replication
074: *
075: * @param abstractContext
076: * @return true - State needs to be replicated
077: * false - otherwise
078: */
079: private static boolean doReplication(AbstractContext abstractContext) {
080: ClusterManager clusterManager = abstractContext
081: .getRootContext().getAxisConfiguration()
082: .getClusterManager();
083: return clusterManager != null
084: && clusterManager.getContextManager() != null;
085: }
086:
087: private static void replicateState(AbstractContext abstractContext)
088: throws ClusteringFault {
089: ClusterManager clusterManager = abstractContext
090: .getRootContext().getAxisConfiguration()
091: .getClusterManager();
092: if (clusterManager != null) {
093: ContextManager contextManager = clusterManager
094: .getContextManager();
095: if (contextManager == null) {
096: String msg = "Cannot replicate contexts since "
097: + "ContextManager is not specified in the axis2.xml file.";
098: throw new ClusteringFault(msg);
099: }
100: if (!abstractContext.getPropertyDifferences().isEmpty()) {
101: String msgUUID = contextManager
102: .updateContext(abstractContext);
103: waitForACKs(contextManager, msgUUID, abstractContext
104: .getRootContext());
105: }
106: } else {
107: String msg = "Cannot replicate contexts since "
108: + "ClusterManager is not specified in the axis2.xml file.";
109: throw new ClusteringFault(msg);
110: }
111: }
112:
113: private static void replicateState(MessageContext msgContext)
114: throws ClusteringFault {
115: ConfigurationContext configurationContext = msgContext
116: .getConfigurationContext();
117: AxisConfiguration axisConfiguration = configurationContext
118: .getAxisConfiguration();
119: ClusterManager clusterManager = axisConfiguration
120: .getClusterManager();
121:
122: if (clusterManager != null) {
123:
124: ContextManager contextManager = clusterManager
125: .getContextManager();
126: if (contextManager == null) {
127: String msg = "Cannot replicate contexts since "
128: + "ContextManager is not specified in the axis2.xml file.";
129: throw new ClusteringFault(msg);
130: }
131:
132: List contexts = new ArrayList();
133:
134: // Do we need to replicate state stored in ConfigurationContext?
135: if (!configurationContext.getPropertyDifferences()
136: .isEmpty()) {
137: contexts.add(configurationContext);
138: }
139:
140: // Do we need to replicate state stored in ServiceGroupContext?
141: ServiceGroupContext sgContext = msgContext
142: .getServiceGroupContext();
143: if (sgContext != null
144: && !sgContext.getPropertyDifferences().isEmpty()) {
145: contexts.add(sgContext);
146: }
147:
148: // Do we need to replicate state stored in ServiceContext?
149: ServiceContext serviceContext = msgContext
150: .getServiceContext();
151: if (serviceContext != null
152: && !serviceContext.getPropertyDifferences()
153: .isEmpty()) {
154: contexts.add(serviceContext);
155: }
156:
157: // Do the actual replication here
158: if (!contexts.isEmpty()) {
159: AbstractContext[] contextArray = (AbstractContext[]) contexts
160: .toArray(new AbstractContext[contexts.size()]);
161: String msgUUID = contextManager
162: .updateContexts(contextArray);
163: waitForACKs(contextManager, msgUUID, msgContext
164: .getRootContext());
165: }
166:
167: } else {
168: String msg = "Cannot replicate contexts since "
169: + "ClusterManager is not specified in the axis2.xml file.";
170: throw new ClusteringFault(msg);
171: }
172: }
173:
174: private static void waitForACKs(ContextManager contextManager,
175: String msgUUID, ConfigurationContext configCtx)
176: throws ClusteringFault {
177: long start = System.currentTimeMillis();
178:
179: // Wait till all members have ACKed receipt & successful processing of
180: // the message with UUID 'msgUUID'
181: do {
182:
183: // Wait sometime before checking whether message is ACKed
184: try {
185: Long tts = (Long) configCtx
186: .getPropertyNonReplicable(ClusteringConstants.TIME_TO_SEND);
187: if (tts == null) {
188: Thread.sleep(5);
189: } else if (tts.longValue() >= 0) {
190: Thread.sleep(tts.longValue() + 5); // Time to recv ACK + time in queue & processing replication request
191: }
192: } catch (InterruptedException ignored) {
193: }
194: if (System.currentTimeMillis() - start > 45000) {
195: throw new ClusteringFault(
196: "ACKs not received from all members within 45 sec. "
197: + "Aborting wait.");
198: }
199: } while (!contextManager.isMessageAcknowledged(msgUUID));
200: }
201: }
|