Source Code Cross Referenced for JODBServer.java in  » Database-DBMS » JODB » com » mobixess » jodb » core » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Database DBMS » JODB » com.mobixess.jodb.core 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:        Copyright (C) 2007  Mobixess Inc. http://www.java-objects-database.com
003:
004:        This file is part of the JODB (Java Objects Database) open source project.
005:
006:        JODB is free software; you can redistribute it and/or modify it under
007:        the terms of version 2 of the GNU General Public License as published
008:        by the Free Software Foundation.
009:
010:        JODB is distributed in the hope that it will be useful, but WITHOUT ANY
011:        WARRANTY; without even the implied warranty of MERCHANTABILITY or
012:        FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
013:        for more details.
014:
015:        You should have received a copy of the GNU General Public License along
016:        with this program; if not, write to the Free Software Foundation, Inc.,
017:        59 Temple Place - Suite 330, Boston, MA  02111-1307, USA. 
018:         */
019:        package com.mobixess.jodb.core;
020:
021:        import java.io.DataInputStream;
022:        import java.io.IOException;
023:        import java.io.InputStream;
024:        import java.io.Serializable;
025:        import java.net.InetSocketAddress;
026:        import java.net.ServerSocket;
027:        import java.net.Socket;
028:        import java.net.URI;
029:        import java.nio.channels.ServerSocketChannel;
030:        import java.nio.channels.SocketChannel;
031:        import java.nio.channels.spi.SelectorProvider;
032:        import java.rmi.Naming;
033:        import java.rmi.NotBoundException;
034:        import java.rmi.RemoteException;
035:        import java.rmi.server.UnicastRemoteObject;
036:        import java.util.concurrent.ExecutionException;
037:        import java.util.concurrent.ExecutorService;
038:        import java.util.concurrent.Executors;
039:        import java.util.concurrent.Future;
040:        import java.util.concurrent.ScheduledThreadPoolExecutor;
041:        import java.util.concurrent.Semaphore;
042:        import java.util.concurrent.ThreadPoolExecutor;
043:        import java.util.concurrent.TimeUnit;
044:        import java.util.concurrent.TimeoutException;
045:        import java.util.concurrent.locks.ReentrantReadWriteLock;
046:        import java.util.logging.Level;
047:
048:        import com.mobixess.jodb.core.index.JODBIndexingRootAgent;
049:        import com.mobixess.jodb.core.io.IOTicket;
050:        import com.mobixess.jodb.core.io.IRandomAccessDataBuffer;
051:        import com.mobixess.jodb.core.io.JODBIOBase;
052:        import com.mobixess.jodb.core.io.IRandomAccessBufferFactory.BUFFER_TYPE;
053:        import com.mobixess.jodb.core.io.rmi.IOTicketRemoteInterface;
054:        import com.mobixess.jodb.core.io.rmi.IRemoteServer;
055:        import com.mobixess.jodb.core.io.rmi.IRemoteTransactionContainer;
056:        import com.mobixess.jodb.core.io.rmi.IRemoteServer.IServerQueryResult;
057:        import com.mobixess.jodb.core.query.QueryNode;
058:        import com.mobixess.jodb.core.transaction.ITranslatedDataSorce;
059:        import com.mobixess.jodb.core.transaction.JODBQueryList;
060:        import com.mobixess.jodb.core.transaction.JODBSession;
061:        import com.mobixess.jodb.core.transaction.TransactionContainer;
062:        import com.mobixess.jodb.util.LongVector;
063:        import com.mobixess.jodb.util.Utils;
064:
065:        public class JODBServer {
066:
067:            private JODBSessionContainer _sessionContainer;
068:            private JODBIOBase _ioBase;
069:            private String _serverId;
070:            private ServerSocketAcceptor _serverSocketAcceptor;
071:            //private int _rmiPort;
072:            //private int _dataPort;
073:            public final static String REMOTE_OBJECT_NAME = '/' + JODBRemoteObject.class
074:                    .getName();
075:
076:            JODBServer(JODBSessionContainer sessionContainer, String serverId)
077:                    throws Exception {
078:                _sessionContainer = sessionContainer;
079:                _ioBase = (JODBIOBase) _sessionContainer.getIoBase();
080:                _serverId = serverId;
081:                _serverSocketAcceptor = new ServerSocketAcceptor();
082:                installServerObject();
083:            }
084:
085:            private void installServerObject() throws Exception {
086:                Utils.ensureRegistryExist();
087:                String name = composeRemoteObjectBindName();
088:                Naming.rebind(name, new JODBRemoteObject());
089:            }
090:
091:            private String composeRemoteObjectBindName() {
092:                String name = REMOTE_OBJECT_NAME;
093:                if (_serverId != null && _serverId.length() > 0) {
094:                    name += "_" + _serverId;
095:                }
096:                return name;
097:            }
098:
099:            public synchronized void stop() throws IOException {
100:                _sessionContainer.close();
101:                _serverSocketAcceptor.close();
102:                try {
103:                    Naming.unbind(composeRemoteObjectBindName());
104:                } catch (NotBoundException e) {
105:                    throw new JodbIOException(e);
106:                }
107:            }
108:
109:            @SuppressWarnings("serial")
110:            private class JODBRemoteObject extends UnicastRemoteObject
111:                    implements  IRemoteServer {
112:
113:                protected JODBRemoteObject() throws RemoteException {
114:                    super ();
115:                }
116:
117:                public void applyTransaction(
118:                        TransactionContainer transactionContainer,
119:                        JODBSession session, IOTicket writeTicket,
120:                        JODBIndexingRootAgent indexingRootAgent)
121:                        throws IOException {
122:
123:                }
124:
125:                public void close() throws IOException, RemoteException {
126:                    _sessionContainer.close();
127:                }
128:
129:                public String getClassTypeForID(int id) {
130:                    return _ioBase.getClassTypeForID(id);
131:                }
132:
133:                public int getClassTypeSubstitutionID(String classType)
134:                        throws RemoteException {
135:                    return _ioBase.getClassTypeSubstitutionID(classType);
136:                }
137:
138:                public IDatabaseStatistics getDatabaseStatistics()
139:                        throws RemoteException {
140:                    return _ioBase.getDatabaseStatistics();
141:                }
142:
143:                public URI getDbIdentificator() {
144:                    return _ioBase.getDbIdentificator();
145:                }
146:
147:                public long getFirstObjectOffset() throws RemoteException {
148:                    return _ioBase.getFirstObjectOffset();
149:                }
150:
151:                public long[] getForAllObjects(int ioTicket) throws IOException {
152:                    IOTicket ticket = _ioBase.findTicket(ioTicket);
153:                    return _ioBase.getForAllObjects(ticket);
154:                }
155:
156:                public String getFullFieldNameForID(int id)
157:                        throws RemoteException {
158:                    return _ioBase.getFullFieldNameForID(id);
159:                }
160:
161:                public IOTicketRemoteInterface getIOTicket(boolean read,
162:                        boolean write) throws IOException {
163:                    try {
164:                        return (IOTicketRemoteInterface) _ioBase.getIOTicket(
165:                                read, write, true);
166:                    } catch (IOException e) {
167:                        e.printStackTrace();
168:                        throw e;
169:                    }
170:                }
171:
172:                public int getOrSetClassTypeSubstitutionID(Class clazz)
173:                        throws RemoteException {
174:                    return _ioBase.getOrSetClassTypeSubstitutionID(clazz);
175:                }
176:
177:                public int getOrSetClassTypeSubstitutionID(String classType)
178:                        throws RemoteException {
179:                    return _ioBase.getOrSetClassTypeSubstitutionID(classType);
180:                }
181:
182:                //        public int getOrSetFieldSubstitutionID(Field field) throws RemoteException {
183:                //            return _ioBase.getOrSetFieldSubstitutionID(field);
184:                //        }
185:
186:                public IPersistentObjectStatistics getPersistenceStatistics(
187:                        long offset, JODBSession session) throws IOException {
188:                    return _ioBase.getPersistenceStatistics(offset, session);
189:                }
190:
191:                public String getPrefixForID(int id) {
192:                    return _ioBase.getPrefixForID(id);
193:                }
194:
195:                public String getSimpleFieldNameForID(int id)
196:                        throws RemoteException {
197:                    return _ioBase.getSimpleFieldNameForID(id);
198:                }
199:
200:                public boolean isClosed() throws RemoteException {
201:                    return _ioBase.isClosed();
202:                }
203:
204:                public IRemoteTransactionContainer getRemoteTransactionContainer()
205:                        throws RemoteException {
206:                    try {
207:                        return new RemoteTransactionContainer();
208:                    } catch (IOException e) {
209:                        throw new RemoteException("", e);
210:                    }
211:                }
212:
213:                public IServerQueryResult runQuery(QueryNode query,
214:                        long[] localActiveObjects) throws IOException {
215:                    LongVector remoteActiveObjects = null;
216:                    if (localActiveObjects != null) {
217:                        remoteActiveObjects = new LongVector(localActiveObjects);
218:                    }
219:                    query.setSession(_sessionContainer.getSession());
220:                    try {
221:                        LongVector additionalRejected = new LongVector();
222:                        JODBQueryList objectSet = (JODBQueryList) query
223:                                .runQuery(remoteActiveObjects,
224:                                        additionalRejected);
225:
226:                        long[] result = objectSet.getAllObjectIds();
227:                        long[] additionalRejectedObjects = null;
228:                        if (additionalRejected.size() > 0) {
229:                            additionalRejectedObjects = additionalRejected
230:                                    .getDataAsArray();
231:                        }
232:                        return new ServerQueryResult(additionalRejectedObjects,
233:                                result);
234:                    } catch (IllegalClassTypeException e) {
235:                        throw new JodbIOException(e);
236:                    }
237:                }
238:
239:                public int getOrSetFieldSubstitutionID(int declaringClassID,
240:                        int fieldTypeID, String fieldName) throws IOException {
241:                    return _ioBase.getOrSetFieldSubstitutionID(
242:                            declaringClassID, fieldTypeID, fieldName);
243:                }
244:
245:                public IPersistentObjectStatistics getPersistenceStatistics(
246:                        long offset) throws IOException {
247:                    return _ioBase.getPersistenceStatistics(offset,
248:                            _sessionContainer.getSession());
249:                }
250:
251:                //        public boolean isNewDatabase() throws RemoteException {
252:                //            return _ioBase.isNewDatabase();
253:                //        }
254:
255:                //        public void printFileMap(JODBSession session, PrintStream printStream) throws IOException {
256:                //            // TODO Auto-generated method stub
257:                //            throw new RuntimeException("Not Implemented");
258:                //            //
259:                //        }
260:
261:            }
262:
263:            private static class ServerQueryResult implements 
264:                    IServerQueryResult, Serializable {
265:
266:                /**
267:                 * 
268:                 */
269:                private static final long serialVersionUID = 1L;
270:
271:                long[] _excludedObjects;
272:                long[] _searchResult;
273:
274:                /**
275:                 * @param excludedObjects
276:                 * @param searchResult
277:                 */
278:                public ServerQueryResult(long[] excludedObjects,
279:                        long[] searchResult) {
280:                    super ();
281:                    _excludedObjects = excludedObjects;
282:                    _searchResult = searchResult;
283:                }
284:
285:                public long[] getExcludedObjects() {
286:                    return _excludedObjects;
287:                }
288:
289:                public long[] getSearchResult() {
290:                    return _searchResult;
291:                }
292:
293:            }
294:
295:            private class RemoteTransactionContainer implements 
296:                    IRemoteTransactionContainer, ITranslatedDataSorce {
297:                private FileReceiver _newDataBuffer;
298:                private FileReceiver _replacementsBuffer;
299:                private FileReceiver _rollbackBuffer;
300:
301:                ReentrantReadWriteLock _lock;
302:                long _transactionOffset;
303:                ExecutorService _localTransactionExecutor = Executors
304:                        .newSingleThreadExecutor();
305:                Future _transactionFuture;
306:                Exception _transactionExecutionError;
307:                LockTransuctionRunnable _lockTransuctionRunnable = new LockTransuctionRunnable();
308:                CompleteTransactionRunnable _completeTransactionRunnable = new CompleteTransactionRunnable();
309:                UnlockTransactionRunnable _unlockTransactionRunnable = new UnlockTransactionRunnable();
310:
311:                public RemoteTransactionContainer() throws IOException {
312:                    _lock = _ioBase.getTransactionLock();
313:                    _newDataBuffer = new FileReceiver(BUFFER_TYPE.NEW_DATA);
314:                    _replacementsBuffer = new FileReceiver(
315:                            BUFFER_TYPE.REPLACEMENTS);
316:                    _rollbackBuffer = new FileReceiver(BUFFER_TYPE.ROLLBACK);
317:                    UnicastRemoteObject.exportObject(this ,
318:                            JODBConstants.DEFAULT_RMI_PORT);
319:                }
320:
321:                public int getNewDataBufferId() {
322:                    return _newDataBuffer.getFileId();
323:                }
324:
325:                public int getReplacementsBufferId() {
326:                    return _replacementsBuffer.getFileId();
327:                }
328:
329:                public int getRollbackBufferId() {
330:                    return _rollbackBuffer.getFileId();
331:                }
332:
333:                public FileReceiver getReceiverForId(int id) {
334:                    if (_newDataBuffer.getFileId() == id) {
335:                        return _newDataBuffer;
336:                    }
337:                    if (_replacementsBuffer.getFileId() == id) {
338:                        return _replacementsBuffer;
339:                    }
340:                    if (_rollbackBuffer.getFileId() == id) {
341:                        return _rollbackBuffer;
342:                    }
343:                    return null;
344:                }
345:
346:                boolean hasPendingFiles() {
347:                    return !_newDataBuffer._processed
348:                            || !_replacementsBuffer._processed
349:                            || !_rollbackBuffer._processed;
350:                }
351:
352:                void ensureFileTransferComplete() throws IOException {
353:                    ensureFileTransferComplete(
354:                            JODBConstants.FILE_TRANSFER_MAX_WAIT,
355:                            _newDataBuffer);
356:                    ensureFileTransferComplete(
357:                            JODBConstants.FILE_TRANSFER_MAX_WAIT,
358:                            _replacementsBuffer);
359:                    ensureFileTransferComplete(
360:                            JODBConstants.FILE_TRANSFER_MAX_WAIT,
361:                            _rollbackBuffer);
362:                }
363:
364:                void ensureFileTransferComplete(int timeout,
365:                        FileReceiver fileReceiver) throws IOException {
366:                    if (fileReceiver._error != null) {
367:                        throw new JodbIOException(fileReceiver._error);
368:                    }
369:
370:                    try {
371:                        fileReceiver.getTransferCompletionSynch(timeout).get(
372:                                timeout, TimeUnit.MILLISECONDS);
373:                    } catch (Exception e) {
374:                        throw new JodbIOException(e);
375:                    }
376:                    if (fileReceiver._error != null) {
377:                        throw new JodbIOException(fileReceiver._error);
378:                    }
379:                    if (!fileReceiver._transferComplete) {
380:                        throw new JodbIOException("Transfer timeout");
381:                    }
382:                }
383:
384:                public IRandomAccessDataBuffer getRollbackDataFile() {
385:                    return _rollbackBuffer._file;
386:                }
387:
388:                public IRandomAccessDataBuffer getTransactionNewDataFile() {
389:                    return _newDataBuffer._file;
390:                }
391:
392:                public IRandomAccessDataBuffer getTransactionReplacementsDataFile() {
393:                    return _replacementsBuffer._file;
394:                }
395:
396:                public void resetTransactionBufferToEnd() throws IOException {
397:                    getRollbackDataFile().resetToEnd();
398:                    getTransactionNewDataFile().resetToEnd();
399:                    getTransactionReplacementsDataFile().resetToEnd();
400:                }
401:
402:                public void resetTransactionBufferToStart() throws IOException {
403:                    getRollbackDataFile().resetToStart();
404:                    getTransactionNewDataFile().resetToStart();
405:                    getTransactionReplacementsDataFile().resetToStart();
406:                }
407:
408:                public void resetTranslatedObjects(JODBSession session,
409:                        long transactionShift) {
410:                    _lock.writeLock().unlock();
411:                }
412:
413:                public long initTransaction() throws RemoteException {
414:                    return initTransaction(JODBConstants.DEFAULT_TRANSACTION_LOCK_WAIT_TIMEOUT);
415:                }
416:
417:                public long initTransaction(int transactioLockTimeout)
418:                        throws RemoteException {
419:                    Future future = _localTransactionExecutor
420:                            .submit(_lockTransuctionRunnable);
421:                    try {
422:                        future
423:                                .get(transactioLockTimeout,
424:                                        TimeUnit.MILLISECONDS);
425:                    } catch (Exception e) {
426:                        throw new RemoteException("", e);
427:                    }
428:                    if (!_lockTransuctionRunnable._gotLock) {
429:                        throw new RemoteException(
430:                                "Unable to obtain transaction lock");
431:                    }
432:                    _serverSocketAcceptor.initRemoteData(this );
433:                    _transactionOffset = _ioBase.getTransactionOffset();
434:                    _transactionFuture = _localTransactionExecutor
435:                            .submit(_completeTransactionRunnable);
436:                    return _transactionOffset;
437:                }
438:
439:                public void setTransactionDataSizes(long newDataSize,
440:                        long replacementDataSize, long rollbackDataSize)
441:                        throws RemoteException {
442:                    _newDataBuffer.setTransactionDataLength(newDataSize);
443:                    _replacementsBuffer
444:                            .setTransactionDataLength(replacementDataSize);
445:                    _rollbackBuffer.setTransactionDataLength(rollbackDataSize);
446:                }
447:
448:                @Override
449:                protected void finalize() throws Throwable {
450:                    disposeRemoteContainer();
451:                }
452:
453:                public void disposeRemoteContainer() throws IOException {
454:                    _newDataBuffer.close();
455:                    _replacementsBuffer.close();
456:                    _rollbackBuffer.close();
457:                    Future unlockProcess = _localTransactionExecutor
458:                            .submit(_unlockTransactionRunnable);
459:                    try {
460:                        unlockProcess.get(JODBConstants.FILE_TRANSFER_MAX_WAIT,
461:                                TimeUnit.MILLISECONDS);
462:                    } catch (Exception e) {
463:                        throw new JodbIOException(e);//TODO should be critical error?
464:                    }
465:                    if (JODBConfig.DEBUG) {
466:                        Utils.getLogger(getClass().getName()).log(
467:                                Level.INFO,
468:                                "Executor shutdown "
469:                                        + _localTransactionExecutor.toString());
470:                    }
471:                    _localTransactionExecutor.shutdownNow();
472:                }
473:
474:                public void checkTransactionComplete() throws IOException {
475:                    try {
476:                        _transactionFuture
477:                                .get(
478:                                        JODBConstants.DEFAULT_TRANSACTION_LOCK_WAIT_TIMEOUT,
479:                                        TimeUnit.MILLISECONDS);
480:                    } catch (Exception e) {
481:                        throw new JodbIOException(e);
482:                    }
483:                    if (_transactionExecutionError != null) {
484:                        throw new JodbIOException(_transactionExecutionError);
485:                    }
486:                }
487:
488:                private class CompleteTransactionRunnable implements  Runnable {
489:
490:                    public void run() {
491:                        try {
492:                            ensureFileTransferComplete();
493:                            _ioBase.applyRemoteTransaction(
494:                                    RemoteTransactionContainer.this ,
495:                                    _transactionOffset);
496:                        } catch (Exception e) {//TODO check Throwable and exit as critical error?
497:                            _transactionExecutionError = e;
498:                        } finally {
499:                            if (JODBConfig.DEBUG) {
500:                                Utils.getLogger(getClass().getName()).log(
501:                                        Level.INFO,
502:                                        "Lock release "
503:                                                + Thread.currentThread());
504:                            }
505:                            _unlockTransactionRunnable.run();
506:                        }
507:                    }
508:
509:                }
510:
511:                private class UnlockTransactionRunnable implements  Runnable {
512:
513:                    public void run() {
514:                        if (_lock.isWriteLockedByCurrentThread()) {
515:                            _lock.writeLock().unlock();
516:                        }
517:                    }
518:
519:                }
520:
521:                private class LockTransuctionRunnable implements  Runnable {
522:
523:                    int _transactioLockTimeout = -1;
524:                    boolean _gotLock;
525:
526:                    void init(int transactioLockTimeout) {
527:                        _transactioLockTimeout = transactioLockTimeout;
528:                        _gotLock = false;
529:                    }
530:
531:                    public void run() {
532:                        try {
533:                            if (JODBConfig.DEBUG) {
534:                                Utils.getLogger(getClass().getName()).log(
535:                                        Level.INFO,
536:                                        "Lock set " + Thread.currentThread());
537:                            }
538:                            _gotLock = _lock.writeLock().tryLock(
539:                                    _transactioLockTimeout,
540:                                    TimeUnit.MILLISECONDS);
541:                        } catch (InterruptedException e) {
542:                            e.printStackTrace();//TODO log
543:                        }
544:                    }
545:                }
546:
547:            }
548:
549:            private static class FileReceiver implements  Runnable {
550:                IRandomAccessDataBuffer _file;
551:                int _id;
552:                long _length = -1;
553:                long _maxTransferTime;
554:                ServerSocketAcceptor _container;
555:                Socket _readSocket;
556:                Throwable _error;
557:                boolean _transferComplete;
558:                boolean _processed;
559:                Future _transferCompletionSynch;
560:                Semaphore _transferSubmitSemaphore = new Semaphore(1);
561:
562:                public FileReceiver(BUFFER_TYPE bufferType) throws IOException {
563:                    _file = JODBConfig.getRandomAccessBufferFactory()
564:                            .createBuffer("", bufferType, true);
565:                    _id = System.identityHashCode(_file);
566:                    try {
567:                        _transferSubmitSemaphore.acquire();
568:                    } catch (InterruptedException e) {
569:                        throw new JodbIOException(e);
570:                    }
571:                }
572:
573:                public int getFileId() {
574:                    return _id;
575:                }
576:
577:                public void setContainer(ServerSocketAcceptor container) {
578:                    _container = container;
579:                }
580:
581:                public void setTransactionDataLength(long length) {
582:                    _length = length;
583:                }
584:
585:                public void close() throws IOException {
586:                    if (_transferSubmitSemaphore.availablePermits() == 0) {
587:                        _transferSubmitSemaphore.release();
588:                    }
589:                    _file.close();
590:                    _file.delete();
591:                }
592:
593:                @Override
594:                protected void finalize() throws Throwable {
595:                    _file.delete();
596:                }
597:
598:                public void setReadSocket(Socket readSocket) {
599:                    _readSocket = readSocket;
600:                }
601:
602:                public void run() {
603:                    try {
604:                        if (_length == -1) {
605:                            throw new JodbIOException(
606:                                    "no client transaction data size available");
607:                        }
608:                        SocketChannel socketChannel = _readSocket.getChannel();
609:                        long transfered = _file.transferFrom(socketChannel, 0,
610:                                _length);
611:                        if (JODBConfig.DEBUG) {
612:                            Utils.getLogger(getClass().getName()).log(
613:                                    Level.INFO,
614:                                    "Transfered " + transfered + " to file "
615:                                            + _file);
616:                        }
617:                        /*
618:                        InputStream input = _readSocket.getInputStream();
619:                        byte[] outBuffer = new byte[_readSocket.getReceiveBufferSize()];
620:                        int bytesReceived = 0;
621:                        while ((bytesReceived = input.read(outBuffer)) >= 0) {
622:                            _file.write(outBuffer, 0, bytesReceived);
623:                        }
624:                        input.close();
625:                        if(_file.length() != _length){
626:                            throw new JodbIOException("Incorrect transfer size "+_file.length()+"!="+_length);
627:                        }*/
628:                    } catch (Exception e) {
629:                        _error = e;
630:                    }
631:                    _transferComplete = true;
632:                }
633:
634:                public void submit(ThreadPoolExecutor threadPoolExecutor,
635:                        long delay) throws InterruptedException,
636:                        ExecutionException, TimeoutException {
637:                    _transferCompletionSynch = threadPoolExecutor.submit(this );
638:                    _processed = true;
639:                    _transferSubmitSemaphore.release();
640:                    if (JODBConfig.DEBUG) {
641:                        Utils.getLogger(getClass().getName()).log(Level.INFO,
642:                                "Task submitted " + this );
643:                    }
644:                    _transferCompletionSynch.get(delay, TimeUnit.MILLISECONDS);
645:                }
646:
647:                public Future getTransferCompletionSynch(long waitForSynchObject)
648:                        throws InterruptedException, IOException {
649:                    if (!_transferSubmitSemaphore.tryAcquire(
650:                            waitForSynchObject, TimeUnit.MILLISECONDS)) {
651:                        throw new JodbIOException("Transfer wait timeout");
652:                    }
653:                    _transferSubmitSemaphore.release();
654:                    return _transferCompletionSynch;
655:                }
656:            }
657:
658:            private class ServerSocketAcceptor extends Thread {
659:
660:                Socket _lastAcceptedSocket;
661:                RemoteTransactionContainer _pendingFiles;
662:                ScheduledThreadPoolExecutor _threadPoolExecutor = new ScheduledThreadPoolExecutor(
663:                        1);
664:                boolean _closed = false;
665:
666:                public ServerSocketAcceptor() throws IOException {
667:                    setName("JodbServerSocketAcceptor");
668:                    setDaemon(true);
669:                    startDataServerSocket();
670:                }
671:
672:                private ServerSocket _serverSocket;
673:                private ServerSocketChannel _serverSocketChannel;
674:
675:                private void startDataServerSocket() throws IOException {
676:                    _serverSocketChannel = SelectorProvider.provider()
677:                            .openServerSocketChannel();
678:                    _serverSocket = _serverSocketChannel.socket();
679:                    _serverSocket.bind(new InetSocketAddress(
680:                            JODBConstants.DEFAULT_DATA_STREAM_PORT));
681:                    //new ServerSocket(JODBConstants.DEFAULT_DATA_STREAM_PORT);
682:                    start();
683:                }
684:
685:                public void initRemoteData(
686:                        RemoteTransactionContainer remoteTransactionData) {
687:                    _pendingFiles = remoteTransactionData;
688:                }
689:
690:                public void close() throws IOException {
691:                    _closed = true;
692:                    _serverSocket.close();
693:                    _threadPoolExecutor.shutdown();
694:                }
695:
696:                @Override
697:                public void run() {
698:                    while (true) {
699:                        Socket socket;
700:                        try {
701:                            socket = _serverSocket.accept();
702:                        } catch (IOException e) {
703:                            if (_closed) {
704:                                break;
705:                            }
706:                            Utils.fatalError(e);
707:                            continue;
708:                        }
709:                        if (_closed) {
710:                            break;
711:                        }
712:                        try {
713:                            handleConnection(socket);
714:                        } catch (IOException e) {
715:                            e.printStackTrace();//TODO log
716:                        }
717:                    }
718:                }
719:
720:                void handleConnection(Socket socket) throws IOException {
721:                    socket
722:                            .setSoTimeout(JODBConstants.FILE_TRANSFER_SOCKET_TIMEOUT);
723:                    DataInputStream is = new DataInputStream(socket
724:                            .getInputStream());
725:                    while (_pendingFiles.hasPendingFiles()) {
726:                        int id = new DataInputStream(is).readInt();
727:                        FileReceiver fileReceiver = _pendingFiles
728:                                .getReceiverForId(id);
729:                        if (fileReceiver == null) {
730:                            throw new JodbIOException(
731:                                    "File Id is not available " + id);
732:                        }
733:                        fileReceiver.setReadSocket(socket);
734:                        try {
735:                            fileReceiver.submit(_threadPoolExecutor,
736:                                    JODBConstants.FILE_TRANSFER_MAX_WAIT);
737:                            if (JODBConfig.DEBUG) {
738:                                Utils.getLogger(getClass().getName()).log(
739:                                        Level.INFO, "Task finished " + this );
740:                            }
741:                        } catch (Exception e) {
742:                            throw new JodbIOException(e);
743:                        }
744:                    }
745:                }
746:            }
747:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.