Source Code Cross Referenced for STATE_TRANSFER.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » protocols » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.protocols 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        // $Id: STATE_TRANSFER.java,v 1.21.6.1 2007/04/27 08:03:50 belaban Exp $
002:
003:        package org.jgroups.protocols;
004:
005:        import org.jgroups.Address;
006:        import org.jgroups.Event;
007:        import org.jgroups.Message;
008:        import org.jgroups.View;
009:        import org.jgroups.blocks.GroupRequest;
010:        import org.jgroups.blocks.RequestCorrelator;
011:        import org.jgroups.blocks.RequestHandler;
012:        import org.jgroups.stack.Protocol;
013:        import org.jgroups.stack.StateTransferInfo;
014:        import org.jgroups.util.Rsp;
015:        import org.jgroups.util.RspList;
016:        import org.jgroups.util.Util;
017:
018:        import java.io.Serializable;
019:        import java.util.HashMap;
020:        import java.util.Properties;
021:        import java.util.Vector;
022:
023:        class StateTransferRequest implements  Serializable {
024:            static final int MAKE_COPY = 1; // arg = originator of request
025:            static final int RETURN_STATE = 2; // arg = orginator of request
026:
027:            int type = 0;
028:            final Object arg;
029:            private static final long serialVersionUID = -7734608266762273116L;
030:
031:            StateTransferRequest(int type, Object arg) {
032:                this .type = type;
033:                this .arg = arg;
034:            }
035:
036:            public int getType() {
037:                return type;
038:            }
039:
040:            public Object getArg() {
041:                return arg;
042:            }
043:
044:            public String toString() {
045:                return "[StateTransferRequest: type=" + type2Str(type)
046:                        + ", arg=" + arg + ']';
047:            }
048:
049:            static String type2Str(int t) {
050:                switch (t) {
051:                case MAKE_COPY:
052:                    return "MAKE_COPY";
053:                case RETURN_STATE:
054:                    return "RETURN_STATE";
055:                default:
056:                    return "<unknown>";
057:                }
058:            }
059:        }
060:
061:        /**
062:         * State transfer layer. Upon receiving a GET_STATE event from JChannel, a MAKE_COPY message is
063:         * sent to all members. When the originator receives MAKE_COPY, it queues all messages to the
064:         * channel.
065:         * When another member receives the message, it asks the JChannel to provide it with a copy of
066:         * the current state (GetStateEvent is received by application, returnState() sends state down the
067:         * stack). Then the current layer sends a unicast RETURN_STATE message to the coordinator, which
068:         * returns the cached copy.
069:         * When the state is received by the originator, the GET_STATE sender is unblocked with a
070:         * GET_STATE_OK event up the stack (unless it already timed out).<p>
071:         * Requires QUEUE layer on top.
072:         * 
073:         * @author Bela Ban
074:         */
075:        public class STATE_TRANSFER extends Protocol implements  RequestHandler {
076:            Address local_addr = null;
077:            final Vector members = new Vector(11);
078:            final Message m = null;
079:            boolean is_server = false;
080:            byte[] cached_state = null;
081:            final Object state_xfer_mutex = new Object(); // get state from appl (via channel).
082:            long timeout_get_appl_state = 5000;
083:            long timeout_return_state = 5000;
084:            RequestCorrelator corr = null;
085:            final Vector observers = new Vector(5);
086:            final HashMap map = new HashMap(7);
087:
088:            /**
089:             * All protocol names have to be unique !
090:             */
091:            public String getName() {
092:                return "STATE_TRANSFER";
093:            }
094:
095:            public void init() throws Exception {
096:                map.put("state_transfer", Boolean.TRUE);
097:                map.put("protocol_class", getClass().getName());
098:
099:            }
100:
101:            public void start() throws Exception {
102:                corr = new RequestCorrelator(getName(), this , this );
103:                passUp(new Event(Event.CONFIG, map));
104:            }
105:
106:            public void stop() {
107:                if (corr != null) {
108:                    corr.stop();
109:                    corr = null;
110:                }
111:            }
112:
113:            public boolean setProperties(Properties props) {
114:                String str;
115:
116:                super .setProperties(props);
117:                // Milliseconds to wait for application to provide requested state, events are
118:                // STATE_TRANSFER up and STATE_TRANSFER_OK down
119:                str = props.getProperty("timeout_get_appl_state");
120:                if (str != null) {
121:                    timeout_get_appl_state = Long.parseLong(str);
122:                    props.remove("timeout_get_appl_state");
123:                }
124:
125:                // Milliseconds to wait for 1 or all members to return its/their state. 0 means wait
126:                // forever. States are retrieved using GroupRequest/RequestCorrelator
127:                str = props.getProperty("timeout_return_state");
128:                if (str != null) {
129:                    timeout_return_state = Long.parseLong(str);
130:                    props.remove("timeout_return_state");
131:                }
132:
133:                if (props.size() > 0) {
134:                    log
135:                            .error("STATE_TRANSFER.setProperties(): the following properties are not recognized: "
136:                                    + props);
137:
138:                    return false;
139:                }
140:                return true;
141:            }
142:
143:            public Vector requiredUpServices() {
144:                Vector ret = new Vector(2);
145:                ret.addElement(new Integer(Event.START_QUEUEING));
146:                ret.addElement(new Integer(Event.STOP_QUEUEING));
147:                return ret;
148:            }
149:
150:            public void up(Event evt) {
151:                switch (evt.getType()) {
152:
153:                case Event.BECOME_SERVER:
154:                    is_server = true;
155:                    break;
156:
157:                case Event.SET_LOCAL_ADDRESS:
158:                    local_addr = (Address) evt.getArg();
159:                    break;
160:
161:                case Event.TMP_VIEW:
162:                case Event.VIEW_CHANGE:
163:                    Vector new_members = ((View) evt.getArg()).getMembers();
164:                    synchronized (members) {
165:                        members.removeAllElements();
166:                        if (new_members != null && new_members.size() > 0)
167:                            for (int k = 0; k < new_members.size(); k++)
168:                                members.addElement(new_members.elementAt(k));
169:                    }
170:                    break;
171:                }
172:
173:                if (corr != null)
174:                    corr.receive(evt); // will consume or pass up, depending on header
175:                else
176:                    passUp(evt);
177:            }
178:
179:            public void down(Event evt) {
180:                Object coord, state;
181:                Vector event_list;
182:                StateTransferInfo info;
183:
184:                switch (evt.getType()) {
185:
186:                case Event.TMP_VIEW:
187:                case Event.VIEW_CHANGE:
188:                    Vector new_members = ((View) evt.getArg()).getMembers();
189:                    synchronized (members) {
190:                        members.removeAllElements();
191:                        if (new_members != null && new_members.size() > 0)
192:                            for (int k = 0; k < new_members.size(); k++)
193:                                members.addElement(new_members.elementAt(k));
194:                    }
195:                    break;
196:
197:                case Event.GET_STATE: // generated by JChannel.getState()
198:                    info = (StateTransferInfo) evt.getArg();
199:                    coord = determineCoordinator();
200:
201:                    if (coord == null || coord.equals(local_addr)) {
202:                        event_list = new Vector(1);
203:                        event_list.addElement(new Event(Event.GET_STATE_OK,
204:                                new StateTransferInfo()));
205:                        passUp(new Event(Event.STOP_QUEUEING, event_list));
206:                        return; // don't pass down any further !
207:                    }
208:
209:                    try {
210:                        sendMakeCopyMessage(); // multicast MAKE_COPY to all members (including me)
211:                        state = getStateFromSingle(info.target);
212:                    } catch (Throwable t) {
213:                        if (log.isErrorEnabled())
214:                            log.error("failed sending state request", t);
215:                        state = null;
216:                    }
217:
218:                    /* Pass up the state to the application layer (insert into JChannel's event queue */
219:                    event_list = new Vector(1);
220:                    event_list.addElement(new Event(Event.GET_STATE_OK,
221:                            new StateTransferInfo(null, info.state_id, 0L,
222:                                    (byte[]) state)));
223:
224:                    /* Now stop queueing */
225:                    passUp(new Event(Event.STOP_QUEUEING, event_list));
226:                    return; // don't pass down any further !
227:
228:                case Event.GET_APPLSTATE_OK:
229:                    synchronized (state_xfer_mutex) {
230:                        info = (StateTransferInfo) evt.getArg();
231:                        cached_state = info.state;
232:                        state_xfer_mutex.notifyAll();
233:                    }
234:                    return; // don't pass down any further !
235:
236:                }
237:
238:                passDown(evt); // pass on to the layer below us
239:            }
240:
241:            /* ---------------------- Interface RequestHandler -------------------------- */
242:            public Object handle(Message msg) {
243:                StateTransferRequest req;
244:
245:                try {
246:                    req = (StateTransferRequest) msg.getObject();
247:
248:                    switch (req.getType()) {
249:                    case StateTransferRequest.MAKE_COPY:
250:                        makeCopy(req.getArg());
251:                        return null;
252:                    case StateTransferRequest.RETURN_STATE:
253:                        if (is_server)
254:                            return cached_state;
255:                        else {
256:                            if (log.isWarnEnabled())
257:                                log
258:                                        .warn("RETURN_STATE: returning null"
259:                                                + "as I'm not yet an operational state server !");
260:                            return null;
261:                        }
262:                    default:
263:                        if (log.isErrorEnabled())
264:                            log.error("type " + req.getType()
265:                                    + "is unknown in StateTransferRequest !");
266:                        return null;
267:                    }
268:                } catch (Exception e) {
269:                    if (log.isErrorEnabled())
270:                        log.error("exception is " + e);
271:                    return null;
272:                }
273:            }
274:
275:            /* ------------------- End of Interface RequestHandler ---------------------- */
276:
277:            byte[] getStateFromSingle(Address target) throws Throwable {
278:                Vector dests = new Vector(11);
279:                Message msg;
280:                StateTransferRequest r = new StateTransferRequest(
281:                        StateTransferRequest.RETURN_STATE, local_addr);
282:                RspList rsp_list;
283:                Rsp rsp;
284:                Address dest;
285:                GroupRequest req;
286:                int num_tries = 0;
287:
288:                try {
289:                    msg = new Message(null, null, Util.objectToByteBuffer(r));
290:                } catch (Exception e) {
291:                    if (log.isErrorEnabled())
292:                        log.error("exception=" + e);
293:                    return null;
294:                }
295:
296:                while (members.size() > 1 && num_tries++ < 3) { // excluding myself
297:                    dest = target != null ? target : determineCoordinator();
298:                    if (dest == null)
299:                        return null;
300:                    msg.setDest(dest);
301:                    dests.removeAllElements();
302:                    dests.addElement(dest);
303:                    req = new GroupRequest(msg, corr, dests,
304:                            GroupRequest.GET_FIRST, timeout_return_state, 0);
305:                    req.execute();
306:                    rsp_list = req.getResults();
307:                    for (int i = 0; i < rsp_list.size(); i++) { // get the first non-suspected result
308:                        rsp = (Rsp) rsp_list.elementAt(i);
309:                        if (rsp.wasReceived())
310:                            return (byte[]) rsp.getValue();
311:                    }
312:                    Util.sleep(1000);
313:                }
314:
315:                return null;
316:            }
317:
318:            //    Vector getStateFromMany(Vector targets) {
319:            //        Vector dests=new Vector(11);
320:            //        Message msg;
321:            //        StateTransferRequest r=new StateTransferRequest(StateTransferRequest.RETURN_STATE, local_addr);
322:            //        RspList rsp_list;
323:            //        GroupRequest req;
324:            //        int i;
325:            //
326:            //
327:            //        if(targets != null) {
328:            //            for(i=0; i < targets.size(); i++)
329:            //                if(!local_addr.equals(targets.elementAt(i)))
330:            //                    dests.addElement(targets.elementAt(i));
331:            //        }
332:            //        else {
333:            //            for(i=0; i < members.size(); i++)
334:            //                if(!local_addr.equals(members.elementAt(i)))
335:            //                    dests.addElement(members.elementAt(i));
336:            //        }
337:            //
338:            //        if(dests.size() == 0)
339:            //            return null;
340:            //
341:            //        msg=new Message();
342:            //        try {
343:            //            msg.setBuffer(Util.objectToByteBuffer(r));
344:            //        }
345:            //        catch(Exception e) {
346:            //        }
347:            //
348:            //        req=new GroupRequest(msg, corr, dests, GroupRequest.GET_ALL, timeout_return_state, 0);
349:            //        req.execute();
350:            //        rsp_list=req.getResults();
351:            //        return rsp_list.getResults();
352:            //    }
353:
354:            void sendMakeCopyMessage() throws Throwable {
355:                GroupRequest req;
356:                Message msg = new Message();
357:                StateTransferRequest r = new StateTransferRequest(
358:                        StateTransferRequest.MAKE_COPY, local_addr);
359:                Vector dests = new Vector(11);
360:
361:                for (int i = 0; i < members.size(); i++)
362:                    dests.addElement(members.elementAt(i));
363:
364:                if (dests.size() == 0)
365:                    return;
366:
367:                try {
368:                    msg.setBuffer(Util.objectToByteBuffer(r));
369:                } catch (Exception e) {
370:                }
371:
372:                req = new GroupRequest(msg, corr, dests, GroupRequest.GET_ALL,
373:                        timeout_return_state, 0);
374:                req.execute();
375:            }
376:
377:            /**
378:             * Return the first element of members which is not me. Otherwise return null.
379:             */
380:            Address determineCoordinator() {
381:                Address ret = null;
382:                if (members != null && members.size() > 1) {
383:                    for (int i = 0; i < members.size(); i++)
384:                        if (!local_addr.equals(members.elementAt(i)))
385:                            return (Address) members.elementAt(i);
386:                }
387:                return ret;
388:            }
389:
390:            /**
391:             * If server, ask application to send us a copy of its state (STATE_TRANSFER up,
392:             * STATE_TRANSFER down). If client, start queueing events. Queuing will be stopped when
393:             * state has been retrieved (or not) from single or all member(s).
394:             */
395:            void makeCopy(Object sender) {
396:                if (sender.equals(local_addr)) { // was sent by us, has to start queueing
397:                    passUp(new Event(Event.START_QUEUEING));
398:                } else { // only retrieve state from appl when not in client state anymore
399:                    if (is_server) { // get state from application and store it locally
400:                        synchronized (state_xfer_mutex) {
401:                            cached_state = null;
402:                            StateTransferInfo info = new StateTransferInfo(
403:                                    local_addr);
404:                            passUp(new Event(Event.GET_APPLSTATE, info));
405:                            if (cached_state == null) {
406:                                try {
407:                                    state_xfer_mutex
408:                                            .wait(timeout_get_appl_state); // wait for STATE_TRANSFER_OK
409:                                } catch (Exception e) {
410:                                }
411:                            }
412:                        }
413:                    }
414:                }
415:            }
416:
417:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.