001: // You can redistribute this software and/or modify it under the terms of
002: // the Ozone Library License version 1 published by ozone-db.org.
003: //
004: // The original code and portions created by SMB are
005: // Copyright (C) 1997-@year@ by SMB GmbH. All rights reserved.
006: //
007: // $Id: ClientCacheDatabase.java,v 1.2 2002/06/08 00:49:38 mediumnet Exp $
008:
009: package org.ozoneDB;
010:
011: import java.io.*;
012: import java.util.*;
013: import org.ozoneDB.DxLib.*;
014: import org.ozoneDB.DxLib.net.*;
015: import org.ozoneDB.util.*;
016: import org.ozoneDB.core.*;
017: import org.ozoneDB.core.DbRemote.*;
018:
019: /**
020: * This is an {@link ExternalDatabase} that implements a client side cache on
021: * top of another {@link ExternalDatabase}.<p>
022: *
023: * <i> In contrast to {@link LocalDatabase} and {@link RemoteDatabase} which
024: * produce the exactly same results for the same code, this implementation of
025: * {@link ExternalDatabase} is not guaranteed to do so.</i><p>
026: *
027: * Note: The method parameters of type {@link OzoneRemote} are supposed to by
028: * proxy objects (of type {@link OzoneProxy}). However, it's possible to pass
029: * a database objects (of type {@link OzoneCompatible}). In this case the
030: * parameter should be substituted. Currently this is done by the invoke()
031: * method only.<p>
032: *
033: * Impl. Note: All interface methods are synchronized because they have to be
034: * executed as an atomar operation.
035: *
036: *
037: * @author <a href="http://www.softwarebuero.de/">SMB</a>
038: * @version $Revision: 1.2 $Date: 2002/06/08 00:49:38 $
039: * @see OzoneInterface
040: */
041: public class ClientCacheDatabase extends ExternalDatabase {
042:
043: /**
044: * Holds all currently cached target objects. Maps ObjectID into
045: * CacheObjectContainer.
046: */
047: private DxMap idTable;
048:
049: /**
050: * Holds the names that are known to this database instance. Maps String
051: * into ObjectID.
052: */
053: private DxMap nameTable;
054:
055: private long totalMemory;
056:
057: private ExternalDatabase delegate;
058:
059: private long idCount;
060: private long idBorder;
061: private long idRange = 1000;
062:
063: private boolean debug;
064:
065: /**
066: * Constructs a new ClientCacheDatabase with the given delegate as
067: * back-end.<p>
068: *
069: * Note: The size of the client side cache can be adjusted via the heap
070: * size of the VM (parameter -Xmx). The cache uses all available heap in
071: * its VM.
072: *
073: *
074: * @param _delegate The back-end database.
075: */
076: public ClientCacheDatabase(ExternalDatabase _delegate) {
077: this (_delegate, false);
078: }
079:
080: /**
081: * Constructs a new ClientCacheDatabase with the given delegate as
082: * back-end and the given debug option.<p>
083: *
084: * Note: The size of the client side cache can be adjusted via the heap
085: * size of the VM (parameter -Xmx). The cache uses all available heap in
086: * its VM.
087: *
088: *
089: * @param _delegate The back-end database.
090: * @param _debug
091: */
092: public ClientCacheDatabase(ExternalDatabase _delegate,
093: boolean _debug) {
094: delegate = _delegate;
095: delegate.setWrapper(this );
096:
097: debug = _debug;
098: idTable = new DxHashMap(1000);
099: nameTable = new DxHashMap(100);
100:
101: calcMemory();
102: }
103:
104: protected void open(Hashtable _props) throws Exception {
105: throw new RuntimeException(
106: "Method open() must not be called for this class.");
107: }
108:
109: protected synchronized ObjectID nextID() throws Exception {
110: if (idCount >= idBorder) {
111: ObjectID id = (ObjectID) delegate.sendCommand(new DbNextID(
112: idRange), true);
113: idCount = id.value();
114: idBorder = idCount + idRange;
115: }
116: return new ObjectID(++idCount);
117: }
118:
119: public ExternalDatabase delegate() {
120: return delegate;
121: }
122:
123: protected Object sendCommand(DbCommand command,
124: boolean waitForResult, DbClient connection)
125: throws Exception {
126: throw new RuntimeException(
127: "ClientCacheDatabase must access the actual database through its delegate only.");
128: }
129:
130: protected DbClient newConnection() throws Exception {
131: return delegate.newConnection();
132: }
133:
134: public boolean isOpen() throws Exception {
135: return delegate.isOpen();
136: }
137:
138: public void close() throws Exception {
139: delegate.close();
140: }
141:
142: protected void finalize() throws Throwable {
143: close();
144: }
145:
146: public ExternalTransaction newTransaction() {
147: return new ExternalTransaction(this );
148: }
149:
150: public void beginTX(AbstractTransaction tx) throws TransactionExc,
151: IOException {
152: if (debug) {
153: System.out.println("[debug] beginTX()");
154: }
155: delegate.beginTX(tx);
156: }
157:
158: public void joinTX(AbstractTransaction tx) throws TransactionExc {
159: if (debug) {
160: System.out.println("[debug] joinTX()");
161: }
162: delegate.joinTX(tx);
163: }
164:
165: public boolean leaveTX(AbstractTransaction tx) {
166: if (debug) {
167: System.out.println("[debug] leaveTX()");
168: }
169: return delegate.leaveTX(tx);
170: }
171:
172: public void checkpointTX(AbstractTransaction tx)
173: throws TransactionExc, IOException {
174: if (debug) {
175: System.out.println("[debug] checkpointTX()");
176: }
177: throw new RuntimeException("Not yet implemented.");
178: }
179:
180: public synchronized void prepareTX(AbstractTransaction tx)
181: throws TransactionExc, IOException {
182: if (debug) {
183: System.out.println("[debug] prepareTX()");
184: }
185: try {
186: syncCache();
187: delegate.prepareTX(tx);
188: updateModTimes();
189: } catch (TransactionExc e) {
190: abortCache();
191: throw e;
192: } catch (Exception e) {
193: throw new UnexpectedException(e.toString());
194: }
195: }
196:
197: public synchronized void commitTX(AbstractTransaction tx,
198: boolean onePhase) throws TransactionExc, IOException {
199: if (debug) {
200: System.out.println("[debug] commitTX(): onePhase:"
201: + onePhase);
202: }
203:
204: if (onePhase) {
205: prepareTX(tx);
206: }
207:
208: try {
209: delegate.commitTX(tx, false);
210: } catch (TransactionExc e) {
211: throw e;
212: } catch (Exception e) {
213: throw new UnexpectedException(e.toString());
214: }
215:
216: // reset container states; handle exceptions in a special way
217: try {
218: DxIterator it = idTable.iterator();
219: while (it.next() != null) {
220: // clearing the states of all containers without checking should
221: // be the fastest way here
222: CacheObjectContainer container = (CacheObjectContainer) it
223: .object();
224: container.clearState();
225: }
226: } catch (Exception e) {
227: // if someting failes after the server transaction is commited we
228: // have to signal a strong error
229: throw new UnexpectedException(e.toString());
230: }
231: }
232:
233: public synchronized void rollbackTX(AbstractTransaction tx)
234: throws TransactionExc, IOException {
235: if (debug) {
236: System.out.println("[debug] rollbackTX()");
237: }
238:
239: delegate.rollbackTX(tx);
240: abortCache();
241: }
242:
243: // OzoneInterface methods *****************************
244:
245: /**
246: * Force the database server to reload all classes which extend
247: * OzoneObject. This is useful while testing new classes.
248: */
249: public void reloadClasses() throws Exception {
250: delegate.reloadClasses();
251: }
252:
253: public synchronized OzoneProxy createObject(String className,
254: int access, String name, String sig, Object[] args)
255: throws RuntimeException {
256: try {
257: AbstractTransaction tx = delegate.txForThread(Thread
258: .currentThread());
259: if (tx == null) {
260: throw new TransactionExc(
261: "Thread has not yet joined a transaction.",
262: TransactionExc.STATE);
263: }
264:
265: OzoneCompatible target = (OzoneCompatible) Class.forName(
266: className).newInstance();
267: CacheObjectContainer container = new CacheObjectContainer(
268: target, nextID(), name, access);
269: container.setDatabase(this );
270: container.raiseState(ObjectContainer.STATE_CREATED);
271: container.setDirty(true);
272: container.tx = tx;
273:
274: idTable.addForKey(container, container.id());
275:
276: return container.ozoneProxy();
277: } catch (Exception e) {
278: // only supported from JDK1.4 on
279: // throw new RuntimeException("Caught during createObject()",e);
280: throw new RuntimeException("Caught during createObject(): "
281: + e);
282: }
283: }
284:
285: public synchronized void deleteObject(OzoneRemote obj)
286: throws RuntimeException {
287: try {
288: OzoneProxy proxy = (OzoneProxy) obj;
289: CacheObjectContainer container = fetch0(proxy.remoteID(),
290: Lock.LEVEL_WRITE);
291:
292: container.raiseState(ObjectContainer.STATE_DELETED);
293: } catch (Exception e) {
294: // only supported from JDK1.4 on
295: // throw new RuntimeException("Caught during createObject()",e);
296: throw new RuntimeException("Caught during createObject(): "
297: + e);
298: }
299: }
300:
301: public synchronized OzoneProxy copyObject(OzoneRemote obj)
302: throws Exception {
303: throw new RuntimeException(
304: "copyObject(): Method not implemented.");
305: }
306:
307: public synchronized void nameObject(OzoneRemote obj, String name)
308: throws Exception {
309:
310: ObjectID id = (ObjectID) nameTable.elementForKey(name);
311: if (id != null) {
312: throw new PermissionDeniedExc("Root object name '" + name
313: + "' already exists.");
314: }
315:
316: OzoneProxy proxy = (OzoneProxy) obj;
317: CacheObjectContainer container = fetch0(proxy.remoteID(),
318: Lock.LEVEL_WRITE);
319: container.setName(name);
320: nameTable.addForKey(container.id(), name);
321: }
322:
323: public synchronized OzoneProxy objectForName(String name)
324: throws Exception {
325: if (debug) {
326: System.out.println("[debug] objectForName(): name:" + name);
327: }
328:
329: ObjectID id = (ObjectID) nameTable.elementForKey(name);
330:
331: if (id != null) {
332: CacheObjectContainer container = (CacheObjectContainer) idTable
333: .elementForKey(id);
334: if (container != null) {
335: return container.ozoneProxy();
336: } else {
337: container = fetch0(id, Lock.LEVEL_READ);
338:
339: if (container != null) {
340: return container.ozoneProxy();
341: } else {
342: nameTable.removeForKey(name);
343: return null;
344: }
345: }
346: } else {
347: OzoneProxy proxy = delegate.objectForName(name);
348: if (proxy != null) {
349: id = proxy.remoteID();
350: nameTable.addForKey(id, name);
351:
352: CacheObjectContainer container = fetch0(id,
353: Lock.LEVEL_READ);
354: return container.ozoneProxy();
355: } else {
356: return null;
357: }
358: }
359: }
360:
361: public Object invoke(OzoneProxy proxy, String methodName,
362: String sig, Object[] args, int lockLevel) throws Exception {
363: throw new RuntimeException("invoke(): Method not implemented.");
364: }
365:
366: public Object invoke(OzoneProxy proxy, int methodIndex,
367: Object[] args, int lockLevel) throws Exception {
368: throw new RuntimeException("invoke(): Method not implemented.");
369: }
370:
371: public OzoneProxy[] objectsOfClass(String name) throws Exception {
372: throw new RuntimeException("invoke(): Method not implemented.");
373: }
374:
375: // fetch/sync cache ***********************************
376:
377: public OzoneCompatible fetch(OzoneProxy proxy, int lockLevel)
378: throws Exception {
379: if (debug) {
380: System.out.println("[debug] fetch(): id:"
381: + proxy.remoteID());
382: }
383:
384: CacheObjectContainer container = fetch0(proxy.remoteID(),
385: lockLevel);
386: OzoneCompatible target = container.target();
387: if (target == null) {
388: if (debug) {
389: System.out.println("[debug] fetch(): id:"
390: + proxy.remoteID());
391: }
392: throw new ObjectNotFoundExc("Target is null.");
393: }
394:
395: return target;
396: }
397:
398: /**
399: * Fetch the object with the specified ObjectID from the server.
400: *
401: *
402: * @param proxy The proxy that specifies the object to fetch.
403: * @return The transaction for the current thread.
404: */
405: protected CacheObjectContainer fetch0(ObjectID id, int lockLevel)
406: throws Exception {
407:
408: AbstractTransaction tx = delegate.txForThread(Thread
409: .currentThread());
410: if (tx == null) {
411: throw new TransactionExc(
412: "Thread has not yet joined a transaction.",
413: TransactionExc.STATE);
414: }
415:
416: CacheObjectContainer container = (CacheObjectContainer) idTable
417: .elementForKey(id);
418: if (container == null) {
419: container = fetchChunk(id, 100000);
420: }
421:
422: if (container == null) {
423: throw new ObjectNotFoundExc("No object for the given ID.");
424: }
425:
426: // we are going to change the proxy so we have to synchronize it;
427: // afterwards the value of container.tx, which in fact is a lock,
428: // prevents other threads from changing the container
429: synchronized (container) {
430:
431: // check if the target is member of our tx; because there is no
432: // detection on the client, we may end up in a real, hard deadlock!
433: while (container.tx != null && container.tx != tx) {
434: try {
435: // we have to call wait on the container to let the JVM
436: // release the lock that the synchronized statement put on it
437: container.wait();
438: } catch (InterruptedException e) {
439: }
440: }
441:
442: // make this proxy/object a member of the transaction; in fact,
443: // this puts a lock on it
444: if (container.tx == null) {
445: container.tx = tx;
446: }
447:
448: if (lockLevel == Lock.LEVEL_READ) {
449: container.raiseState(CacheObjectContainer.STATE_READ);
450: } else {
451: container
452: .raiseState(CacheObjectContainer.STATE_MODIFIED);
453: container.setDirty(true);
454: }
455: }
456:
457: return container;
458: }
459:
460: protected synchronized CacheObjectContainer fetchChunk(
461: ObjectID rootID, int size) throws Exception {
462: ensureSpace(size);
463:
464: DxArrayBag chunk = (DxArrayBag) delegate.sendCommand(
465: new DbCacheChunk(rootID, size), true);
466:
467: for (int i = 0; i < chunk.count(); i++) {
468: CacheObjectContainer container = (CacheObjectContainer) chunk
469: .elementAtIndex(i);
470: if (debug) {
471: System.out.println("[debug] fetchChunk(): container:"
472: + container.id());
473: }
474:
475: // set the container link of this client side copy of the target
476: container.setTarget(container.target());
477:
478: container.setDatabase(this );
479:
480: if (idTable.containsKey(container.id())) {
481: System.out
482: .print("[debug] fetchChunk(): container already registered... ");
483:
484: // use the newly fetched container instead of the old one if
485: // the old isn't currently locked
486: CacheObjectContainer c = (CacheObjectContainer) idTable
487: .elementForKey(container.id());
488: if (c.tx != null) {
489: System.out.println("and locked - using old one.");
490: } else {
491: System.out.println("not locked - using new one.");
492: idTable.removeForKey(container.id());
493: idTable.addForKey(container, container.id());
494: }
495: } else {
496: idTable.addForKey(container, container.id());
497: }
498: }
499:
500: return (CacheObjectContainer) idTable.elementForKey(rootID);
501: }
502:
503: /**
504: * Synchronize all changed objects with the server. This does not commits
505: * the transaction but only transfers all changed objects back to the
506: * server.
507: */
508: protected synchronized void syncCache() throws Exception {
509: if (debug) {
510: System.out.println("[debug] syncCache()");
511: }
512:
513: // although we send a byte[] the proxy links are handled correctly -
514: // see DbCacheChunk for details
515:
516: ByteArrayOutputStream bout = new ByteArrayOutputStream();
517: ObjectOutputStream out = new ObjectOutputStream(bout);
518:
519: int count = 0;
520: DxIterator it = idTable.iterator();
521: while (it.next() != null) {
522: CacheObjectContainer container = (CacheObjectContainer) it
523: .object();
524:
525: if (debug) {
526: System.out.println("[debug] id:" + container.id()
527: + ", state:" + container.state() + ", dirty:"
528: + container.dirty());
529: }
530:
531: if (container.dirty()) {
532: out.writeObject(container);
533:
534: // if the sendCommand() fails, which would make the container
535: // dirty again, then the entire transaction is aborted the
536: // container is thrown away
537: container.setDirty(false);
538: count++;
539: }
540:
541: if (bout.size() > 500000) {
542: if (debug) {
543: System.out.println("[debug] syncCache(): writing "
544: + count + " objects");
545: }
546:
547: out.close();
548: delegate.sendCommand(new DbCacheChunk(bout
549: .toByteArray()), true);
550: bout = new ByteArrayOutputStream();
551: out = new ObjectOutputStream(bout);
552: count = 0;
553: }
554: }
555:
556: if (debug) {
557: System.out.println("[debug] syncCache(): writing " + count
558: + " objects");
559: }
560: out.close();
561: delegate
562: .sendCommand(new DbCacheChunk(bout.toByteArray()), true);
563: }
564:
565: /**
566: * Throws away all modified containers. Clears idTable and nameTable.
567: */
568: protected synchronized void abortCache() {
569: try {
570: DxIterator it = idTable.iterator();
571: while (it.next() != null) {
572: CacheObjectContainer container = (CacheObjectContainer) it
573: .object();
574: if (container.state() >= ObjectContainer.STATE_MODIFIED) {
575: idTable.removeForKey(container.id());
576: if (container.name() != null) {
577: nameTable.removeForKey(container.name());
578: }
579: }
580: }
581: } catch (Exception e) {
582: // this should never happen; all we can do here is to signal a
583: // strong error
584: throw new RuntimeException(e.toString());
585: }
586: }
587:
588: /**
589: * Update the modification times of all currently modified containers. This
590: * method should be called after prepareTX only.
591: */
592: protected synchronized void updateModTimes() {
593: try {
594: if (debug) {
595: System.out.println("[debug] updateModTimes()");
596: }
597:
598: DbModTimes command = new DbModTimes();
599:
600: int count = 0;
601: DxIterator it = idTable.iterator();
602: while (it.next() != null) {
603: CacheObjectContainer container = (CacheObjectContainer) it
604: .object();
605: if (container.state() == ObjectContainer.STATE_MODIFIED
606: || container.state() == ObjectContainer.STATE_CREATED) {
607: if (debug) {
608: System.out.println("[debug] send: id:"
609: + container.id());
610: }
611:
612: command.addObjectID(container.id());
613: count++;
614: }
615: }
616:
617: // avoid sending the command if nothing has changed anyway
618: if (count > 0) {
619: DxMap map = (DxMap) delegate.sendCommand(command, true);
620:
621: it = map.iterator();
622: Long modTime = null;
623: while ((modTime = (Long) it.next()) != null) {
624: ObjectID id = (ObjectID) it.key();
625:
626: if (debug) {
627: System.out.println("[debug] receive: id:"
628: + id + ", modTime:" + modTime);
629: }
630:
631: CacheObjectContainer container = (CacheObjectContainer) idTable
632: .elementForKey(id);
633: container.setModTime(modTime.longValue());
634: }
635: }
636: } catch (Exception e) {
637: // this should never happen; all we can do here is to signal a
638: // strong error
639: throw new RuntimeException(e.toString());
640: }
641: }
642:
643: /**
644: * Try to free the specified amount of cache space. This will remove
645: * containers from the cache that are currently not used by a transaction
646: */
647: protected synchronized void ensureSpace(long neededSpace) {
648: if (freeMemory() < neededSpace) {
649:
650: // build priority queue for all currently cached objects
651: DxMap priorityQueue = new DxTreeMap();
652: DxIterator it = idTable.iterator();
653: while (it.next() != null) {
654: CacheObjectContainer container = (CacheObjectContainer) it
655: .object();
656: priorityQueue.addForKey(container, new Long(container
657: .lastTouched()));
658: }
659:
660: // free 100 objects at once until there is enough free memory or
661: // no unlocked container left, lowest priority first
662: it = priorityQueue.iterator();
663: CacheObjectContainer container = (CacheObjectContainer) it
664: .next();
665: while (freeMemory() < neededSpace && container != null) {
666:
667: for (int i = 0; i < 100 && container != null; i++) {
668: if (container.tx == null) {
669: idTable.removeForKey(container.id());
670: container = (CacheObjectContainer) it.next();
671: } else {
672: System.out
673: .println("[debug] ensureSpace(): trying to free locked container.");
674: }
675: }
676: System.gc();
677: }
678: }
679: }
680:
681: // free memory ****************************************
682:
683: /**
684: * Initialize the internal memory counter so that freeMemory() returns
685: * correct results.
686: */
687: protected void calcMemory() {
688: Runtime rt = Runtime.getRuntime();
689: try {
690: DxBag bag = new DxArrayBag();
691: for (;;) {
692: bag.add(new byte[100000]);
693: }
694: } catch (OutOfMemoryError e) {
695: totalMemory = rt.totalMemory();
696: rt.gc();
697: }
698: }
699:
700: /**
701: * Return the amount of *total* free memory in the system. The results
702: * returned by Runtime.freeMemory() may change overtime and so its
703: * useless for ozone.<p>
704: *
705: *
706: * Note: this will not work properly if some kind of weal references are
707: * used in this VM. In case of empty space we need to force teh GC to also
708: * free weak references.
709: */
710: public long freeMemory() {
711: Runtime rt = Runtime.getRuntime();
712: long hiddenMemory = totalMemory - rt.totalMemory();
713:
714: // keep 2MB free at least
715: return Math.max(rt.freeMemory() + hiddenMemory - 2000000L, 0);
716: }
717:
718: }
|