0001: /**
0002: * com.mckoi.database.BlobStore 18 Jan 2003
0003: *
0004: * Mckoi SQL Database ( http://www.mckoi.com/database )
0005: * Copyright (C) 2000, 2001, 2002 Diehl and Associates, Inc.
0006: *
0007: * This program is free software; you can redistribute it and/or
0008: * modify it under the terms of the GNU General Public License
0009: * Version 2 as published by the Free Software Foundation.
0010: *
0011: * This program is distributed in the hope that it will be useful,
0012: * but WITHOUT ANY WARRANTY; without even the implied warranty of
0013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
0014: * GNU General Public License Version 2 for more details.
0015: *
0016: * You should have received a copy of the GNU General Public License
0017: * Version 2 along with this program; if not, write to the Free Software
0018: * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
0019: *
0020: * Change Log:
0021: *
0022: *
0023: */package com.mckoi.database;
0024:
0025: import java.util.ArrayList;
0026: import java.util.zip.Deflater;
0027: import java.util.zip.Inflater;
0028: import java.util.zip.DataFormatException;
0029: import java.io.IOException;
0030: import java.io.InputStream;
0031: import java.io.Reader;
0032: import com.mckoi.util.PagedInputStream;
0033: import com.mckoi.store.Store;
0034: import com.mckoi.store.Area;
0035: import com.mckoi.store.MutableArea;
0036: import com.mckoi.store.AreaWriter;
0037: import com.mckoi.database.jdbc.AsciiReader;
0038: import com.mckoi.database.jdbc.BinaryToUnicodeReader;
0039: import com.mckoi.database.global.Ref;
0040: import com.mckoi.database.global.BlobRef;
0041: import com.mckoi.database.global.ClobRef;
0042: import com.mckoi.database.global.ByteLongObject;
0043:
0044: /**
0045: * A structure inside an Area that maintains the storage of any number of large
0046: * binary objects. A blob store allows for the easy allocation of areas for
0047: * storing blob data and for reading and writing blob information via BlobRef
0048: * objects.
0049: * <p>
0050: * A BlobStore can be broken down to the following simplistic functions;
0051: * <p>
0052: * 1) Allocation of an area to store a new blob.<br>
0053: * 2) Reading the information in a Blob given a Blob reference identifier.<br>
0054: * 3) Reference counting to a particular Blob.<br>
0055: * 4) Cleaning up a Blob when no static references are left.<br>
0056: *
0057: * @author Tobias Downer
0058: */
0059:
0060: final class BlobStore implements BlobStoreInterface {
0061:
0062: /**
0063: * The magic value for fixed record list structures.
0064: */
0065: private final static int MAGIC = 0x012BC53A9;
0066:
0067: /**
0068: * The outer Store object that is to contain the blob store.
0069: */
0070: private Store store;
0071:
0072: /**
0073: * The FixedRecordList structure that maintains a list of fixed size records
0074: * for blob reference counting.
0075: */
0076: private FixedRecordList fixed_list;
0077:
0078: /**
0079: * The first delete chain element.
0080: */
0081: private long first_delete_chain_record;
0082:
0083: /**
0084: * Constructs the BlobStore on the given Area object.
0085: */
0086: BlobStore(Store store) {
0087: this .store = store;
0088: fixed_list = new FixedRecordList(store, 24);
0089: }
0090:
0091: /**
0092: * Creates the blob store and returns a pointer in the store to the header
0093: * information. This value is later used to initialize the store.
0094: */
0095: long create() throws IOException {
0096: // Init the fixed record list area.
0097: // The fixed list entries are formatted as follows;
0098: // ( status (int), reference_count (int),
0099: // blob_size (long), blob_pointer (long) )
0100: long fixed_list_p = fixed_list.create();
0101:
0102: // Delete chain is empty when we start
0103: first_delete_chain_record = -1;
0104: fixed_list.setReservedLong(-1);
0105:
0106: // Allocate a small header that contains the MAGIC, and the pointer to the
0107: // fixed list structure.
0108: AreaWriter blob_store_header = store.createArea(32);
0109: long blob_store_p = blob_store_header.getID();
0110: // Write the blob store header information
0111: // The magic
0112: blob_store_header.putInt(MAGIC);
0113: // The version
0114: blob_store_header.putInt(1);
0115: // The pointer to the fixed list area
0116: blob_store_header.putLong(fixed_list_p);
0117: // And finish
0118: blob_store_header.finish();
0119:
0120: // Return the pointer to the blob store header
0121: return blob_store_p;
0122: }
0123:
0124: /**
0125: * Initializes the blob store given a pointer to the blob store pointer
0126: * header (the value previously returned by the 'create' method).
0127: */
0128: void init(long blob_store_p) throws IOException {
0129: // Get the header area
0130: Area blob_store_header = store.getArea(blob_store_p);
0131: blob_store_header.position(0);
0132: // Read the magic
0133: int magic = blob_store_header.getInt();
0134: int version = blob_store_header.getInt();
0135: if (magic != MAGIC) {
0136: throw new IOException(
0137: "MAGIC value for BlobStore is not correct.");
0138: }
0139: if (version != 1) {
0140: throw new IOException(
0141: "version number for BlobStore is not correct.");
0142: }
0143:
0144: // Read the pointer to the fixed area
0145: long fixed_list_p = blob_store_header.getLong();
0146: // Init the FixedRecordList area
0147: fixed_list.init(fixed_list_p);
0148:
0149: // Set the delete chain
0150: first_delete_chain_record = fixed_list.getReservedLong();
0151: }
0152:
0153: /**
0154: * Simple structure used when copying blob information.
0155: */
0156: private static class CopyBlobInfo {
0157: int ref_count;
0158: long size;
0159: long ob_p;
0160: };
0161:
0162: /**
0163: * Copies all the blob data from the given BlobStore into this blob store.
0164: * Any blob information that already exists within this BlobStore is deleted.
0165: * We assume this method is called after the blob store is created or
0166: * initialized.
0167: */
0168: void copyFrom(StoreSystem store_system, BlobStore src_blob_store)
0169: throws IOException {
0170: FixedRecordList src_fixed_list = src_blob_store.fixed_list;
0171: long node_count;
0172: synchronized (src_fixed_list) {
0173: node_count = src_fixed_list.addressableNodeCount();
0174: }
0175:
0176: synchronized (fixed_list) {
0177:
0178: // Make sure our fixed_list is big enough to accomodate the copied list,
0179: while (fixed_list.addressableNodeCount() < node_count) {
0180: fixed_list.increaseSize();
0181: }
0182:
0183: // We rearrange the delete chain
0184: long last_deleted = -1;
0185:
0186: // We copy blobs in groups no larger than 1024 Blobs
0187: final int BLOCK_WRITE_COUNT = 1024;
0188:
0189: int max_to_read = (int) Math.min(BLOCK_WRITE_COUNT,
0190: node_count);
0191: long p = 0;
0192:
0193: while (max_to_read > 0) {
0194: // (CopyBlboInfo)
0195: ArrayList src_copy_list = new ArrayList();
0196:
0197: synchronized (src_fixed_list) {
0198: for (int i = 0; i < max_to_read; ++i) {
0199: Area a = src_fixed_list.positionOnNode(p + i);
0200: int status = a.getInt();
0201: // If record is not deleted
0202: if (status != 0x020000) {
0203: CopyBlobInfo info = new CopyBlobInfo();
0204: info.ref_count = a.getInt();
0205: info.size = a.getLong();
0206: info.ob_p = a.getLong();
0207: src_copy_list.add(info);
0208: } else {
0209: src_copy_list.add(null);
0210: }
0211: }
0212: }
0213:
0214: try {
0215: store.lockForWrite();
0216:
0217: // We now should have a list of all records from the src to copy,
0218: int sz = src_copy_list.size();
0219: for (int i = 0; i < sz; ++i) {
0220: CopyBlobInfo info = (CopyBlobInfo) src_copy_list
0221: .get(i);
0222: MutableArea a = fixed_list
0223: .positionOnNode(p + i);
0224: // Either set a deleted entry or set the entry with a copied blob.
0225: if (info == null) {
0226: a.putInt(0x020000);
0227: a.putInt(0);
0228: a.putLong(-1);
0229: a.putLong(last_deleted);
0230: a.checkOut();
0231: last_deleted = p + i;
0232: } else {
0233: // Get the Area containing the blob header data in the source
0234: // store
0235: Area src_blob_header = src_blob_store.store
0236: .getArea(info.ob_p);
0237: // Read the information from the header,
0238: int res = src_blob_header.getInt();
0239: int type = src_blob_header.getInt();
0240: long total_block_size = src_blob_header
0241: .getLong();
0242: long total_block_pages = src_blob_header
0243: .getLong();
0244:
0245: // Allocate a new header
0246: AreaWriter dst_blob_header = store
0247: .createArea(4 + 4 + 8 + 8
0248: + (total_block_pages * 8));
0249: long new_ob_header_p = dst_blob_header
0250: .getID();
0251: // Copy information into the header
0252: dst_blob_header.putInt(res);
0253: dst_blob_header.putInt(type);
0254: dst_blob_header.putLong(total_block_size);
0255: dst_blob_header.putLong(total_block_pages);
0256:
0257: // Allocate and copy each page,
0258: for (int n = 0; n < total_block_pages; ++n) {
0259: // Get the block information
0260: long block_p = src_blob_header
0261: .getLong();
0262: Area src_block = src_blob_store.store
0263: .getArea(block_p);
0264: int block_type = src_block.getInt();
0265: int block_size = src_block.getInt();
0266: // Copy a new block,
0267: int new_block_size = block_size + 4 + 4;
0268: AreaWriter dst_block_p = store
0269: .createArea(new_block_size);
0270: long new_block_p = dst_block_p.getID();
0271: src_block.position(0);
0272: src_block.copyTo(dst_block_p,
0273: new_block_size);
0274: // And finish
0275: dst_block_p.finish();
0276: // Write the new header
0277: dst_blob_header.putLong(new_block_p);
0278: }
0279:
0280: // And finish 'dst_blob_header'
0281: dst_blob_header.finish();
0282:
0283: // Set up the data in the fixed list
0284: a.putInt(1);
0285: // Note all the blobs are written with 0 reference count.
0286: a.putInt(0);
0287: a.putLong(info.size);
0288: a.putLong(new_ob_header_p);
0289: // Check out the changes
0290: a.checkOut();
0291: }
0292: }
0293:
0294: } finally {
0295: store.unlockForWrite();
0296: }
0297:
0298: node_count -= max_to_read;
0299: p += max_to_read;
0300: max_to_read = (int) Math.min(BLOCK_WRITE_COUNT,
0301: node_count);
0302:
0303: // Set a checkpoint in the destination store system so we write out
0304: // all pending changes from the log
0305: store_system.setCheckPoint();
0306:
0307: }
0308:
0309: // Set the delete chain
0310: first_delete_chain_record = last_deleted;
0311: fixed_list.setReservedLong(last_deleted);
0312:
0313: } // synchronized (fixed_list)
0314:
0315: }
0316:
0317: /**
0318: * Convenience method that converts the given String into a ClobRef
0319: * object and pushes it into the given BlobStore object.
0320: */
0321: ClobRef putStringInBlobStore(String str) throws IOException {
0322: final int BUF_SIZE = 64 * 1024;
0323:
0324: int size = str.length();
0325:
0326: byte type = 4;
0327: // Enable compression (ISSUE: Should this be enabled by default?)
0328: type = (byte) (type | 0x010);
0329:
0330: ClobRef ref = (ClobRef) allocateLargeObject(type, size * 2);
0331: byte[] buf = new byte[BUF_SIZE];
0332: long p = 0;
0333: int str_i = 0;
0334: while (size > 0) {
0335: int to_write = Math.min(BUF_SIZE / 2, size);
0336: int buf_i = 0;
0337: for (int i = 0; i < to_write; ++i) {
0338: char c = str.charAt(str_i);
0339: buf[buf_i] = (byte) (c >> 8);
0340: ++buf_i;
0341: buf[buf_i] = (byte) c;
0342: ++buf_i;
0343: ++str_i;
0344: }
0345: ref.write(p, buf, buf_i);
0346: size -= to_write;
0347: p += to_write * 2;
0348: }
0349:
0350: ref.complete();
0351:
0352: return ref;
0353: }
0354:
0355: /**
0356: * Convenience method that converts the given ByteLongObject into a
0357: * BlobRef object and pushes it into the given BlobStore object.
0358: */
0359: BlobRef putByteLongObjectInBlobStore(ByteLongObject blob)
0360: throws IOException {
0361:
0362: final int BUF_SIZE = 64 * 1024;
0363:
0364: byte[] src_buf = blob.getByteArray();
0365: final int size = src_buf.length;
0366: BlobRef ref = (BlobRef) allocateLargeObject((byte) 2, size);
0367:
0368: byte[] copy_buf = new byte[BUF_SIZE];
0369: int offset = 0;
0370: int to_write = Math.min(BUF_SIZE, size);
0371:
0372: while (to_write > 0) {
0373: System.arraycopy(src_buf, offset, copy_buf, 0, to_write);
0374: ref.write(offset, copy_buf, to_write);
0375:
0376: offset += to_write;
0377: to_write = Math.min(BUF_SIZE, (size - offset));
0378: }
0379:
0380: ref.complete();
0381:
0382: return ref;
0383: }
0384:
0385: /**
0386: * Finds a free place to add a record and returns an index to the record here.
0387: * This may expand the record space as necessary if there are no free record
0388: * slots to use.
0389: * <p>
0390: * NOTE: Unfortunately this is cut-and-paste from the way
0391: * V2MasterTableDataSource manages recycled elements.
0392: */
0393: private long addToRecordList(long record_p) throws IOException {
0394:
0395: synchronized (fixed_list) {
0396: // If there is no free deleted records in the delete chain,
0397: if (first_delete_chain_record == -1) {
0398:
0399: // Increase the size of the list structure.
0400: fixed_list.increaseSize();
0401: // The start record of the new size
0402: int new_block_number = fixed_list.listBlockCount() - 1;
0403: long start_index = fixed_list
0404: .listBlockFirstPosition(new_block_number);
0405: long size_of_block = fixed_list
0406: .listBlockNodeCount(new_block_number);
0407: // The Area object for the new position
0408: MutableArea a = fixed_list.positionOnNode(start_index);
0409:
0410: a.putInt(0);
0411: a.putInt(0);
0412: a.putLong(-1); // Initially unknown size
0413: a.putLong(record_p);
0414: // Set the rest of the block as deleted records
0415: for (long n = 1; n < size_of_block - 1; ++n) {
0416: a.putInt(0x020000);
0417: a.putInt(0);
0418: a.putLong(-1);
0419: a.putLong(start_index + n + 1);
0420: }
0421: // The last block is end of delete chain.
0422: a.putInt(0x020000);
0423: a.putInt(0);
0424: a.putLong(-1);
0425: a.putLong(-1);
0426: // Check out the changes.
0427: a.checkOut();
0428: // And set the new delete chain
0429: first_delete_chain_record = start_index + 1;
0430: // Set the reserved area
0431: fixed_list.setReservedLong(first_delete_chain_record);
0432: // // Flush the changes to the store
0433: // store.flush();
0434:
0435: // Return pointer to the record we just added.
0436: return start_index;
0437:
0438: } else {
0439:
0440: // Pull free block from the delete chain and recycle it.
0441: long recycled_record = first_delete_chain_record;
0442: MutableArea block = fixed_list
0443: .positionOnNode(recycled_record);
0444: int rec_pos = block.position();
0445: // Status of the recycled block
0446: int status = block.getInt();
0447: if ((status & 0x020000) == 0) {
0448: throw new Error(
0449: "Assertion failed: record is not deleted!");
0450: }
0451: // Reference count (currently unused in delete chains).
0452: block.getInt();
0453: // The size (should be -1);
0454: block.getLong();
0455: // The pointer to the next in the chain.
0456: long next_chain = block.getLong();
0457: first_delete_chain_record = next_chain;
0458: // Update the first_delete_chain_record field in the header
0459: fixed_list.setReservedLong(first_delete_chain_record);
0460: // Update the block
0461: block.position(rec_pos);
0462: block.putInt(0);
0463: block.putInt(0);
0464: block.putLong(-1); // Initially unknown size
0465: block.putLong(record_p);
0466: // Check out the changes
0467: block.checkOut();
0468:
0469: return recycled_record;
0470: }
0471: }
0472:
0473: }
0474:
0475: /**
0476: * Allocates an area in the store for a large binary object to be stored.
0477: * After the blob area is allocated the blob may be written. This returns
0478: * a BlobRef object for future access to the blob.
0479: * <p>
0480: * A newly allocated blob is read and write enabled. A call to the
0481: * 'completeBlob' method must be called to finalize the blob at which point
0482: * the blob becomes a static read-only object.
0483: */
0484: Ref allocateLargeObject(byte type, long size) throws IOException {
0485: if (size < 0) {
0486: throw new IOException("Negative blob size not allowed.");
0487: }
0488:
0489: try {
0490: store.lockForWrite();
0491:
0492: // Allocate the area (plus header area) for storing the blob pages
0493: long page_count = ((size - 1) / (64 * 1024)) + 1;
0494: AreaWriter blob_area = store
0495: .createArea((page_count * 8) + 24);
0496: long blob_p = blob_area.getID();
0497: // Set up the area header
0498: blob_area.putInt(0); // Reserved for future
0499: blob_area.putInt(type);
0500: blob_area.putLong(size);
0501: blob_area.putLong(page_count);
0502: // Initialize the empty blob area
0503: for (long i = 0; i < page_count; ++i) {
0504: blob_area.putLong(-1);
0505: }
0506: // And finish
0507: blob_area.finish();
0508:
0509: // Update the fixed_list and return the record number for this blob
0510: long reference_id = addToRecordList(blob_p);
0511: byte st_type = (byte) (type & 0x0F);
0512: if (st_type == 2) {
0513: // Create a BlobRef implementation that can access this blob
0514: return new BlobRefImpl(reference_id, type, size, true);
0515: } else if (st_type == 3) {
0516: return new ClobRefImpl(reference_id, type, size, true);
0517: } else if (st_type == 4) {
0518: return new ClobRefImpl(reference_id, type, size, true);
0519: } else {
0520: throw new IOException("Unknown large object type");
0521: }
0522:
0523: } finally {
0524: store.unlockForWrite();
0525: }
0526:
0527: }
0528:
0529: /**
0530: * Returns a Ref object that allows read-only access to a large object in this
0531: * blob store.
0532: */
0533: public Ref getLargeObject(long reference_id) throws IOException {
0534:
0535: long blob_p;
0536: long size;
0537: synchronized (fixed_list) {
0538:
0539: // Assert that the blob reference id given is a valid range
0540: if (reference_id < 0
0541: || reference_id >= fixed_list
0542: .addressableNodeCount()) {
0543: throw new IOException("reference_id is out of range.");
0544: }
0545:
0546: // Position on this record
0547: Area block = fixed_list.positionOnNode(reference_id);
0548: // Read the information in the fixed record
0549: int status = block.getInt();
0550: // Assert that the status is not deleted
0551: if ((status & 0x020000) != 0) {
0552: throw new Error("Assertion failed: record is deleted!");
0553: }
0554: // Get the reference count
0555: int reference_count = block.getInt();
0556: // Get the total size of the blob
0557: size = block.getLong();
0558: // Get the blob pointer
0559: blob_p = block.getLong();
0560:
0561: }
0562:
0563: Area blob_area = store.getArea(blob_p);
0564: blob_area.position(0);
0565: blob_area.getInt(); // (reserved)
0566: // Read the type
0567: byte type = (byte) blob_area.getInt();
0568: // The size of the block
0569: long block_size = blob_area.getLong();
0570: // The number of pages in the blob
0571: long page_count = blob_area.getLong();
0572:
0573: if (type == (byte) 2) {
0574: // Create a new BlobRef object.
0575: return new BlobRefImpl(reference_id, type, size, false);
0576: } else {
0577: // Create a new ClobRef object.
0578: return new ClobRefImpl(reference_id, type, size, false);
0579: }
0580: }
0581:
0582: /**
0583: * Call this to complete a blob in the store after a blob has been completely
0584: * written. Only BlobRef implementations returned by the 'allocateBlob'
0585: * method are accepted.
0586: */
0587: void completeBlob(AbstractRef ref) throws IOException {
0588: // Assert that the BlobRef is open and allocated
0589: ref.assertIsOpen();
0590: // Get the blob reference id (reference to the fixed record list).
0591: long blob_reference_id = ref.getID();
0592:
0593: synchronized (fixed_list) {
0594:
0595: // Update the record in the fixed list.
0596: MutableArea block = fixed_list
0597: .positionOnNode(blob_reference_id);
0598: // Record the position
0599: int rec_pos = block.position();
0600: // Read the information in the fixed record
0601: int status = block.getInt();
0602: // Assert that the status is open
0603: if (status != 0) {
0604: throw new IOException(
0605: "Assertion failed: record is not open.");
0606: }
0607: int reference_count = block.getInt();
0608: long size = block.getLong();
0609: long page_count = block.getLong();
0610:
0611: try {
0612: store.lockForWrite();
0613:
0614: // Set the fixed blob record as complete.
0615: block.position(rec_pos);
0616: // Write the new status
0617: block.putInt(1);
0618: // Write the reference count
0619: block.putInt(0);
0620: // Write the completed size
0621: block.putLong(ref.getRawSize());
0622: // Write the pointer
0623: block.putLong(page_count);
0624: // Check out the change
0625: block.checkOut();
0626:
0627: } finally {
0628: store.unlockForWrite();
0629: }
0630:
0631: }
0632: // Now the blob has been finalized so change the state of the BlobRef
0633: // object.
0634: ref.close();
0635:
0636: }
0637:
0638: /**
0639: * Tells the BlobStore that a static reference has been established in a
0640: * table to the blob referenced by the given id. This is used to count
0641: * references to a blob, and possibly clean up a blob if there are no
0642: * references remaining to it.
0643: * <p>
0644: * NOTE: It is the responsibility of the callee to establish a 'lockForWrite'
0645: * lock on the store before this is used.
0646: */
0647: public void establishReference(long blob_reference_id) {
0648: try {
0649: synchronized (fixed_list) {
0650: // Update the record in the fixed list.
0651: MutableArea block = fixed_list
0652: .positionOnNode(blob_reference_id);
0653: // Record the position
0654: int rec_pos = block.position();
0655: // Read the information in the fixed record
0656: int status = block.getInt();
0657: // Assert that the status is static
0658: if (status != 1) {
0659: throw new RuntimeException(
0660: "Assertion failed: record is not static.");
0661: }
0662: int reference_count = block.getInt();
0663:
0664: // Set the fixed blob record as complete.
0665: block.position(rec_pos + 4);
0666: // Write the reference count + 1
0667: block.putInt(reference_count + 1);
0668: // Check out the change
0669: block.checkOut();
0670: }
0671: // // Flush all changes to the store.
0672: // store.flush();
0673: } catch (IOException e) {
0674: throw new RuntimeException("IO Error: " + e.getMessage());
0675: }
0676: }
0677:
0678: /**
0679: * Tells the BlobStore that a static reference has been released to the
0680: * given blob. This would typically be called when the row in the database
0681: * is removed.
0682: * <p>
0683: * NOTE: It is the responsibility of the callee to establish a 'lockForWrite'
0684: * lock on the store before this is used.
0685: */
0686: public void releaseReference(long blob_reference_id) {
0687: try {
0688: synchronized (fixed_list) {
0689: // Update the record in the fixed list.
0690: MutableArea block = fixed_list
0691: .positionOnNode(blob_reference_id);
0692: // Record the position
0693: int rec_pos = block.position();
0694: // Read the information in the fixed record
0695: int status = block.getInt();
0696: // Assert that the status is static
0697: if (status != 1) {
0698: throw new RuntimeException("Assertion failed: "
0699: + "Record is not static (status = "
0700: + status + ")");
0701: }
0702: int reference_count = block.getInt();
0703: if (reference_count == 0) {
0704: throw new RuntimeException(
0705: "Releasing when Blob reference counter is at 0.");
0706: }
0707:
0708: long object_size = block.getLong();
0709: long object_p = block.getLong();
0710:
0711: // If reference count == 0 then we need to free all the resources
0712: // associated with this Blob in the blob store.
0713: if ((reference_count - 1) == 0) {
0714: // Free the resources associated with this object.
0715: Area blob_area = store.getArea(object_p);
0716: blob_area.getInt();
0717: byte type = (byte) blob_area.getInt();
0718: long total_size = blob_area.getLong();
0719: long page_count = blob_area.getLong();
0720: // Free all of the pages in this blob.
0721: for (long i = 0; i < page_count; ++i) {
0722: long page_p = blob_area.getLong();
0723: if (page_p > 0) {
0724: store.deleteArea(page_p);
0725: }
0726: }
0727: // Free the blob area object itself.
0728: store.deleteArea(object_p);
0729: // Write out the blank record.
0730: block.position(rec_pos);
0731: block.putInt(0x020000);
0732: block.putInt(0);
0733: block.putLong(-1);
0734: block.putLong(first_delete_chain_record);
0735: // CHeck out these changes
0736: block.checkOut();
0737: first_delete_chain_record = blob_reference_id;
0738: // Update the first_delete_chain_record field in the header
0739: fixed_list
0740: .setReservedLong(first_delete_chain_record);
0741: } else {
0742: // Simply decrement the reference counter for this record.
0743: block.position(rec_pos + 4);
0744: // Write the reference count - 1
0745: block.putInt(reference_count - 1);
0746: // Check out this change
0747: block.checkOut();
0748: }
0749:
0750: }
0751: // // Flush all changes to the store.
0752: // store.flush();
0753: } catch (IOException e) {
0754: throw new RuntimeException("IO Error: " + e.getMessage());
0755: }
0756: }
0757:
0758: /**
0759: * Reads a section of the blob referenced by the given id, offset and length
0760: * into the byte array.
0761: */
0762: private void readBlobByteArray(long reference_id, long offset,
0763: byte[] buf, int off, int length) throws IOException {
0764:
0765: // ASSERT: Read and write position must be 64K aligned.
0766: if (offset % (64 * 1024) != 0) {
0767: throw new RuntimeException(
0768: "Assert failed: offset is not 64k aligned.");
0769: }
0770: // ASSERT: Length is less than or equal to 64K
0771: if (length > (64 * 1024)) {
0772: throw new RuntimeException(
0773: "Assert failed: length is greater than 64K.");
0774: }
0775:
0776: int status;
0777: int reference_count;
0778: long size;
0779: long blob_p;
0780:
0781: synchronized (fixed_list) {
0782:
0783: // Assert that the blob reference id given is a valid range
0784: if (reference_id < 0
0785: || reference_id >= fixed_list
0786: .addressableNodeCount()) {
0787: throw new IOException(
0788: "blob_reference_id is out of range.");
0789: }
0790:
0791: // Position on this record
0792: Area block = fixed_list.positionOnNode(reference_id);
0793: // Read the information in the fixed record
0794: status = block.getInt();
0795: // Assert that the status is not deleted
0796: if ((status & 0x020000) != 0) {
0797: throw new Error("Assertion failed: record is deleted!");
0798: }
0799: // Get the reference count
0800: reference_count = block.getInt();
0801: // Get the total size of the blob
0802: size = block.getLong();
0803: // Get the blob pointer
0804: blob_p = block.getLong();
0805:
0806: }
0807:
0808: // Assert that the area being read is within the bounds of the blob
0809: if (offset < 0 || offset + length > size) {
0810: throw new IOException("Blob invalid read. offset = "
0811: + offset + ", length = " + length);
0812: }
0813:
0814: // Open an Area into the blob
0815: Area blob_area = store.getArea(blob_p);
0816: blob_area.getInt();
0817: byte type = (byte) blob_area.getInt();
0818:
0819: // Convert to the page number
0820: long page_number = (offset / (64 * 1024));
0821: blob_area.position((int) ((page_number * 8) + 24));
0822: long page_p = blob_area.getLong();
0823:
0824: // Read the page
0825: Area page_area = store.getArea(page_p);
0826: page_area.position(0);
0827: int page_type = page_area.getInt();
0828: int page_size = page_area.getInt();
0829: if ((type & 0x010) != 0) {
0830: // The page is compressed
0831: byte[] page_buf = new byte[page_size];
0832: page_area.get(page_buf, 0, page_size);
0833: Inflater inflater = new Inflater();
0834: inflater.setInput(page_buf, 0, page_size);
0835: try {
0836: int result_length = inflater.inflate(buf, off, length);
0837: if (result_length != length) {
0838: throw new RuntimeException(
0839: "Assert failed: decompressed length is incorrect.");
0840: }
0841: } catch (DataFormatException e) {
0842: throw new IOException("ZIP Data Format Error: "
0843: + e.getMessage());
0844: }
0845: inflater.end();
0846: } else {
0847: // The page is not compressed
0848: page_area.get(buf, off, length);
0849: }
0850:
0851: }
0852:
0853: /**
0854: * Writes a section of the blob referenced by the given id, offset and
0855: * length to the byte array. Note that this does not perform any checks on
0856: * whether we are allowed to write to this blob.
0857: */
0858: private void writeBlobByteArray(long reference_id, long offset,
0859: byte[] buf, int length) throws IOException {
0860:
0861: // ASSERT: Read and write position must be 64K aligned.
0862: if (offset % (64 * 1024) != 0) {
0863: throw new RuntimeException(
0864: "Assert failed: offset is not 64k aligned.");
0865: }
0866: // ASSERT: Length is less than or equal to 64K
0867: if (length > (64 * 1024)) {
0868: throw new RuntimeException(
0869: "Assert failed: length is greater than 64K.");
0870: }
0871:
0872: int status;
0873: int reference_count;
0874: long size;
0875: long blob_p;
0876:
0877: synchronized (fixed_list) {
0878:
0879: // Assert that the blob reference id given is a valid range
0880: if (reference_id < 0
0881: || reference_id >= fixed_list
0882: .addressableNodeCount()) {
0883: throw new IOException(
0884: "blob_reference_id is out of range.");
0885: }
0886:
0887: // Position on this record
0888: Area block = fixed_list.positionOnNode(reference_id);
0889: // Read the information in the fixed record
0890: status = block.getInt();
0891: // Assert that the status is not deleted
0892: if ((status & 0x020000) != 0) {
0893: throw new Error("Assertion failed: record is deleted!");
0894: }
0895: // Get the reference count
0896: reference_count = block.getInt();
0897: // Get the total size of the blob
0898: size = block.getLong();
0899: // Get the blob pointer
0900: blob_p = block.getLong();
0901:
0902: }
0903:
0904: // Open an Area into the blob
0905: MutableArea blob_area = store.getMutableArea(blob_p);
0906: blob_area.getInt();
0907: byte type = (byte) blob_area.getInt();
0908: size = blob_area.getLong();
0909:
0910: // Assert that the area being read is within the bounds of the blob
0911: if (offset < 0 || offset + length > size) {
0912: throw new IOException("Blob invalid write. offset = "
0913: + offset + ", length = " + length + ", size = "
0914: + size);
0915: }
0916:
0917: // Convert to the page number
0918: long page_number = (offset / (64 * 1024));
0919: blob_area.position((int) ((page_number * 8) + 24));
0920: long page_p = blob_area.getLong();
0921:
0922: // Assert that 'page_p' is -1
0923: if (page_p != -1) {
0924: // This means we are trying to rewrite a page we've already written
0925: // before.
0926: throw new RuntimeException(
0927: "Assert failed: page_p is not -1");
0928: }
0929:
0930: // Is the compression bit set?
0931: byte[] to_write;
0932: int write_length;
0933: if ((type & 0x010) != 0) {
0934: // Yes, compression
0935: Deflater deflater = new Deflater();
0936: deflater.setInput(buf, 0, length);
0937: deflater.finish();
0938: to_write = new byte[65 * 1024];
0939: write_length = deflater.deflate(to_write);
0940: } else {
0941: // No compression
0942: to_write = buf;
0943: write_length = length;
0944: }
0945:
0946: try {
0947: store.lockForWrite();
0948:
0949: // Allocate and write the page.
0950: AreaWriter page_area = store.createArea(write_length + 8);
0951: page_p = page_area.getID();
0952: page_area.putInt(1);
0953: page_area.putInt(write_length);
0954: page_area.put(to_write, 0, write_length);
0955: // Finish this page
0956: page_area.finish();
0957:
0958: // Update the page in the header.
0959: blob_area.position((int) ((page_number * 8) + 24));
0960: blob_area.putLong(page_p);
0961: // Check out this change.
0962: blob_area.checkOut();
0963:
0964: } finally {
0965: store.unlockForWrite();
0966: }
0967:
0968: }
0969:
0970: /**
0971: * An InputStream implementation that reads from the underlying blob data as
0972: * fixed size pages.
0973: */
0974: private class BLOBInputStream extends PagedInputStream {
0975:
0976: final static int B_SIZE = 64 * 1024;
0977:
0978: private long reference_id;
0979:
0980: public BLOBInputStream(final long reference_id, final long size) {
0981: super (B_SIZE, size);
0982: this .reference_id = reference_id;
0983: }
0984:
0985: public void readPageContent(byte[] buf, long pos, int length)
0986: throws IOException {
0987: readBlobByteArray(reference_id, pos, buf, 0, length);
0988: }
0989:
0990: }
0991:
0992: /**
0993: * An abstract implementation of a Ref object for referencing large objects
0994: * in this blob store.
0995: */
0996: private class AbstractRef {
0997:
0998: /**
0999: * The reference identifier. This is a pointer into the fixed list
1000: * structure.
1001: */
1002: protected final long reference_id;
1003:
1004: /**
1005: * The total size of the large object in bytes.
1006: */
1007: protected final long size;
1008:
1009: /**
1010: * The type of large object.
1011: */
1012: protected final byte type;
1013:
1014: /**
1015: * Set to true if this large object is open for writing, otherwise the
1016: * object is an immutable static object.
1017: */
1018: private boolean open_for_write;
1019:
1020: /**
1021: * Constructs the Ref implementation.
1022: */
1023: AbstractRef(long reference_id, byte type, long size,
1024: boolean open_for_write) {
1025: this .reference_id = reference_id;
1026: this .size = size;
1027: this .type = type;
1028: this .open_for_write = open_for_write;
1029: }
1030:
1031: /**
1032: * Asserts that this blob is open for writing.
1033: */
1034: void assertIsOpen() {
1035: if (!open_for_write) {
1036: throw new Error("Large object ref is newly allocated.");
1037: }
1038: }
1039:
1040: public long getRawSize() {
1041: return size;
1042: }
1043:
1044: /**
1045: * Marks this large object as closed to write operations.
1046: */
1047: void close() {
1048: open_for_write = false;
1049: }
1050:
1051: public int length() {
1052: return (int) size;
1053: }
1054:
1055: public long getID() {
1056: return reference_id;
1057: }
1058:
1059: public byte getType() {
1060: return type;
1061: }
1062:
1063: public void read(long offset, byte[] buf, int length)
1064: throws IOException {
1065: // Reads the section of the blob into the given buffer byte array at the
1066: // given offset of the blob.
1067: readBlobByteArray(reference_id, offset, buf, 0, length);
1068: }
1069:
1070: public void write(long offset, byte[] buf, int length)
1071: throws IOException {
1072: if (open_for_write) {
1073: writeBlobByteArray(reference_id, offset, buf, length);
1074: } else {
1075: throw new IOException("Blob is read-only.");
1076: }
1077: }
1078:
1079: public void complete() throws IOException {
1080: completeBlob(this );
1081: }
1082:
1083: }
1084:
1085: /**
1086: * An implementation of ClobRef used to represent a reference to a large
1087: * character object inside this blob store.
1088: */
1089: private class ClobRefImpl extends AbstractRef implements ClobRef {
1090:
1091: /**
1092: * Constructs the ClobRef implementation.
1093: */
1094: ClobRefImpl(long reference_id, byte type, long size,
1095: boolean open_for_write) {
1096: super (reference_id, type, size, open_for_write);
1097: }
1098:
1099: // ---------- Implemented from ClobRef ----------
1100:
1101: public int length() {
1102: byte st_type = (byte) (type & 0x0F);
1103: if (st_type == 3) {
1104: return (int) size;
1105: } else if (st_type == 4) {
1106: return (int) (size / 2);
1107: } else {
1108: throw new RuntimeException("Unknown type.");
1109: }
1110: }
1111:
1112: public Reader getReader() {
1113: byte st_type = (byte) (type & 0x0F);
1114: if (st_type == 3) {
1115: return new AsciiReader(new BLOBInputStream(
1116: reference_id, size));
1117: } else if (st_type == 4) {
1118: return new BinaryToUnicodeReader(new BLOBInputStream(
1119: reference_id, size));
1120: } else {
1121: throw new RuntimeException("Unknown type.");
1122: }
1123: }
1124:
1125: public String toString() {
1126: final int BUF_SIZE = 8192;
1127: Reader r = getReader();
1128: StringBuffer buf = new StringBuffer(length());
1129: char[] c = new char[BUF_SIZE];
1130: try {
1131: while (true) {
1132: int has_read = r.read(c, 0, BUF_SIZE);
1133: if (has_read == 0 || has_read == -1) {
1134: return new String(buf);
1135: }
1136: buf.append(c);
1137: }
1138: } catch (IOException e) {
1139: throw new RuntimeException("IO Error: "
1140: + e.getMessage());
1141: }
1142: }
1143:
1144: }
1145:
1146: /**
1147: * An implementation of BlobRef used to represent a blob reference inside this
1148: * blob store.
1149: */
1150: private class BlobRefImpl extends AbstractRef implements BlobRef {
1151:
1152: /**
1153: * Constructs the BlobRef implementation.
1154: */
1155: BlobRefImpl(long reference_id, byte type, long size,
1156: boolean open_for_write) {
1157: super (reference_id, type, size, open_for_write);
1158: }
1159:
1160: // ---------- Implemented from BlobRef ----------
1161:
1162: public InputStream getInputStream() {
1163: return new BLOBInputStream(reference_id, size);
1164: }
1165:
1166: }
1167:
1168: }
|