001: /*
002: Copyright (C) 2007 Mobixess Inc. http://www.java-objects-database.com
003:
004: This file is part of the JODB (Java Objects Database) open source project.
005:
006: JODB is free software; you can redistribute it and/or modify it under
007: the terms of version 2 of the GNU General Public License as published
008: by the Free Software Foundation.
009:
010: JODB is distributed in the hope that it will be useful, but WITHOUT ANY
011: WARRANTY; without even the implied warranty of MERCHANTABILITY or
012: FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
013: for more details.
014:
015: You should have received a copy of the GNU General Public License along
016: with this program; if not, write to the Free Software Foundation, Inc.,
017: 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
018: */
019: package com.mobixess.jodb.core;
020:
021: import java.io.DataInputStream;
022: import java.io.IOException;
023: import java.io.InputStream;
024: import java.io.Serializable;
025: import java.net.InetSocketAddress;
026: import java.net.ServerSocket;
027: import java.net.Socket;
028: import java.net.URI;
029: import java.nio.channels.ServerSocketChannel;
030: import java.nio.channels.SocketChannel;
031: import java.nio.channels.spi.SelectorProvider;
032: import java.rmi.Naming;
033: import java.rmi.NotBoundException;
034: import java.rmi.RemoteException;
035: import java.rmi.server.UnicastRemoteObject;
036: import java.util.concurrent.ExecutionException;
037: import java.util.concurrent.ExecutorService;
038: import java.util.concurrent.Executors;
039: import java.util.concurrent.Future;
040: import java.util.concurrent.ScheduledThreadPoolExecutor;
041: import java.util.concurrent.Semaphore;
042: import java.util.concurrent.ThreadPoolExecutor;
043: import java.util.concurrent.TimeUnit;
044: import java.util.concurrent.TimeoutException;
045: import java.util.concurrent.locks.ReentrantReadWriteLock;
046: import java.util.logging.Level;
047:
048: import com.mobixess.jodb.core.index.JODBIndexingRootAgent;
049: import com.mobixess.jodb.core.io.IOTicket;
050: import com.mobixess.jodb.core.io.IRandomAccessDataBuffer;
051: import com.mobixess.jodb.core.io.JODBIOBase;
052: import com.mobixess.jodb.core.io.IRandomAccessBufferFactory.BUFFER_TYPE;
053: import com.mobixess.jodb.core.io.rmi.IOTicketRemoteInterface;
054: import com.mobixess.jodb.core.io.rmi.IRemoteServer;
055: import com.mobixess.jodb.core.io.rmi.IRemoteTransactionContainer;
056: import com.mobixess.jodb.core.io.rmi.IRemoteServer.IServerQueryResult;
057: import com.mobixess.jodb.core.query.QueryNode;
058: import com.mobixess.jodb.core.transaction.ITranslatedDataSorce;
059: import com.mobixess.jodb.core.transaction.JODBQueryList;
060: import com.mobixess.jodb.core.transaction.JODBSession;
061: import com.mobixess.jodb.core.transaction.TransactionContainer;
062: import com.mobixess.jodb.util.LongVector;
063: import com.mobixess.jodb.util.Utils;
064:
065: public class JODBServer {
066:
067: private JODBSessionContainer _sessionContainer;
068: private JODBIOBase _ioBase;
069: private String _serverId;
070: private ServerSocketAcceptor _serverSocketAcceptor;
071: //private int _rmiPort;
072: //private int _dataPort;
073: public final static String REMOTE_OBJECT_NAME = '/' + JODBRemoteObject.class
074: .getName();
075:
076: JODBServer(JODBSessionContainer sessionContainer, String serverId)
077: throws Exception {
078: _sessionContainer = sessionContainer;
079: _ioBase = (JODBIOBase) _sessionContainer.getIoBase();
080: _serverId = serverId;
081: _serverSocketAcceptor = new ServerSocketAcceptor();
082: installServerObject();
083: }
084:
085: private void installServerObject() throws Exception {
086: Utils.ensureRegistryExist();
087: String name = composeRemoteObjectBindName();
088: Naming.rebind(name, new JODBRemoteObject());
089: }
090:
091: private String composeRemoteObjectBindName() {
092: String name = REMOTE_OBJECT_NAME;
093: if (_serverId != null && _serverId.length() > 0) {
094: name += "_" + _serverId;
095: }
096: return name;
097: }
098:
099: public synchronized void stop() throws IOException {
100: _sessionContainer.close();
101: _serverSocketAcceptor.close();
102: try {
103: Naming.unbind(composeRemoteObjectBindName());
104: } catch (NotBoundException e) {
105: throw new JodbIOException(e);
106: }
107: }
108:
109: @SuppressWarnings("serial")
110: private class JODBRemoteObject extends UnicastRemoteObject
111: implements IRemoteServer {
112:
113: protected JODBRemoteObject() throws RemoteException {
114: super ();
115: }
116:
117: public void applyTransaction(
118: TransactionContainer transactionContainer,
119: JODBSession session, IOTicket writeTicket,
120: JODBIndexingRootAgent indexingRootAgent)
121: throws IOException {
122:
123: }
124:
125: public void close() throws IOException, RemoteException {
126: _sessionContainer.close();
127: }
128:
129: public String getClassTypeForID(int id) {
130: return _ioBase.getClassTypeForID(id);
131: }
132:
133: public int getClassTypeSubstitutionID(String classType)
134: throws RemoteException {
135: return _ioBase.getClassTypeSubstitutionID(classType);
136: }
137:
138: public IDatabaseStatistics getDatabaseStatistics()
139: throws RemoteException {
140: return _ioBase.getDatabaseStatistics();
141: }
142:
143: public URI getDbIdentificator() {
144: return _ioBase.getDbIdentificator();
145: }
146:
147: public long getFirstObjectOffset() throws RemoteException {
148: return _ioBase.getFirstObjectOffset();
149: }
150:
151: public long[] getForAllObjects(int ioTicket) throws IOException {
152: IOTicket ticket = _ioBase.findTicket(ioTicket);
153: return _ioBase.getForAllObjects(ticket);
154: }
155:
156: public String getFullFieldNameForID(int id)
157: throws RemoteException {
158: return _ioBase.getFullFieldNameForID(id);
159: }
160:
161: public IOTicketRemoteInterface getIOTicket(boolean read,
162: boolean write) throws IOException {
163: try {
164: return (IOTicketRemoteInterface) _ioBase.getIOTicket(
165: read, write, true);
166: } catch (IOException e) {
167: e.printStackTrace();
168: throw e;
169: }
170: }
171:
172: public int getOrSetClassTypeSubstitutionID(Class clazz)
173: throws RemoteException {
174: return _ioBase.getOrSetClassTypeSubstitutionID(clazz);
175: }
176:
177: public int getOrSetClassTypeSubstitutionID(String classType)
178: throws RemoteException {
179: return _ioBase.getOrSetClassTypeSubstitutionID(classType);
180: }
181:
182: // public int getOrSetFieldSubstitutionID(Field field) throws RemoteException {
183: // return _ioBase.getOrSetFieldSubstitutionID(field);
184: // }
185:
186: public IPersistentObjectStatistics getPersistenceStatistics(
187: long offset, JODBSession session) throws IOException {
188: return _ioBase.getPersistenceStatistics(offset, session);
189: }
190:
191: public String getPrefixForID(int id) {
192: return _ioBase.getPrefixForID(id);
193: }
194:
195: public String getSimpleFieldNameForID(int id)
196: throws RemoteException {
197: return _ioBase.getSimpleFieldNameForID(id);
198: }
199:
200: public boolean isClosed() throws RemoteException {
201: return _ioBase.isClosed();
202: }
203:
204: public IRemoteTransactionContainer getRemoteTransactionContainer()
205: throws RemoteException {
206: try {
207: return new RemoteTransactionContainer();
208: } catch (IOException e) {
209: throw new RemoteException("", e);
210: }
211: }
212:
213: public IServerQueryResult runQuery(QueryNode query,
214: long[] localActiveObjects) throws IOException {
215: LongVector remoteActiveObjects = null;
216: if (localActiveObjects != null) {
217: remoteActiveObjects = new LongVector(localActiveObjects);
218: }
219: query.setSession(_sessionContainer.getSession());
220: try {
221: LongVector additionalRejected = new LongVector();
222: JODBQueryList objectSet = (JODBQueryList) query
223: .runQuery(remoteActiveObjects,
224: additionalRejected);
225:
226: long[] result = objectSet.getAllObjectIds();
227: long[] additionalRejectedObjects = null;
228: if (additionalRejected.size() > 0) {
229: additionalRejectedObjects = additionalRejected
230: .getDataAsArray();
231: }
232: return new ServerQueryResult(additionalRejectedObjects,
233: result);
234: } catch (IllegalClassTypeException e) {
235: throw new JodbIOException(e);
236: }
237: }
238:
239: public int getOrSetFieldSubstitutionID(int declaringClassID,
240: int fieldTypeID, String fieldName) throws IOException {
241: return _ioBase.getOrSetFieldSubstitutionID(
242: declaringClassID, fieldTypeID, fieldName);
243: }
244:
245: public IPersistentObjectStatistics getPersistenceStatistics(
246: long offset) throws IOException {
247: return _ioBase.getPersistenceStatistics(offset,
248: _sessionContainer.getSession());
249: }
250:
251: // public boolean isNewDatabase() throws RemoteException {
252: // return _ioBase.isNewDatabase();
253: // }
254:
255: // public void printFileMap(JODBSession session, PrintStream printStream) throws IOException {
256: // // TODO Auto-generated method stub
257: // throw new RuntimeException("Not Implemented");
258: // //
259: // }
260:
261: }
262:
263: private static class ServerQueryResult implements
264: IServerQueryResult, Serializable {
265:
266: /**
267: *
268: */
269: private static final long serialVersionUID = 1L;
270:
271: long[] _excludedObjects;
272: long[] _searchResult;
273:
274: /**
275: * @param excludedObjects
276: * @param searchResult
277: */
278: public ServerQueryResult(long[] excludedObjects,
279: long[] searchResult) {
280: super ();
281: _excludedObjects = excludedObjects;
282: _searchResult = searchResult;
283: }
284:
285: public long[] getExcludedObjects() {
286: return _excludedObjects;
287: }
288:
289: public long[] getSearchResult() {
290: return _searchResult;
291: }
292:
293: }
294:
295: private class RemoteTransactionContainer implements
296: IRemoteTransactionContainer, ITranslatedDataSorce {
297: private FileReceiver _newDataBuffer;
298: private FileReceiver _replacementsBuffer;
299: private FileReceiver _rollbackBuffer;
300:
301: ReentrantReadWriteLock _lock;
302: long _transactionOffset;
303: ExecutorService _localTransactionExecutor = Executors
304: .newSingleThreadExecutor();
305: Future _transactionFuture;
306: Exception _transactionExecutionError;
307: LockTransuctionRunnable _lockTransuctionRunnable = new LockTransuctionRunnable();
308: CompleteTransactionRunnable _completeTransactionRunnable = new CompleteTransactionRunnable();
309: UnlockTransactionRunnable _unlockTransactionRunnable = new UnlockTransactionRunnable();
310:
311: public RemoteTransactionContainer() throws IOException {
312: _lock = _ioBase.getTransactionLock();
313: _newDataBuffer = new FileReceiver(BUFFER_TYPE.NEW_DATA);
314: _replacementsBuffer = new FileReceiver(
315: BUFFER_TYPE.REPLACEMENTS);
316: _rollbackBuffer = new FileReceiver(BUFFER_TYPE.ROLLBACK);
317: UnicastRemoteObject.exportObject(this ,
318: JODBConstants.DEFAULT_RMI_PORT);
319: }
320:
321: public int getNewDataBufferId() {
322: return _newDataBuffer.getFileId();
323: }
324:
325: public int getReplacementsBufferId() {
326: return _replacementsBuffer.getFileId();
327: }
328:
329: public int getRollbackBufferId() {
330: return _rollbackBuffer.getFileId();
331: }
332:
333: public FileReceiver getReceiverForId(int id) {
334: if (_newDataBuffer.getFileId() == id) {
335: return _newDataBuffer;
336: }
337: if (_replacementsBuffer.getFileId() == id) {
338: return _replacementsBuffer;
339: }
340: if (_rollbackBuffer.getFileId() == id) {
341: return _rollbackBuffer;
342: }
343: return null;
344: }
345:
346: boolean hasPendingFiles() {
347: return !_newDataBuffer._processed
348: || !_replacementsBuffer._processed
349: || !_rollbackBuffer._processed;
350: }
351:
352: void ensureFileTransferComplete() throws IOException {
353: ensureFileTransferComplete(
354: JODBConstants.FILE_TRANSFER_MAX_WAIT,
355: _newDataBuffer);
356: ensureFileTransferComplete(
357: JODBConstants.FILE_TRANSFER_MAX_WAIT,
358: _replacementsBuffer);
359: ensureFileTransferComplete(
360: JODBConstants.FILE_TRANSFER_MAX_WAIT,
361: _rollbackBuffer);
362: }
363:
364: void ensureFileTransferComplete(int timeout,
365: FileReceiver fileReceiver) throws IOException {
366: if (fileReceiver._error != null) {
367: throw new JodbIOException(fileReceiver._error);
368: }
369:
370: try {
371: fileReceiver.getTransferCompletionSynch(timeout).get(
372: timeout, TimeUnit.MILLISECONDS);
373: } catch (Exception e) {
374: throw new JodbIOException(e);
375: }
376: if (fileReceiver._error != null) {
377: throw new JodbIOException(fileReceiver._error);
378: }
379: if (!fileReceiver._transferComplete) {
380: throw new JodbIOException("Transfer timeout");
381: }
382: }
383:
384: public IRandomAccessDataBuffer getRollbackDataFile() {
385: return _rollbackBuffer._file;
386: }
387:
388: public IRandomAccessDataBuffer getTransactionNewDataFile() {
389: return _newDataBuffer._file;
390: }
391:
392: public IRandomAccessDataBuffer getTransactionReplacementsDataFile() {
393: return _replacementsBuffer._file;
394: }
395:
396: public void resetTransactionBufferToEnd() throws IOException {
397: getRollbackDataFile().resetToEnd();
398: getTransactionNewDataFile().resetToEnd();
399: getTransactionReplacementsDataFile().resetToEnd();
400: }
401:
402: public void resetTransactionBufferToStart() throws IOException {
403: getRollbackDataFile().resetToStart();
404: getTransactionNewDataFile().resetToStart();
405: getTransactionReplacementsDataFile().resetToStart();
406: }
407:
408: public void resetTranslatedObjects(JODBSession session,
409: long transactionShift) {
410: _lock.writeLock().unlock();
411: }
412:
413: public long initTransaction() throws RemoteException {
414: return initTransaction(JODBConstants.DEFAULT_TRANSACTION_LOCK_WAIT_TIMEOUT);
415: }
416:
417: public long initTransaction(int transactioLockTimeout)
418: throws RemoteException {
419: Future future = _localTransactionExecutor
420: .submit(_lockTransuctionRunnable);
421: try {
422: future
423: .get(transactioLockTimeout,
424: TimeUnit.MILLISECONDS);
425: } catch (Exception e) {
426: throw new RemoteException("", e);
427: }
428: if (!_lockTransuctionRunnable._gotLock) {
429: throw new RemoteException(
430: "Unable to obtain transaction lock");
431: }
432: _serverSocketAcceptor.initRemoteData(this );
433: _transactionOffset = _ioBase.getTransactionOffset();
434: _transactionFuture = _localTransactionExecutor
435: .submit(_completeTransactionRunnable);
436: return _transactionOffset;
437: }
438:
439: public void setTransactionDataSizes(long newDataSize,
440: long replacementDataSize, long rollbackDataSize)
441: throws RemoteException {
442: _newDataBuffer.setTransactionDataLength(newDataSize);
443: _replacementsBuffer
444: .setTransactionDataLength(replacementDataSize);
445: _rollbackBuffer.setTransactionDataLength(rollbackDataSize);
446: }
447:
448: @Override
449: protected void finalize() throws Throwable {
450: disposeRemoteContainer();
451: }
452:
453: public void disposeRemoteContainer() throws IOException {
454: _newDataBuffer.close();
455: _replacementsBuffer.close();
456: _rollbackBuffer.close();
457: Future unlockProcess = _localTransactionExecutor
458: .submit(_unlockTransactionRunnable);
459: try {
460: unlockProcess.get(JODBConstants.FILE_TRANSFER_MAX_WAIT,
461: TimeUnit.MILLISECONDS);
462: } catch (Exception e) {
463: throw new JodbIOException(e);//TODO should be critical error?
464: }
465: if (JODBConfig.DEBUG) {
466: Utils.getLogger(getClass().getName()).log(
467: Level.INFO,
468: "Executor shutdown "
469: + _localTransactionExecutor.toString());
470: }
471: _localTransactionExecutor.shutdownNow();
472: }
473:
474: public void checkTransactionComplete() throws IOException {
475: try {
476: _transactionFuture
477: .get(
478: JODBConstants.DEFAULT_TRANSACTION_LOCK_WAIT_TIMEOUT,
479: TimeUnit.MILLISECONDS);
480: } catch (Exception e) {
481: throw new JodbIOException(e);
482: }
483: if (_transactionExecutionError != null) {
484: throw new JodbIOException(_transactionExecutionError);
485: }
486: }
487:
488: private class CompleteTransactionRunnable implements Runnable {
489:
490: public void run() {
491: try {
492: ensureFileTransferComplete();
493: _ioBase.applyRemoteTransaction(
494: RemoteTransactionContainer.this ,
495: _transactionOffset);
496: } catch (Exception e) {//TODO check Throwable and exit as critical error?
497: _transactionExecutionError = e;
498: } finally {
499: if (JODBConfig.DEBUG) {
500: Utils.getLogger(getClass().getName()).log(
501: Level.INFO,
502: "Lock release "
503: + Thread.currentThread());
504: }
505: _unlockTransactionRunnable.run();
506: }
507: }
508:
509: }
510:
511: private class UnlockTransactionRunnable implements Runnable {
512:
513: public void run() {
514: if (_lock.isWriteLockedByCurrentThread()) {
515: _lock.writeLock().unlock();
516: }
517: }
518:
519: }
520:
521: private class LockTransuctionRunnable implements Runnable {
522:
523: int _transactioLockTimeout = -1;
524: boolean _gotLock;
525:
526: void init(int transactioLockTimeout) {
527: _transactioLockTimeout = transactioLockTimeout;
528: _gotLock = false;
529: }
530:
531: public void run() {
532: try {
533: if (JODBConfig.DEBUG) {
534: Utils.getLogger(getClass().getName()).log(
535: Level.INFO,
536: "Lock set " + Thread.currentThread());
537: }
538: _gotLock = _lock.writeLock().tryLock(
539: _transactioLockTimeout,
540: TimeUnit.MILLISECONDS);
541: } catch (InterruptedException e) {
542: e.printStackTrace();//TODO log
543: }
544: }
545: }
546:
547: }
548:
549: private static class FileReceiver implements Runnable {
550: IRandomAccessDataBuffer _file;
551: int _id;
552: long _length = -1;
553: long _maxTransferTime;
554: ServerSocketAcceptor _container;
555: Socket _readSocket;
556: Throwable _error;
557: boolean _transferComplete;
558: boolean _processed;
559: Future _transferCompletionSynch;
560: Semaphore _transferSubmitSemaphore = new Semaphore(1);
561:
562: public FileReceiver(BUFFER_TYPE bufferType) throws IOException {
563: _file = JODBConfig.getRandomAccessBufferFactory()
564: .createBuffer("", bufferType, true);
565: _id = System.identityHashCode(_file);
566: try {
567: _transferSubmitSemaphore.acquire();
568: } catch (InterruptedException e) {
569: throw new JodbIOException(e);
570: }
571: }
572:
573: public int getFileId() {
574: return _id;
575: }
576:
577: public void setContainer(ServerSocketAcceptor container) {
578: _container = container;
579: }
580:
581: public void setTransactionDataLength(long length) {
582: _length = length;
583: }
584:
585: public void close() throws IOException {
586: if (_transferSubmitSemaphore.availablePermits() == 0) {
587: _transferSubmitSemaphore.release();
588: }
589: _file.close();
590: _file.delete();
591: }
592:
593: @Override
594: protected void finalize() throws Throwable {
595: _file.delete();
596: }
597:
598: public void setReadSocket(Socket readSocket) {
599: _readSocket = readSocket;
600: }
601:
602: public void run() {
603: try {
604: if (_length == -1) {
605: throw new JodbIOException(
606: "no client transaction data size available");
607: }
608: SocketChannel socketChannel = _readSocket.getChannel();
609: long transfered = _file.transferFrom(socketChannel, 0,
610: _length);
611: if (JODBConfig.DEBUG) {
612: Utils.getLogger(getClass().getName()).log(
613: Level.INFO,
614: "Transfered " + transfered + " to file "
615: + _file);
616: }
617: /*
618: InputStream input = _readSocket.getInputStream();
619: byte[] outBuffer = new byte[_readSocket.getReceiveBufferSize()];
620: int bytesReceived = 0;
621: while ((bytesReceived = input.read(outBuffer)) >= 0) {
622: _file.write(outBuffer, 0, bytesReceived);
623: }
624: input.close();
625: if(_file.length() != _length){
626: throw new JodbIOException("Incorrect transfer size "+_file.length()+"!="+_length);
627: }*/
628: } catch (Exception e) {
629: _error = e;
630: }
631: _transferComplete = true;
632: }
633:
634: public void submit(ThreadPoolExecutor threadPoolExecutor,
635: long delay) throws InterruptedException,
636: ExecutionException, TimeoutException {
637: _transferCompletionSynch = threadPoolExecutor.submit(this );
638: _processed = true;
639: _transferSubmitSemaphore.release();
640: if (JODBConfig.DEBUG) {
641: Utils.getLogger(getClass().getName()).log(Level.INFO,
642: "Task submitted " + this );
643: }
644: _transferCompletionSynch.get(delay, TimeUnit.MILLISECONDS);
645: }
646:
647: public Future getTransferCompletionSynch(long waitForSynchObject)
648: throws InterruptedException, IOException {
649: if (!_transferSubmitSemaphore.tryAcquire(
650: waitForSynchObject, TimeUnit.MILLISECONDS)) {
651: throw new JodbIOException("Transfer wait timeout");
652: }
653: _transferSubmitSemaphore.release();
654: return _transferCompletionSynch;
655: }
656: }
657:
658: private class ServerSocketAcceptor extends Thread {
659:
660: Socket _lastAcceptedSocket;
661: RemoteTransactionContainer _pendingFiles;
662: ScheduledThreadPoolExecutor _threadPoolExecutor = new ScheduledThreadPoolExecutor(
663: 1);
664: boolean _closed = false;
665:
666: public ServerSocketAcceptor() throws IOException {
667: setName("JodbServerSocketAcceptor");
668: setDaemon(true);
669: startDataServerSocket();
670: }
671:
672: private ServerSocket _serverSocket;
673: private ServerSocketChannel _serverSocketChannel;
674:
675: private void startDataServerSocket() throws IOException {
676: _serverSocketChannel = SelectorProvider.provider()
677: .openServerSocketChannel();
678: _serverSocket = _serverSocketChannel.socket();
679: _serverSocket.bind(new InetSocketAddress(
680: JODBConstants.DEFAULT_DATA_STREAM_PORT));
681: //new ServerSocket(JODBConstants.DEFAULT_DATA_STREAM_PORT);
682: start();
683: }
684:
685: public void initRemoteData(
686: RemoteTransactionContainer remoteTransactionData) {
687: _pendingFiles = remoteTransactionData;
688: }
689:
690: public void close() throws IOException {
691: _closed = true;
692: _serverSocket.close();
693: _threadPoolExecutor.shutdown();
694: }
695:
696: @Override
697: public void run() {
698: while (true) {
699: Socket socket;
700: try {
701: socket = _serverSocket.accept();
702: } catch (IOException e) {
703: if (_closed) {
704: break;
705: }
706: Utils.fatalError(e);
707: continue;
708: }
709: if (_closed) {
710: break;
711: }
712: try {
713: handleConnection(socket);
714: } catch (IOException e) {
715: e.printStackTrace();//TODO log
716: }
717: }
718: }
719:
720: void handleConnection(Socket socket) throws IOException {
721: socket
722: .setSoTimeout(JODBConstants.FILE_TRANSFER_SOCKET_TIMEOUT);
723: DataInputStream is = new DataInputStream(socket
724: .getInputStream());
725: while (_pendingFiles.hasPendingFiles()) {
726: int id = new DataInputStream(is).readInt();
727: FileReceiver fileReceiver = _pendingFiles
728: .getReceiverForId(id);
729: if (fileReceiver == null) {
730: throw new JodbIOException(
731: "File Id is not available " + id);
732: }
733: fileReceiver.setReadSocket(socket);
734: try {
735: fileReceiver.submit(_threadPoolExecutor,
736: JODBConstants.FILE_TRANSFER_MAX_WAIT);
737: if (JODBConfig.DEBUG) {
738: Utils.getLogger(getClass().getName()).log(
739: Level.INFO, "Task finished " + this );
740: }
741: } catch (Exception e) {
742: throw new JodbIOException(e);
743: }
744: }
745: }
746: }
747: }
|