Source Code Cross Referenced for OnceAndOnlyOnceProtocolNetworkLayerImpl.java in  » Net » Terracotta » com » tc » net » protocol » delivery » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » Terracotta » com.tc.net.protocol.delivery 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003:         * notice. All rights reserved.
004:         */
005:        package com.tc.net.protocol.delivery;
006:
007:        import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
008:
009:        import com.tc.async.api.Sink;
010:        import com.tc.bytes.TCByteBuffer;
011:        import com.tc.exception.TCRuntimeException;
012:        import com.tc.logging.TCLogger;
013:        import com.tc.logging.TCLogging;
014:        import com.tc.net.MaxConnectionsExceededException;
015:        import com.tc.net.TCSocketAddress;
016:        import com.tc.net.core.TCConnection;
017:        import com.tc.net.protocol.NetworkLayer;
018:        import com.tc.net.protocol.NetworkStackID;
019:        import com.tc.net.protocol.TCNetworkMessage;
020:        import com.tc.net.protocol.TCProtocolException;
021:        import com.tc.net.protocol.tcm.MessageChannelInternal;
022:        import com.tc.net.protocol.transport.AbstractMessageTransport;
023:        import com.tc.net.protocol.transport.ConnectionID;
024:        import com.tc.net.protocol.transport.MessageTransport;
025:        import com.tc.net.protocol.transport.WireProtocolMessage;
026:        import com.tc.util.Assert;
027:        import com.tc.util.DebugUtil;
028:        import com.tc.util.TCTimeoutException;
029:
030:        import java.io.IOException;
031:        import java.net.UnknownHostException;
032:        import java.util.Random;
033:
034:        /**
035:         * NetworkLayer implementation for once and only once message delivery protocol.
036:         */
037:        public class OnceAndOnlyOnceProtocolNetworkLayerImpl extends
038:                AbstractMessageTransport implements 
039:                OnceAndOnlyOnceProtocolNetworkLayer, OOOProtocolMessageDelivery {
040:            private static final TCLogger logger = TCLogging
041:                    .getLogger(OnceAndOnlyOnceProtocolNetworkLayerImpl.class);
042:            private final OOOProtocolMessageFactory messageFactory;
043:            private final OOOProtocolMessageParser messageParser;
044:            boolean wasConnected = false;
045:            private MessageChannelInternal receiveLayer;
046:            private MessageTransport sendLayer;
047:            private GuaranteedDeliveryProtocol delivery;
048:            private final SynchronizedBoolean reconnectMode = new SynchronizedBoolean(
049:                    false);
050:            private final SynchronizedBoolean handshakeMode = new SynchronizedBoolean(
051:                    false);
052:            private final SynchronizedBoolean channelConnected = new SynchronizedBoolean(
053:                    false);
054:            private boolean isClosed = false;
055:            private final boolean isClient;
056:            private final String debugId;
057:            private short sessionId = -1;
058:            private static final boolean debug = false;
059:
060:            public OnceAndOnlyOnceProtocolNetworkLayerImpl(
061:                    OOOProtocolMessageFactory messageFactory,
062:                    OOOProtocolMessageParser messageParser, Sink workSink,
063:                    boolean isClient) {
064:                super (logger);
065:                this .messageFactory = messageFactory;
066:                this .messageParser = messageParser;
067:                this .isClient = isClient;
068:                this .delivery = new GuaranteedDeliveryProtocol(this , workSink,
069:                        isClient);
070:                this .delivery.start();
071:                this .delivery.pause();
072:                this .sessionId = (this .isClient) ? -1 : newRandomSessionId();
073:                this .debugId = (this .isClient) ? "CLIENT" : "SERVER";
074:            }
075:
076:            /*********************************************************************************************************************
077:             * Network layer interface...
078:             */
079:
080:            public void setSendLayer(NetworkLayer layer) {
081:                if (!(layer instanceof  MessageTransport)) {
082:                    throw new IllegalArgumentException(
083:                            "Error: send layer must be MessageTransport!");
084:                }
085:                this .setSendLayer((MessageTransport) layer);
086:            }
087:
088:            public void setSendLayer(MessageTransport transport) {
089:                this .sendLayer = transport;
090:            }
091:
092:            public void setReceiveLayer(NetworkLayer layer) {
093:                if (!(layer instanceof  MessageChannelInternal)) {
094:                    throw new IllegalArgumentException(
095:                            "Error: receive layer must be MessageChannelInternal, was "
096:                                    + layer.getClass().getName());
097:                }
098:                this .receiveLayer = (MessageChannelInternal) layer;
099:            }
100:
101:            public void send(TCNetworkMessage message) {
102:                delivery.send(message);
103:            }
104:
105:            public void receive(TCByteBuffer[] msgData) {
106:                OOOProtocolMessage msg = createProtocolMessage(msgData);
107:                debugLog("receive -> " + msg.getHeader().toString());
108:                if (msg.isSend() || msg.isAck()) {
109:                    Assert.inv(!handshakeMode.get());
110:                    Assert.inv(channelConnected.get());
111:                    if (sessionId != msg.getSessionId())
112:                        return; // drop bad message
113:                    delivery.receive(msg);
114:                } else if (msg.isHandshake()) {
115:                    Assert.inv(!isClient);
116:                    debugLog("Got Handshake message...");
117:                    if (msg.getSessionId() == -1) {
118:                        debugLog("A brand new client is trying to connect - reply OK");
119:                        OOOProtocolMessage reply = createHandshakeReplyOkMessage(delivery
120:                                .getReceiver().getReceived().get());
121:                        sendMessage(reply);
122:                        delivery.resume();
123:                        delivery.receive(createHandshakeReplyOkMessage(-1));
124:                        handshakeMode.set(false);
125:                        if (!channelConnected.get()) {
126:                            channelConnected.set(true);
127:                            receiveLayer.notifyTransportConnected(this );
128:                        }
129:                        reconnectMode.set(false);
130:                    } else if (msg.getSessionId() == getSessionId()) {
131:                        debugLog("A same-session client is trying to connect - reply OK");
132:                        OOOProtocolMessage reply = createHandshakeReplyOkMessage(delivery
133:                                .getReceiver().getReceived().get());
134:                        sendMessage(reply);
135:                        handshakeMode.set(false);
136:                        delivery.resume();
137:                        // tell local sender the ackseq of client
138:                        delivery.receive(createHandshakeReplyOkMessage(msg
139:                                .getAckSequence()));
140:                        if (!channelConnected.get()) {
141:                            channelConnected.set(true);
142:                            receiveLayer.notifyTransportConnected(this );
143:                        }
144:                        reconnectMode.set(false);
145:                    } else {
146:                        debugLog("A DIFF-session client is trying to connect - reply FAIL");
147:                        OOOProtocolMessage reply = createHandshakeReplyFailMessage(delivery
148:                                .getReceiver().getReceived().get());
149:                        sendMessage(reply);
150:                        handshakeMode.set(false);
151:                        if (channelConnected.get())
152:                            receiveLayer.notifyTransportDisconnected(this );
153:                        channelConnected.set(false);
154:                        resetStack();
155:                        delivery.resume();
156:                        delivery.receive(reply);
157:                        if (!channelConnected.get()) {
158:                            channelConnected.set(true);
159:                            receiveLayer.notifyTransportConnected(this );
160:                        }
161:                        reconnectMode.set(false);
162:                    }
163:                } else if (msg.isHandshakeReplyOk()) {
164:                    Assert.inv(isClient);
165:                    Assert.inv(handshakeMode.get());
166:                    debugLog("Got reply OK");
167:                    // current session is still ok:
168:                    // 1. might have to resend some messages
169:                    // 2. no need to signal to Higher Level
170:                    handshakeMode.set(false);
171:                    sessionId = msg.getSessionId();
172:                    delivery.resume();
173:                    delivery.receive(msg);
174:                    if (!channelConnected.get()) {
175:                        channelConnected.set(true);
176:                        receiveLayer.notifyTransportConnected(this );
177:                    }
178:                    reconnectMode.set(false);
179:                } else if (msg.isHandshakeReplyFail()) {
180:                    debugLog("Received handshake fail reply");
181:                    Assert.inv(isClient);
182:                    Assert.inv(handshakeMode.get());
183:                    // we did not synch'ed the existing session.
184:                    // 1. clear OOO state (drop messages, clear counters, etc)
185:                    // 2. set the new session
186:                    // 3. signal Higher Lever to re-synch
187:                    if (channelConnected.get())
188:                        receiveLayer.notifyTransportDisconnected(this );
189:                    channelConnected.set(false);
190:                    resetStack();
191:                    sessionId = msg.getSessionId();
192:                    handshakeMode.set(false);
193:                    delivery.resume();
194:                    delivery.receive(msg);
195:                    if (!channelConnected.get()) {
196:                        channelConnected.set(true);
197:                        receiveLayer.notifyTransportConnected(this );
198:                    }
199:                } else if (msg.isGoodbye()) {
200:                    debugLog("Got GoodBye message - shutting down");
201:                    isClosed = true;
202:                    sendLayer.close();
203:                    receiveLayer.close();
204:                    delivery.pause();
205:                } else {
206:                    Assert.inv(false);
207:                }
208:            }
209:
210:            private void debugLog(String msg) {
211:                if (debug) {
212:                    DebugUtil.trace("OOOLayer-" + debugId + "-"
213:                            + sendLayer.getConnectionId() + " -> " + msg);
214:                }
215:            }
216:
217:            public boolean isConnected() {
218:                return (channelConnected.get() && !delivery.isPaused());
219:            }
220:
221:            public NetworkStackID open() throws TCTimeoutException,
222:                    UnknownHostException, IOException,
223:                    MaxConnectionsExceededException {
224:                Assert.assertNotNull(sendLayer);
225:                return sendLayer.open();
226:            }
227:
228:            public void close() {
229:                Assert.assertNotNull(sendLayer);
230:                // send goobye message with session-id on it
231:                OOOProtocolMessage opm = messageFactory
232:                        .createNewGoodbyeMessage(getSessionId());
233:                sendLayer.send(opm);
234:                sendLayer.close();
235:            }
236:
237:            /*********************************************************************************************************************
238:             * Transport listener interface...
239:             */
240:
241:            public void notifyTransportConnected(MessageTransport transport) {
242:                handshakeMode.set(true);
243:                if (isClient) {
244:                    OOOProtocolMessage handshake = createHandshakeMessage(delivery
245:                            .getReceiver().getReceived().get());
246:                    debugLog("Sending Handshake message...");
247:                    sendMessage(handshake);
248:                } else {
249:                    // resue for missing transportDisconnected events
250:                    if (!delivery.isPaused()) {
251:                        notifyTransportDisconnected(null);
252:                    }
253:                }
254:                reconnectMode.set(false);
255:            }
256:
257:            public void notifyTransportDisconnected(MessageTransport transport) {
258:                final boolean restoreConnectionMode = reconnectMode.get();
259:                debugLog("Transport Disconnected - pausing delivery, restoreConnection = "
260:                        + restoreConnectionMode);
261:                this .delivery.pause();
262:                if (!restoreConnectionMode) {
263:                    if (channelConnected.get())
264:                        receiveLayer.notifyTransportDisconnected(this );
265:                    channelConnected.set(false);
266:                }
267:            }
268:
269:            public void start() {
270:                //
271:            }
272:
273:            public void pause() {
274:                this .delivery.pause();
275:            }
276:
277:            public void resume() {
278:                this .delivery.resume();
279:            }
280:
281:            public void notifyTransportConnectAttempt(MessageTransport transport) {
282:                if (!reconnectMode.get()) {
283:                    receiveLayer.notifyTransportConnectAttempt(this );
284:                }
285:            }
286:
287:            public void notifyTransportClosed(MessageTransport transport) {
288:                // XXX: do we do anything here? We've probably done everything we need to do when close() was called.
289:                debugLog("Transport Closed - notifying higher layer");
290:                receiveLayer.notifyTransportClosed(this );
291:                channelConnected.set(false);
292:            }
293:
294:            /*********************************************************************************************************************
295:             * Protocol Message Delivery interface
296:             */
297:
298:            public OOOProtocolMessage createHandshakeMessage(long ack) {
299:                OOOProtocolMessage rv = this .messageFactory
300:                        .createNewHandshakeMessage(getSessionId(), ack);
301:                return rv;
302:            }
303:
304:            public OOOProtocolMessage createHandshakeReplyOkMessage(long ack) {
305:                // FIXME: need to use correct ack
306:                OOOProtocolMessage rv = this .messageFactory
307:                        .createNewHandshakeReplyOkMessage(getSessionId(), ack);
308:                return rv;
309:            }
310:
311:            public OOOProtocolMessage createHandshakeReplyFailMessage(long ack) {
312:                // FIXME: need to use correct ack
313:                OOOProtocolMessage rv = this .messageFactory
314:                        .createNewHandshakeReplyFailMessage(getSessionId(), ack);
315:                return rv;
316:            }
317:
318:            private short getSessionId() {
319:                return sessionId;
320:            }
321:
322:            public OOOProtocolMessage createAckMessage(long ack) {
323:                return (this .messageFactory.createNewAckMessage(getSessionId(),
324:                        ack));
325:            }
326:
327:            public boolean sendMessage(OOOProtocolMessage msg) {
328:                // this method doesn't do anything at the moment, but it is a good spot to plug in things you might want to do
329:                // every message flowing down from the layer (like logging for example)
330:                if (this .sendLayer.isConnected()) {
331:                    this .sendLayer.send(msg);
332:                    return (true);
333:                } else {
334:                    return (false);
335:                }
336:            }
337:
338:            public void receiveMessage(OOOProtocolMessage msg) {
339:                Assert.assertNotNull("Receive layer is null.",
340:                        this .receiveLayer);
341:                Assert.assertNotNull("Attempt to null msg", msg);
342:                Assert.eval(msg.isSend());
343:
344:                this .receiveLayer.receive(msg.getPayload());
345:            }
346:
347:            public OOOProtocolMessage createProtocolMessage(long sequence,
348:                    final TCNetworkMessage msg) {
349:                OOOProtocolMessage rv = messageFactory.createNewSendMessage(
350:                        getSessionId(), sequence, msg);
351:                final Runnable callback = msg.getSentCallback();
352:                if (callback != null) {
353:                    rv.setSentCallback(new Runnable() {
354:                        public void run() {
355:                            callback.run();
356:                        }
357:                    });
358:                }
359:
360:                return rv;
361:            }
362:
363:            private OOOProtocolMessage createProtocolMessage(
364:                    TCByteBuffer[] msgData) {
365:                try {
366:                    return messageParser.parseMessage(msgData);
367:                } catch (TCProtocolException e) {
368:                    // XXX: this isn't the right thing to do here
369:                    throw new TCRuntimeException(e);
370:                }
371:            }
372:
373:            public void attachNewConnection(TCConnection connection) {
374:                throw new AssertionError("Must not call!");
375:            }
376:
377:            public void setAllowConnectionReplace(boolean allow) {
378:                throw new AssertionError("Must not call!");
379:            }
380:
381:            public ConnectionID getConnectionId() {
382:                return sendLayer != null ? sendLayer.getConnectionId() : null;
383:            }
384:
385:            public TCSocketAddress getLocalAddress() {
386:                return sendLayer.getLocalAddress();
387:            }
388:
389:            public TCSocketAddress getRemoteAddress() {
390:                return sendLayer.getRemoteAddress();
391:            }
392:
393:            public void receiveTransportMessage(WireProtocolMessage message) {
394:                throw new AssertionError("Must not call!");
395:            }
396:
397:            public void sendToConnection(TCNetworkMessage message) {
398:                throw new AssertionError("Must not call!");
399:            }
400:
401:            public void startRestoringConnection() {
402:                debugLog("Switched to restoreConnection mode");
403:                reconnectMode.set(true);
404:            }
405:
406:            public void connectionRestoreFailed() {
407:                debugLog("RestoreConnectionFailed - resetting stack");
408:                if (channelConnected.get()) {
409:                    receiveLayer.notifyTransportDisconnected(this );
410:                    channelConnected.set(false);
411:                }
412:                reconnectMode.set(false);
413:                delivery.pause();
414:                delivery.reset();
415:                sessionId = newRandomSessionId();
416:            }
417:
418:            private void resetStack() {
419:                // we need to reset because we are talking to a new stack on the other side
420:                reconnectMode.set(false);
421:                delivery.pause();
422:                delivery.reset();
423:            }
424:
425:            public boolean isClosed() {
426:                return isClosed;
427:            }
428:
429:            private short newRandomSessionId() {
430:                // generate a random session id
431:                Random r = new Random();
432:                r.setSeed(System.currentTimeMillis());
433:                return ((short) r.nextInt(Short.MAX_VALUE));
434:            }
435:
436:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.