Source Code Cross Referenced for BlobStore.java in  » Database-DBMS » mckoi » com » mckoi » database » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Database DBMS » mckoi » com.mckoi.database 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /**
0002:         * com.mckoi.database.BlobStore  18 Jan 2003
0003:         *
0004:         * Mckoi SQL Database ( http://www.mckoi.com/database )
0005:         * Copyright (C) 2000, 2001, 2002  Diehl and Associates, Inc.
0006:         *
0007:         * This program is free software; you can redistribute it and/or
0008:         * modify it under the terms of the GNU General Public License
0009:         * Version 2 as published by the Free Software Foundation.
0010:         *
0011:         * This program is distributed in the hope that it will be useful,
0012:         * but WITHOUT ANY WARRANTY; without even the implied warranty of
0013:         * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
0014:         * GNU General Public License Version 2 for more details.
0015:         *
0016:         * You should have received a copy of the GNU General Public License
0017:         * Version 2 along with this program; if not, write to the Free Software
0018:         * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
0019:         *
0020:         * Change Log:
0021:         * 
0022:         * 
0023:         */package com.mckoi.database;
0024:
0025:        import java.util.ArrayList;
0026:        import java.util.zip.Deflater;
0027:        import java.util.zip.Inflater;
0028:        import java.util.zip.DataFormatException;
0029:        import java.io.IOException;
0030:        import java.io.InputStream;
0031:        import java.io.Reader;
0032:        import com.mckoi.util.PagedInputStream;
0033:        import com.mckoi.store.Store;
0034:        import com.mckoi.store.Area;
0035:        import com.mckoi.store.MutableArea;
0036:        import com.mckoi.store.AreaWriter;
0037:        import com.mckoi.database.jdbc.AsciiReader;
0038:        import com.mckoi.database.jdbc.BinaryToUnicodeReader;
0039:        import com.mckoi.database.global.Ref;
0040:        import com.mckoi.database.global.BlobRef;
0041:        import com.mckoi.database.global.ClobRef;
0042:        import com.mckoi.database.global.ByteLongObject;
0043:
0044:        /**
0045:         * A structure inside an Area that maintains the storage of any number of large
0046:         * binary objects.  A blob store allows for the easy allocation of areas for
0047:         * storing blob data and for reading and writing blob information via BlobRef
0048:         * objects.
0049:         * <p>
0050:         * A BlobStore can be broken down to the following simplistic functions;
0051:         * <p>
0052:         * 1) Allocation of an area to store a new blob.<br>
0053:         * 2) Reading the information in a Blob given a Blob reference identifier.<br>
0054:         * 3) Reference counting to a particular Blob.<br>
0055:         * 4) Cleaning up a Blob when no static references are left.<br>
0056:         *
0057:         * @author Tobias Downer
0058:         */
0059:
0060:        final class BlobStore implements  BlobStoreInterface {
0061:
0062:            /**
0063:             * The magic value for fixed record list structures.
0064:             */
0065:            private final static int MAGIC = 0x012BC53A9;
0066:
0067:            /**
0068:             * The outer Store object that is to contain the blob store.
0069:             */
0070:            private Store store;
0071:
0072:            /**
0073:             * The FixedRecordList structure that maintains a list of fixed size records
0074:             * for blob reference counting.
0075:             */
0076:            private FixedRecordList fixed_list;
0077:
0078:            /**
0079:             * The first delete chain element.
0080:             */
0081:            private long first_delete_chain_record;
0082:
0083:            /**
0084:             * Constructs the BlobStore on the given Area object. 
0085:             */
0086:            BlobStore(Store store) {
0087:                this .store = store;
0088:                fixed_list = new FixedRecordList(store, 24);
0089:            }
0090:
0091:            /**
0092:             * Creates the blob store and returns a pointer in the store to the header
0093:             * information.  This value is later used to initialize the store.
0094:             */
0095:            long create() throws IOException {
0096:                // Init the fixed record list area.
0097:                // The fixed list entries are formatted as follows;
0098:                //  ( status (int), reference_count (int),
0099:                //    blob_size (long), blob_pointer (long) )
0100:                long fixed_list_p = fixed_list.create();
0101:
0102:                // Delete chain is empty when we start
0103:                first_delete_chain_record = -1;
0104:                fixed_list.setReservedLong(-1);
0105:
0106:                // Allocate a small header that contains the MAGIC, and the pointer to the
0107:                // fixed list structure.
0108:                AreaWriter blob_store_header = store.createArea(32);
0109:                long blob_store_p = blob_store_header.getID();
0110:                // Write the blob store header information
0111:                // The magic
0112:                blob_store_header.putInt(MAGIC);
0113:                // The version
0114:                blob_store_header.putInt(1);
0115:                // The pointer to the fixed list area
0116:                blob_store_header.putLong(fixed_list_p);
0117:                // And finish
0118:                blob_store_header.finish();
0119:
0120:                // Return the pointer to the blob store header
0121:                return blob_store_p;
0122:            }
0123:
0124:            /**
0125:             * Initializes the blob store given a pointer to the blob store pointer
0126:             * header (the value previously returned by the 'create' method).
0127:             */
0128:            void init(long blob_store_p) throws IOException {
0129:                // Get the header area
0130:                Area blob_store_header = store.getArea(blob_store_p);
0131:                blob_store_header.position(0);
0132:                // Read the magic
0133:                int magic = blob_store_header.getInt();
0134:                int version = blob_store_header.getInt();
0135:                if (magic != MAGIC) {
0136:                    throw new IOException(
0137:                            "MAGIC value for BlobStore is not correct.");
0138:                }
0139:                if (version != 1) {
0140:                    throw new IOException(
0141:                            "version number for BlobStore is not correct.");
0142:                }
0143:
0144:                // Read the pointer to the fixed area
0145:                long fixed_list_p = blob_store_header.getLong();
0146:                // Init the FixedRecordList area
0147:                fixed_list.init(fixed_list_p);
0148:
0149:                // Set the delete chain
0150:                first_delete_chain_record = fixed_list.getReservedLong();
0151:            }
0152:
0153:            /**
0154:             * Simple structure used when copying blob information.
0155:             */
0156:            private static class CopyBlobInfo {
0157:                int ref_count;
0158:                long size;
0159:                long ob_p;
0160:            };
0161:
0162:            /**
0163:             * Copies all the blob data from the given BlobStore into this blob store.
0164:             * Any blob information that already exists within this BlobStore is deleted.
0165:             * We assume this method is called after the blob store is created or
0166:             * initialized.
0167:             */
0168:            void copyFrom(StoreSystem store_system, BlobStore src_blob_store)
0169:                    throws IOException {
0170:                FixedRecordList src_fixed_list = src_blob_store.fixed_list;
0171:                long node_count;
0172:                synchronized (src_fixed_list) {
0173:                    node_count = src_fixed_list.addressableNodeCount();
0174:                }
0175:
0176:                synchronized (fixed_list) {
0177:
0178:                    // Make sure our fixed_list is big enough to accomodate the copied list,
0179:                    while (fixed_list.addressableNodeCount() < node_count) {
0180:                        fixed_list.increaseSize();
0181:                    }
0182:
0183:                    // We rearrange the delete chain
0184:                    long last_deleted = -1;
0185:
0186:                    // We copy blobs in groups no larger than 1024 Blobs
0187:                    final int BLOCK_WRITE_COUNT = 1024;
0188:
0189:                    int max_to_read = (int) Math.min(BLOCK_WRITE_COUNT,
0190:                            node_count);
0191:                    long p = 0;
0192:
0193:                    while (max_to_read > 0) {
0194:                        // (CopyBlboInfo)
0195:                        ArrayList src_copy_list = new ArrayList();
0196:
0197:                        synchronized (src_fixed_list) {
0198:                            for (int i = 0; i < max_to_read; ++i) {
0199:                                Area a = src_fixed_list.positionOnNode(p + i);
0200:                                int status = a.getInt();
0201:                                // If record is not deleted
0202:                                if (status != 0x020000) {
0203:                                    CopyBlobInfo info = new CopyBlobInfo();
0204:                                    info.ref_count = a.getInt();
0205:                                    info.size = a.getLong();
0206:                                    info.ob_p = a.getLong();
0207:                                    src_copy_list.add(info);
0208:                                } else {
0209:                                    src_copy_list.add(null);
0210:                                }
0211:                            }
0212:                        }
0213:
0214:                        try {
0215:                            store.lockForWrite();
0216:
0217:                            // We now should have a list of all records from the src to copy,
0218:                            int sz = src_copy_list.size();
0219:                            for (int i = 0; i < sz; ++i) {
0220:                                CopyBlobInfo info = (CopyBlobInfo) src_copy_list
0221:                                        .get(i);
0222:                                MutableArea a = fixed_list
0223:                                        .positionOnNode(p + i);
0224:                                // Either set a deleted entry or set the entry with a copied blob.
0225:                                if (info == null) {
0226:                                    a.putInt(0x020000);
0227:                                    a.putInt(0);
0228:                                    a.putLong(-1);
0229:                                    a.putLong(last_deleted);
0230:                                    a.checkOut();
0231:                                    last_deleted = p + i;
0232:                                } else {
0233:                                    // Get the Area containing the blob header data in the source
0234:                                    // store
0235:                                    Area src_blob_header = src_blob_store.store
0236:                                            .getArea(info.ob_p);
0237:                                    // Read the information from the header,
0238:                                    int res = src_blob_header.getInt();
0239:                                    int type = src_blob_header.getInt();
0240:                                    long total_block_size = src_blob_header
0241:                                            .getLong();
0242:                                    long total_block_pages = src_blob_header
0243:                                            .getLong();
0244:
0245:                                    // Allocate a new header
0246:                                    AreaWriter dst_blob_header = store
0247:                                            .createArea(4 + 4 + 8 + 8
0248:                                                    + (total_block_pages * 8));
0249:                                    long new_ob_header_p = dst_blob_header
0250:                                            .getID();
0251:                                    // Copy information into the header
0252:                                    dst_blob_header.putInt(res);
0253:                                    dst_blob_header.putInt(type);
0254:                                    dst_blob_header.putLong(total_block_size);
0255:                                    dst_blob_header.putLong(total_block_pages);
0256:
0257:                                    // Allocate and copy each page,
0258:                                    for (int n = 0; n < total_block_pages; ++n) {
0259:                                        // Get the block information
0260:                                        long block_p = src_blob_header
0261:                                                .getLong();
0262:                                        Area src_block = src_blob_store.store
0263:                                                .getArea(block_p);
0264:                                        int block_type = src_block.getInt();
0265:                                        int block_size = src_block.getInt();
0266:                                        // Copy a new block,
0267:                                        int new_block_size = block_size + 4 + 4;
0268:                                        AreaWriter dst_block_p = store
0269:                                                .createArea(new_block_size);
0270:                                        long new_block_p = dst_block_p.getID();
0271:                                        src_block.position(0);
0272:                                        src_block.copyTo(dst_block_p,
0273:                                                new_block_size);
0274:                                        // And finish
0275:                                        dst_block_p.finish();
0276:                                        // Write the new header
0277:                                        dst_blob_header.putLong(new_block_p);
0278:                                    }
0279:
0280:                                    // And finish 'dst_blob_header'
0281:                                    dst_blob_header.finish();
0282:
0283:                                    // Set up the data in the fixed list
0284:                                    a.putInt(1);
0285:                                    // Note all the blobs are written with 0 reference count.
0286:                                    a.putInt(0);
0287:                                    a.putLong(info.size);
0288:                                    a.putLong(new_ob_header_p);
0289:                                    // Check out the changes
0290:                                    a.checkOut();
0291:                                }
0292:                            }
0293:
0294:                        } finally {
0295:                            store.unlockForWrite();
0296:                        }
0297:
0298:                        node_count -= max_to_read;
0299:                        p += max_to_read;
0300:                        max_to_read = (int) Math.min(BLOCK_WRITE_COUNT,
0301:                                node_count);
0302:
0303:                        // Set a checkpoint in the destination store system so we write out
0304:                        // all pending changes from the log
0305:                        store_system.setCheckPoint();
0306:
0307:                    }
0308:
0309:                    // Set the delete chain
0310:                    first_delete_chain_record = last_deleted;
0311:                    fixed_list.setReservedLong(last_deleted);
0312:
0313:                } // synchronized (fixed_list)
0314:
0315:            }
0316:
0317:            /**
0318:             * Convenience method that converts the given String into a ClobRef
0319:             * object and pushes it into the given BlobStore object.
0320:             */
0321:            ClobRef putStringInBlobStore(String str) throws IOException {
0322:                final int BUF_SIZE = 64 * 1024;
0323:
0324:                int size = str.length();
0325:
0326:                byte type = 4;
0327:                // Enable compression (ISSUE: Should this be enabled by default?)
0328:                type = (byte) (type | 0x010);
0329:
0330:                ClobRef ref = (ClobRef) allocateLargeObject(type, size * 2);
0331:                byte[] buf = new byte[BUF_SIZE];
0332:                long p = 0;
0333:                int str_i = 0;
0334:                while (size > 0) {
0335:                    int to_write = Math.min(BUF_SIZE / 2, size);
0336:                    int buf_i = 0;
0337:                    for (int i = 0; i < to_write; ++i) {
0338:                        char c = str.charAt(str_i);
0339:                        buf[buf_i] = (byte) (c >> 8);
0340:                        ++buf_i;
0341:                        buf[buf_i] = (byte) c;
0342:                        ++buf_i;
0343:                        ++str_i;
0344:                    }
0345:                    ref.write(p, buf, buf_i);
0346:                    size -= to_write;
0347:                    p += to_write * 2;
0348:                }
0349:
0350:                ref.complete();
0351:
0352:                return ref;
0353:            }
0354:
0355:            /**
0356:             * Convenience method that converts the given ByteLongObject into a
0357:             * BlobRef object and pushes it into the given BlobStore object.
0358:             */
0359:            BlobRef putByteLongObjectInBlobStore(ByteLongObject blob)
0360:                    throws IOException {
0361:
0362:                final int BUF_SIZE = 64 * 1024;
0363:
0364:                byte[] src_buf = blob.getByteArray();
0365:                final int size = src_buf.length;
0366:                BlobRef ref = (BlobRef) allocateLargeObject((byte) 2, size);
0367:
0368:                byte[] copy_buf = new byte[BUF_SIZE];
0369:                int offset = 0;
0370:                int to_write = Math.min(BUF_SIZE, size);
0371:
0372:                while (to_write > 0) {
0373:                    System.arraycopy(src_buf, offset, copy_buf, 0, to_write);
0374:                    ref.write(offset, copy_buf, to_write);
0375:
0376:                    offset += to_write;
0377:                    to_write = Math.min(BUF_SIZE, (size - offset));
0378:                }
0379:
0380:                ref.complete();
0381:
0382:                return ref;
0383:            }
0384:
0385:            /**
0386:             * Finds a free place to add a record and returns an index to the record here.
0387:             * This may expand the record space as necessary if there are no free record
0388:             * slots to use.
0389:             * <p>
0390:             * NOTE: Unfortunately this is cut-and-paste from the way
0391:             *   V2MasterTableDataSource manages recycled elements.
0392:             */
0393:            private long addToRecordList(long record_p) throws IOException {
0394:
0395:                synchronized (fixed_list) {
0396:                    // If there is no free deleted records in the delete chain,
0397:                    if (first_delete_chain_record == -1) {
0398:
0399:                        // Increase the size of the list structure.
0400:                        fixed_list.increaseSize();
0401:                        // The start record of the new size
0402:                        int new_block_number = fixed_list.listBlockCount() - 1;
0403:                        long start_index = fixed_list
0404:                                .listBlockFirstPosition(new_block_number);
0405:                        long size_of_block = fixed_list
0406:                                .listBlockNodeCount(new_block_number);
0407:                        // The Area object for the new position
0408:                        MutableArea a = fixed_list.positionOnNode(start_index);
0409:
0410:                        a.putInt(0);
0411:                        a.putInt(0);
0412:                        a.putLong(-1); // Initially unknown size
0413:                        a.putLong(record_p);
0414:                        // Set the rest of the block as deleted records
0415:                        for (long n = 1; n < size_of_block - 1; ++n) {
0416:                            a.putInt(0x020000);
0417:                            a.putInt(0);
0418:                            a.putLong(-1);
0419:                            a.putLong(start_index + n + 1);
0420:                        }
0421:                        // The last block is end of delete chain.
0422:                        a.putInt(0x020000);
0423:                        a.putInt(0);
0424:                        a.putLong(-1);
0425:                        a.putLong(-1);
0426:                        // Check out the changes.
0427:                        a.checkOut();
0428:                        // And set the new delete chain
0429:                        first_delete_chain_record = start_index + 1;
0430:                        // Set the reserved area
0431:                        fixed_list.setReservedLong(first_delete_chain_record);
0432:                        //        // Flush the changes to the store
0433:                        //        store.flush();
0434:
0435:                        // Return pointer to the record we just added.
0436:                        return start_index;
0437:
0438:                    } else {
0439:
0440:                        // Pull free block from the delete chain and recycle it.
0441:                        long recycled_record = first_delete_chain_record;
0442:                        MutableArea block = fixed_list
0443:                                .positionOnNode(recycled_record);
0444:                        int rec_pos = block.position();
0445:                        // Status of the recycled block
0446:                        int status = block.getInt();
0447:                        if ((status & 0x020000) == 0) {
0448:                            throw new Error(
0449:                                    "Assertion failed: record is not deleted!");
0450:                        }
0451:                        // Reference count (currently unused in delete chains).
0452:                        block.getInt();
0453:                        // The size (should be -1);
0454:                        block.getLong();
0455:                        // The pointer to the next in the chain.
0456:                        long next_chain = block.getLong();
0457:                        first_delete_chain_record = next_chain;
0458:                        // Update the first_delete_chain_record field in the header
0459:                        fixed_list.setReservedLong(first_delete_chain_record);
0460:                        // Update the block
0461:                        block.position(rec_pos);
0462:                        block.putInt(0);
0463:                        block.putInt(0);
0464:                        block.putLong(-1); // Initially unknown size
0465:                        block.putLong(record_p);
0466:                        // Check out the changes
0467:                        block.checkOut();
0468:
0469:                        return recycled_record;
0470:                    }
0471:                }
0472:
0473:            }
0474:
0475:            /**
0476:             * Allocates an area in the store for a large binary object to be stored.
0477:             * After the blob area is allocated the blob may be written.  This returns
0478:             * a BlobRef object for future access to the blob.
0479:             * <p>
0480:             * A newly allocated blob is read and write enabled.  A call to the
0481:             * 'completeBlob' method must be called to finalize the blob at which point
0482:             * the blob becomes a static read-only object.
0483:             */
0484:            Ref allocateLargeObject(byte type, long size) throws IOException {
0485:                if (size < 0) {
0486:                    throw new IOException("Negative blob size not allowed.");
0487:                }
0488:
0489:                try {
0490:                    store.lockForWrite();
0491:
0492:                    // Allocate the area (plus header area) for storing the blob pages
0493:                    long page_count = ((size - 1) / (64 * 1024)) + 1;
0494:                    AreaWriter blob_area = store
0495:                            .createArea((page_count * 8) + 24);
0496:                    long blob_p = blob_area.getID();
0497:                    // Set up the area header
0498:                    blob_area.putInt(0); // Reserved for future
0499:                    blob_area.putInt(type);
0500:                    blob_area.putLong(size);
0501:                    blob_area.putLong(page_count);
0502:                    // Initialize the empty blob area
0503:                    for (long i = 0; i < page_count; ++i) {
0504:                        blob_area.putLong(-1);
0505:                    }
0506:                    // And finish
0507:                    blob_area.finish();
0508:
0509:                    // Update the fixed_list and return the record number for this blob
0510:                    long reference_id = addToRecordList(blob_p);
0511:                    byte st_type = (byte) (type & 0x0F);
0512:                    if (st_type == 2) {
0513:                        // Create a BlobRef implementation that can access this blob
0514:                        return new BlobRefImpl(reference_id, type, size, true);
0515:                    } else if (st_type == 3) {
0516:                        return new ClobRefImpl(reference_id, type, size, true);
0517:                    } else if (st_type == 4) {
0518:                        return new ClobRefImpl(reference_id, type, size, true);
0519:                    } else {
0520:                        throw new IOException("Unknown large object type");
0521:                    }
0522:
0523:                } finally {
0524:                    store.unlockForWrite();
0525:                }
0526:
0527:            }
0528:
0529:            /**
0530:             * Returns a Ref object that allows read-only access to a large object in this
0531:             * blob store.
0532:             */
0533:            public Ref getLargeObject(long reference_id) throws IOException {
0534:
0535:                long blob_p;
0536:                long size;
0537:                synchronized (fixed_list) {
0538:
0539:                    // Assert that the blob reference id given is a valid range
0540:                    if (reference_id < 0
0541:                            || reference_id >= fixed_list
0542:                                    .addressableNodeCount()) {
0543:                        throw new IOException("reference_id is out of range.");
0544:                    }
0545:
0546:                    // Position on this record
0547:                    Area block = fixed_list.positionOnNode(reference_id);
0548:                    // Read the information in the fixed record
0549:                    int status = block.getInt();
0550:                    // Assert that the status is not deleted
0551:                    if ((status & 0x020000) != 0) {
0552:                        throw new Error("Assertion failed: record is deleted!");
0553:                    }
0554:                    // Get the reference count
0555:                    int reference_count = block.getInt();
0556:                    // Get the total size of the blob
0557:                    size = block.getLong();
0558:                    // Get the blob pointer
0559:                    blob_p = block.getLong();
0560:
0561:                }
0562:
0563:                Area blob_area = store.getArea(blob_p);
0564:                blob_area.position(0);
0565:                blob_area.getInt(); // (reserved)
0566:                // Read the type
0567:                byte type = (byte) blob_area.getInt();
0568:                // The size of the block
0569:                long block_size = blob_area.getLong();
0570:                // The number of pages in the blob
0571:                long page_count = blob_area.getLong();
0572:
0573:                if (type == (byte) 2) {
0574:                    // Create a new BlobRef object.
0575:                    return new BlobRefImpl(reference_id, type, size, false);
0576:                } else {
0577:                    // Create a new ClobRef object.
0578:                    return new ClobRefImpl(reference_id, type, size, false);
0579:                }
0580:            }
0581:
0582:            /**
0583:             * Call this to complete a blob in the store after a blob has been completely
0584:             * written.  Only BlobRef implementations returned by the 'allocateBlob'
0585:             * method are accepted.
0586:             */
0587:            void completeBlob(AbstractRef ref) throws IOException {
0588:                // Assert that the BlobRef is open and allocated
0589:                ref.assertIsOpen();
0590:                // Get the blob reference id (reference to the fixed record list).
0591:                long blob_reference_id = ref.getID();
0592:
0593:                synchronized (fixed_list) {
0594:
0595:                    // Update the record in the fixed list.
0596:                    MutableArea block = fixed_list
0597:                            .positionOnNode(blob_reference_id);
0598:                    // Record the position
0599:                    int rec_pos = block.position();
0600:                    // Read the information in the fixed record
0601:                    int status = block.getInt();
0602:                    // Assert that the status is open
0603:                    if (status != 0) {
0604:                        throw new IOException(
0605:                                "Assertion failed: record is not open.");
0606:                    }
0607:                    int reference_count = block.getInt();
0608:                    long size = block.getLong();
0609:                    long page_count = block.getLong();
0610:
0611:                    try {
0612:                        store.lockForWrite();
0613:
0614:                        // Set the fixed blob record as complete.
0615:                        block.position(rec_pos);
0616:                        // Write the new status
0617:                        block.putInt(1);
0618:                        // Write the reference count
0619:                        block.putInt(0);
0620:                        // Write the completed size
0621:                        block.putLong(ref.getRawSize());
0622:                        // Write the pointer
0623:                        block.putLong(page_count);
0624:                        // Check out the change
0625:                        block.checkOut();
0626:
0627:                    } finally {
0628:                        store.unlockForWrite();
0629:                    }
0630:
0631:                }
0632:                // Now the blob has been finalized so change the state of the BlobRef
0633:                // object.
0634:                ref.close();
0635:
0636:            }
0637:
0638:            /**
0639:             * Tells the BlobStore that a static reference has been established in a
0640:             * table to the blob referenced by the given id.  This is used to count
0641:             * references to a blob, and possibly clean up a blob if there are no
0642:             * references remaining to it.
0643:             * <p>
0644:             * NOTE: It is the responsibility of the callee to establish a 'lockForWrite'
0645:             *   lock on the store before this is used.
0646:             */
0647:            public void establishReference(long blob_reference_id) {
0648:                try {
0649:                    synchronized (fixed_list) {
0650:                        // Update the record in the fixed list.
0651:                        MutableArea block = fixed_list
0652:                                .positionOnNode(blob_reference_id);
0653:                        // Record the position
0654:                        int rec_pos = block.position();
0655:                        // Read the information in the fixed record
0656:                        int status = block.getInt();
0657:                        // Assert that the status is static
0658:                        if (status != 1) {
0659:                            throw new RuntimeException(
0660:                                    "Assertion failed: record is not static.");
0661:                        }
0662:                        int reference_count = block.getInt();
0663:
0664:                        // Set the fixed blob record as complete.
0665:                        block.position(rec_pos + 4);
0666:                        // Write the reference count + 1
0667:                        block.putInt(reference_count + 1);
0668:                        // Check out the change
0669:                        block.checkOut();
0670:                    }
0671:                    //      // Flush all changes to the store.
0672:                    //      store.flush();
0673:                } catch (IOException e) {
0674:                    throw new RuntimeException("IO Error: " + e.getMessage());
0675:                }
0676:            }
0677:
0678:            /**
0679:             * Tells the BlobStore that a static reference has been released to the
0680:             * given blob.  This would typically be called when the row in the database
0681:             * is removed.
0682:             * <p>
0683:             * NOTE: It is the responsibility of the callee to establish a 'lockForWrite'
0684:             *   lock on the store before this is used.
0685:             */
0686:            public void releaseReference(long blob_reference_id) {
0687:                try {
0688:                    synchronized (fixed_list) {
0689:                        // Update the record in the fixed list.
0690:                        MutableArea block = fixed_list
0691:                                .positionOnNode(blob_reference_id);
0692:                        // Record the position
0693:                        int rec_pos = block.position();
0694:                        // Read the information in the fixed record
0695:                        int status = block.getInt();
0696:                        // Assert that the status is static
0697:                        if (status != 1) {
0698:                            throw new RuntimeException("Assertion failed: "
0699:                                    + "Record is not static (status = "
0700:                                    + status + ")");
0701:                        }
0702:                        int reference_count = block.getInt();
0703:                        if (reference_count == 0) {
0704:                            throw new RuntimeException(
0705:                                    "Releasing when Blob reference counter is at 0.");
0706:                        }
0707:
0708:                        long object_size = block.getLong();
0709:                        long object_p = block.getLong();
0710:
0711:                        // If reference count == 0 then we need to free all the resources
0712:                        // associated with this Blob in the blob store.
0713:                        if ((reference_count - 1) == 0) {
0714:                            // Free the resources associated with this object.
0715:                            Area blob_area = store.getArea(object_p);
0716:                            blob_area.getInt();
0717:                            byte type = (byte) blob_area.getInt();
0718:                            long total_size = blob_area.getLong();
0719:                            long page_count = blob_area.getLong();
0720:                            // Free all of the pages in this blob.
0721:                            for (long i = 0; i < page_count; ++i) {
0722:                                long page_p = blob_area.getLong();
0723:                                if (page_p > 0) {
0724:                                    store.deleteArea(page_p);
0725:                                }
0726:                            }
0727:                            // Free the blob area object itself.
0728:                            store.deleteArea(object_p);
0729:                            // Write out the blank record.
0730:                            block.position(rec_pos);
0731:                            block.putInt(0x020000);
0732:                            block.putInt(0);
0733:                            block.putLong(-1);
0734:                            block.putLong(first_delete_chain_record);
0735:                            // CHeck out these changes
0736:                            block.checkOut();
0737:                            first_delete_chain_record = blob_reference_id;
0738:                            // Update the first_delete_chain_record field in the header
0739:                            fixed_list
0740:                                    .setReservedLong(first_delete_chain_record);
0741:                        } else {
0742:                            // Simply decrement the reference counter for this record.
0743:                            block.position(rec_pos + 4);
0744:                            // Write the reference count - 1
0745:                            block.putInt(reference_count - 1);
0746:                            // Check out this change
0747:                            block.checkOut();
0748:                        }
0749:
0750:                    }
0751:                    //      // Flush all changes to the store.
0752:                    //      store.flush();
0753:                } catch (IOException e) {
0754:                    throw new RuntimeException("IO Error: " + e.getMessage());
0755:                }
0756:            }
0757:
0758:            /**
0759:             * Reads a section of the blob referenced by the given id, offset and length
0760:             * into the byte array.
0761:             */
0762:            private void readBlobByteArray(long reference_id, long offset,
0763:                    byte[] buf, int off, int length) throws IOException {
0764:
0765:                // ASSERT: Read and write position must be 64K aligned.
0766:                if (offset % (64 * 1024) != 0) {
0767:                    throw new RuntimeException(
0768:                            "Assert failed: offset is not 64k aligned.");
0769:                }
0770:                // ASSERT: Length is less than or equal to 64K
0771:                if (length > (64 * 1024)) {
0772:                    throw new RuntimeException(
0773:                            "Assert failed: length is greater than 64K.");
0774:                }
0775:
0776:                int status;
0777:                int reference_count;
0778:                long size;
0779:                long blob_p;
0780:
0781:                synchronized (fixed_list) {
0782:
0783:                    // Assert that the blob reference id given is a valid range
0784:                    if (reference_id < 0
0785:                            || reference_id >= fixed_list
0786:                                    .addressableNodeCount()) {
0787:                        throw new IOException(
0788:                                "blob_reference_id is out of range.");
0789:                    }
0790:
0791:                    // Position on this record
0792:                    Area block = fixed_list.positionOnNode(reference_id);
0793:                    // Read the information in the fixed record
0794:                    status = block.getInt();
0795:                    // Assert that the status is not deleted
0796:                    if ((status & 0x020000) != 0) {
0797:                        throw new Error("Assertion failed: record is deleted!");
0798:                    }
0799:                    // Get the reference count
0800:                    reference_count = block.getInt();
0801:                    // Get the total size of the blob
0802:                    size = block.getLong();
0803:                    // Get the blob pointer
0804:                    blob_p = block.getLong();
0805:
0806:                }
0807:
0808:                // Assert that the area being read is within the bounds of the blob
0809:                if (offset < 0 || offset + length > size) {
0810:                    throw new IOException("Blob invalid read.  offset = "
0811:                            + offset + ", length = " + length);
0812:                }
0813:
0814:                // Open an Area into the blob
0815:                Area blob_area = store.getArea(blob_p);
0816:                blob_area.getInt();
0817:                byte type = (byte) blob_area.getInt();
0818:
0819:                // Convert to the page number
0820:                long page_number = (offset / (64 * 1024));
0821:                blob_area.position((int) ((page_number * 8) + 24));
0822:                long page_p = blob_area.getLong();
0823:
0824:                // Read the page
0825:                Area page_area = store.getArea(page_p);
0826:                page_area.position(0);
0827:                int page_type = page_area.getInt();
0828:                int page_size = page_area.getInt();
0829:                if ((type & 0x010) != 0) {
0830:                    // The page is compressed
0831:                    byte[] page_buf = new byte[page_size];
0832:                    page_area.get(page_buf, 0, page_size);
0833:                    Inflater inflater = new Inflater();
0834:                    inflater.setInput(page_buf, 0, page_size);
0835:                    try {
0836:                        int result_length = inflater.inflate(buf, off, length);
0837:                        if (result_length != length) {
0838:                            throw new RuntimeException(
0839:                                    "Assert failed: decompressed length is incorrect.");
0840:                        }
0841:                    } catch (DataFormatException e) {
0842:                        throw new IOException("ZIP Data Format Error: "
0843:                                + e.getMessage());
0844:                    }
0845:                    inflater.end();
0846:                } else {
0847:                    // The page is not compressed
0848:                    page_area.get(buf, off, length);
0849:                }
0850:
0851:            }
0852:
0853:            /**
0854:             * Writes a section of the blob referenced by the given id, offset and
0855:             * length to the byte array.  Note that this does not perform any checks on
0856:             * whether we are allowed to write to this blob.
0857:             */
0858:            private void writeBlobByteArray(long reference_id, long offset,
0859:                    byte[] buf, int length) throws IOException {
0860:
0861:                // ASSERT: Read and write position must be 64K aligned.
0862:                if (offset % (64 * 1024) != 0) {
0863:                    throw new RuntimeException(
0864:                            "Assert failed: offset is not 64k aligned.");
0865:                }
0866:                // ASSERT: Length is less than or equal to 64K
0867:                if (length > (64 * 1024)) {
0868:                    throw new RuntimeException(
0869:                            "Assert failed: length is greater than 64K.");
0870:                }
0871:
0872:                int status;
0873:                int reference_count;
0874:                long size;
0875:                long blob_p;
0876:
0877:                synchronized (fixed_list) {
0878:
0879:                    // Assert that the blob reference id given is a valid range
0880:                    if (reference_id < 0
0881:                            || reference_id >= fixed_list
0882:                                    .addressableNodeCount()) {
0883:                        throw new IOException(
0884:                                "blob_reference_id is out of range.");
0885:                    }
0886:
0887:                    // Position on this record
0888:                    Area block = fixed_list.positionOnNode(reference_id);
0889:                    // Read the information in the fixed record
0890:                    status = block.getInt();
0891:                    // Assert that the status is not deleted
0892:                    if ((status & 0x020000) != 0) {
0893:                        throw new Error("Assertion failed: record is deleted!");
0894:                    }
0895:                    // Get the reference count
0896:                    reference_count = block.getInt();
0897:                    // Get the total size of the blob
0898:                    size = block.getLong();
0899:                    // Get the blob pointer
0900:                    blob_p = block.getLong();
0901:
0902:                }
0903:
0904:                // Open an Area into the blob
0905:                MutableArea blob_area = store.getMutableArea(blob_p);
0906:                blob_area.getInt();
0907:                byte type = (byte) blob_area.getInt();
0908:                size = blob_area.getLong();
0909:
0910:                // Assert that the area being read is within the bounds of the blob
0911:                if (offset < 0 || offset + length > size) {
0912:                    throw new IOException("Blob invalid write.  offset = "
0913:                            + offset + ", length = " + length + ", size = "
0914:                            + size);
0915:                }
0916:
0917:                // Convert to the page number
0918:                long page_number = (offset / (64 * 1024));
0919:                blob_area.position((int) ((page_number * 8) + 24));
0920:                long page_p = blob_area.getLong();
0921:
0922:                // Assert that 'page_p' is -1
0923:                if (page_p != -1) {
0924:                    // This means we are trying to rewrite a page we've already written
0925:                    // before.
0926:                    throw new RuntimeException(
0927:                            "Assert failed: page_p is not -1");
0928:                }
0929:
0930:                // Is the compression bit set?
0931:                byte[] to_write;
0932:                int write_length;
0933:                if ((type & 0x010) != 0) {
0934:                    // Yes, compression
0935:                    Deflater deflater = new Deflater();
0936:                    deflater.setInput(buf, 0, length);
0937:                    deflater.finish();
0938:                    to_write = new byte[65 * 1024];
0939:                    write_length = deflater.deflate(to_write);
0940:                } else {
0941:                    // No compression
0942:                    to_write = buf;
0943:                    write_length = length;
0944:                }
0945:
0946:                try {
0947:                    store.lockForWrite();
0948:
0949:                    // Allocate and write the page.
0950:                    AreaWriter page_area = store.createArea(write_length + 8);
0951:                    page_p = page_area.getID();
0952:                    page_area.putInt(1);
0953:                    page_area.putInt(write_length);
0954:                    page_area.put(to_write, 0, write_length);
0955:                    // Finish this page
0956:                    page_area.finish();
0957:
0958:                    // Update the page in the header.
0959:                    blob_area.position((int) ((page_number * 8) + 24));
0960:                    blob_area.putLong(page_p);
0961:                    // Check out this change.
0962:                    blob_area.checkOut();
0963:
0964:                } finally {
0965:                    store.unlockForWrite();
0966:                }
0967:
0968:            }
0969:
0970:            /**
0971:             * An InputStream implementation that reads from the underlying blob data as
0972:             * fixed size pages.
0973:             */
0974:            private class BLOBInputStream extends PagedInputStream {
0975:
0976:                final static int B_SIZE = 64 * 1024;
0977:
0978:                private long reference_id;
0979:
0980:                public BLOBInputStream(final long reference_id, final long size) {
0981:                    super (B_SIZE, size);
0982:                    this .reference_id = reference_id;
0983:                }
0984:
0985:                public void readPageContent(byte[] buf, long pos, int length)
0986:                        throws IOException {
0987:                    readBlobByteArray(reference_id, pos, buf, 0, length);
0988:                }
0989:
0990:            }
0991:
0992:            /**
0993:             * An abstract implementation of a Ref object for referencing large objects
0994:             * in this blob store.
0995:             */
0996:            private class AbstractRef {
0997:
0998:                /**
0999:                 * The reference identifier.  This is a pointer into the fixed list
1000:                 * structure.
1001:                 */
1002:                protected final long reference_id;
1003:
1004:                /**
1005:                 * The total size of the large object in bytes.
1006:                 */
1007:                protected final long size;
1008:
1009:                /**
1010:                 * The type of large object.
1011:                 */
1012:                protected final byte type;
1013:
1014:                /**
1015:                 * Set to true if this large object is open for writing, otherwise the
1016:                 * object is an immutable static object.
1017:                 */
1018:                private boolean open_for_write;
1019:
1020:                /**
1021:                 * Constructs the Ref implementation.
1022:                 */
1023:                AbstractRef(long reference_id, byte type, long size,
1024:                        boolean open_for_write) {
1025:                    this .reference_id = reference_id;
1026:                    this .size = size;
1027:                    this .type = type;
1028:                    this .open_for_write = open_for_write;
1029:                }
1030:
1031:                /**
1032:                 * Asserts that this blob is open for writing.
1033:                 */
1034:                void assertIsOpen() {
1035:                    if (!open_for_write) {
1036:                        throw new Error("Large object ref is newly allocated.");
1037:                    }
1038:                }
1039:
1040:                public long getRawSize() {
1041:                    return size;
1042:                }
1043:
1044:                /**
1045:                 * Marks this large object as closed to write operations.
1046:                 */
1047:                void close() {
1048:                    open_for_write = false;
1049:                }
1050:
1051:                public int length() {
1052:                    return (int) size;
1053:                }
1054:
1055:                public long getID() {
1056:                    return reference_id;
1057:                }
1058:
1059:                public byte getType() {
1060:                    return type;
1061:                }
1062:
1063:                public void read(long offset, byte[] buf, int length)
1064:                        throws IOException {
1065:                    // Reads the section of the blob into the given buffer byte array at the
1066:                    // given offset of the blob.
1067:                    readBlobByteArray(reference_id, offset, buf, 0, length);
1068:                }
1069:
1070:                public void write(long offset, byte[] buf, int length)
1071:                        throws IOException {
1072:                    if (open_for_write) {
1073:                        writeBlobByteArray(reference_id, offset, buf, length);
1074:                    } else {
1075:                        throw new IOException("Blob is read-only.");
1076:                    }
1077:                }
1078:
1079:                public void complete() throws IOException {
1080:                    completeBlob(this );
1081:                }
1082:
1083:            }
1084:
1085:            /**
1086:             * An implementation of ClobRef used to represent a reference to a large
1087:             * character object inside this blob store.
1088:             */
1089:            private class ClobRefImpl extends AbstractRef implements  ClobRef {
1090:
1091:                /**
1092:                 * Constructs the ClobRef implementation.
1093:                 */
1094:                ClobRefImpl(long reference_id, byte type, long size,
1095:                        boolean open_for_write) {
1096:                    super (reference_id, type, size, open_for_write);
1097:                }
1098:
1099:                // ---------- Implemented from ClobRef ----------
1100:
1101:                public int length() {
1102:                    byte st_type = (byte) (type & 0x0F);
1103:                    if (st_type == 3) {
1104:                        return (int) size;
1105:                    } else if (st_type == 4) {
1106:                        return (int) (size / 2);
1107:                    } else {
1108:                        throw new RuntimeException("Unknown type.");
1109:                    }
1110:                }
1111:
1112:                public Reader getReader() {
1113:                    byte st_type = (byte) (type & 0x0F);
1114:                    if (st_type == 3) {
1115:                        return new AsciiReader(new BLOBInputStream(
1116:                                reference_id, size));
1117:                    } else if (st_type == 4) {
1118:                        return new BinaryToUnicodeReader(new BLOBInputStream(
1119:                                reference_id, size));
1120:                    } else {
1121:                        throw new RuntimeException("Unknown type.");
1122:                    }
1123:                }
1124:
1125:                public String toString() {
1126:                    final int BUF_SIZE = 8192;
1127:                    Reader r = getReader();
1128:                    StringBuffer buf = new StringBuffer(length());
1129:                    char[] c = new char[BUF_SIZE];
1130:                    try {
1131:                        while (true) {
1132:                            int has_read = r.read(c, 0, BUF_SIZE);
1133:                            if (has_read == 0 || has_read == -1) {
1134:                                return new String(buf);
1135:                            }
1136:                            buf.append(c);
1137:                        }
1138:                    } catch (IOException e) {
1139:                        throw new RuntimeException("IO Error: "
1140:                                + e.getMessage());
1141:                    }
1142:                }
1143:
1144:            }
1145:
1146:            /**
1147:             * An implementation of BlobRef used to represent a blob reference inside this
1148:             * blob store.
1149:             */
1150:            private class BlobRefImpl extends AbstractRef implements  BlobRef {
1151:
1152:                /**
1153:                 * Constructs the BlobRef implementation.
1154:                 */
1155:                BlobRefImpl(long reference_id, byte type, long size,
1156:                        boolean open_for_write) {
1157:                    super (reference_id, type, size, open_for_write);
1158:                }
1159:
1160:                // ---------- Implemented from BlobRef ----------
1161:
1162:                public InputStream getInputStream() {
1163:                    return new BLOBInputStream(reference_id, size);
1164:                }
1165:
1166:            }
1167:
1168:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.