Source Code Cross Referenced for ReplicationManager.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » blocks » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.blocks 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        // $Id: ReplicationManager.java,v 1.8 2006/09/29 21:49:02 bstansberry Exp $
002:
003:        package org.jgroups.blocks;
004:
005:        import org.apache.commons.logging.Log;
006:        import org.apache.commons.logging.LogFactory;
007:        import org.jgroups.*;
008:        import org.jgroups.util.RspList;
009:
010:        import java.io.Serializable;
011:
012:        /**
013:         * Class to propagate updates to a number of nodes in various ways:
014:         * <ol>
015:         * <li>Asynchronous
016:         * <li>Synchronous
017:         * <li>Synchronous with locking
018:         * </ol>
019:         * 
020:         * <br/><em>Note: This class is experimental as of Oct 2002</em>
021:         *
022:         * @author Bela Ban Oct 2002
023:         */
024:        public class ReplicationManager implements  RequestHandler {
025:            Address local_addr = null;
026:            ReplicationReceiver receiver = null;
027:
028:            /** Used to broadcast updates and receive responses (latter only in synchronous case) */
029:            protected MessageDispatcher disp = null;
030:
031:            protected final Log log = LogFactory.getLog(this .getClass());
032:
033:            /**
034:             * Creates an instance of ReplicationManager on top of a Channel
035:             */
036:            public ReplicationManager(Channel channel, MessageListener ml,
037:                    MembershipListener l, ReplicationReceiver receiver) {
038:                setReplicationReceiver(receiver);
039:                if (channel != null) {
040:                    local_addr = channel.getLocalAddress();
041:                    disp = new MessageDispatcher(channel, ml, l, this , // ReplicationManager is RequestHandler
042:                            true); // use deadlock detection
043:                }
044:            }
045:
046:            /**
047:             * Creates an instance of ReplicationManager on top of a PullPushAdapter
048:             */
049:            public ReplicationManager(PullPushAdapter adapter, Serializable id,
050:                    MessageListener ml, MembershipListener l,
051:                    ReplicationReceiver receiver) {
052:                if (adapter != null && adapter.getTransport() != null
053:                        && adapter.getTransport() instanceof  Channel)
054:                    local_addr = ((Channel) adapter.getTransport())
055:                            .getLocalAddress();
056:                setReplicationReceiver(receiver);
057:                disp = new MessageDispatcher(adapter, id, // FIXME
058:                        ml, l, this ); // ReplicationManager is RequestHandler
059:                disp.setDeadlockDetection(true);
060:            }
061:
062:            public void stop() {
063:                if (disp != null)
064:                    disp.stop();
065:            }
066:
067:            /**
068:             * Create a new transaction. The transaction will be used to send updates, identify updates in the same transaction,
069:             * and eventually commit or rollback the changes associated with the transaction.
070:             * @return Xid A unique transaction
071:             * @exception Exception Thrown when local_addr is null
072:             */
073:            public Xid begin() throws Exception {
074:                return begin(Xid.DIRTY_READS);
075:            }
076:
077:            /**
078:             * Create a new transaction. The tracsion will be used to send updates, identify updates in the same transaction,
079:             * and eventually commit or rollback the changes associated with the transaction.
080:             * @param transaction_mode Mode in which the transaction should run. Possible values are Xid.DIRTY_READS,
081:             *                         Xid.READ_COMMITTED, Xid.REPEATABLE_READ and Xid.SERIALIZABLE
082:             * @return Xid A unique transaction
083:             * @exception Exception Thrown when local_addr is null
084:             */
085:            public Xid begin(int transaction_mode) throws Exception {
086:                return Xid.create(local_addr, transaction_mode);
087:            }
088:
089:            public void setReplicationReceiver(ReplicationReceiver handler) {
090:                this .receiver = handler;
091:            }
092:
093:            public void setMembershipListener(MembershipListener l) {
094:                if (l == null)
095:                    return;
096:                if (disp == null) {
097:                    if (log.isErrorEnabled())
098:                        log
099:                                .error("dispatcher is null, cannot set MembershipListener");
100:                } else {
101:                    disp.setMembershipListener(l);
102:                }
103:            }
104:
105:            /**
106:             * Sends a request to all members of the group. Sending is asynchronous (return immediately) or
107:             * synchronous (wait for all members to respond). If <code>use_locking</code> is true, then locking
108:             * will be used at the receivers to acquire locks before accessing/updating a resource. Locks can be
109:             * explicitly set using <code>lock_info</code> or implicitly through <code>data</code>. In the latter
110:             * case, locks are induced from the data sent, e.g. if the data is a request for updating a certain row
111:             * in a table, then we need to acquire a lock for that table.<p>
112:             * In case of using locks, if the transaction associated with update already has a lock for a given resource,
113:             * we will return. Otherwise, we will wait for <code>lock_acquisition_timeout</code> milliseconds. If the lock
114:             * is not granted within that time a <code>LockingException</code> will be thrown. (<em>We hope to
115:             * replace this timeout with a distributed deadlock detection algorithm in the future.</em>)<p>
116:             * We have 3 main use case for this method:
117:             * <ol>
118:             * <li><b>Asynchronous</b>: sends the message and returns immediately. Argument <code>asynchronous</code>
119:             *     needs to be true. All other arguments except <code>data</code> are ignored and can be null. Will call
120:             *     <code>update()</code> on the registered ReplicationReceiver at each receiver.
121:             * <li><b>Synchronous without locks</b>: sends the message, but returns only after responses from all members
122:             *     have been received, or <code>synchronous_timeout</code> milliseconds have elapsed (whichever comes
123:             *     first). Argument <code>asynchronous</code> needs to be false. Argument <code>synchronous_timeout</code>
124:             *     needs to be >= 0. If it is null the call will not time out, but wait for all responses.
125:             *     All other arguments (besides <code>data</code> are ignored).
126:             * <li><b>Synchronous with locks</b>: sends the message, but returns only after responses from all members
127:             *     have been received, or <code>synchronous_timeout</code> milliseconds have elapsed (whichever comes
128:             *     first). At the receiver's side we have to acquire a lock for the resource to be updated, if the 
129:             *     acquisition fails a LockingException will be thrown. The resource to be locked can be found in two ways:
130:             *     either <code>data</code> contains the resource(c) to be acquired implicitly, or <code>lock_info</code>
131:             *     lists the resources explicitly, or both. All the locks acquired at the receiver's side should be associated
132:             *     with <code>transaction</code>. When a <code>commit()</code> is received, the receiver should commit
133:             *     the modifications to the resource and release all locks. When a <code>rollback()</code> is received,
134:             *     the receiver should remove all (temporary) modifications and release all locks associated with
135:             *     <code>transaction</code>.
136:             * </ol>
137:             * In both the synchronous cases a List of byte[] will be returned if the data was sent to all receivers
138:             * successfully, cointaining byte buffers. The list may be empty.
139:             * @param dest The destination to which to send the message. Will be sent to all members if null.
140:             * @param data The data to be sent to all members. It may contain information about the resource to be locked.
141:             * @param synchronous If false the call is asynchronous, ie. non-blocking. If true, the method will wait
142:             *                    until responses from all members have been received (unless a timeout is defined, see below)
143:             * @param synchronous_timeout In a synchronous call, we will wait for responses from all members or until
144:             *                            <code>synchronous_timeout</code> have elapsed (whichever comes first). 0 means
145:             *                            to wait forever.
146:             * @param transaction The transaction under which all locks for resources should be acquired. The receiver
147:             *                    will probably maintain a lock table with resources as keys and transactions as values.
148:             *                    When an update is received, the receiver checks its lock table: if the resource is
149:             *                    not yet taken, the resource/transaction pair will be added to the lock table. Otherwise,
150:             *                    we check if the transaction's owner associated with the resource is the same as the caller.
151:             *                    If this is the case, the lock will be considered granted, otherwise we will wait for the
152:             *                    resource to become available (for a certain amount of time). When a transaction is 
153:             *                    committed or rolled back, all resources associated with this transaction will be released.
154:             * @param lock_info Information about resource(s) to be acquired. This may be null, e.g. if this information
155:             *                  is already implied in <code>data</code>. Both <code>data</code> and <code>lock_info</code>
156:             *                  may be used to define the set of resources to be acquired.
157:             * @param lock_acquisition_timeout The number of milliseconds to wait until a lock acquisition request is
158:             *                                 considered failed (causing a LockingException). If 0 we will wait forever.
159:             *                                 (Note that this may lead to deadlocks).
160:             * @param lock_lease_timeout The number of milliseconds we want to keep the lock for a resource. After
161:             *                           this time has elapsed, the lock will be released. If 0 we won't release the lock(s)
162:             * @param use_locks If this is false, we will ignore all lock information (even if it is specified) and
163:             *                  not use locks at all.
164:             * @return RspList A list of Rsps ({@link org.jgroups.util.Rsp}), one for each member. Each one is the result of
165:             *                 {@link ReplicationReceiver#receive}. If a member didn't send a response, the <code>received</code>
166:             *                 field will be false. If the member was suspected while waiting for a response, the <code>
167:             *                 suspected</code> field will be true. If the <code>receive()</code> method in the receiver returned
168:             *                 a value it will be in field <code>retval</code>. If the receiver threw an exception it will also
169:             *                 be in this field.
170:             */
171:            public RspList send(Address dest, byte[] data, boolean synchronous,
172:                    long synchronous_timeout, Xid transaction,
173:                    byte[] lock_info, long lock_acquisition_timeout,
174:                    long lock_lease_timeout, boolean use_locks) { // throws UpdateException, TimeoutException, LockingException {
175:
176:                Message msg = null;
177:                ReplicationData d = new ReplicationData(ReplicationData.SEND,
178:                        data, transaction, lock_info, lock_acquisition_timeout,
179:                        lock_lease_timeout, use_locks);
180:
181:                if (log.isInfoEnabled())
182:                    log.info("data is " + d + " (synchronous=" + synchronous
183:                            + ')');
184:                msg = new Message(dest, null, d);
185:                if (synchronous) {
186:                    return disp.castMessage(null, msg, GroupRequest.GET_ALL,
187:                            synchronous_timeout);
188:                } else {
189:                    disp.castMessage(null, msg, GroupRequest.GET_NONE, 0);
190:                    return null;
191:                }
192:            }
193:
194:            /**
195:             * Commits all modifications sent to the receivers via {@link #send} and releases all locks associated with
196:             * this transaction. If modifications were made to stable storage (but not to resource), those modifications
197:             * would now need to be transferred to the resource (e.g. database).
198:             */
199:            public void commit(Xid transaction) {
200:                sendMessage(ReplicationData.COMMIT, transaction);
201:            }
202:
203:            /**
204:             * Discards all modifications sent to the receivers via {@link #send} and releases all locks associated with
205:             * this transaction.
206:             */
207:            public void rollback(Xid transaction) {
208:                sendMessage(ReplicationData.ROLLBACK, transaction);
209:            }
210:
211:            /* ------------------------------- RequestHandler interface ------------------------------ */
212:
213:            public Object handle(Message msg) {
214:                Object retval = null;
215:                ReplicationData data;
216:
217:                if (msg == null) {
218:                    if (log.isErrorEnabled())
219:                        log.error("received message was null");
220:                    return null;
221:                }
222:
223:                if (msg.getLength() == 0) {
224:                    if (log.isErrorEnabled())
225:                        log.error("payload of received message was null");
226:                    return null;
227:                }
228:
229:                try {
230:                    data = (ReplicationData) msg.getObject();
231:                } catch (Throwable ex) {
232:                    if (log.isErrorEnabled())
233:                        log.error("failure unmarshalling message: " + ex);
234:                    return null;
235:                }
236:
237:                switch (data.getType()) {
238:                case ReplicationData.SEND:
239:                    try {
240:                        return handleSend(data);
241:                    } catch (Throwable ex) {
242:                        if (log.isErrorEnabled())
243:                            log.error("failed handling update: " + ex);
244:                        return ex;
245:                    }
246:                case ReplicationData.COMMIT:
247:                    handleCommit(data.getTransaction());
248:                    break;
249:                case ReplicationData.ROLLBACK:
250:                    handleRollback(data.getTransaction());
251:                    break;
252:                default:
253:                    if (log.isErrorEnabled())
254:                        log.error("received incorrect replication message: "
255:                                + data);
256:                    return null;
257:                }
258:
259:                return retval;
260:            }
261:
262:            /* --------------------------- End of RequestHandler interface---------------------------- */
263:
264:            protected Object handleSend(ReplicationData data)
265:                    throws UpdateException, LockingException {
266:                try {
267:                    if (receiver == null) {
268:                        if (log.isWarnEnabled())
269:                            log.warn("receiver is not set");
270:                        return null;
271:                    }
272:                    return receiver.receive(data.getTransaction(), data
273:                            .getData(), data.getLockInfo(), data
274:                            .getLockAcquisitionTimeout(), data
275:                            .getLockLeaseTimeout(), data.useLocks());
276:                } catch (Throwable ex) {
277:                    return ex;
278:                }
279:            }
280:
281:            protected void handleCommit(Xid transaction) {
282:                if (receiver == null) {
283:                    if (log.isWarnEnabled())
284:                        log.warn("receiver is not set");
285:                } else
286:                    receiver.commit(transaction);
287:            }
288:
289:            protected void handleRollback(Xid transaction) {
290:                if (receiver == null) {
291:                    if (log.isWarnEnabled())
292:                        log.warn("receiver is not set");
293:                } else
294:                    receiver.rollback(transaction);
295:            }
296:
297:            /* -------------------------------------- Private methods ------------------------------------ */
298:
299:            void sendMessage(int type, Xid transaction) {
300:                ReplicationData data = new ReplicationData(type, null,
301:                        transaction, null, 0, 0, false);
302:                Message msg = new Message(null, null, data);
303:                disp.castMessage(null, msg, GroupRequest.GET_NONE, 0); // send commit message asynchronously
304:            }
305:
306:            /* ---------------------------------- End of Private methods --------------------------------- */
307:
308:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.