001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017:
018: package org.apache.catalina.tribes.group.interceptors;
019:
020: import java.lang.ref.WeakReference;
021: import java.util.Arrays;
022: import java.util.concurrent.atomic.AtomicInteger;
023:
024: import org.apache.catalina.tribes.ChannelException;
025: import org.apache.catalina.tribes.ChannelInterceptor;
026: import org.apache.catalina.tribes.ChannelMessage;
027: import org.apache.catalina.tribes.Member;
028: import org.apache.catalina.tribes.group.ChannelInterceptorBase;
029: import org.apache.catalina.tribes.io.ChannelData;
030:
031: /**
032: *
033: * Sends a ping to all members.
034: * Configure this interceptor with the TcpFailureDetector below it,
035: * and the TcpFailureDetector will act as the membership guide.
036: * @author Filip Hanik
037: * @version 1.0
038: */
039:
040: public class TcpPingInterceptor extends ChannelInterceptorBase {
041:
042: protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory
043: .getLog(TcpPingInterceptor.class);
044:
045: protected static byte[] TCP_PING_DATA = new byte[] { 79, -89, 115,
046: 72, 121, -33, 67, -55, -97, 111, -119, -128, -95, 91, 7,
047: 20, 125, -39, 82, 91, -21, -33, 67, -102, -73, 126, -66,
048: -113, -127, 103, 30, -74, 55, 21, -66, -121, 69, 33, 76,
049: -88, -65, 10, 77, 19, 83, 56, 21, 50, 85, -10, -108, -73,
050: 58, -33, 33, 120, -111, 4, 125, -41, 114, -124, -64, -43 };
051:
052: protected long interval = 1000; //1 second
053:
054: protected boolean useThread = false;
055: protected boolean staticOnly = false;
056: protected boolean running = true;
057: protected PingThread thread = null;
058: protected static AtomicInteger cnt = new AtomicInteger(0);
059:
060: WeakReference<TcpFailureDetector> failureDetector = null;
061: WeakReference<StaticMembershipInterceptor> staticMembers = null;
062:
063: public synchronized void start(int svc) throws ChannelException {
064: super .start(svc);
065: running = true;
066: if (thread == null) {
067: thread = new PingThread();
068: thread.setDaemon(true);
069: thread.setName("TcpPingInterceptor.PingThread-"
070: + cnt.addAndGet(1));
071: thread.start();
072: }
073:
074: //acquire the interceptors to invoke on send ping events
075: ChannelInterceptor next = getNext();
076: while (next != null) {
077: if (next instanceof TcpFailureDetector)
078: failureDetector = new WeakReference<TcpFailureDetector>(
079: (TcpFailureDetector) next);
080: if (next instanceof StaticMembershipInterceptor)
081: staticMembers = new WeakReference<StaticMembershipInterceptor>(
082: (StaticMembershipInterceptor) next);
083: next = next.getNext();
084: }
085:
086: }
087:
088: public void stop(int svc) throws ChannelException {
089: running = false;
090: if (thread != null)
091: thread.interrupt();
092: thread = null;
093: super .stop(svc);
094: }
095:
096: public void heartbeat() {
097: super .heartbeat();
098: if (!getUseThread())
099: sendPing();
100: }
101:
102: public long getInterval() {
103: return interval;
104: }
105:
106: public void setInterval(long interval) {
107: this .interval = interval;
108: }
109:
110: public void setUseThread(boolean useThread) {
111: this .useThread = useThread;
112: }
113:
114: public void setStaticOnly(boolean staticOnly) {
115: this .staticOnly = staticOnly;
116: }
117:
118: public boolean getUseThread() {
119: return useThread;
120: }
121:
122: public boolean getStaticOnly() {
123: return staticOnly;
124: }
125:
126: protected void sendPing() {
127: if (failureDetector.get() != null) {
128: //we have a reference to the failure detector
129: //piggy back on that dude
130: failureDetector.get().checkMembers(true);
131: } else {
132: if (staticOnly && staticMembers.get() != null) {
133: sendPingMessage(staticMembers.get().getMembers());
134: } else {
135: sendPingMessage(getMembers());
136: }
137: }
138: }
139:
140: protected void sendPingMessage(Member[] members) {
141: if (members == null || members.length == 0)
142: return;
143: ChannelData data = new ChannelData(true);//generates a unique Id
144: data.setAddress(getLocalMember(false));
145: data.setTimestamp(System.currentTimeMillis());
146: data.setOptions(getOptionFlag());
147: try {
148: super .sendMessage(members, data, null);
149: } catch (ChannelException x) {
150: log.warn("Unable to send TCP ping.", x);
151: }
152: }
153:
154: public void messageReceived(ChannelMessage msg) {
155: //catch incoming
156: boolean process = true;
157: if (okToProcess(msg.getOptions())) {
158: //check to see if it is a ping message, if so, process = false
159: process = ((msg.getMessage().getLength() != TCP_PING_DATA.length) || (!Arrays
160: .equals(TCP_PING_DATA, msg.getMessage().getBytes())));
161: }//end if
162:
163: //ignore the message, it doesnt have the flag set
164: if (process)
165: super .messageReceived(msg);
166: else if (log.isDebugEnabled())
167: log.debug("Received a TCP ping packet:" + msg);
168: }//messageReceived
169:
170: protected class PingThread extends Thread {
171: public void run() {
172: while (running) {
173: try {
174: sleep(interval);
175: sendPing();
176: } catch (InterruptedException ix) {
177: interrupted();
178: } catch (Exception x) {
179: log
180: .warn(
181: "Unable to send ping from TCP ping thread.",
182: x);
183: }
184: }
185: }
186: }
187:
188: }
|