001: // $Id: ReplicationManager.java,v 1.8 2006/09/29 21:49:02 bstansberry Exp $
002:
003: package org.jgroups.blocks;
004:
005: import org.apache.commons.logging.Log;
006: import org.apache.commons.logging.LogFactory;
007: import org.jgroups.*;
008: import org.jgroups.util.RspList;
009:
010: import java.io.Serializable;
011:
012: /**
013: * Class to propagate updates to a number of nodes in various ways:
014: * <ol>
015: * <li>Asynchronous
016: * <li>Synchronous
017: * <li>Synchronous with locking
018: * </ol>
019: *
020: * <br/><em>Note: This class is experimental as of Oct 2002</em>
021: *
022: * @author Bela Ban Oct 2002
023: */
024: public class ReplicationManager implements RequestHandler {
025: Address local_addr = null;
026: ReplicationReceiver receiver = null;
027:
028: /** Used to broadcast updates and receive responses (latter only in synchronous case) */
029: protected MessageDispatcher disp = null;
030:
031: protected final Log log = LogFactory.getLog(this .getClass());
032:
033: /**
034: * Creates an instance of ReplicationManager on top of a Channel
035: */
036: public ReplicationManager(Channel channel, MessageListener ml,
037: MembershipListener l, ReplicationReceiver receiver) {
038: setReplicationReceiver(receiver);
039: if (channel != null) {
040: local_addr = channel.getLocalAddress();
041: disp = new MessageDispatcher(channel, ml, l, this , // ReplicationManager is RequestHandler
042: true); // use deadlock detection
043: }
044: }
045:
046: /**
047: * Creates an instance of ReplicationManager on top of a PullPushAdapter
048: */
049: public ReplicationManager(PullPushAdapter adapter, Serializable id,
050: MessageListener ml, MembershipListener l,
051: ReplicationReceiver receiver) {
052: if (adapter != null && adapter.getTransport() != null
053: && adapter.getTransport() instanceof Channel)
054: local_addr = ((Channel) adapter.getTransport())
055: .getLocalAddress();
056: setReplicationReceiver(receiver);
057: disp = new MessageDispatcher(adapter, id, // FIXME
058: ml, l, this ); // ReplicationManager is RequestHandler
059: disp.setDeadlockDetection(true);
060: }
061:
062: public void stop() {
063: if (disp != null)
064: disp.stop();
065: }
066:
067: /**
068: * Create a new transaction. The transaction will be used to send updates, identify updates in the same transaction,
069: * and eventually commit or rollback the changes associated with the transaction.
070: * @return Xid A unique transaction
071: * @exception Exception Thrown when local_addr is null
072: */
073: public Xid begin() throws Exception {
074: return begin(Xid.DIRTY_READS);
075: }
076:
077: /**
078: * Create a new transaction. The tracsion will be used to send updates, identify updates in the same transaction,
079: * and eventually commit or rollback the changes associated with the transaction.
080: * @param transaction_mode Mode in which the transaction should run. Possible values are Xid.DIRTY_READS,
081: * Xid.READ_COMMITTED, Xid.REPEATABLE_READ and Xid.SERIALIZABLE
082: * @return Xid A unique transaction
083: * @exception Exception Thrown when local_addr is null
084: */
085: public Xid begin(int transaction_mode) throws Exception {
086: return Xid.create(local_addr, transaction_mode);
087: }
088:
089: public void setReplicationReceiver(ReplicationReceiver handler) {
090: this .receiver = handler;
091: }
092:
093: public void setMembershipListener(MembershipListener l) {
094: if (l == null)
095: return;
096: if (disp == null) {
097: if (log.isErrorEnabled())
098: log
099: .error("dispatcher is null, cannot set MembershipListener");
100: } else {
101: disp.setMembershipListener(l);
102: }
103: }
104:
105: /**
106: * Sends a request to all members of the group. Sending is asynchronous (return immediately) or
107: * synchronous (wait for all members to respond). If <code>use_locking</code> is true, then locking
108: * will be used at the receivers to acquire locks before accessing/updating a resource. Locks can be
109: * explicitly set using <code>lock_info</code> or implicitly through <code>data</code>. In the latter
110: * case, locks are induced from the data sent, e.g. if the data is a request for updating a certain row
111: * in a table, then we need to acquire a lock for that table.<p>
112: * In case of using locks, if the transaction associated with update already has a lock for a given resource,
113: * we will return. Otherwise, we will wait for <code>lock_acquisition_timeout</code> milliseconds. If the lock
114: * is not granted within that time a <code>LockingException</code> will be thrown. (<em>We hope to
115: * replace this timeout with a distributed deadlock detection algorithm in the future.</em>)<p>
116: * We have 3 main use case for this method:
117: * <ol>
118: * <li><b>Asynchronous</b>: sends the message and returns immediately. Argument <code>asynchronous</code>
119: * needs to be true. All other arguments except <code>data</code> are ignored and can be null. Will call
120: * <code>update()</code> on the registered ReplicationReceiver at each receiver.
121: * <li><b>Synchronous without locks</b>: sends the message, but returns only after responses from all members
122: * have been received, or <code>synchronous_timeout</code> milliseconds have elapsed (whichever comes
123: * first). Argument <code>asynchronous</code> needs to be false. Argument <code>synchronous_timeout</code>
124: * needs to be >= 0. If it is null the call will not time out, but wait for all responses.
125: * All other arguments (besides <code>data</code> are ignored).
126: * <li><b>Synchronous with locks</b>: sends the message, but returns only after responses from all members
127: * have been received, or <code>synchronous_timeout</code> milliseconds have elapsed (whichever comes
128: * first). At the receiver's side we have to acquire a lock for the resource to be updated, if the
129: * acquisition fails a LockingException will be thrown. The resource to be locked can be found in two ways:
130: * either <code>data</code> contains the resource(c) to be acquired implicitly, or <code>lock_info</code>
131: * lists the resources explicitly, or both. All the locks acquired at the receiver's side should be associated
132: * with <code>transaction</code>. When a <code>commit()</code> is received, the receiver should commit
133: * the modifications to the resource and release all locks. When a <code>rollback()</code> is received,
134: * the receiver should remove all (temporary) modifications and release all locks associated with
135: * <code>transaction</code>.
136: * </ol>
137: * In both the synchronous cases a List of byte[] will be returned if the data was sent to all receivers
138: * successfully, cointaining byte buffers. The list may be empty.
139: * @param dest The destination to which to send the message. Will be sent to all members if null.
140: * @param data The data to be sent to all members. It may contain information about the resource to be locked.
141: * @param synchronous If false the call is asynchronous, ie. non-blocking. If true, the method will wait
142: * until responses from all members have been received (unless a timeout is defined, see below)
143: * @param synchronous_timeout In a synchronous call, we will wait for responses from all members or until
144: * <code>synchronous_timeout</code> have elapsed (whichever comes first). 0 means
145: * to wait forever.
146: * @param transaction The transaction under which all locks for resources should be acquired. The receiver
147: * will probably maintain a lock table with resources as keys and transactions as values.
148: * When an update is received, the receiver checks its lock table: if the resource is
149: * not yet taken, the resource/transaction pair will be added to the lock table. Otherwise,
150: * we check if the transaction's owner associated with the resource is the same as the caller.
151: * If this is the case, the lock will be considered granted, otherwise we will wait for the
152: * resource to become available (for a certain amount of time). When a transaction is
153: * committed or rolled back, all resources associated with this transaction will be released.
154: * @param lock_info Information about resource(s) to be acquired. This may be null, e.g. if this information
155: * is already implied in <code>data</code>. Both <code>data</code> and <code>lock_info</code>
156: * may be used to define the set of resources to be acquired.
157: * @param lock_acquisition_timeout The number of milliseconds to wait until a lock acquisition request is
158: * considered failed (causing a LockingException). If 0 we will wait forever.
159: * (Note that this may lead to deadlocks).
160: * @param lock_lease_timeout The number of milliseconds we want to keep the lock for a resource. After
161: * this time has elapsed, the lock will be released. If 0 we won't release the lock(s)
162: * @param use_locks If this is false, we will ignore all lock information (even if it is specified) and
163: * not use locks at all.
164: * @return RspList A list of Rsps ({@link org.jgroups.util.Rsp}), one for each member. Each one is the result of
165: * {@link ReplicationReceiver#receive}. If a member didn't send a response, the <code>received</code>
166: * field will be false. If the member was suspected while waiting for a response, the <code>
167: * suspected</code> field will be true. If the <code>receive()</code> method in the receiver returned
168: * a value it will be in field <code>retval</code>. If the receiver threw an exception it will also
169: * be in this field.
170: */
171: public RspList send(Address dest, byte[] data, boolean synchronous,
172: long synchronous_timeout, Xid transaction,
173: byte[] lock_info, long lock_acquisition_timeout,
174: long lock_lease_timeout, boolean use_locks) { // throws UpdateException, TimeoutException, LockingException {
175:
176: Message msg = null;
177: ReplicationData d = new ReplicationData(ReplicationData.SEND,
178: data, transaction, lock_info, lock_acquisition_timeout,
179: lock_lease_timeout, use_locks);
180:
181: if (log.isInfoEnabled())
182: log.info("data is " + d + " (synchronous=" + synchronous
183: + ')');
184: msg = new Message(dest, null, d);
185: if (synchronous) {
186: return disp.castMessage(null, msg, GroupRequest.GET_ALL,
187: synchronous_timeout);
188: } else {
189: disp.castMessage(null, msg, GroupRequest.GET_NONE, 0);
190: return null;
191: }
192: }
193:
194: /**
195: * Commits all modifications sent to the receivers via {@link #send} and releases all locks associated with
196: * this transaction. If modifications were made to stable storage (but not to resource), those modifications
197: * would now need to be transferred to the resource (e.g. database).
198: */
199: public void commit(Xid transaction) {
200: sendMessage(ReplicationData.COMMIT, transaction);
201: }
202:
203: /**
204: * Discards all modifications sent to the receivers via {@link #send} and releases all locks associated with
205: * this transaction.
206: */
207: public void rollback(Xid transaction) {
208: sendMessage(ReplicationData.ROLLBACK, transaction);
209: }
210:
211: /* ------------------------------- RequestHandler interface ------------------------------ */
212:
213: public Object handle(Message msg) {
214: Object retval = null;
215: ReplicationData data;
216:
217: if (msg == null) {
218: if (log.isErrorEnabled())
219: log.error("received message was null");
220: return null;
221: }
222:
223: if (msg.getLength() == 0) {
224: if (log.isErrorEnabled())
225: log.error("payload of received message was null");
226: return null;
227: }
228:
229: try {
230: data = (ReplicationData) msg.getObject();
231: } catch (Throwable ex) {
232: if (log.isErrorEnabled())
233: log.error("failure unmarshalling message: " + ex);
234: return null;
235: }
236:
237: switch (data.getType()) {
238: case ReplicationData.SEND:
239: try {
240: return handleSend(data);
241: } catch (Throwable ex) {
242: if (log.isErrorEnabled())
243: log.error("failed handling update: " + ex);
244: return ex;
245: }
246: case ReplicationData.COMMIT:
247: handleCommit(data.getTransaction());
248: break;
249: case ReplicationData.ROLLBACK:
250: handleRollback(data.getTransaction());
251: break;
252: default:
253: if (log.isErrorEnabled())
254: log.error("received incorrect replication message: "
255: + data);
256: return null;
257: }
258:
259: return retval;
260: }
261:
262: /* --------------------------- End of RequestHandler interface---------------------------- */
263:
264: protected Object handleSend(ReplicationData data)
265: throws UpdateException, LockingException {
266: try {
267: if (receiver == null) {
268: if (log.isWarnEnabled())
269: log.warn("receiver is not set");
270: return null;
271: }
272: return receiver.receive(data.getTransaction(), data
273: .getData(), data.getLockInfo(), data
274: .getLockAcquisitionTimeout(), data
275: .getLockLeaseTimeout(), data.useLocks());
276: } catch (Throwable ex) {
277: return ex;
278: }
279: }
280:
281: protected void handleCommit(Xid transaction) {
282: if (receiver == null) {
283: if (log.isWarnEnabled())
284: log.warn("receiver is not set");
285: } else
286: receiver.commit(transaction);
287: }
288:
289: protected void handleRollback(Xid transaction) {
290: if (receiver == null) {
291: if (log.isWarnEnabled())
292: log.warn("receiver is not set");
293: } else
294: receiver.rollback(transaction);
295: }
296:
297: /* -------------------------------------- Private methods ------------------------------------ */
298:
299: void sendMessage(int type, Xid transaction) {
300: ReplicationData data = new ReplicationData(type, null,
301: transaction, null, 0, 0, false);
302: Message msg = new Message(null, null, data);
303: disp.castMessage(null, msg, GroupRequest.GET_NONE, 0); // send commit message asynchronously
304: }
305:
306: /* ---------------------------------- End of Private methods --------------------------------- */
307:
308: }
|