001: /* Copyright (C) 2004 - 2007 db4objects Inc. http://www.db4o.com
002:
003: This file is part of the db4o open source object database.
004:
005: db4o is free software; you can redistribute it and/or modify it under
006: the terms of version 2 of the GNU General Public License as published
007: by the Free Software Foundation and as clarified by db4objects' GPL
008: interpretation policy, available at
009: http://www.db4o.com/about/company/legalpolicies/gplinterpretation/
010: Alternatively you can write to db4objects, Inc., 1900 S Norfolk Street,
011: Suite 350, San Mateo, CA 94403, USA.
012:
013: db4o is distributed in the hope that it will be useful, but WITHOUT ANY
014: WARRANTY; without even the implied warranty of MERCHANTABILITY or
015: FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
016: for more details.
017:
018: You should have received a copy of the GNU General Public License along
019: with this program; if not, write to the Free Software Foundation, Inc.,
020: 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */
021: package com.db4o.internal.cs;
022:
023: import java.io.*;
024:
025: import com.db4o.*;
026: import com.db4o.config.*;
027: import com.db4o.ext.*;
028: import com.db4o.foundation.*;
029: import com.db4o.foundation.network.*;
030: import com.db4o.internal.*;
031: import com.db4o.internal.convert.*;
032: import com.db4o.internal.cs.messages.*;
033: import com.db4o.internal.query.processor.*;
034: import com.db4o.internal.query.result.*;
035: import com.db4o.internal.slots.*;
036: import com.db4o.reflect.*;
037:
038: /**
039: * @exclude
040: */
041: public class ClientObjectContainer extends ExternalObjectContainer
042: implements ExtClient, BlobTransport, ClientMessageDispatcher {
043:
044: final Object blobLock = new Object();
045:
046: private BlobProcessor blobThread;
047:
048: private Socket4 i_socket;
049:
050: private BlockingQueue _messageQueue = new BlockingQueue();
051:
052: private final String _password; // null denotes password not necessary
053:
054: int[] _prefetchedIDs;
055:
056: ClientMessageDispatcher _messageDispatcher;
057:
058: int remainingIDs;
059:
060: private String switchedToFile;
061:
062: private boolean _singleThreaded;
063:
064: private final String _userName;
065:
066: private Db4oDatabase i_db;
067:
068: protected boolean _doFinalize = true;
069:
070: private int _blockSize = 1;
071:
072: private Collection4 _batchedMessages = new Collection4();
073:
074: // initial value of _batchedQueueLength is YapConst.INT_LENGTH, which is
075: // used for to write the number of messages.
076: private int _batchedQueueLength = Const4.INT_LENGTH;
077:
078: private boolean _login;
079:
080: private final ClientHeartbeat _heartbeat;
081:
082: public ClientObjectContainer(Configuration config, Socket4 socket,
083: String user, String password, boolean login) {
084: super (config, null);
085: _userName = user;
086: _password = password;
087: _login = login;
088: _heartbeat = new ClientHeartbeat(this );
089: setAndConfigSocket(socket);
090: open();
091: }
092:
093: private void setAndConfigSocket(Socket4 socket) {
094: i_socket = socket;
095: i_socket.setSoTimeout(_config.timeoutClientSocket());
096: }
097:
098: protected final void openImpl() {
099: _singleThreaded = configImpl().singleThreadedClient();
100: // TODO: Experiment with packetsize and noDelay
101: // socket.setSendBufferSize(100);
102: // socket.setTcpNoDelay(true);
103: // System.out.println(socket.getSendBufferSize());
104: if (_login) {
105: loginToServer(i_socket);
106: }
107: if (!_singleThreaded) {
108: startDispatcherThread(i_socket, _userName);
109: }
110: logMsg(36, toString());
111: startHeartBeat();
112: readThis();
113: }
114:
115: private void startHeartBeat() {
116: _heartbeat.start();
117: }
118:
119: private void startDispatcherThread(Socket4 socket, String user) {
120: _messageDispatcher = new ClientMessageDispatcherImpl(this ,
121: socket, _messageQueue);
122: _messageDispatcher.setDispatcherName(user);
123: _messageDispatcher.startDispatcher();
124: }
125:
126: public void backup(String path) throws NotSupportedException {
127: throw new NotSupportedException();
128: }
129:
130: public void reserve(int byteCount) {
131: throw new NotSupportedException();
132: }
133:
134: public void blockSize(int blockSize) {
135: _blockSize = blockSize;
136: }
137:
138: public byte blockSize() {
139: return (byte) _blockSize;
140: }
141:
142: protected void close2() {
143: if ((!_singleThreaded)
144: && (_messageDispatcher == null || !_messageDispatcher
145: .isMessageDispatcherAlive())) {
146: stopHeartBeat();
147: shutdownObjectContainer();
148: return;
149: }
150: try {
151: commit1(_transaction);
152: } catch (Exception e) {
153: Exceptions4.catchAllExceptDb4oException(e);
154: }
155: try {
156: write(Msg.CLOSE);
157: } catch (Exception e) {
158: Exceptions4.catchAllExceptDb4oException(e);
159: }
160:
161: shutDownCommunicationRessources();
162:
163: try {
164: i_socket.close();
165: } catch (Exception e) {
166: Exceptions4.catchAllExceptDb4oException(e);
167: }
168:
169: shutdownObjectContainer();
170: }
171:
172: private void stopHeartBeat() {
173: _heartbeat.stop();
174: }
175:
176: private void closeMessageDispatcher() {
177: try {
178: if (!_singleThreaded) {
179: _messageDispatcher.close();
180: }
181: } catch (Exception e) {
182: Exceptions4.catchAllExceptDb4oException(e);
183: }
184: }
185:
186: public final void commit1(Transaction trans) {
187: trans.commit();
188: }
189:
190: public int converterVersion() {
191: return Converter.VERSION;
192: }
193:
194: Socket4 createParalellSocket() throws IOException {
195: write(Msg.GET_THREAD_ID);
196:
197: int serverThreadID = expectedByteResponse(Msg.ID_LIST)
198: .readInt();
199:
200: Socket4 sock = i_socket.openParalellSocket();
201:
202: if (!(i_socket instanceof LoopbackSocket)) {
203: loginToServer(sock);
204: }
205:
206: if (switchedToFile != null) {
207: MsgD message = Msg.SWITCH_TO_FILE.getWriterForString(
208: systemTransaction(), switchedToFile);
209: message.write(sock);
210: if (!(Msg.OK.equals(Msg.readMessage(this ,
211: systemTransaction(), sock)))) {
212: throw new IOException(Messages.get(42));
213: }
214: }
215: Msg.USE_TRANSACTION.getWriterForInt(_transaction,
216: serverThreadID).write(sock);
217: return sock;
218: }
219:
220: public AbstractQueryResult newQueryResult(Transaction trans,
221: QueryEvaluationMode mode) {
222: throw new IllegalStateException();
223: }
224:
225: final public Transaction newTransaction(
226: Transaction parentTransaction,
227: TransactionalReferenceSystem referenceSystem) {
228: return new ClientTransaction(this , parentTransaction,
229: referenceSystem);
230: }
231:
232: public boolean createClassMetadata(ClassMetadata a_yapClass,
233: ReflectClass a_class, ClassMetadata a_super YapClass) {
234: write(Msg.CREATE_CLASS.getWriterForString(systemTransaction(),
235: a_class.getName()));
236: Msg resp = getResponse();
237: if (resp == null) {
238: return false;
239: }
240:
241: if (resp.equals(Msg.FAILED)) {
242: // if the class can not be created on the server, send class meta to the server.
243: sendClassMeta(a_class);
244: resp = getResponse();
245: }
246:
247: if (resp.equals(Msg.FAILED)) {
248: if (configImpl().exceptionsOnNotStorable()) {
249: throw new ObjectNotStorableException(a_class);
250: }
251: return false;
252: }
253: if (!resp.equals(Msg.OBJECT_TO_CLIENT)) {
254: return false;
255: }
256:
257: MsgObject message = (MsgObject) resp;
258: StatefulBuffer bytes = message.unmarshall();
259: if (bytes == null) {
260: return false;
261: }
262: bytes.setTransaction(systemTransaction());
263: if (!super .createClassMetadata(a_yapClass, a_class,
264: a_super YapClass)) {
265: return false;
266: }
267: a_yapClass.setID(message.getId());
268: a_yapClass.readName1(systemTransaction(), bytes);
269: classCollection().addYapClass(a_yapClass);
270: classCollection().readClassMetadata(a_yapClass, a_class);
271: return true;
272: }
273:
274: private void sendClassMeta(ReflectClass reflectClass) {
275: ClassInfo classMeta = _classMetaHelper
276: .getClassMeta(reflectClass);
277: write(Msg.CLASS_META.getWriter(Serializer.marshall(
278: systemTransaction(), classMeta)));
279: }
280:
281: public long currentVersion() {
282: write(Msg.CURRENT_VERSION);
283: return ((MsgD) expectedResponse(Msg.ID_LIST)).readLong();
284: }
285:
286: public final boolean delete4(Transaction ta, ObjectReference yo,
287: int a_cascade, boolean userCall) {
288: MsgD msg = Msg.DELETE.getWriterForInts(_transaction, new int[] {
289: yo.getID(), userCall ? 1 : 0 });
290: writeBatchedMessage(msg);
291: return true;
292: }
293:
294: public boolean detectSchemaChanges() {
295: return false;
296: }
297:
298: protected boolean doFinalize() {
299: return _doFinalize;
300: }
301:
302: final Buffer expectedByteResponse(Msg expectedMessage) {
303: Msg msg = expectedResponse(expectedMessage);
304: if (msg == null) {
305: // TODO: throw Exception to allow
306: // smooth shutdown
307: return null;
308: }
309: return msg.getByteLoad();
310: }
311:
312: public final Msg expectedResponse(Msg expectedMessage) {
313: Msg message = getResponse();
314: if (expectedMessage.equals(message)) {
315: return message;
316: }
317: checkExceptionMessage(message);
318: throw new IllegalStateException("Unexpected Message:" + message
319: + " Expected:" + expectedMessage);
320: }
321:
322: private void checkExceptionMessage(Msg msg) {
323: if (msg instanceof MRuntimeException) {
324: ((MRuntimeException) msg).throwPayload();
325: }
326: }
327:
328: public AbstractQueryResult getAll(Transaction trans) {
329: int mode = config().queryEvaluationMode().asInt();
330: MsgD msg = Msg.GET_ALL.getWriterForInt(trans, mode);
331: write(msg);
332: return readQueryResult(trans);
333: }
334:
335: /**
336: * may return null, if no message is returned. Error handling is weak and
337: * should ideally be able to trigger some sort of state listener (connection
338: * dead) on the client.
339: */
340: public Msg getResponse() {
341: return _singleThreaded ? getResponseSingleThreaded()
342: : getResponseMultiThreaded();
343: }
344:
345: private Msg getResponseMultiThreaded() {
346: Msg msg;
347: try {
348: msg = (Msg) _messageQueue.next();
349: } catch (BlockingQueueStoppedException e) {
350: msg = Msg.ERROR;
351: }
352: if (msg == Msg.ERROR) {
353: onMsgError();
354: }
355: return msg;
356: }
357:
358: private void onMsgError() {
359: close();
360: throw new DatabaseClosedException();
361: }
362:
363: private Msg getResponseSingleThreaded() {
364: while (isMessageDispatcherAlive()) {
365: try {
366: final Msg message = Msg.readMessage(this , _transaction,
367: i_socket);
368: if (message instanceof ClientSideMessage) {
369: if (((ClientSideMessage) message).processAtClient()) {
370: continue;
371: }
372: }
373: return message;
374: } catch (Db4oIOException exc) {
375: onMsgError();
376: }
377: }
378: return null;
379: }
380:
381: public boolean isMessageDispatcherAlive() {
382: return i_socket != null;
383: }
384:
385: public ClassMetadata classMetadataForId(int a_id) {
386: if (a_id == 0) {
387: return null;
388: }
389: ClassMetadata yc = super .classMetadataForId(a_id);
390: if (yc != null) {
391: return yc;
392: }
393: MsgD msg = Msg.CLASS_NAME_FOR_ID.getWriterForInt(
394: systemTransaction(), a_id);
395: write(msg);
396: MsgD message = (MsgD) expectedResponse(Msg.CLASS_NAME_FOR_ID);
397: String className = message.readString();
398: if (className != null && className.length() > 0) {
399: ReflectClass claxx = reflector().forName(className);
400: if (claxx != null) {
401: return produceClassMetadata(claxx);
402: }
403: // TODO inform client class not present
404: }
405: return null;
406: }
407:
408: public boolean needsLockFileThread() {
409: return false;
410: }
411:
412: protected boolean hasShutDownHook() {
413: return false;
414: }
415:
416: public Db4oDatabase identity() {
417: if (i_db == null) {
418: write(Msg.IDENTITY);
419: Buffer reader = expectedByteResponse(Msg.ID_LIST);
420: showInternalClasses(true);
421: try {
422: i_db = (Db4oDatabase) getByID(reader.readInt());
423: activate(systemTransaction(), i_db, 3);
424: } finally {
425: showInternalClasses(false);
426: }
427: }
428: return i_db;
429: }
430:
431: public boolean isClient() {
432: return true;
433: }
434:
435: private void loginToServer(Socket4 socket)
436: throws InvalidPasswordException {
437: UnicodeStringIO stringWriter = new UnicodeStringIO();
438: int length = stringWriter.length(_userName)
439: + stringWriter.length(_password);
440: MsgD message = Msg.LOGIN.getWriterForLength(
441: systemTransaction(), length);
442: message.writeString(_userName);
443: message.writeString(_password);
444: message.write(socket);
445: Msg msg = readLoginMessage(socket);
446: Buffer payLoad = msg.payLoad();
447: _blockSize = payLoad.readInt();
448: int doEncrypt = payLoad.readInt();
449: if (doEncrypt == 0) {
450: _handlers.oldEncryptionOff();
451: }
452: }
453:
454: private Msg readLoginMessage(Socket4 socket) {
455: Msg msg = Msg.readMessage(this , systemTransaction(), socket);
456: while (Msg.PONG.equals(msg)) {
457: msg = Msg.readMessage(this , systemTransaction(), socket);
458: }
459: if (!Msg.LOGIN_OK.equals(msg)) {
460: throw new InvalidPasswordException();
461: }
462: return msg;
463: }
464:
465: public boolean maintainsIndices() {
466: return false;
467: }
468:
469: public final int newUserObject() {
470: int prefetchIDCount = config().prefetchIDCount();
471: ensureIDCacheAllocated(prefetchIDCount);
472: Buffer reader = null;
473: if (remainingIDs < 1) {
474: MsgD msg = Msg.PREFETCH_IDS.getWriterForInt(_transaction,
475: prefetchIDCount);
476: write(msg);
477: reader = expectedByteResponse(Msg.ID_LIST);
478: for (int i = prefetchIDCount - 1; i >= 0; i--) {
479: _prefetchedIDs[i] = reader.readInt();
480: }
481: remainingIDs = prefetchIDCount;
482: }
483: remainingIDs--;
484: return _prefetchedIDs[remainingIDs];
485: }
486:
487: void processBlobMessage(MsgBlob msg) {
488: synchronized (blobLock) {
489: boolean needStart = blobThread == null
490: || blobThread.isTerminated();
491: if (needStart) {
492: blobThread = new BlobProcessor(this );
493: }
494: blobThread.add(msg);
495: if (needStart) {
496: blobThread.start();
497: }
498: }
499: }
500:
501: public void raiseVersion(long a_minimumVersion) {
502: write(Msg.RAISE_VERSION.getWriterForLong(_transaction,
503: a_minimumVersion));
504: }
505:
506: public void readBytes(byte[] bytes, int address, int addressOffset,
507: int length) {
508: throw Exceptions4.virtualException();
509: }
510:
511: public void readBytes(byte[] a_bytes, int a_address, int a_length) {
512: MsgD msg = Msg.READ_BYTES.getWriterForInts(_transaction,
513: new int[] { a_address, a_length });
514: write(msg);
515: Buffer reader = expectedByteResponse(Msg.READ_BYTES);
516: System.arraycopy(reader._buffer, 0, a_bytes, 0, a_length);
517: }
518:
519: protected boolean rename1(Config4Impl config) {
520: logMsg(58, null);
521: return false;
522: }
523:
524: public final StatefulBuffer readWriterByID(Transaction a_ta,
525: int a_id) {
526: MsgD msg = Msg.READ_OBJECT.getWriterForInt(a_ta, a_id);
527: write(msg);
528: StatefulBuffer bytes = ((MsgObject) expectedResponse(Msg.OBJECT_TO_CLIENT))
529: .unmarshall();
530: if (bytes != null) {
531: bytes.setTransaction(a_ta);
532: }
533: return bytes;
534: }
535:
536: public final StatefulBuffer[] readWritersByIDs(Transaction a_ta,
537: int[] ids) {
538: MsgD msg = Msg.READ_MULTIPLE_OBJECTS.getWriterForIntArray(a_ta,
539: ids, ids.length);
540: write(msg);
541: MsgD response = (MsgD) expectedResponse(Msg.READ_MULTIPLE_OBJECTS);
542: int count = response.readInt();
543: StatefulBuffer[] yapWriters = new StatefulBuffer[count];
544: for (int i = 0; i < count; i++) {
545: MsgObject mso = (MsgObject) Msg.OBJECT_TO_CLIENT
546: .publicClone();
547: mso.setTransaction(a_ta);
548: mso.payLoad(response.payLoad().readYapBytes());
549: if (mso.payLoad() != null) {
550: mso.payLoad().incrementOffset(Const4.MESSAGE_LENGTH);
551: yapWriters[i] = mso.unmarshall(Const4.MESSAGE_LENGTH);
552: yapWriters[i].setTransaction(a_ta);
553: }
554: }
555: return yapWriters;
556: }
557:
558: public final Buffer readReaderByID(Transaction a_ta, int a_id) {
559: // TODO: read lightweight reader instead
560: return readWriterByID(a_ta, a_id);
561: }
562:
563: private AbstractQueryResult readQueryResult(Transaction trans) {
564: AbstractQueryResult queryResult = null;
565: Buffer reader = expectedByteResponse(Msg.QUERY_RESULT);
566: int queryResultID = reader.readInt();
567: if (queryResultID > 0) {
568: queryResult = new LazyClientQueryResult(trans, this ,
569: queryResultID);
570: } else {
571: queryResult = new ClientQueryResult(trans);
572: }
573: queryResult.loadFromIdReader(reader);
574: return queryResult;
575: }
576:
577: void readThis() {
578: write(Msg.GET_CLASSES.getWriter(systemTransaction()));
579: Buffer bytes = expectedByteResponse(Msg.GET_CLASSES);
580: classCollection().setID(bytes.readInt());
581: createStringIO(bytes.readByte());
582: classCollection().read(systemTransaction());
583: classCollection().refreshClasses();
584: }
585:
586: public void releaseSemaphore(String name) {
587: synchronized (_lock) {
588: checkClosed();
589: if (name == null) {
590: throw new NullPointerException();
591: }
592: write(Msg.RELEASE_SEMAPHORE.getWriterForString(
593: _transaction, name));
594: }
595: }
596:
597: public void releaseSemaphores(Transaction ta) {
598: // do nothing
599: }
600:
601: private void reReadAll(Configuration config) {
602: remainingIDs = 0;
603: initialize1(config);
604: initializeTransactions();
605: readThis();
606: }
607:
608: public final void rollback1(Transaction trans) {
609: if (_config.batchMessages()) {
610: clearBatchedObjects();
611: }
612: write(Msg.ROLLBACK);
613: trans.rollback();
614: }
615:
616: public void send(Object obj) {
617: synchronized (_lock) {
618: if (obj != null) {
619: write(Msg.USER_MESSAGE.getWriter(Serializer.marshall(
620: _transaction, obj)));
621: }
622: }
623: }
624:
625: public final void setDirtyInSystemTransaction(
626: PersistentBase a_object) {
627: // do nothing
628: }
629:
630: public boolean setSemaphore(String name, int timeout) {
631: synchronized (_lock) {
632: checkClosed();
633: if (name == null) {
634: throw new NullPointerException();
635: }
636: MsgD msg = Msg.SET_SEMAPHORE.getWriterForIntString(
637: _transaction, timeout, name);
638: write(msg);
639: Msg message = getResponse();
640: return (message.equals(Msg.SUCCESS));
641: }
642: }
643:
644: public void switchToFile(String fileName) {
645: synchronized (_lock) {
646: commit();
647: MsgD msg = Msg.SWITCH_TO_FILE.getWriterForString(
648: _transaction, fileName);
649: write(msg);
650: expectedResponse(Msg.OK);
651: // FIXME NSC
652: reReadAll(Db4o.cloneConfiguration());
653: switchedToFile = fileName;
654: }
655: }
656:
657: public void switchToMainFile() {
658: synchronized (_lock) {
659: commit();
660: write(Msg.SWITCH_TO_MAIN_FILE);
661: expectedResponse(Msg.OK);
662: // FIXME NSC
663: reReadAll(Db4o.cloneConfiguration());
664: switchedToFile = null;
665: }
666: }
667:
668: public String name() {
669: return toString();
670: }
671:
672: public String toString() {
673: // if(i_classCollection != null){
674: // return i_classCollection.toString();
675: // }
676: return "Client Connection " + _userName;
677: }
678:
679: public void shutdown() {
680: // do nothing
681: }
682:
683: public final void writeDirty() {
684: // do nothing
685: }
686:
687: public final boolean write(Msg msg) {
688: writeMsg(msg, true);
689: return true;
690: }
691:
692: public final void writeBatchedMessage(Msg msg) {
693: writeMsg(msg, false);
694: }
695:
696: private final void writeMsg(Msg msg, boolean flush) {
697: if (_config.batchMessages()) {
698: if (flush && _batchedMessages.isEmpty()) {
699: // if there's nothing batched, just send this message directly
700: writeMessageToSocket(msg);
701: } else {
702: addToBatch(msg);
703: if (flush
704: || _batchedQueueLength > _config
705: .maxBatchQueueSize()) {
706: writeBatchedMessages();
707: }
708: }
709: } else {
710: writeMessageToSocket(msg);
711: }
712: }
713:
714: public boolean writeMessageToSocket(Msg msg) {
715: return msg.write(i_socket);
716: }
717:
718: public final void writeNew(Transaction trans, Pointer4 pointer,
719: ClassMetadata classMetadata, Buffer buffer) {
720: MsgD msg = Msg.WRITE_NEW.getWriter(trans, pointer,
721: classMetadata, buffer);
722: writeBatchedMessage(msg);
723: }
724:
725: public final void writeTransactionPointer(int a_address) {
726: // do nothing
727: }
728:
729: public final void writeUpdate(Transaction trans, Pointer4 pointer,
730: ClassMetadata classMetadata, Buffer buffer) {
731: MsgD msg = Msg.WRITE_UPDATE.getWriter(trans, pointer,
732: classMetadata, buffer);
733: writeBatchedMessage(msg);
734: }
735:
736: public boolean isAlive() {
737: try {
738: write(Msg.IS_ALIVE);
739: return expectedResponse(Msg.IS_ALIVE) != null;
740: } catch (Db4oException exc) {
741: return false;
742: }
743: }
744:
745: // Remove, for testing purposes only
746: public Socket4 socket() {
747: return i_socket;
748: }
749:
750: private void ensureIDCacheAllocated(int prefetchIDCount) {
751: if (_prefetchedIDs == null) {
752: _prefetchedIDs = new int[prefetchIDCount];
753: return;
754: }
755: if (prefetchIDCount > _prefetchedIDs.length) {
756: int[] newPrefetchedIDs = new int[prefetchIDCount];
757: System.arraycopy(_prefetchedIDs, 0, newPrefetchedIDs, 0,
758: _prefetchedIDs.length);
759: _prefetchedIDs = newPrefetchedIDs;
760: }
761: }
762:
763: public SystemInfo systemInfo() {
764: throw new NotImplementedException(
765: "Functionality not availble on clients.");
766: }
767:
768: public void writeBlobTo(Transaction trans, BlobImpl blob, File file)
769: throws IOException {
770: MsgBlob msg = (MsgBlob) Msg.READ_BLOB.getWriterForInt(trans,
771: (int) getID(blob));
772: msg._blob = blob;
773: processBlobMessage(msg);
774: }
775:
776: public void readBlobFrom(Transaction trans, BlobImpl blob, File file)
777: throws IOException {
778: MsgBlob msg = null;
779: synchronized (lock()) {
780: set(blob);
781: int id = (int) getID(blob);
782: msg = (MsgBlob) Msg.WRITE_BLOB.getWriterForInt(trans, id);
783: msg._blob = blob;
784: blob.setStatus(Status.QUEUED);
785: }
786: processBlobMessage(msg);
787: }
788:
789: public void deleteBlobFile(Transaction trans, BlobImpl blob) {
790: MDeleteBlobFile msg = (MDeleteBlobFile) Msg.DELETE_BLOB_FILE
791: .getWriterForInt(trans, (int) getID(blob));
792: writeMsg(msg, false);
793: }
794:
795: public long[] getIDsForClass(Transaction trans, ClassMetadata clazz) {
796: MsgD msg = Msg.GET_INTERNAL_IDS.getWriterForInt(trans, clazz
797: .getID());
798: write(msg);
799: Buffer reader = expectedByteResponse(Msg.ID_LIST);
800: int size = reader.readInt();
801: final long[] ids = new long[size];
802: for (int i = 0; i < size; i++) {
803: ids[i] = reader.readInt();
804: }
805: return ids;
806: }
807:
808: public QueryResult classOnlyQuery(Transaction trans,
809: ClassMetadata clazz) {
810: long[] ids = clazz.getIDs(trans);
811: ClientQueryResult resClient = new ClientQueryResult(trans,
812: ids.length);
813: for (int i = 0; i < ids.length; i++) {
814: resClient.add((int) ids[i]);
815: }
816: return resClient;
817: }
818:
819: public QueryResult executeQuery(QQuery query) {
820: Transaction trans = query.getTransaction();
821: query.evaluationMode(config().queryEvaluationMode());
822: query.marshall();
823: MsgD msg = Msg.QUERY_EXECUTE.getWriter(Serializer.marshall(
824: trans, query));
825: write(msg);
826: return readQueryResult(trans);
827: }
828:
829: public final void writeBatchedMessages() {
830: if (_batchedMessages.isEmpty()) {
831: return;
832: }
833:
834: Msg msg;
835: MsgD multibytes = Msg.WRITE_BATCHED_MESSAGES
836: .getWriterForLength(transaction(), _batchedQueueLength);
837: multibytes.writeInt(_batchedMessages.size());
838: Iterator4 iter = _batchedMessages.iterator();
839: while (iter.moveNext()) {
840: msg = (Msg) iter.current();
841: if (msg == null) {
842: multibytes.writeInt(0);
843: } else {
844: multibytes.writeInt(msg.payLoad().length());
845: multibytes.payLoad().append(msg.payLoad()._buffer);
846: }
847: }
848: writeMessageToSocket(multibytes);
849: clearBatchedObjects();
850: }
851:
852: public final void addToBatch(Msg msg) {
853: _batchedMessages.add(msg);
854: // the first INT_LENGTH is for buffer.length, and then buffer content.
855: _batchedQueueLength += Const4.INT_LENGTH
856: + msg.payLoad().length();
857: }
858:
859: private final void clearBatchedObjects() {
860: _batchedMessages.clear();
861: // initial value of _batchedQueueLength is YapConst.INT_LENGTH, which is
862: // used for to write the number of messages.
863: _batchedQueueLength = Const4.INT_LENGTH;
864: }
865:
866: int timeout() {
867: return configImpl().timeoutClientSocket();
868: }
869:
870: protected void shutdownDataStorage() {
871: shutDownCommunicationRessources();
872: }
873:
874: private void shutDownCommunicationRessources() {
875: stopHeartBeat();
876: closeMessageDispatcher();
877: _messageQueue.stop();
878: }
879:
880: public void setDispatcherName(String name) {
881: // do nothing here
882: }
883:
884: public void startDispatcher() {
885: // do nothing here for single thread, ClientObjectContainer is already running
886: }
887:
888: public ClientMessageDispatcher messageDispatcher() {
889: return _singleThreaded ? this : _messageDispatcher;
890: }
891:
892: public void onCommittedListener() {
893: if (_singleThreaded) {
894: return;
895: }
896: write(Msg.COMMITTED_CALLBACK_REGISTER);
897: }
898:
899: public int classMetadataIdForName(String name) {
900: MsgD msg = Msg.CLASS_METADATA_ID_FOR_NAME.getWriterForString(
901: systemTransaction(), name);
902: msg.write(i_socket);
903: MsgD response = (MsgD) expectedResponse(Msg.CLASS_ID);
904: return response.readInt();
905: }
906:
907: }
|