001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2006 Continuent, Inc.
004: * Contact: sequoia@continuent.org
005: *
006: * Licensed under the Apache License, Version 2.0 (the "License");
007: * you may not use this file except in compliance with the License.
008: * You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing, software
013: * distributed under the License is distributed on an "AS IS" BASIS,
014: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015: * See the License for the specific language governing permissions and
016: * limitations under the License.
017: *
018: */package org.continuent.sequoia.controller.virtualdatabase;
019:
020: import java.io.InputStream;
021: import java.io.Serializable;
022: import java.util.Iterator;
023: import java.util.List;
024: import java.util.Map;
025: import java.util.Properties;
026:
027: import org.continuent.hedera.adapters.MessageListener;
028: import org.continuent.hedera.adapters.MulticastRequestAdapter;
029: import org.continuent.hedera.adapters.MulticastRequestListener;
030: import org.continuent.hedera.adapters.MulticastResponse;
031: import org.continuent.hedera.channel.AbstractReliableGroupChannel;
032: import org.continuent.hedera.common.GroupIdentifier;
033: import org.continuent.hedera.common.Member;
034: import org.continuent.hedera.factory.AbstractGroupCommunicationFactory;
035: import org.continuent.sequoia.common.log.Trace;
036: import org.continuent.sequoia.controller.virtualdatabase.activity.ActivityService;
037:
038: /**
039: * A PartitionReconciler can be used to "reconcile" members of a vdb after a
040: * network partition has been detected. This class creates a reconciliation
041: * channel, separate from the vdb channel, to exchange activity information
042: * between the reconciling members
043: *
044: * @see DistributedVirtualDatabase#networkPartition(GroupIdentifier, List)
045: * @see ActivityService
046: */
047: class PartitionReconciler implements MessageListener,
048: MulticastRequestListener {
049: private static final Trace logger = Trace
050: .getLogger("org.continuent.sequoia.controller.virtualdatabase");
051: private String vdbName;
052: private String groupName;
053: private Member member;
054: private String hederaPropertiesFile;
055: private AbstractReliableGroupChannel reconciliationChannel;
056: private MulticastRequestAdapter reconciliationMulticastRequestAdapter;
057:
058: PartitionReconciler(String vdbName, String groupName,
059: Member member, String hederaPropertiesFile) {
060: this .vdbName = vdbName;
061: this .groupName = groupName;
062: this .member = member;
063: this .hederaPropertiesFile = hederaPropertiesFile;
064: }
065:
066: void dispose() {
067: if (reconciliationChannel != null) {
068: reconciliationChannel.close();
069: }
070: if (reconciliationMulticastRequestAdapter != null) {
071: reconciliationMulticastRequestAdapter.stop();
072: }
073: }
074:
075: synchronized PartitionReconciliationStatus reconcileWith(
076: final Member other) throws Exception {
077: boolean activity = ActivityService.getInstance()
078: .hasActivitySinceUnreachable(vdbName,
079: other.getAddress());
080: if (logger.isInfoEnabled()) {
081: logger
082: .info("there has been "
083: + (activity ? "some " : "no ")
084: + "activity since member has been detected as unreachable ("
085: + other + ")");
086: }
087:
088: Properties p = new Properties();
089: InputStream is = this .getClass().getResourceAsStream(
090: hederaPropertiesFile);
091: p.load(is);
092: is.close();
093:
094: AbstractGroupCommunicationFactory groupCommFactory = (AbstractGroupCommunicationFactory) Class
095: .forName(p.getProperty("hedera.factory")).newInstance();
096: Object[] ret = groupCommFactory
097: .createChannelAndGroupMembershipService(p,
098: new GroupIdentifier(groupName
099: + "-reconciliation"));
100: reconciliationChannel = (AbstractReliableGroupChannel) ret[0];
101:
102: if (logger.isDebugEnabled()) {
103: logger.debug("join channel " + reconciliationChannel);
104: }
105: reconciliationChannel.join();
106: if (logger.isDebugEnabled()) {
107: logger.debug("joined channel " + reconciliationChannel);
108: }
109:
110: int timeToWaitForOtherMemberToJoinReconciliationChannel = 20 * 1000; // in ms
111: long start = System.currentTimeMillis();
112: while (reconciliationChannel.getGroup().getMembers().size() < 2) {
113: logger.debug("waiting to be 2 in the group "
114: + reconciliationChannel);
115: Thread.sleep(1000);
116: long now = System.currentTimeMillis();
117: if ((now - start) > timeToWaitForOtherMemberToJoinReconciliationChannel) {
118: break;
119: }
120: }
121: if (reconciliationChannel.getGroup().getMembers().size() < 2) {
122: logger
123: .error("waited"
124: + timeToWaitForOtherMemberToJoinReconciliationChannel
125: + "ms for other member"
126: + " to join the reconciliation group without any success. Spurious network partition is likely.");
127: // in that case we can do nothing else than treat it as a split brain
128: return PartitionReconciliationStatus.SPLIT_BRAIN;
129: }
130:
131: Thread.sleep(5 * 1000);
132:
133: reconciliationMulticastRequestAdapter = new MulticastRequestAdapter(
134: reconciliationChannel, // group channel
135: this , // MessageListener
136: this // MulticastRequestListener
137: );
138: reconciliationMulticastRequestAdapter.start();
139: if (logger.isDebugEnabled()) {
140: logger.debug("started multicast request adapter for group "
141: + reconciliationChannel.getGroup()
142: .getGroupIdentifier().getGroupName());
143: }
144:
145: PartitionReconciliationMessage msg = new PartitionReconciliationMessage(
146: activity, other);
147:
148: List members = reconciliationChannel.getGroup().getMembers();
149:
150: if (logger.isDebugEnabled()) {
151: logger.debug("send " + msg + " to " + members);
152: }
153: MulticastResponse response = reconciliationMulticastRequestAdapter
154: .multicastMessage(members, msg,
155: MulticastRequestAdapter.WAIT_ALL, 30 * 1000);
156: Map results = response.getResults();
157:
158: if (logger.isDebugEnabled()) {
159: logger.debug("results from " + msg + " : " + results);
160: }
161:
162: PartitionReconciliationStatus negociationStatus = decide(results);
163:
164: return negociationStatus;
165: }
166:
167: private PartitionReconciliationStatus decide(
168: Map/* <Member, Boolean> */results) {
169: if (results.size() == 0) {
170: // no other members
171: return PartitionReconciliationStatus.OTHER_ALONE_IN_THE_WORLD;
172: }
173: Iterator iter = results.entrySet().iterator();
174: while (iter.hasNext()) {
175: Map.Entry entry = (Map.Entry) iter.next();
176: Member m = (Member) entry.getKey();
177: if (entry.getValue() == null) {
178: continue;
179: }
180: boolean otherActivity = ((Boolean) entry.getValue())
181: .booleanValue();
182: boolean ownActivity = ActivityService.getInstance()
183: .hasActivitySinceUnreachable(vdbName,
184: m.getAddress());
185: if (!otherActivity && !ownActivity) {
186: return PartitionReconciliationStatus.NO_ACTIVITY;
187: }
188: if (otherActivity && ownActivity) {
189: return PartitionReconciliationStatus.SPLIT_BRAIN;
190: }
191: if (otherActivity && !ownActivity) {
192: return PartitionReconciliationStatus.ALONE_IN_THE_WORLD;
193: }
194: if (!otherActivity && ownActivity) {
195: return PartitionReconciliationStatus.OTHER_ALONE_IN_THE_WORLD;
196: }
197: }
198: throw new IllegalStateException(
199: "Unable to decide of a possible reconciliation status");
200: }
201:
202: /**
203: * {@inheritDoc}
204: */
205: public void receive(Serializable ser) {
206: if (logger.isWarnEnabled()) {
207: logger.warn("Unexpected message received: " + ser);
208: }
209: }
210:
211: /**
212: * {@inheritDoc}
213: */
214: public Serializable handleMessageMultiThreaded(Serializable obj,
215: Member member, Object result) {
216: return (Serializable) result;
217: }
218:
219: /**
220: * {@inheritDoc}
221: */
222: public Object handleMessageSingleThreaded(Serializable obj, Member m) {
223: if (obj instanceof PartitionReconciliationMessage) {
224: PartitionReconciliationMessage msg = (PartitionReconciliationMessage) obj;
225: if (logger.isDebugEnabled()) {
226: logger.debug("received " + msg + " from " + m);
227: }
228: if (!msg.other.getAddress()
229: .equals(this .member.getAddress())) {
230: if (logger.isDebugEnabled()) {
231: logger
232: .debug("reconciliation message is not for member "
233: + m.getAddress());
234: }
235: return null;
236: }
237: boolean hadActivity = ActivityService.getInstance()
238: .hasActivitySinceUnreachable(vdbName,
239: m.getAddress());
240: if (logger.isInfoEnabled()) {
241: StringBuffer buff = new StringBuffer("own activity = "
242: + hadActivity);
243: buff.append(", activity = " + msg.activity + " on "
244: + m.getAddress());
245: logger.info(buff);
246: }
247: return Boolean.valueOf(hadActivity);
248: }
249: return null;
250: }
251:
252: /**
253: * {@inheritDoc}
254: */
255: public void cancelMessage(Serializable msg) {
256: // Does not do anything special.
257: }
258:
259: }
260:
261: class PartitionReconciliationMessage implements Serializable {
262: private static final long serialVersionUID = 1L;
263:
264: final boolean activity;
265: final Member other;
266:
267: PartitionReconciliationMessage(boolean activity, Member other) {
268: this .activity = activity;
269: this .other = other;
270: }
271:
272: public String toString() {
273: return "PartitionReconciliationMessage[activity=" + activity
274: + ", other=" + other + "]";
275: }
276: }
|