001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017:
018: package org.apache.catalina.tribes.membership;
019:
020: import java.util.ArrayList;
021: import java.util.Arrays;
022: import java.util.HashMap;
023: import java.util.Iterator;
024:
025: import org.apache.catalina.tribes.Member;
026: import java.util.Comparator;
027:
028: /**
029: * A <b>membership</b> implementation using simple multicast.
030: * This is the representation of a multicast membership.
031: * This class is responsible for maintaining a list of active cluster nodes in the cluster.
032: * If a node fails to send out a heartbeat, the node will be dismissed.
033: *
034: * @author Filip Hanik
035: * @author Peter Rossbach
036: * @version $Revision: 500684 $, $Date: 2007-01-28 00:27:18 +0100 (dim., 28 janv. 2007) $
037: */
038: public class Membership {
039: protected static final MemberImpl[] EMPTY_MEMBERS = new MemberImpl[0];
040:
041: /**
042: * The name of this membership, has to be the same as the name for the local
043: * member
044: */
045: protected MemberImpl local;
046:
047: /**
048: * A map of all the members in the cluster.
049: */
050: protected HashMap map = new HashMap();
051:
052: /**
053: * A list of all the members in the cluster.
054: */
055: protected MemberImpl[] members = EMPTY_MEMBERS;
056:
057: /**
058: * sort members by alive time
059: */
060: protected Comparator memberComparator = new MemberComparator();
061:
062: public Object clone() {
063: synchronized (members) {
064: Membership clone = new Membership(local, memberComparator);
065: clone.map = (HashMap) map.clone();
066: clone.members = new MemberImpl[members.length];
067: System.arraycopy(members, 0, clone.members, 0,
068: members.length);
069: return clone;
070: }
071: }
072:
073: /**
074: * Constructs a new membership
075: * @param name - has to be the name of the local member. Used to filter the local member from the cluster membership
076: */
077: public Membership(MemberImpl local, boolean includeLocal) {
078: this .local = local;
079: if (includeLocal)
080: addMember(local);
081: }
082:
083: public Membership(MemberImpl local) {
084: this (local, false);
085: }
086:
087: public Membership(MemberImpl local, Comparator comp) {
088: this (local, comp, false);
089: }
090:
091: public Membership(MemberImpl local, Comparator comp,
092: boolean includeLocal) {
093: this (local, includeLocal);
094: this .memberComparator = comp;
095: }
096:
097: /**
098: * Reset the membership and start over fresh.
099: * Ie, delete all the members and wait for them to ping again and join this membership
100: */
101: public synchronized void reset() {
102: map.clear();
103: members = EMPTY_MEMBERS;
104: }
105:
106: /**
107: * Notify the membership that this member has announced itself.
108: *
109: * @param member - the member that just pinged us
110: * @return - true if this member is new to the cluster, false otherwise.
111: * @return - false if this member is the local member or updated.
112: */
113: public synchronized boolean memberAlive(MemberImpl member) {
114: boolean result = false;
115: //ignore ourselves
116: if (member.equals(local))
117: return result;
118:
119: //return true if the membership has changed
120: MbrEntry entry = (MbrEntry) map.get(member);
121: if (entry == null) {
122: entry = addMember(member);
123: result = true;
124: } else {
125: //update the member alive time
126: MemberImpl updateMember = entry.getMember();
127: if (updateMember.getMemberAliveTime() != member
128: .getMemberAliveTime()) {
129: //update fields that can change
130: updateMember.setMemberAliveTime(member
131: .getMemberAliveTime());
132: updateMember.setPayload(member.getPayload());
133: updateMember.setCommand(member.getCommand());
134: Arrays.sort(members, memberComparator);
135: }
136: }
137: entry.accessed();
138: return result;
139: }
140:
141: /**
142: * Add a member to this component and sort array with memberComparator
143: * @param member The member to add
144: */
145: public synchronized MbrEntry addMember(MemberImpl member) {
146: synchronized (members) {
147: MbrEntry entry = new MbrEntry(member);
148: if (!map.containsKey(member)) {
149: map.put(member, entry);
150: MemberImpl results[] = new MemberImpl[members.length + 1];
151: for (int i = 0; i < members.length; i++)
152: results[i] = members[i];
153: results[members.length] = member;
154: members = results;
155: Arrays.sort(members, memberComparator);
156: }
157: return entry;
158: }
159: }
160:
161: /**
162: * Remove a member from this component.
163: *
164: * @param member The member to remove
165: */
166: public void removeMember(MemberImpl member) {
167: map.remove(member);
168: synchronized (members) {
169: int n = -1;
170: for (int i = 0; i < members.length; i++) {
171: if (members[i] == member || members[i].equals(member)) {
172: n = i;
173: break;
174: }
175: }
176: if (n < 0)
177: return;
178: MemberImpl results[] = new MemberImpl[members.length - 1];
179: int j = 0;
180: for (int i = 0; i < members.length; i++) {
181: if (i != n)
182: results[j++] = members[i];
183: }
184: members = results;
185: }
186: }
187:
188: /**
189: * Runs a refresh cycle and returns a list of members that has expired.
190: * This also removes the members from the membership, in such a way that
191: * getMembers() = getMembers() - expire()
192: * @param maxtime - the max time a member can remain unannounced before it is considered dead.
193: * @return the list of expired members
194: */
195: public synchronized MemberImpl[] expire(long maxtime) {
196: if (!hasMembers())
197: return EMPTY_MEMBERS;
198:
199: ArrayList list = null;
200: Iterator i = map.values().iterator();
201: while (i.hasNext()) {
202: MbrEntry entry = (MbrEntry) i.next();
203: if (entry.hasExpired(maxtime)) {
204: if (list == null) // only need a list when members are expired (smaller gc)
205: list = new java.util.ArrayList();
206: list.add(entry.getMember());
207: }
208: }
209:
210: if (list != null) {
211: MemberImpl[] result = new MemberImpl[list.size()];
212: list.toArray(result);
213: for (int j = 0; j < result.length; j++) {
214: removeMember(result[j]);
215: }
216: return result;
217: } else {
218: return EMPTY_MEMBERS;
219: }
220: }
221:
222: /**
223: * Returning that service has members or not
224: */
225: public boolean hasMembers() {
226: return members.length > 0;
227: }
228:
229: public MemberImpl getMember(Member mbr) {
230: if (hasMembers()) {
231: MemberImpl result = null;
232: for (int i = 0; i < this .members.length && result == null; i++) {
233: if (members[i].equals(mbr))
234: result = members[i];
235: }//for
236: return result;
237: } else {
238: return null;
239: }
240: }
241:
242: public boolean contains(Member mbr) {
243: return getMember(mbr) != null;
244: }
245:
246: /**
247: * Returning a list of all the members in the membership
248: * We not need a copy: add and remove generate new arrays.
249: */
250: public MemberImpl[] getMembers() {
251: if (hasMembers()) {
252: return members;
253: } else {
254: return EMPTY_MEMBERS;
255: }
256: }
257:
258: /**
259: * get a copy from all member entries
260: */
261: protected synchronized MbrEntry[] getMemberEntries() {
262: MbrEntry[] result = new MbrEntry[map.size()];
263: java.util.Iterator i = map.entrySet().iterator();
264: int pos = 0;
265: while (i.hasNext())
266: result[pos++] = ((MbrEntry) ((java.util.Map.Entry) i.next())
267: .getValue());
268: return result;
269: }
270:
271: // --------------------------------------------- Inner Class
272:
273: private class MemberComparator implements java.util.Comparator {
274:
275: public int compare(Object o1, Object o2) {
276: try {
277: return compare((MemberImpl) o1, (MemberImpl) o2);
278: } catch (ClassCastException x) {
279: return 0;
280: }
281: }
282:
283: public int compare(MemberImpl m1, MemberImpl m2) {
284: //longer alive time, means sort first
285: long result = m2.getMemberAliveTime()
286: - m1.getMemberAliveTime();
287: if (result < 0)
288: return -1;
289: else if (result == 0)
290: return 0;
291: else
292: return 1;
293: }
294: }
295:
296: /**
297: * Inner class that represents a member entry
298: */
299: protected static class MbrEntry {
300:
301: protected MemberImpl mbr;
302: protected long lastHeardFrom;
303:
304: public MbrEntry(MemberImpl mbr) {
305: this .mbr = mbr;
306: }
307:
308: /**
309: * Indicate that this member has been accessed.
310: */
311: public void accessed() {
312: lastHeardFrom = System.currentTimeMillis();
313: }
314:
315: /**
316: * Return the actual Member object
317: */
318: public MemberImpl getMember() {
319: return mbr;
320: }
321:
322: /**
323: * Check if this dude has expired
324: * @param maxtime The time threshold
325: */
326: public boolean hasExpired(long maxtime) {
327: long delta = System.currentTimeMillis() - lastHeardFrom;
328: return delta > maxtime;
329: }
330: }
331: }
|