Source Code Cross Referenced for NodeServer.java in  » Database-ORM » beankeeper » hu » netmind » persistence » node » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Database ORM » beankeeper » hu.netmind.persistence.node 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /**
002:         * Copyright (C) 2006 NetMind Consulting Bt.
003:         *
004:         * This library is free software; you can redistribute it and/or
005:         * modify it under the terms of the GNU Lesser General Public
006:         * License as published by the Free Software Foundation; either
007:         * version 3 of the License, or (at your option) any later version.
008:         *
009:         * This library is distributed in the hope that it will be useful,
010:         * but WITHOUT ANY WARRANTY; without even the implied warranty of
011:         * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
012:         * Lesser General Public License for more details.
013:         *
014:         * You should have received a copy of the GNU Lesser General Public
015:         * License along with this library; if not, write to the Free Software
016:         * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
017:         */package hu.netmind.persistence.node;
018:
019:        import hu.netmind.persistence.*;
020:        import hu.netmind.persistence.event.*;
021:        import java.io.*;
022:        import java.net.*;
023:        import java.util.*;
024:        import org.apache.log4j.Logger;
025:
026:        /**
027:         * Implementation of a server node. This is a synchronization
028:         * point for all nodes connected to the same database.
029:         * @author Brautigam Robert
030:         * @version Revision: $Revision$
031:         */
032:        public class NodeServer implements  ServiceProvider, Runnable {
033:            private static Logger logger = Logger.getLogger(NodeServer.class);
034:            private StoreContext context;
035:            private ServerSocket serverSocket;
036:            private Vector clientHandlers = new Vector();
037:            private boolean running = true;
038:            private boolean active = false;
039:
040:            private RemoteLockTracker remoteLockTracker;
041:            private CommitAndQueryTracker commitAndQueryTracker;
042:            private ModificationTracker modificationTracker;
043:
044:            public NodeServer(StoreContext context) {
045:                this .context = context;
046:                commitAndQueryTracker = new CommitAndQueryTracker(context);
047:                modificationTracker = new ModificationTracker(context);
048:                remoteLockTracker = new RemoteLockTracker(modificationTracker);
049:            }
050:
051:            /**
052:             *  Activate server, this means read the maximal serial from 
053:             *  database and set the active flag.
054:             */
055:            public void activate() {
056:                active = true;
057:            }
058:
059:            public boolean isActive() {
060:                return active;
061:            }
062:
063:            public ServerSocket getServerSocket() {
064:                return serverSocket;
065:            }
066:
067:            public void setServerSocket(ServerSocket serverSocket) {
068:                this .serverSocket = serverSocket;
069:            }
070:
071:            /**
072:             * Broadcast a message to all connected clients.
073:             */
074:            public void broadcastObject(CommObject obj) {
075:                // Safe-copy all clients
076:                Vector handlers = null;
077:                synchronized (clientHandlers) {
078:                    handlers = new Vector(clientHandlers);
079:                }
080:                // Broadcast message
081:                for (int i = 0; i < handlers.size(); i++) {
082:                    ClientHandler handler = (ClientHandler) handlers.get(i);
083:                    try {
084:                        handler.sendObject(obj);
085:                    } catch (Exception e) {
086:                        logger.warn("could not deliver message '" + obj
087:                                + "' to client.", e);
088:                    }
089:                }
090:            }
091:
092:            /**
093:             * Setup server and bind to a random port.
094:             */
095:            public void bind() {
096:                try {
097:                    // Setup channel
098:                    serverSocket = new ServerSocket(0); // Choose an ip:port
099:                    // Start listening
100:                    Thread listenerThread = new Thread(this );
101:                    listenerThread.setName("Persistence-node");
102:                    listenerThread.setDaemon(true);
103:                    listenerThread.start();
104:                } catch (Exception e) {
105:                    throw new StoreException(
106:                            "exception while binding to server port", e);
107:                }
108:            }
109:
110:            /**
111:             * Get the server addresses from interfaces.
112:             */
113:            public static String getHostAddresses() {
114:                try {
115:                    Enumeration interfaceEnumeration = NetworkInterface
116:                            .getNetworkInterfaces();
117:                    // Copy from enumeration to addresses vector, but filter loopback addresses
118:                    Vector addresses = new Vector();
119:                    while (interfaceEnumeration.hasMoreElements()) {
120:                        NetworkInterface intf = (NetworkInterface) interfaceEnumeration
121:                                .nextElement();
122:                        // Remove loopback addresses
123:                        Enumeration addressEnumeration = intf
124:                                .getInetAddresses();
125:                        while (addressEnumeration.hasMoreElements()) {
126:                            InetAddress address = (InetAddress) addressEnumeration
127:                                    .nextElement();
128:                            // Insert to addresses only if not loopback
129:                            if (!address.isLoopbackAddress())
130:                                addresses.add(address);
131:                        }
132:                    }
133:                    // Pick one address from the remaining address space
134:                    logger.debug("server available local addresses: "
135:                            + addresses);
136:                    // Now, multiple addresses are in the list, so copy all of them
137:                    // into the result string.
138:                    StringBuffer ips = new StringBuffer();
139:                    for (int i = 0; i < addresses.size(); i++) {
140:                        InetAddress address = (InetAddress) addresses.get(i);
141:                        if (ips.length() > 0)
142:                            ips.append(",");
143:                        ips.append(address.getHostAddress());
144:                    }
145:                    return ips.toString();
146:                } catch (StoreException e) {
147:                    throw e;
148:                } catch (Exception e) {
149:                    throw new StoreException(
150:                            "exception while determining server address", e);
151:                }
152:            }
153:
154:            /**
155:             * Disconnect the server.
156:             */
157:            public void disconnect() {
158:                // Set to not running
159:                active = false;
160:                running = false;
161:                // Close server socket
162:                try {
163:                    serverSocket.close();
164:                } catch (Exception e) {
165:                    logger.error("error while disconnecting server socket", e);
166:                }
167:                // Close all client sockets
168:                synchronized (clientHandlers) {
169:                    for (int i = 0; i < clientHandlers.size(); i++) {
170:                        ClientHandler handler = (ClientHandler) clientHandlers
171:                                .get(i);
172:                        handler.disconnect();
173:                    }
174:                }
175:            }
176:
177:            /**
178:             * Run listener thread. This thread accepts incoming connections,
179:             * and incoming socket data.
180:             */
181:            public void run() {
182:                try {
183:                    while (running) {
184:                        // Listen for socket
185:                        Socket socket = serverSocket.accept();
186:                        logger.debug("server received connect from client...");
187:                        // Enter into client sockets
188:                        ClientHandler handler = new ClientHandler(socket);
189:                        synchronized (clientHandlers) {
190:                            clientHandlers.add(handler);
191:                        }
192:                        // Run thread to handle socket
193:                        Thread handlerThread = new Thread(handler);
194:                        handlerThread.setName("Persistence-handler");
195:                        handlerThread.setDaemon(true);
196:                        handlerThread.start();
197:                    }
198:                } catch (Exception e) {
199:                    if (running) // If it should be running, set node manager to new state
200:                    {
201:                        logger.warn("server socket threw error", e);
202:                        context.getNodeManager().ensureState(
203:                                NodeManager.STATE_UNINITIALIZED);
204:                    } else {
205:                        logger.debug("server socket was shutdown: "
206:                                + e.getMessage());
207:                    }
208:                } finally {
209:                    try {
210:                        serverSocket.close();
211:                    } catch (Exception e) {
212:                        logger
213:                                .warn(
214:                                        "error while closing socket, may already been closed",
215:                                        e);
216:                    }
217:                }
218:            }
219:
220:            /**
221:             * Client handler class runs on a separate thread, and handles
222:             * communication to a specific client. Note that the whole
223:             * communication could be implemented using nio, but the client
224:             * <strong>should</strong> be handled on another thread, because
225:             * the communication is a synchron request-response protocol.
226:             */
227:            public class ClientHandler implements  Runnable {
228:                private Socket socket;
229:                private ObjectInputStream oInput;
230:                private ObjectOutputStream oOutput;
231:                private int index;
232:
233:                public ClientHandler(Socket socket) {
234:                    this .socket = socket;
235:                }
236:
237:                /**
238:                 * Send an object to the client.
239:                 */
240:                public synchronized void sendObject(CommObject obj)
241:                        throws IOException {
242:                    if (logger.isDebugEnabled())
243:                        logger.debug("outgoing message to: " + index
244:                                + ", object: " + obj);
245:                    // Write to output
246:                    oOutput.writeObject(obj);
247:                }
248:
249:                /**
250:                 * Handle a single request object.
251:                 */
252:                public CommObject handleObject(CommObject obj) {
253:                    if (logger.isDebugEnabled())
254:                        logger.debug("incoming message from: " + index
255:                                + ", object: " + obj);
256:                    try {
257:                        // Initialization message
258:                        if (obj instanceof  InitMessage) {
259:                            index = ((InitMessage) obj).getNodeIndex();
260:                            // If not active, maybe the client node detected earlier
261:                            // that the current server node is out, so check with
262:                            // a re-connect. If the server node did not die, we loose
263:                            // our cache and current transactions! But if the server
264:                            // node really died, we reconnect and accept the client's
265:                            // init request immediately.
266:                            if (!active) {
267:                                logger
268:                                        .info("server got init request from client: "
269:                                                + index
270:                                                + ", try to reconnect to server to check, whether it's available");
271:                                context.getNodeManager().ensureState(
272:                                        NodeManager.STATE_INITIALIZED);
273:                                context.getNodeManager().ensureState(
274:                                        NodeManager.STATE_CONNECTED);
275:                            }
276:                            // If active, accept init request
277:                            if (!active)
278:                                return new GenericResponse(
279:                                        GenericResponse.SERVER_INACTIVE); // Server not active
280:                            else
281:                                return new SerialResponse(
282:                                        GenericResponse.ACTION_SUCCESS,
283:                                        getNextSerial()); // Ok
284:                        }
285:                        // Serial request
286:                        if (obj instanceof  SerialRequest) {
287:                            return new SerialResponse(
288:                                    GenericResponse.ACTION_SUCCESS,
289:                                    getNextSerial());
290:                        }
291:                        // Update cache message
292:                        if (obj instanceof  CacheUpdateRequest) {
293:                            CacheUpdateRequest req = (CacheUpdateRequest) obj;
294:                            context.getCache().updateEntries(
295:                                    req.getTableName(), req.getSerial());
296:                            broadcastObject(new CacheUpdateRequest(req
297:                                    .getTableName(), req.getSerial()));
298:                        }
299:                        // Lock message
300:                        if (obj instanceof  LockRequest) {
301:                            // Only "remote" lock tracker needs to be asked, because
302:                            // all tracked objects (local also) will be present
303:                            LockRequest req = (LockRequest) obj;
304:                            SessionInfo info = remoteLockTracker.lock(index,
305:                                    req.getThreadId(), req.getTxSerial(), req
306:                                            .getMetas(), req.getSessionInfo(),
307:                                    req.getWait(), req.getEnsureCurrent());
308:                            if (info != null)
309:                                return new LockResponse(
310:                                        GenericResponse.ALREADY_LOCKED, info);
311:                            else
312:                                return new LockResponse(
313:                                        GenericResponse.ACTION_SUCCESS, null);
314:                        }
315:                        // Unlock
316:                        if (obj instanceof  UnlockRequest) {
317:                            UnlockRequest req = (UnlockRequest) obj;
318:                            remoteLockTracker.unlock(index, req.getThreadId(),
319:                                    req.getTxSerial(), req.getMetas());
320:                        }
321:                        // Wait for query
322:                        if (obj instanceof  QueryRequest) {
323:                            QueryRequest req = (QueryRequest) obj;
324:                            commitAndQueryTracker.waitForQuery(req.getSerial());
325:                        }
326:                        // Start query
327:                        if (obj instanceof  CommitStartRequest) {
328:                            CommitStartRequest req = (CommitStartRequest) obj;
329:                            return new SerialResponse(
330:                                    GenericResponse.ACTION_SUCCESS,
331:                                    commitAndQueryTracker.startCommit(index));
332:                        }
333:                        // End query
334:                        if (obj instanceof  CommitEndRequest) {
335:                            CommitEndRequest req = (CommitEndRequest) obj;
336:                            endCommit(index, req.getSerial());
337:                        }
338:                        // Change management
339:                        if (obj instanceof  NotifyChangeRequest) {
340:                            NotifyChangeRequest req = (NotifyChangeRequest) obj;
341:                            notifyChange(req.getMetas(), req.getEndSerial(),
342:                                    req.getTxSerial());
343:                        }
344:                    } catch (Exception e) {
345:                        logger.warn("message request was not successful.", e);
346:                        return new GenericResponse(
347:                                GenericResponse.UNEXPECTED_ERROR);
348:                    }
349:                    return null;
350:                }
351:
352:                /**
353:                 * Disconnect socket.
354:                 */
355:                public void disconnect() {
356:                    try {
357:                        oInput.close();
358:                        oOutput.close();
359:                        socket.close();
360:                    } catch (Exception e) {
361:                        logger.warn("error while disconnecting client", e);
362:                    }
363:                }
364:
365:                /**
366:                 * Handle communication in this thread.
367:                 */
368:                public void run() {
369:                    try {
370:                        // Allocate streams
371:                        oInput = new ObjectInputStream(socket.getInputStream());
372:                        oOutput = new ObjectOutputStream(socket
373:                                .getOutputStream());
374:                        // Wait for objects and send responses
375:                        CommObject obj = null;
376:                        while ((obj = (CommObject) oInput.readObject()) != null) {
377:                            CommObject response = handleObject(obj);
378:                            if (response != null)
379:                                sendObject(response);
380:                            else
381:                                sendObject(new GenericResponse(0));
382:                        }
383:                    } catch (Exception e) {
384:                        if (index == 0)
385:                            logger
386:                                    .debug(
387:                                            "there was a connection with no communication, probably an empty connect for testing",
388:                                            e);
389:                        else
390:                            logger.error(
391:                                    "error while communication with client: "
392:                                            + index, e);
393:                        // Close socket
394:                        try {
395:                            socket.close();
396:                        } catch (Exception f) {
397:                            logger.error(
398:                                    "error while closing client socket, client: "
399:                                            + index, f);
400:                        }
401:                    } finally {
402:                        // Remove from socket handlers
403:                        synchronized (clientHandlers) {
404:                            clientHandlers.remove(this );
405:                        }
406:                        // Relinquish all locks associated with this client
407:                        remoteLockTracker.unlockAll(index);
408:                        commitAndQueryTracker.endAllCommits(index);
409:                    }
410:                }
411:            }
412:
413:            /*
414:             * Implementing the service provider interface.
415:             */
416:
417:            /**
418:             * Get a new serial number.
419:             */
420:            public Long getNextSerial() {
421:                return context.getSerialTracker().getNextSerial();
422:            }
423:
424:            /**
425:             * Update entries of client caches according to received parameters.
426:             */
427:            public void updateEntries(String tableName, Long serial) {
428:                // Remote caches
429:                broadcastObject(new CacheUpdateRequest(tableName, serial));
430:            }
431:
432:            /**
433:             * Lock an object.
434:             */
435:            public SessionInfo lock(int index, long threadId, long txSerial,
436:                    List metas, SessionInfo info, int wait,
437:                    boolean ensureCurrent) {
438:                logger.debug("server locking with: " + metas.size()
439:                        + " objects, wait: " + wait);
440:                SessionInfo result = remoteLockTracker.lock(index, threadId,
441:                        txSerial, metas, info, wait, ensureCurrent);
442:                return result;
443:            }
444:
445:            /**
446:             * Unlock object.
447:             */
448:            public void unlock(int index, long threadId, long txSerial,
449:                    List metas) {
450:                remoteLockTracker.unlock(index, threadId, txSerial, metas);
451:            }
452:
453:            /**
454:             * Wait for a query to execute with the given serial.
455:             * This method returns, if all commits before the given serial
456:             * are finished.
457:             */
458:            public void waitForQuery(Long serial) {
459:                commitAndQueryTracker.waitForQuery(serial);
460:            }
461:
462:            /**
463:             * Wait for starting a commit. The commit can start if
464:             * there are no queries executed with greater serial.
465:             * @return The serial the commit can run with.
466:             */
467:            public Long startCommit(int index) {
468:                return commitAndQueryTracker.startCommit(index);
469:            }
470:
471:            /**
472:             * Mark the end of a commit phase.
473:             */
474:            public void endCommit(int index, Long serial) {
475:                commitAndQueryTracker.endCommit(index, serial);
476:                modificationTracker.endTransaction(serial);
477:            }
478:
479:            /**
480:             * Add a change notification to change tracker.
481:             */
482:            public void notifyChange(List metas, Long endSerial, Long txSerial) {
483:                modificationTracker
484:                        .changeCandidates(metas, endSerial, txSerial);
485:            }
486:
487:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.