001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.catalina.tribes.group.interceptors;
018:
019: import java.util.HashMap;
020:
021: import org.apache.catalina.tribes.ChannelException;
022: import org.apache.catalina.tribes.ChannelMessage;
023: import org.apache.catalina.tribes.Member;
024: import org.apache.catalina.tribes.group.ChannelInterceptorBase;
025: import org.apache.catalina.tribes.group.InterceptorPayload;
026: import org.apache.catalina.tribes.util.UUIDGenerator;
027: import org.apache.catalina.tribes.util.Arrays;
028: import org.apache.catalina.tribes.UniqueId;
029: import java.util.Map;
030:
031: /**
032: * <p>Title: </p>
033: *
034: * <p>Description: </p>
035: *
036: * <p>Company: </p>
037: *
038: * @author not attributable
039: * @version 1.0
040: */
041: public class TwoPhaseCommitInterceptor extends ChannelInterceptorBase {
042:
043: public static final byte[] START_DATA = new byte[] { 113, 1, -58,
044: 2, -34, -60, 75, -78, -101, -12, 32, -29, 32, 111, -40, 4 };
045: public static final byte[] END_DATA = new byte[] { 54, -13, 90,
046: 110, 47, -31, 75, -24, -81, -29, 36, 52, -58, 77, -110, 56 };
047: private static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory
048: .getLog(TwoPhaseCommitInterceptor.class);
049:
050: protected HashMap messages = new HashMap();
051: protected long expire = 1000 * 60; //one minute expiration
052: protected boolean deepclone = true;
053:
054: public void sendMessage(Member[] destination, ChannelMessage msg,
055: InterceptorPayload payload) throws ChannelException {
056: //todo, optimize, if destination.length==1, then we can do
057: //msg.setOptions(msg.getOptions() & (~getOptionFlag())
058: //and just send one message
059: if (okToProcess(msg.getOptions())) {
060: super .sendMessage(destination, msg, null);
061: ChannelMessage confirmation = null;
062: if (deepclone)
063: confirmation = (ChannelMessage) msg.deepclone();
064: else
065: confirmation = (ChannelMessage) msg.clone();
066: confirmation.getMessage().reset();
067: UUIDGenerator.randomUUID(false, confirmation.getUniqueId(),
068: 0);
069: confirmation.getMessage().append(START_DATA, 0,
070: START_DATA.length);
071: confirmation.getMessage().append(msg.getUniqueId(), 0,
072: msg.getUniqueId().length);
073: confirmation.getMessage().append(END_DATA, 0,
074: END_DATA.length);
075: super .sendMessage(destination, confirmation, payload);
076: } else {
077: //turn off two phase commit
078: //this wont work if the interceptor has 0 as a flag
079: //since there is no flag to turn off
080: //msg.setOptions(msg.getOptions() & (~getOptionFlag()));
081: super .sendMessage(destination, msg, payload);
082: }
083: }
084:
085: public void messageReceived(ChannelMessage msg) {
086: if (okToProcess(msg.getOptions())) {
087: if (msg.getMessage().getLength() == (START_DATA.length
088: + msg.getUniqueId().length + END_DATA.length)
089: && Arrays.contains(msg.getMessage()
090: .getBytesDirect(), 0, START_DATA, 0,
091: START_DATA.length)
092: && Arrays.contains(msg.getMessage()
093: .getBytesDirect(), START_DATA.length
094: + msg.getUniqueId().length, END_DATA, 0,
095: END_DATA.length)) {
096: UniqueId id = new UniqueId(msg.getMessage()
097: .getBytesDirect(), START_DATA.length, msg
098: .getUniqueId().length);
099: MapEntry original = (MapEntry) messages.get(id);
100: if (original != null) {
101: super .messageReceived(original.msg);
102: messages.remove(id);
103: } else
104: log
105: .warn("Received a confirmation, but original message is missing. Id:"
106: + Arrays.toString(id.getBytes()));
107: } else {
108: UniqueId id = new UniqueId(msg.getUniqueId());
109: MapEntry entry = new MapEntry((ChannelMessage) msg
110: .deepclone(), id, System.currentTimeMillis());
111: messages.put(id, entry);
112: }
113: } else {
114: super .messageReceived(msg);
115: }
116: }
117:
118: public boolean getDeepclone() {
119: return deepclone;
120: }
121:
122: public long getExpire() {
123: return expire;
124: }
125:
126: public void setDeepclone(boolean deepclone) {
127: this .deepclone = deepclone;
128: }
129:
130: public void setExpire(long expire) {
131: this .expire = expire;
132: }
133:
134: public void heartbeat() {
135: try {
136: long now = System.currentTimeMillis();
137: Map.Entry[] entries = (Map.Entry[]) messages.entrySet()
138: .toArray(new Map.Entry[messages.size()]);
139: for (int i = 0; i < entries.length; i++) {
140: MapEntry entry = (MapEntry) entries[i].getValue();
141: if (entry.expired(now, expire)) {
142: if (log.isInfoEnabled())
143: log.info("Message [" + entry.id
144: + "] has expired. Removing.");
145: messages.remove(entry.id);
146: }//end if
147: }
148: } catch (Exception x) {
149: log
150: .warn(
151: "Unable to perform heartbeat on the TwoPhaseCommit interceptor.",
152: x);
153: } finally {
154: super .heartbeat();
155: }
156: }
157:
158: public static class MapEntry {
159: public ChannelMessage msg;
160: public UniqueId id;
161: public long timestamp;
162:
163: public MapEntry(ChannelMessage msg, UniqueId id, long timestamp) {
164: this .msg = msg;
165: this .id = id;
166: this .timestamp = timestamp;
167: }
168:
169: public boolean expired(long now, long expiration) {
170: return (now - timestamp) > expiration;
171: }
172:
173: }
174:
175: }
|