001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: */
016: package org.apache.catalina.tribes.group.interceptors;
017:
018: import java.net.InetAddress;
019: import java.net.InetSocketAddress;
020: import java.net.Socket;
021: import java.net.SocketTimeoutException;
022: import java.util.Arrays;
023: import java.util.HashMap;
024:
025: import org.apache.catalina.tribes.Channel;
026: import org.apache.catalina.tribes.ChannelException;
027: import org.apache.catalina.tribes.ChannelException.FaultyMember;
028: import org.apache.catalina.tribes.ChannelMessage;
029: import org.apache.catalina.tribes.Member;
030: import org.apache.catalina.tribes.RemoteProcessException;
031: import org.apache.catalina.tribes.group.ChannelInterceptorBase;
032: import org.apache.catalina.tribes.group.InterceptorPayload;
033: import org.apache.catalina.tribes.io.ChannelData;
034: import org.apache.catalina.tribes.io.XByteBuffer;
035: import org.apache.catalina.tribes.membership.MemberImpl;
036: import org.apache.catalina.tribes.membership.Membership;
037: import java.net.ConnectException;
038:
039: /**
040: * <p>Title: A perfect failure detector </p>
041: *
042: * <p>Description: The TcpFailureDetector is a useful interceptor
043: * that adds reliability to the membership layer.</p>
044: * <p>
045: * If the network is busy, or the system is busy so that the membership receiver thread
046: * is not getting enough time to update its table, members can be "timed out"
047: * This failure detector will intercept the memberDisappeared message(unless its a true shutdown message)
048: * and connect to the member using TCP.
049: * </p>
050: * <p>
051: * The TcpFailureDetector works in two ways. <br>
052: * 1. It intercepts memberDisappeared events
053: * 2. It catches send errors
054: * </p>
055: *
056: * @author Filip Hanik
057: * @version 1.0
058: */
059: public class TcpFailureDetector extends ChannelInterceptorBase {
060:
061: private static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory
062: .getLog(TcpFailureDetector.class);
063:
064: protected static byte[] TCP_FAIL_DETECT = new byte[] { 79, -89,
065: 115, 72, 121, -126, 67, -55, -97, 111, -119, -128, -95, 91,
066: 7, 20, 125, -39, 82, 91, -21, -15, 67, -102, -73, 126, -66,
067: -113, -127, 103, 30, -74, 55, 21, -66, -121, 69, 126, 76,
068: -88, -65, 10, 77, 19, 83, 56, 21, 50, 85, -10, -108, -73,
069: 58, -6, 64, 120, -111, 4, 125, -41, 114, -124, -64, -43 };
070:
071: protected boolean performConnectTest = true;
072:
073: protected long connectTimeout = 1000;//1 second default
074:
075: protected boolean performSendTest = true;
076:
077: protected boolean performReadTest = false;
078:
079: protected long readTestTimeout = 5000;//5 seconds
080:
081: protected Membership membership = null;
082:
083: protected HashMap removeSuspects = new HashMap();
084:
085: protected HashMap addSuspects = new HashMap();
086:
087: public void sendMessage(Member[] destination, ChannelMessage msg,
088: InterceptorPayload payload) throws ChannelException {
089: try {
090: super .sendMessage(destination, msg, payload);
091: } catch (ChannelException cx) {
092: FaultyMember[] mbrs = cx.getFaultyMembers();
093: for (int i = 0; i < mbrs.length; i++) {
094: if (mbrs[i].getCause() != null
095: && (!(mbrs[i].getCause() instanceof RemoteProcessException))) {//RemoteProcessException's are ok
096: this .memberDisappeared(mbrs[i].getMember());
097: }//end if
098: }//for
099: throw cx;
100: }
101: }
102:
103: public void messageReceived(ChannelMessage msg) {
104: //catch incoming
105: boolean process = true;
106: if (okToProcess(msg.getOptions())) {
107: //check to see if it is a testMessage, if so, process = false
108: process = ((msg.getMessage().getLength() != TCP_FAIL_DETECT.length) || (!Arrays
109: .equals(TCP_FAIL_DETECT, msg.getMessage()
110: .getBytes())));
111: }//end if
112:
113: //ignore the message, it doesnt have the flag set
114: if (process)
115: super .messageReceived(msg);
116: else if (log.isDebugEnabled())
117: log.debug("Received a failure detector packet:" + msg);
118: }//messageReceived
119:
120: public void memberAdded(Member member) {
121: if (membership == null)
122: setupMembership();
123: boolean notify = false;
124: synchronized (membership) {
125: if (removeSuspects.containsKey(member)) {
126: //previously marked suspect, system below picked up the member again
127: removeSuspects.remove(member);
128: } else if (membership.getMember((MemberImpl) member) == null) {
129: //if we add it here, then add it upwards too
130: //check to see if it is alive
131: if (memberAlive(member)) {
132: membership.memberAlive((MemberImpl) member);
133: notify = true;
134: } else {
135: addSuspects.put(member, new Long(System
136: .currentTimeMillis()));
137: }
138: }
139: }
140: if (notify)
141: super .memberAdded(member);
142: }
143:
144: public void memberDisappeared(Member member) {
145: if (membership == null)
146: setupMembership();
147: boolean notify = false;
148: boolean shutdown = Arrays.equals(member.getCommand(),
149: Member.SHUTDOWN_PAYLOAD);
150: if (!shutdown)
151: if (log.isInfoEnabled())
152: log.info("Received memberDisappeared[" + member
153: + "] message. Will verify.");
154: synchronized (membership) {
155: //check to see if the member really is gone
156: //if the payload is not a shutdown message
157: if (shutdown || !memberAlive(member)) {
158: //not correct, we need to maintain the map
159: membership.removeMember((MemberImpl) member);
160: removeSuspects.remove(member);
161: notify = true;
162: } else {
163: //add the member as suspect
164: removeSuspects.put(member, new Long(System
165: .currentTimeMillis()));
166: }
167: }
168: if (notify) {
169: if (log.isInfoEnabled())
170: log.info("Verification complete. Member disappeared["
171: + member + "]");
172: super .memberDisappeared(member);
173: } else {
174: if (log.isInfoEnabled())
175: log.info("Verification complete. Member still alive["
176: + member + "]");
177:
178: }
179: }
180:
181: public boolean hasMembers() {
182: if (membership == null)
183: setupMembership();
184: return membership.hasMembers();
185: }
186:
187: public Member[] getMembers() {
188: if (membership == null)
189: setupMembership();
190: return membership.getMembers();
191: }
192:
193: public Member getMember(Member mbr) {
194: if (membership == null)
195: setupMembership();
196: return membership.getMember(mbr);
197: }
198:
199: public Member getLocalMember(boolean incAlive) {
200: return super .getLocalMember(incAlive);
201: }
202:
203: public void heartbeat() {
204: super .heartbeat();
205: checkMembers(false);
206: }
207:
208: public void checkMembers(boolean checkAll) {
209:
210: try {
211: if (membership == null)
212: setupMembership();
213: synchronized (membership) {
214: if (!checkAll)
215: performBasicCheck();
216: else
217: performForcedCheck();
218: }
219: } catch (Exception x) {
220: log
221: .warn(
222: "Unable to perform heartbeat on the TcpFailureDetector.",
223: x);
224: } finally {
225:
226: }
227: }
228:
229: protected void performForcedCheck() {
230: //update all alive times
231: Member[] members = super .getMembers();
232: for (int i = 0; members != null && i < members.length; i++) {
233: if (memberAlive(members[i])) {
234: if (membership.memberAlive((MemberImpl) members[i]))
235: super .memberAdded(members[i]);
236: addSuspects.remove(members[i]);
237: } else {
238: if (membership.getMember(members[i]) != null) {
239: membership.removeMember((MemberImpl) members[i]);
240: removeSuspects.remove(members[i]);
241: super .memberDisappeared((MemberImpl) members[i]);
242: }
243: } //end if
244: } //for
245:
246: }
247:
248: protected void performBasicCheck() {
249: //update all alive times
250: Member[] members = super .getMembers();
251: for (int i = 0; members != null && i < members.length; i++) {
252: if (membership.memberAlive((MemberImpl) members[i])) {
253: //we don't have this one in our membership, check to see if he/she is alive
254: if (memberAlive(members[i])) {
255: log
256: .warn("Member added, even though we werent notified:"
257: + members[i]);
258: super .memberAdded(members[i]);
259: } else {
260: membership.removeMember((MemberImpl) members[i]);
261: } //end if
262: } //end if
263: } //for
264:
265: //check suspect members if they are still alive,
266: //if not, simply issue the memberDisappeared message
267: MemberImpl[] keys = (MemberImpl[]) removeSuspects.keySet()
268: .toArray(new MemberImpl[removeSuspects.size()]);
269: for (int i = 0; i < keys.length; i++) {
270: MemberImpl m = (MemberImpl) keys[i];
271: if (membership.getMember(m) != null && (!memberAlive(m))) {
272: membership.removeMember(m);
273: super .memberDisappeared(m);
274: removeSuspects.remove(m);
275: if (log.isInfoEnabled())
276: log.info("Suspect member, confirmed dead.[" + m
277: + "]");
278: } //end if
279: }
280:
281: //check add suspects members if they are alive now,
282: //if they are, simply issue the memberAdded message
283: keys = (MemberImpl[]) addSuspects.keySet().toArray(
284: new MemberImpl[addSuspects.size()]);
285: for (int i = 0; i < keys.length; i++) {
286: MemberImpl m = (MemberImpl) keys[i];
287: if (membership.getMember(m) == null && (memberAlive(m))) {
288: membership.memberAlive(m);
289: super .memberAdded(m);
290: addSuspects.remove(m);
291: if (log.isInfoEnabled())
292: log.info("Suspect member, confirmed alive.[" + m
293: + "]");
294: } //end if
295: }
296: }
297:
298: protected synchronized void setupMembership() {
299: if (membership == null) {
300: membership = new Membership((MemberImpl) super
301: .getLocalMember(true));
302: }
303:
304: }
305:
306: protected boolean memberAlive(Member mbr) {
307: return memberAlive(mbr, TCP_FAIL_DETECT, performSendTest,
308: performReadTest, readTestTimeout, connectTimeout,
309: getOptionFlag());
310: }
311:
312: protected static boolean memberAlive(Member mbr, byte[] msgData,
313: boolean sendTest, boolean readTest, long readTimeout,
314: long conTimeout, int optionFlag) {
315: //could be a shutdown notification
316: if (Arrays.equals(mbr.getCommand(), Member.SHUTDOWN_PAYLOAD))
317: return false;
318:
319: Socket socket = new Socket();
320: try {
321: InetAddress ia = InetAddress.getByAddress(mbr.getHost());
322: InetSocketAddress addr = new InetSocketAddress(ia, mbr
323: .getPort());
324: socket.setSoTimeout((int) readTimeout);
325: socket.connect(addr, (int) conTimeout);
326: if (sendTest) {
327: ChannelData data = new ChannelData(true);
328: data.setAddress(mbr);
329: data.setMessage(new XByteBuffer(msgData, false));
330: data.setTimestamp(System.currentTimeMillis());
331: int options = optionFlag
332: | Channel.SEND_OPTIONS_BYTE_MESSAGE;
333: if (readTest)
334: options = (options | Channel.SEND_OPTIONS_USE_ACK);
335: else
336: options = (options & (~Channel.SEND_OPTIONS_USE_ACK));
337: data.setOptions(options);
338: byte[] message = XByteBuffer.createDataPackage(data);
339: socket.getOutputStream().write(message);
340: if (readTest) {
341: int length = socket.getInputStream().read(message);
342: return length > 0;
343: }
344: }//end if
345: return true;
346: } catch (SocketTimeoutException sx) {
347: //do nothing, we couldn't connect
348: } catch (ConnectException cx) {
349: //do nothing, we couldn't connect
350: } catch (Exception x) {
351: log
352: .error(
353: "Unable to perform failure detection check, assuming member down.",
354: x);
355: } finally {
356: try {
357: socket.close();
358: } catch (Exception ignore) {
359: }
360: }
361: return false;
362: }
363:
364: }
|