001: /*
002: Copyright (C) 2007 Mobixess Inc. http://www.java-objects-database.com
003:
004: This file is part of the JODB (Java Objects Database) open source project.
005:
006: JODB is free software; you can redistribute it and/or modify it under
007: the terms of version 2 of the GNU General Public License as published
008: by the Free Software Foundation.
009:
010: JODB is distributed in the hope that it will be useful, but WITHOUT ANY
011: WARRANTY; without even the implied warranty of MERCHANTABILITY or
012: FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
013: for more details.
014:
015: You should have received a copy of the GNU General Public License along
016: with this program; if not, write to the Free Software Foundation, Inc.,
017: 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
018: */
019: package com.mobixess.jodb.core.io.rmi;
020:
021: import java.io.DataOutputStream;
022: import java.io.IOException;
023: import java.io.PrintStream;
024: import java.lang.reflect.Field;
025: import java.net.InetAddress;
026: import java.net.InetSocketAddress;
027: import java.net.Socket;
028: import java.net.SocketAddress;
029: import java.net.URI;
030: import java.net.URISyntaxException;
031: import java.nio.ByteBuffer;
032: import java.nio.channels.SocketChannel;
033: import java.rmi.Naming;
034: import java.rmi.RemoteException;
035: import java.util.concurrent.locks.ReentrantReadWriteLock;
036:
037: import com.mobixess.jodb.core.IDatabaseStatistics;
038: import com.mobixess.jodb.core.IPersistentObjectStatistics;
039: import com.mobixess.jodb.core.IllegalClassTypeException;
040: import com.mobixess.jodb.core.JODBConstants;
041: import com.mobixess.jodb.core.JODBServer;
042: import com.mobixess.jodb.core.JodbIOException;
043: import com.mobixess.jodb.core.index.IndexDataIterator;
044: import com.mobixess.jodb.core.index.JODBIndexingRootAgent;
045: import com.mobixess.jodb.core.io.IOBase;
046: import com.mobixess.jodb.core.io.IOTicket;
047: import com.mobixess.jodb.core.io.IRandomAccessDataBuffer;
048: import com.mobixess.jodb.core.io.JODBOperationContext;
049: import com.mobixess.jodb.core.io.rmi.IRemoteServer.IServerQueryResult;
050: import com.mobixess.jodb.core.query.QueryNode;
051: import com.mobixess.jodb.core.transaction.JODBQueryList;
052: import com.mobixess.jodb.core.transaction.JODBSession;
053: import com.mobixess.jodb.core.transaction.TransactionAssembler;
054: import com.mobixess.jodb.core.transaction.TransactionContainer;
055:
056: public class JODBIOBaseProxy implements IOBase {
057:
058: private IRemoteServer _server;
059: private URI _serverId;
060:
061: private ReentrantReadWriteLock _transactionLock = new ReentrantReadWriteLock();
062:
063: public JODBIOBaseProxy(URI serverURI) throws JodbIOException {
064: connect(serverURI);
065: }
066:
067: private void connect(URI serverURI) throws JodbIOException {
068: _serverId = serverURI;
069: try {
070: _server = (IRemoteServer) Naming.lookup(_serverId
071: .toString());
072: } catch (Exception e) {
073: throw new JodbIOException(e);
074: }
075: }
076:
077: public static URI composeURI(String host, String serverName)
078: throws JodbIOException {
079: String id = host + JODBServer.REMOTE_OBJECT_NAME;
080: if (serverName != null && serverName.length() > 0) {
081: id += "_" + serverName;
082: }
083: try {
084: return new URI(id).normalize();
085: } catch (URISyntaxException e) {
086: throw new JodbIOException(e);
087: }
088: }
089:
090: public void applyTransaction(
091: TransactionContainer transactionContainer,
092: JODBSession session, IOTicket writeTicket,
093: JODBIndexingRootAgent indexingRootAgent) throws IOException {
094: transactionContainer.lockTransaction();
095: IRemoteTransactionContainer remoteTransactionContainer = _server
096: .getRemoteTransactionContainer();
097: long transactionOffset = remoteTransactionContainer
098: .initTransaction();
099: IOTicket ticket = getIOTicket(true, false);
100: try {
101: applyTransaction(transactionContainer, ticket, session,
102: transactionOffset, indexingRootAgent);
103: dispatchTranslatedData(transactionContainer,
104: remoteTransactionContainer);
105: remoteTransactionContainer.checkTransactionComplete();
106: transactionContainer.resetTranslatedObjects(session,
107: transactionOffset);
108: } catch (Exception e) {
109: throw new JodbIOException(e);
110: } finally {
111: transactionContainer.reset();
112: ticket.close();
113: remoteTransactionContainer.disposeRemoteContainer();
114: }
115: }
116:
117: private void applyTransaction(
118: TransactionContainer transactionContainer, IOTicket ticket,
119: JODBSession session, long transactionOffset,
120: JODBIndexingRootAgent indexingRootAgent) throws Exception {
121:
122: JODBOperationContext context = new JODBOperationContext(
123: session, ticket, null, transactionContainer,
124: indexingRootAgent);
125: context.setTransactionOffset(transactionOffset);
126:
127: TransactionAssembler.assembleTransactionData(context,
128: transactionContainer);
129:
130: transactionContainer.processTranslatedObjectsIndex(context,
131: transactionOffset);
132: transactionContainer.resetTransactionBufferToEnd();
133: try {
134: transactionContainer.enableAgentMode();
135: TransactionAssembler.assembleTransactionData(context,
136: transactionContainer);
137: } finally {
138: transactionContainer.disableAgentMode();
139: }
140: }
141:
142: private void dispatchTranslatedData(
143: TransactionContainer transactionContainer,
144: IRemoteTransactionContainer remoteTransactionContainer)
145: throws IOException {
146: //Socket socket = new Socket(_serverId.getHost(),JODBConstants.DEFAULT_DATA_STREAM_PORT);
147: SocketChannel socketChannel = SocketChannel
148: .open(new InetSocketAddress(_serverId.getHost(),
149: JODBConstants.DEFAULT_DATA_STREAM_PORT));
150: Socket socket = socketChannel.socket();
151: socketChannel.configureBlocking(true);
152: transactionContainer.resetTransactionBufferToStart();
153: DataOutputStream dos = new DataOutputStream(socket
154: .getOutputStream());
155: remoteTransactionContainer.setTransactionDataSizes(
156: transactionContainer.getTransactionNewDataFile()
157: .length(), transactionContainer
158: .getTransactionReplacementsDataFile().length(),
159: transactionContainer.getRollbackDataFile().length());
160: dispatchTranslatedData(dos, socketChannel, transactionContainer
161: .getTransactionNewDataFile(),
162: remoteTransactionContainer.getNewDataBufferId());
163: dispatchTranslatedData(dos, socketChannel, transactionContainer
164: .getTransactionReplacementsDataFile(),
165: remoteTransactionContainer.getReplacementsBufferId());
166: dispatchTranslatedData(dos, socketChannel, transactionContainer
167: .getRollbackDataFile(), remoteTransactionContainer
168: .getRollbackBufferId());
169: }
170:
171: private void dispatchTranslatedData(DataOutputStream dos,
172: SocketChannel socketChannel,
173: IRandomAccessDataBuffer randomAccessDataBuffer, int dataId)
174: throws IOException {
175: dos.writeInt(dataId);
176: long len = randomAccessDataBuffer.length();
177: randomAccessDataBuffer.transferTo(0, len, socketChannel);
178: }
179:
180: public void close() throws IOException {
181: _server.close();
182: }
183:
184: public String getClassTypeForID(int id) throws JodbIOException {
185: try {
186: return _server.getClassTypeForID(id);
187: } catch (IOException e) {
188: throw new JodbIOException(e);
189: }
190: }
191:
192: public int getClassTypeSubstitutionID(String classType)
193: throws JodbIOException {
194: try {
195: return _server.getClassTypeSubstitutionID(classType);
196: } catch (IOException e) {
197: throw new JodbIOException(e);
198: }
199: }
200:
201: public IDatabaseStatistics getDatabaseStatistics()
202: throws IOException {
203: return _server.getDatabaseStatistics();
204: }
205:
206: public URI getDbIdentificator() {
207: return _serverId;
208: }
209:
210: public int getFieldSubstitutionID(Field field) {
211: // TODO Auto-generated method stub
212: throw new RuntimeException("Not Implemented");
213: //return 0;
214: }
215:
216: public long getFirstObjectOffset() throws IOException {
217: return _server.getFirstObjectOffset();
218: }
219:
220: public long[] getForAllObjects(IOTicket ioTicket)
221: throws IOException {
222: return _server.getForAllObjects(ioTicket.getTicketIdentity());
223: }
224:
225: public String getFullFieldNameForID(int id) throws IOException {
226: return _server.getFullFieldNameForID(id);
227: }
228:
229: public IOTicket getIOTicket(boolean read, boolean write)
230: throws IOException {
231: return new IOTicketProxy(_server.getIOTicket(read, write));
232: }
233:
234: public int getOrSetClassTypeSubstitutionID(Class clazz)
235: throws IOException {
236: return getOrSetClassTypeSubstitutionID(clazz.getName());
237: }
238:
239: public int getOrSetClassTypeSubstitutionID(String classType)
240: throws IOException {
241: return _server.getOrSetClassTypeSubstitutionID(classType);
242: }
243:
244: public int getOrSetFieldSubstitutionID(Field field) {
245: try {
246: int declaringClassID = getOrSetClassTypeSubstitutionID(field
247: .getDeclaringClass().getName());
248: int fieldTypeID = getOrSetClassTypeSubstitutionID(field
249: .getType().getName());
250: return _server.getOrSetFieldSubstitutionID(
251: declaringClassID, fieldTypeID, field.getName());
252: } catch (Exception e) {
253: throw new RuntimeException(e);
254: }
255: }
256:
257: public IPersistentObjectStatistics getPersistenceStatistics(
258: long offset, JODBSession session) throws IOException {
259: return _server.getPersistenceStatistics(offset);
260: }
261:
262: public String getPrefixForID(int id) throws IOException {
263: return _server.getPrefixForID(id);
264: }
265:
266: public String getSimpleFieldNameForID(int id) throws IOException {
267: return _server.getSimpleFieldNameForID(id);
268: }
269:
270: public boolean isClosed() throws IOException {
271: return _server.isClosed();
272: }
273:
274: public boolean isNewDatabase() {
275: // TODO Auto-generated method stub
276: throw new RuntimeException("Not Implemented");
277: //return false;
278: }
279:
280: public ReentrantReadWriteLock getTransactionLock() {
281: return _transactionLock;
282: }
283:
284: public void printFileMap(JODBSession session,
285: PrintStream printStream) throws IOException {
286:
287: }
288:
289: public JODBQueryList executeQuery(QueryNode query)
290: throws IOException, IllegalClassTypeException {
291: JODBQueryList result = null;
292: IServerQueryResult queryResult = _server.runQuery(query, null);
293: if (query != null) {
294: long[] resultingOffsets = queryResult.getSearchResult();
295: if (resultingOffsets != null) {
296: result = new JODBQueryList(resultingOffsets, query
297: .getSession());
298: }
299: }
300: return result;
301: }
302:
303: private static class UnprocessedObjectsIterator implements
304: IndexDataIterator {
305:
306: long[] _ids1;
307: long[] _ids2;
308: int _total;
309: int _index = 0;
310:
311: /**
312: * @param ids1
313: * @param ids2
314: */
315: public UnprocessedObjectsIterator(long[] ids1, long[] ids2) {
316: super ();
317: _ids1 = ids1;
318: _ids2 = ids2;
319: if (_ids1 != null) {
320: _total += _ids1.length;
321: }
322: if (_ids2 != null) {
323: _total += _ids2.length;
324: }
325: }
326:
327: public long next(ByteBuffer result) {
328: return next();
329: }
330:
331: public boolean hasNext() {
332: return _index < _total;
333: }
334:
335: public int length() {
336: return _total;
337: }
338:
339: public long next() {
340: int offset = _index++;
341: if (offset < _ids1.length) {
342: return _ids1[offset];
343: } else {
344: return _ids2[offset - _ids1.length];
345: }
346: }
347:
348: }
349:
350: private class IOTicketProxy implements IOTicket {
351: IOTicketRemoteInterface _remoteTicket;
352: RandomAccessDataBufferProxy _randomAccessDataBufferProxy;
353:
354: /**
355: * @param remoteTicket
356: * @throws IOException
357: */
358: public IOTicketProxy(IOTicketRemoteInterface remoteTicket)
359: throws IOException {
360: super ();
361: _remoteTicket = remoteTicket;
362: _randomAccessDataBufferProxy = new RandomAccessDataBufferProxy(
363: _remoteTicket);
364: }
365:
366: public void close() throws IOException {
367: _randomAccessDataBufferProxy.delete();
368: _remoteTicket.close();
369: }
370:
371: public IOBase getBase() {
372: return JODBIOBaseProxy.this ;
373: }
374:
375: public IRandomAccessDataBuffer getRandomAccessBuffer() {
376: return _randomAccessDataBufferProxy;
377: }
378:
379: public void lock(boolean write) throws IOException {
380: // TODO Auto-generated method stub
381: //throw new RuntimeException("Not Implemented");
382: //
383: }
384:
385: public void lock(boolean write, long offset) throws IOException {
386: // TODO Auto-generated method stub
387: //throw new RuntimeException("Not Implemented");
388: //
389: }
390:
391: public void unlock() {
392: // TODO Auto-generated method stub
393: //throw new RuntimeException("Not Implemented");
394: //
395: }
396:
397: public int getTicketIdentity() throws IOException {
398: return _remoteTicket.getTicketIdentity();
399: }
400:
401: }
402: }
|