001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */
019: package org.apache.synapse.endpoints;
020:
021: import org.apache.axis2.clustering.ClusteringFault;
022: import org.apache.axis2.clustering.context.Replicator;
023: import org.apache.axis2.context.ConfigurationContext;
024: import org.apache.commons.logging.Log;
025: import org.apache.commons.logging.LogFactory;
026: import org.apache.synapse.SynapseException;
027:
028: /**
029: * Keeps the states of the endpoint.This hides where those states are kept .For a cluster environment
030: * ,all states are kept in the axis2 configuration context in order to replicate those states so that
031: * other synapse instance in the same cluster can see those changes . This class can be evolved to
032: * keep any run time states related to the endpoint .For a non-clustered environment ,
033: * all data are kept locally.
034: * <p/>
035: * This class provide the abstraction need to separate the dynamic data from the static data and
036: * improve the high cohesion and provides capability to replicate only required state at
037: * a given time. This improves the performance when replicate data.
038: */
039: public class EndpointContext {
040:
041: private static final Log log = LogFactory
042: .getLog(EndpointContext.class);
043:
044: /* The static constant only for construct key prefix for each property in endpoint context
045: *as it is need when those property state going to replicate in a cluster env. */
046: private static final String ACTIVE = "active";
047: private static final String RECOVER_ON = "recover_on";
048: private static final String UNDERSCORE_STRING = "_";
049:
050: /* Determines if this endpoint is active or not. This variable have to be loaded always from the
051: * memory as multiple threads could access it.*/
052: private boolean active = true;
053:
054: /* Time to recover a failed endpoint.*/
055: private long recoverOn = Long.MAX_VALUE;
056:
057: /*The axis configuration context- this will hold the all callers states
058: * when doing throttling in a clustered environment. */
059: private ConfigurationContext configCtx;
060:
061: /*The key for 'active' attribute and this is used when this attribute value being replicated */
062: private String activePropertyKey;
063: /*The key for 'recoverOn' attribute and this is used when this attribute value being replicated */
064: private String recoverOnPropertyKey;
065:
066: /* Is this env. support clustering*/
067: private boolean isClusteringEnable = false;
068:
069: /**
070: * Checks if the endpoint is active (failed or not)
071: *
072: * @return Returns true if the endpoint is active , otherwise , false will be returned
073: */
074: public boolean isActive() {
075:
076: if (this .isClusteringEnable) { // if this is a clustering env.
077:
078: if (this .activePropertyKey == null
079: || "".equals(this .activePropertyKey)) {
080: handleException("Cannot find the required key to find the "
081: + "shared state of 'active' attribute");
082: }
083:
084: // gets the value from configuration context (The shared state across all instances )
085: Object value = this .configCtx
086: .getPropertyNonReplicable(this .activePropertyKey);
087: if (value == null) {
088: return true;
089: }
090: if (value instanceof Boolean) {
091: return ((Boolean) value).booleanValue();
092: } else if (value instanceof String) {
093: return Boolean.parseBoolean((String) value);
094: } else {
095: handleException("Unsupported object type for value"
096: + value);
097: }
098:
099: } else {
100: return active;
101: }
102:
103: throw new SynapseException("Invalid states in endpoint context");
104:
105: }
106:
107: /**
108: * Sets if endpoint active or not.
109: *
110: * @param active True for make endpoint active , false for make it inactive
111: */
112: public synchronized void setActive(boolean active) {
113:
114: if (this .isClusteringEnable) { // if this is a clustering env.
115: // replicates the state so that all instances across cluster can see this state
116: setAndReplicateState(this .activePropertyKey, active);
117: } else {
118: this .active = active;
119: }
120:
121: }
122:
123: /**
124: * Time to recover a failed endpoint.
125: *
126: * @return Returns time to recover a failed endpoint.
127: */
128: public long getRecoverOn() {
129:
130: if (this .isClusteringEnable) { // if this is a clustering env.
131:
132: if (this .recoverOnPropertyKey == null
133: || "".equals(this .recoverOnPropertyKey)) {
134: handleException("Cannot find the required key to find the "
135: + "shared state of 'recoveOn' attribute");
136: }
137:
138: // gets the value from configuration context (The shared state across all instances )
139: Object value = this .configCtx
140: .getPropertyNonReplicable(this .recoverOnPropertyKey);
141: if (value == null) {
142: return Long.MAX_VALUE;
143: }
144: if (value instanceof Long) {
145: return ((Long) value).longValue();
146: } else if (value instanceof String) {
147: try {
148: return Long.parseLong((String) value);
149: } catch (NumberFormatException e) {
150: return Long.MAX_VALUE;
151: }
152: } else {
153: handleException("Unsupported object type for value"
154: + value);
155: }
156:
157: } else {
158: return recoverOn;
159: }
160: throw new SynapseException("Invalid states in endpoint context");
161: }
162:
163: /**
164: * Sets time to recover a failed endpoint.
165: *
166: * @param recoverOn The value for recover time
167: */
168: public void setRecoverOn(long recoverOn) {
169:
170: if (this .isClusteringEnable) { // if this is a clustering env.
171: // replicates the state so that all instances across cluster can see this state
172: setAndReplicateState(this .recoverOnPropertyKey, recoverOn);
173: } else {
174: this .recoverOn = recoverOn;
175: }
176: }
177:
178: /**
179: * Get the configuration context instance . This is only available for cluster env.
180: *
181: * @return Returns the ConfigurationContext instance
182: */
183: public ConfigurationContext getConfigurationContext() {
184: return configCtx;
185: }
186:
187: /**
188: * Sets the ConfigurationContext instance . This is only used for cluster env.
189: * By setting this , indicates that this is a cluster env.
190: *
191: * @param configCtx The ConfigurationContext instance
192: */
193: public void setConfigurationContext(ConfigurationContext configCtx) {
194:
195: if (configCtx == null) {
196: handleException("The ConfigurationContext cannot be null"
197: + " when system in a cluster environment");
198: }
199:
200: this .configCtx = configCtx;
201: this .isClusteringEnable = true; // Now, the environment is considered as a cluster
202: }
203:
204: /**
205: * Sets the identifier for this endpoint context , so that , this can be identified
206: * uniquely across the cluster. The id will be the name of the endpoint
207: *
208: * @param contextID The Id for this endpoint context
209: */
210: public void setContextID(String contextID) {
211:
212: if (contextID == null || "".equals(contextID)) {
213: handleException("The Context ID cannot be null when system in a cluster environment");
214: }
215:
216: //Making required key for each property in the endpoint context - Those will be used when
217: //replicating states
218: StringBuffer buffer = new StringBuffer();
219: buffer.append(contextID);
220: buffer.append(UNDERSCORE_STRING);
221: String prefix = buffer.toString();
222:
223: this .recoverOnPropertyKey = prefix + RECOVER_ON;
224: this .activePropertyKey = prefix + ACTIVE;
225:
226: }
227:
228: /**
229: * Helper method to replicates states of the property with given key
230: * replicates the given state so that all instances across cluster can see this state
231: *
232: * @param key The key of the property
233: * @param value The value of the property
234: */
235: private void setAndReplicateState(String key, Object value) {
236:
237: if (configCtx != null && key != null && value != null) {
238:
239: try {
240: if (log.isDebugEnabled()) {
241: log
242: .debug("Going to replicate the property with key : "
243: + key + " value : " + value);
244: }
245:
246: configCtx.setProperty(key, value);
247: Replicator.replicate(configCtx, new String[] { key });
248:
249: } catch (ClusteringFault clusteringFault) {
250: handleException("Error during the replicating states ",
251: clusteringFault);
252: }
253: }
254: }
255:
256: /**
257: * Helper methods for handle errors.
258: *
259: * @param msg The error message
260: */
261: protected void handleException(String msg) {
262:
263: log.error(msg);
264: throw new SynapseException(msg);
265: }
266:
267: /**
268: * Helper methods for handle errors.
269: *
270: * @param msg The error message
271: * @param e The exception
272: */
273: protected void handleException(String msg, Exception e) {
274:
275: log.error(msg, e);
276: throw new SynapseException(msg, e);
277: }
278: }
|