001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017:
018: package org.apache.catalina.tribes.membership;
019:
020: import java.io.IOException;
021: import java.io.ObjectInput;
022: import java.io.ObjectOutput;
023: import java.util.Arrays;
024:
025: import org.apache.catalina.tribes.Member;
026: import org.apache.catalina.tribes.io.XByteBuffer;
027: import org.apache.catalina.tribes.transport.SenderState;
028:
029: /**
030: * A <b>membership</b> implementation using simple multicast.
031: * This is the representation of a multicast member.
032: * Carries the host, and port of the this or other cluster nodes.
033: *
034: * @author Filip Hanik
035: * @version $Revision: 538977 $, $Date: 2007-05-17 17:43:49 +0200 (jeu., 17 mai 2007) $
036: */
037: public class MemberImpl implements Member, java.io.Externalizable {
038:
039: /**
040: * Public properties specific to this implementation
041: */
042: public static final transient String TCP_LISTEN_PORT = "tcpListenPort";
043: public static final transient String TCP_LISTEN_HOST = "tcpListenHost";
044: public static final transient String MEMBER_NAME = "memberName";
045:
046: public static final transient byte[] TRIBES_MBR_BEGIN = new byte[] {
047: 84, 82, 73, 66, 69, 83, 45, 66 };
048: public static final transient byte[] TRIBES_MBR_END = new byte[] {
049: 84, 82, 73, 66, 69, 83, 45, 69 };
050:
051: /**
052: * The listen host for this member
053: */
054: protected byte[] host;
055: protected transient String hostname;
056: /**
057: * The tcp listen port for this member
058: */
059: protected int port;
060:
061: /**
062: * The tcp/SSL listen port for this member
063: */
064: protected int securePort = -1;
065:
066: /**
067: * Counter for how many broadcast messages have been sent from this member
068: */
069: protected int msgCount = 0;
070: /**
071: * The number of milliseconds since this members was
072: * created, is kept track of using the start time
073: */
074: protected long memberAliveTime = 0;
075:
076: /**
077: * For the local member only
078: */
079: protected transient long serviceStartTime;
080:
081: /**
082: * To avoid serialization over and over again, once the local dataPkg
083: * has been set, we use that to transmit data
084: */
085: protected transient byte[] dataPkg = null;
086:
087: /**
088: * Unique session Id for this member
089: */
090: protected byte[] uniqueId = new byte[16];
091:
092: /**
093: * Custom payload that an app framework can broadcast
094: * Also used to transport stop command.
095: */
096: protected byte[] payload = new byte[0];
097:
098: /**
099: * Command, so that the custom payload doesn't have to be used
100: * This is for internal tribes use, such as SHUTDOWN_COMMAND
101: */
102: protected byte[] command = new byte[0];
103:
104: /**
105: * Domain if we want to filter based on domain.
106: */
107: protected byte[] domain = new byte[0];
108:
109: /**
110: * Empty constructor for serialization
111: */
112: public MemberImpl() {
113:
114: }
115:
116: /**
117: * Construct a new member object
118: * @param name - the name of this member, cluster unique
119: * @param domain - the cluster domain name of this member
120: * @param host - the tcp listen host
121: * @param port - the tcp listen port
122: */
123: public MemberImpl(String host, int port, long aliveTime)
124: throws IOException {
125: setHostname(host);
126: this .port = port;
127: this .memberAliveTime = aliveTime;
128: }
129:
130: public MemberImpl(String host, int port, long aliveTime,
131: byte[] payload) throws IOException {
132: this (host, port, aliveTime);
133: setPayload(payload);
134: }
135:
136: public boolean isReady() {
137: return SenderState.getSenderState(this ).isReady();
138: }
139:
140: public boolean isSuspect() {
141: return SenderState.getSenderState(this ).isSuspect();
142: }
143:
144: public boolean isFailing() {
145: return SenderState.getSenderState(this ).isFailing();
146: }
147:
148: /**
149: * Increment the message count.
150: */
151: protected void inc() {
152: msgCount++;
153: }
154:
155: /**
156: * Create a data package to send over the wire representing this member.
157: * This is faster than serialization.
158: * @return - the bytes for this member deserialized
159: * @throws Exception
160: */
161: public byte[] getData() {
162: return getData(true);
163: }
164:
165: /**
166: * Highly optimized version of serializing a member into a byte array
167: * Returns a cached byte[] reference, do not modify this data
168: * @param getalive boolean
169: * @return byte[]
170: */
171: public byte[] getData(boolean getalive) {
172: return getData(getalive, false);
173: }
174:
175: public int getDataLength() {
176: return TRIBES_MBR_BEGIN.length + //start pkg
177: 4 + //data length
178: 8 + //alive time
179: 4 + //port
180: 4 + //secure port
181: 1 + //host length
182: host.length + //host
183: 4 + //command length
184: command.length + //command
185: 4 + //domain length
186: domain.length + //domain
187: 16 + //unique id
188: 4 + //payload length
189: payload.length + //payload
190: TRIBES_MBR_END.length; //end pkg
191: }
192:
193: /**
194: *
195: * @param getalive boolean - calculate memberAlive time
196: * @param reset boolean - reset the cached data package, and create a new one
197: * @return byte[]
198: */
199: public byte[] getData(boolean getalive, boolean reset) {
200: if (reset)
201: dataPkg = null;
202: //look in cache first
203: if (dataPkg != null) {
204: if (getalive) {
205: //you'd be surprised, but System.currentTimeMillis
206: //shows up on the profiler
207: long alive = System.currentTimeMillis()
208: - getServiceStartTime();
209: XByteBuffer.toBytes((long) alive, dataPkg,
210: TRIBES_MBR_BEGIN.length + 4);
211: }
212: return dataPkg;
213: }
214:
215: //package looks like
216: //start package TRIBES_MBR_BEGIN.length
217: //package length - 4 bytes
218: //alive - 8 bytes
219: //port - 4 bytes
220: //secure port - 4 bytes
221: //host length - 1 byte
222: //host - hl bytes
223: //clen - 4 bytes
224: //command - clen bytes
225: //dlen - 4 bytes
226: //domain - dlen bytes
227: //uniqueId - 16 bytes
228: //payload length - 4 bytes
229: //payload plen bytes
230: //end package TRIBES_MBR_END.length
231: byte[] addr = host;
232: long alive = System.currentTimeMillis() - getServiceStartTime();
233: byte hl = (byte) addr.length;
234: byte[] data = new byte[getDataLength()];
235:
236: int bodylength = (getDataLength() - TRIBES_MBR_BEGIN.length
237: - TRIBES_MBR_END.length - 4);
238:
239: int pos = 0;
240:
241: //TRIBES_MBR_BEGIN
242: System.arraycopy(TRIBES_MBR_BEGIN, 0, data, pos,
243: TRIBES_MBR_BEGIN.length);
244: pos += TRIBES_MBR_BEGIN.length;
245:
246: //body length
247: XByteBuffer.toBytes(bodylength, data, pos);
248: pos += 4;
249:
250: //alive data
251: XByteBuffer.toBytes((long) alive, data, pos);
252: pos += 8;
253: //port
254: XByteBuffer.toBytes(port, data, pos);
255: pos += 4;
256: //secure port
257: XByteBuffer.toBytes(securePort, data, pos);
258: pos += 4;
259: //host length
260: data[pos++] = hl;
261: //host
262: System.arraycopy(addr, 0, data, pos, addr.length);
263: pos += addr.length;
264: //clen - 4 bytes
265: XByteBuffer.toBytes(command.length, data, pos);
266: pos += 4;
267: //command - clen bytes
268: System.arraycopy(command, 0, data, pos, command.length);
269: pos += command.length;
270: //dlen - 4 bytes
271: XByteBuffer.toBytes(domain.length, data, pos);
272: pos += 4;
273: //domain - dlen bytes
274: System.arraycopy(domain, 0, data, pos, domain.length);
275: pos += domain.length;
276: //unique Id
277: System.arraycopy(uniqueId, 0, data, pos, uniqueId.length);
278: pos += uniqueId.length;
279: //payload
280: XByteBuffer.toBytes(payload.length, data, pos);
281: pos += 4;
282: System.arraycopy(payload, 0, data, pos, payload.length);
283: pos += payload.length;
284:
285: //TRIBES_MBR_END
286: System.arraycopy(TRIBES_MBR_END, 0, data, pos,
287: TRIBES_MBR_END.length);
288: pos += TRIBES_MBR_END.length;
289:
290: //create local data
291: dataPkg = data;
292: return data;
293: }
294:
295: /**
296: * Deserializes a member from data sent over the wire
297: * @param data - the bytes received
298: * @return a member object.
299: */
300: public static MemberImpl getMember(byte[] data, MemberImpl member) {
301: return getMember(data, 0, data.length, member);
302: }
303:
304: public static MemberImpl getMember(byte[] data, int offset,
305: int length, MemberImpl member) {
306: //package looks like
307: //start package TRIBES_MBR_BEGIN.length
308: //package length - 4 bytes
309: //alive - 8 bytes
310: //port - 4 bytes
311: //secure port - 4 bytes
312: //host length - 1 byte
313: //host - hl bytes
314: //clen - 4 bytes
315: //command - clen bytes
316: //dlen - 4 bytes
317: //domain - dlen bytes
318: //uniqueId - 16 bytes
319: //payload length - 4 bytes
320: //payload plen bytes
321: //end package TRIBES_MBR_END.length
322:
323: int pos = offset;
324:
325: if (XByteBuffer.firstIndexOf(data, offset, TRIBES_MBR_BEGIN) != pos) {
326: throw new IllegalArgumentException(
327: "Invalid package, should start with:"
328: + org.apache.catalina.tribes.util.Arrays
329: .toString(TRIBES_MBR_BEGIN));
330: }
331:
332: if (length < (TRIBES_MBR_BEGIN.length + 4)) {
333: throw new ArrayIndexOutOfBoundsException(
334: "Member package to small to validate.");
335: }
336:
337: pos += TRIBES_MBR_BEGIN.length;
338:
339: int bodylength = XByteBuffer.toInt(data, pos);
340: pos += 4;
341:
342: if (length < (bodylength + 4 + TRIBES_MBR_BEGIN.length + TRIBES_MBR_END.length)) {
343: throw new ArrayIndexOutOfBoundsException(
344: "Not enough bytes in member package.");
345: }
346:
347: int endpos = pos + bodylength;
348: if (XByteBuffer.firstIndexOf(data, endpos, TRIBES_MBR_END) != endpos) {
349: throw new IllegalArgumentException(
350: "Invalid package, should end with:"
351: + org.apache.catalina.tribes.util.Arrays
352: .toString(TRIBES_MBR_END));
353: }
354:
355: byte[] alived = new byte[8];
356: System.arraycopy(data, pos, alived, 0, 8);
357: pos += 8;
358: byte[] portd = new byte[4];
359: System.arraycopy(data, pos, portd, 0, 4);
360: pos += 4;
361:
362: byte[] sportd = new byte[4];
363: System.arraycopy(data, pos, sportd, 0, 4);
364: pos += 4;
365:
366: byte hl = data[pos++];
367: byte[] addr = new byte[hl];
368: System.arraycopy(data, pos, addr, 0, hl);
369: pos += hl;
370:
371: int cl = XByteBuffer.toInt(data, pos);
372: pos += 4;
373:
374: byte[] command = new byte[cl];
375: System.arraycopy(data, pos, command, 0, command.length);
376: pos += command.length;
377:
378: int dl = XByteBuffer.toInt(data, pos);
379: pos += 4;
380:
381: byte[] domain = new byte[dl];
382: System.arraycopy(data, pos, domain, 0, domain.length);
383: pos += domain.length;
384:
385: byte[] uniqueId = new byte[16];
386: System.arraycopy(data, pos, uniqueId, 0, 16);
387: pos += 16;
388:
389: int pl = XByteBuffer.toInt(data, pos);
390: pos += 4;
391:
392: byte[] payload = new byte[pl];
393: System.arraycopy(data, pos, payload, 0, payload.length);
394: pos += payload.length;
395:
396: member.setHost(addr);
397: member.setPort(XByteBuffer.toInt(portd, 0));
398: member.setSecurePort(XByteBuffer.toInt(sportd, 0));
399: member.setMemberAliveTime(XByteBuffer.toLong(alived, 0));
400: member.setUniqueId(uniqueId);
401: member.payload = payload;
402: member.domain = domain;
403: member.command = command;
404:
405: member.dataPkg = new byte[length];
406: System.arraycopy(data, offset, member.dataPkg, 0, length);
407:
408: return member;
409: }
410:
411: public static MemberImpl getMember(byte[] data) {
412: return getMember(data, new MemberImpl());
413: }
414:
415: public static MemberImpl getMember(byte[] data, int offset,
416: int length) {
417: return getMember(data, offset, length, new MemberImpl());
418: }
419:
420: /**
421: * Return the name of this object
422: * @return a unique name to the cluster
423: */
424: public String getName() {
425: return "tcp://" + getHostname() + ":" + getPort();
426: }
427:
428: /**
429: * Return the listen port of this member
430: * @return - tcp listen port
431: */
432: public int getPort() {
433: return this .port;
434: }
435:
436: /**
437: * Return the TCP listen host for this member
438: * @return IP address or host name
439: */
440: public byte[] getHost() {
441: return host;
442: }
443:
444: public String getHostname() {
445: if (this .hostname != null)
446: return hostname;
447: else {
448: try {
449: this .hostname = java.net.InetAddress.getByAddress(host)
450: .getHostName();
451: return this .hostname;
452: } catch (IOException x) {
453: throw new RuntimeException("Unable to parse hostname.",
454: x);
455: }
456: }
457: }
458:
459: /**
460: * Contains information on how long this member has been online.
461: * The result is the number of milli seconds this member has been
462: * broadcasting its membership to the cluster.
463: * @return nr of milliseconds since this member started.
464: */
465: public long getMemberAliveTime() {
466: return memberAliveTime;
467: }
468:
469: public long getServiceStartTime() {
470: return serviceStartTime;
471: }
472:
473: public byte[] getUniqueId() {
474: return uniqueId;
475: }
476:
477: public byte[] getPayload() {
478: return payload;
479: }
480:
481: public byte[] getCommand() {
482: return command;
483: }
484:
485: public byte[] getDomain() {
486: return domain;
487: }
488:
489: public int getSecurePort() {
490: return securePort;
491: }
492:
493: public void setMemberAliveTime(long time) {
494: memberAliveTime = time;
495: }
496:
497: /**
498: * String representation of this object
499: */
500: public String toString() {
501: StringBuffer buf = new StringBuffer(
502: "org.apache.catalina.tribes.membership.MemberImpl[");
503: buf.append(getName()).append(",");
504: buf.append(getHostname()).append(",");
505: buf.append(port).append(", alive=");
506: buf.append(memberAliveTime).append(",");
507: buf.append("id=").append(bToS(this .uniqueId)).append(", ");
508: buf.append("payload=").append(bToS(this .payload, 8)).append(
509: ", ");
510: buf.append("command=").append(bToS(this .command, 8)).append(
511: ", ");
512: buf.append("domain=").append(bToS(this .domain, 8)).append(", ");
513: buf.append("]");
514: return buf.toString();
515: }
516:
517: public static String bToS(byte[] data) {
518: return bToS(data, data.length);
519: }
520:
521: public static String bToS(byte[] data, int max) {
522: StringBuffer buf = new StringBuffer(4 * 16);
523: buf.append("{");
524: for (int i = 0; data != null && i < data.length; i++) {
525: buf.append(String.valueOf(data[i])).append(" ");
526: if (i == max) {
527: buf.append("...(" + data.length + ")");
528: break;
529: }
530: }
531: buf.append("}");
532: return buf.toString();
533: }
534:
535: /**
536: * @see java.lang.Object#hashCode()
537: * @return The hash code
538: */
539: public int hashCode() {
540: return getHost()[0] + getHost()[1] + getHost()[2]
541: + getHost()[3];
542: }
543:
544: /**
545: * Returns true if the param o is a McastMember with the same name
546: * @param o
547: */
548: public boolean equals(Object o) {
549: if (o instanceof MemberImpl) {
550: return Arrays.equals(this .getHost(), ((MemberImpl) o)
551: .getHost())
552: && this .getPort() == ((MemberImpl) o).getPort()
553: && Arrays.equals(this .getUniqueId(),
554: ((MemberImpl) o).getUniqueId());
555: } else
556: return false;
557: }
558:
559: public void setHost(byte[] host) {
560: this .host = host;
561: }
562:
563: public void setHostname(String host) throws IOException {
564: hostname = host;
565: this .host = java.net.InetAddress.getByName(host).getAddress();
566: }
567:
568: public void setMsgCount(int msgCount) {
569: this .msgCount = msgCount;
570: }
571:
572: public void setPort(int port) {
573: this .port = port;
574: this .dataPkg = null;
575: }
576:
577: public void setServiceStartTime(long serviceStartTime) {
578: this .serviceStartTime = serviceStartTime;
579: }
580:
581: public void setUniqueId(byte[] uniqueId) {
582: this .uniqueId = uniqueId != null ? uniqueId : new byte[16];
583: getData(true, true);
584: }
585:
586: public void setPayload(byte[] payload) {
587: byte[] oldpayload = this .payload;
588: this .payload = payload != null ? payload : new byte[0];
589: if (this .getData(true, true).length > McastServiceImpl.MAX_PACKET_SIZE) {
590: this .payload = oldpayload;
591: throw new IllegalArgumentException(
592: "Payload is to large for tribes to handle.");
593: }
594:
595: }
596:
597: public void setCommand(byte[] command) {
598: this .command = command != null ? command : new byte[0];
599: getData(true, true);
600: }
601:
602: public void setDomain(byte[] domain) {
603: this .domain = domain != null ? domain : new byte[0];
604: getData(true, true);
605: }
606:
607: public void setSecurePort(int securePort) {
608: this .securePort = securePort;
609: }
610:
611: public void readExternal(ObjectInput in) throws IOException,
612: ClassNotFoundException {
613: int length = in.readInt();
614: byte[] message = new byte[length];
615: in.read(message);
616: getMember(message, this );
617:
618: }
619:
620: public void writeExternal(ObjectOutput out) throws IOException {
621: byte[] data = this.getData();
622: out.writeInt(data.length);
623: out.write(data);
624: }
625:
626: }
|