0001: package de.anomic.kelondro;
0002:
0003: // a collectionIndex is an index to kelondroRowCollection objects
0004: // such a collection ist defined by the following parameters
0005: // - chunksize
0006: // - chunkcount
0007: // each of such a collection is stored in a byte[] which may or may not have space for more chunks
0008: // than already exists in such an array. To store these arrays, we reserve entries in kelondroArray
0009: // database files. There will be a set of array files for different sizes of the collection arrays.
0010: // the 1st file has space for <loadfactor> chunks, the 2nd file for <loadfactor> * <loadfactor> chunks,
0011: // the 3rd file for <loadfactor>^^3 chunks, and the n-th file for <loadfactor>^^n chunks.
0012: // if the loadfactor is 4, then we have the following capacities:
0013: // file 0: 4
0014: // file 1: 16
0015: // file 2: 64
0016: // file 3: 256
0017: // file 4: 1024
0018: // file 5: 4096
0019: // file 6:16384
0020: // file 7:65536
0021: // the maximum number of such files is called the partitions number.
0022: // we don't want that these files grow too big, an kelondroOutOfLimitsException is throws if they
0023: // are oversized.
0024: // the collection arrays may be migration to another size during run-time, which means that not only the
0025: // partitions as mentioned above are maintained, but also a set of "shadow-partitions", that represent old
0026: // partitions and where data is read only and slowly migrated to the default partitions.
0027: //
0028: // $LastChangedDate: 2008-02-04 15:51:51 +0000 (Mo, 04 Feb 2008) $
0029: // $LastChangedRevision: 4442 $
0030: // $LastChangedBy: orbiter $
0031:
0032: import java.io.File;
0033: import java.io.IOException;
0034: import java.text.SimpleDateFormat;
0035: import java.util.ArrayList;
0036: import java.util.Date;
0037: import java.util.HashMap;
0038: import java.util.Iterator;
0039: import java.util.List;
0040: import java.util.Map;
0041: import java.util.Random;
0042: import java.util.Set;
0043: import java.util.TimeZone;
0044: import java.util.TreeMap;
0045:
0046: import de.anomic.index.indexContainer;
0047: import de.anomic.kelondro.kelondroRow.EntryIndex;
0048: import de.anomic.server.serverCodings;
0049: import de.anomic.server.serverFileUtils;
0050: import de.anomic.server.serverMemory;
0051: import de.anomic.server.logging.serverLog;
0052: import de.anomic.yacy.yacyURL;
0053:
0054: public class kelondroCollectionIndex {
0055:
0056: private static final int serialNumber = 0;
0057: private static final long minimumRAM4Eco = 20 * 1024 * 1024;
0058: private static final int EcoFSBufferSize = 1000;
0059:
0060: private kelondroIndex index;
0061: private int keylength;
0062: private File path;
0063: private String filenameStub;
0064: private File commonsPath;
0065: private int loadfactor;
0066: private Map<String, kelondroFixedWidthArray> arrays; // Map of (partitionNumber"-"chunksize)/kelondroFixedWidthArray - Objects
0067: private kelondroRow payloadrow; // definition of the payload (chunks inside the collections)
0068: private int maxPartitions; // this is the maxmimum number of array files
0069:
0070: private static final int idx_col_key = 0; // the index
0071: private static final int idx_col_chunksize = 1; // chunksize (number of bytes in a single chunk, needed for migration option)
0072: private static final int idx_col_chunkcount = 2; // chunkcount (number of chunks in this collection)
0073: private static final int idx_col_clusteridx = 3; // selector for right cluster file, must be >= arrayIndex(chunkcount)
0074: private static final int idx_col_flags = 4; // flags (for future use)
0075: private static final int idx_col_indexpos = 5; // indexpos (position in array file)
0076: private static final int idx_col_lastread = 6; // a time stamp, update time in days since 1.1.2000
0077: private static final int idx_col_lastwrote = 7; // a time stamp, update time in days since 1.1.2000
0078:
0079: private static kelondroRow indexRow(int keylength,
0080: kelondroByteOrder payloadOrder) {
0081: return new kelondroRow("byte[] key-" + keylength + ","
0082: + "int chunksize-4 {b256},"
0083: + "int chunkcount-4 {b256},"
0084: + "byte clusteridx-1 {b256}," + "byte flags-1 {b256},"
0085: + "int indexpos-4 {b256},"
0086: + "short lastread-2 {b256}, "
0087: + "short lastwrote-2 {b256}", payloadOrder, 0);
0088: }
0089:
0090: public kelondroRow payloadRow() {
0091: return this .payloadrow;
0092: }
0093:
0094: private static String fillZ(String s, int len) {
0095: while (s.length() < len)
0096: s = "0" + s;
0097: return s;
0098: }
0099:
0100: private static File arrayFile(File path, String filenameStub,
0101: int loadfactor, int chunksize, int partitionNumber,
0102: int serialNumber) {
0103: String lf = fillZ(
0104: Integer.toHexString(loadfactor).toUpperCase(), 2);
0105: String cs = fillZ(Integer.toHexString(chunksize).toUpperCase(),
0106: 4);
0107: String pn = fillZ(Integer.toHexString(partitionNumber)
0108: .toUpperCase(), 2);
0109: String sn = fillZ(Integer.toHexString(serialNumber)
0110: .toUpperCase(), 2);
0111: return new File(path, filenameStub + "." + lf + "." + cs + "."
0112: + pn + "." + sn + ".kca"); // kelondro collection array
0113: }
0114:
0115: private static File propertyFile(File path, String filenameStub,
0116: int loadfactor, int chunksize) {
0117: String lf = fillZ(
0118: Integer.toHexString(loadfactor).toUpperCase(), 2);
0119: String cs = fillZ(Integer.toHexString(chunksize).toUpperCase(),
0120: 4);
0121: return new File(path, filenameStub + "." + lf + "." + cs
0122: + ".properties");
0123: }
0124:
0125: public kelondroCollectionIndex(File path, String filenameStub,
0126: int keyLength, kelondroByteOrder indexOrder,
0127: long preloadTime, int loadfactor, int maxpartitions,
0128: kelondroRow rowdef) throws IOException {
0129: // the buffersize is number of bytes that are only used if the kelondroFlexTable is backed up with a kelondroTree
0130: this .path = path;
0131: this .filenameStub = filenameStub;
0132: this .keylength = keyLength;
0133: this .payloadrow = rowdef;
0134: this .loadfactor = loadfactor;
0135: this .maxPartitions = maxpartitions;
0136: this .commonsPath = new File(path, filenameStub
0137: + "."
0138: + fillZ(Integer.toHexString(rowdef.objectsize)
0139: .toUpperCase(), 4) + ".commons");
0140: this .commonsPath.mkdirs();
0141: File f = new File(path, filenameStub + ".index");
0142:
0143: if (f.exists()) {
0144: serverLog.logFine("STARTUP", "OPENING COLLECTION INDEX");
0145:
0146: // open index and array files
0147: this .arrays = new HashMap<String, kelondroFixedWidthArray>(); // all entries will be dynamically created with getArray()
0148: index = openIndexFile(path, filenameStub, indexOrder,
0149: preloadTime, loadfactor, rowdef, 0);
0150: openAllArrayFiles(false, indexOrder);
0151: } else {
0152: // calculate initialSpace
0153: String[] list = this .path.list();
0154: kelondroFixedWidthArray array;
0155: int initialSpace = 0;
0156: for (int i = 0; i < list.length; i++)
0157: if (list[i].endsWith(".kca")) {
0158: // open array
0159: int pos = list[i].indexOf('.');
0160: if (pos < 0)
0161: continue;
0162: int partitionNumber = Integer.parseInt(list[i]
0163: .substring(pos + 9, pos + 11), 16);
0164: int serialNumber = Integer.parseInt(list[i]
0165: .substring(pos + 12, pos + 14), 16);
0166: try {
0167: array = openArrayFile(partitionNumber,
0168: serialNumber, indexOrder, true);
0169: initialSpace += array.size();
0170: array.close();
0171: } catch (IOException e) {
0172: e.printStackTrace();
0173: continue;
0174: }
0175: }
0176: serverLog.logFine("STARTUP",
0177: "STARTED INITIALIZATION OF NEW COLLECTION INDEX WITH "
0178: + initialSpace
0179: + " ENTRIES. THIS WILL TAKE SOME TIME. "
0180: + (serverMemory.available() / 1024 / 1024)
0181: + "MB AVAILABLE.");
0182: kelondroRow indexRowdef = indexRow(keyLength, indexOrder);
0183: long necessaryRAM4fullTable = minimumRAM4Eco
0184: + (indexRowdef.objectsize + 4) * initialSpace * 3
0185: / 2;
0186: long necessaryRAM4fullIndex = minimumRAM4Eco
0187: + (indexRowdef.primaryKeyLength + 4) * initialSpace
0188: * 3 / 2;
0189:
0190: // initialize (new generation) index table from file
0191: if (serverMemory.request(necessaryRAM4fullTable, false)) {
0192: index = new kelondroEcoTable(f, indexRowdef,
0193: kelondroEcoTable.tailCacheUsageAuto,
0194: EcoFSBufferSize, initialSpace);
0195: } else if (serverMemory.request(necessaryRAM4fullIndex,
0196: false)) {
0197: index = new kelondroEcoTable(f, indexRowdef,
0198: kelondroEcoTable.tailCacheDenyUsage,
0199: EcoFSBufferSize, initialSpace);
0200: } else {
0201: index = new kelondroFlexTable(path, filenameStub
0202: + ".index", preloadTime, indexRowdef,
0203: initialSpace, true);
0204: }
0205:
0206: // open array files
0207: this .arrays = new HashMap<String, kelondroFixedWidthArray>(); // all entries will be dynamically created with getArray()
0208: openAllArrayFiles(true, indexOrder);
0209: }
0210: }
0211:
0212: private void openAllArrayFiles(boolean indexGeneration,
0213: kelondroByteOrder indexOrder) throws IOException {
0214:
0215: String[] list = this .path.list();
0216: kelondroFixedWidthArray array;
0217:
0218: kelondroRow irow = indexRow(keylength, indexOrder);
0219: int t = kelondroRowCollection.daysSince2000(System
0220: .currentTimeMillis());
0221: for (int i = 0; i < list.length; i++)
0222: if (list[i].endsWith(".kca")) {
0223:
0224: // open array
0225: int pos = list[i].indexOf('.');
0226: if (pos < 0)
0227: continue;
0228: int chunksize = Integer.parseInt(list[i].substring(
0229: pos + 4, pos + 8), 16);
0230: int partitionNumber = Integer.parseInt(list[i]
0231: .substring(pos + 9, pos + 11), 16);
0232: int serialNumber = Integer.parseInt(list[i].substring(
0233: pos + 12, pos + 14), 16);
0234: try {
0235: array = openArrayFile(partitionNumber,
0236: serialNumber, indexOrder, true);
0237: } catch (IOException e) {
0238: e.printStackTrace();
0239: continue;
0240: }
0241:
0242: // remember that we opened the array
0243: arrays.put(partitionNumber + "-" + chunksize, array);
0244:
0245: if ((index != null) && (indexGeneration)) {
0246: // loop over all elements in array and create index entry for each row
0247: kelondroRow.EntryIndex aentry;
0248: kelondroRow.Entry ientry;
0249: Iterator<EntryIndex> ei = array.contentRows(-1);
0250: byte[] key;
0251: long start = System.currentTimeMillis();
0252: long lastlog = start;
0253: int count = 0;
0254: while (ei.hasNext()) {
0255: aentry = (kelondroRow.EntryIndex) ei.next();
0256: key = aentry.getColBytes(0);
0257: assert (key != null);
0258: if (key == null)
0259: continue; // skip deleted entries
0260: ientry = irow.newEntry();
0261: ientry.setCol(idx_col_key, key);
0262: ientry.setCol(idx_col_chunksize, chunksize);
0263: ientry.setCol(idx_col_chunkcount,
0264: kelondroRowCollection
0265: .sizeOfExportedCollectionRows(
0266: aentry, 1));
0267: ientry.setCol(idx_col_clusteridx,
0268: (byte) partitionNumber);
0269: ientry.setCol(idx_col_flags, (byte) 0);
0270: ientry.setCol(idx_col_indexpos, aentry.index());
0271: ientry.setCol(idx_col_lastread, t);
0272: ientry.setCol(idx_col_lastwrote, t);
0273: index.addUnique(ientry); // FIXME: this should avoid doubles
0274: count++;
0275:
0276: // write a log
0277: if (System.currentTimeMillis() - lastlog > 30000) {
0278: serverLog
0279: .logFine(
0280: "STARTUP",
0281: "created "
0282: + count
0283: + " RWI index entries. "
0284: + (((System
0285: .currentTimeMillis() - start)
0286: * (array
0287: .size()
0288: + array
0289: .free() - count) / count) / 60000)
0290: + " minutes remaining for this array");
0291: lastlog = System.currentTimeMillis();
0292: }
0293: }
0294: }
0295: }
0296: // care for double entries
0297: ArrayList<kelondroRowSet> del = index.removeDoubles();
0298: Iterator<kelondroRowSet> j = del.iterator();
0299: kelondroRowSet rowset;
0300: Iterator<kelondroRow.Entry> rowiter;
0301: int partition, maxpartition;
0302: kelondroRow.Entry entry, maxentry;
0303: int doublecount = 0;
0304: while (j.hasNext()) {
0305: rowset = j.next();
0306: // for each entry in row set choose one which we want to keep
0307: rowiter = rowset.rows();
0308: maxentry = null;
0309: maxpartition = -1;
0310: while (rowiter.hasNext()) {
0311: entry = rowiter.next();
0312: partition = (int) entry.getColLong(idx_col_clusteridx);
0313: if (partition > maxpartition) {
0314: maxpartition = partition;
0315: maxentry = entry;
0316: }
0317: }
0318: if (maxentry != null) {
0319: // put back a single entry to the index, which is then not double to any other entry
0320: index.put(maxentry);
0321: doublecount++;
0322: }
0323: }
0324: if (doublecount > 0)
0325: serverLog
0326: .logWarning(
0327: "STARTUP",
0328: "found "
0329: + doublecount
0330: + " RWI entries with references to several collections. All have been fixed (zombies still exists).");
0331: }
0332:
0333: private kelondroIndex openIndexFile(File path, String filenameStub,
0334: kelondroByteOrder indexOrder, long preloadTime,
0335: int loadfactor, kelondroRow rowdef, int initialSpace)
0336: throws IOException {
0337: // open/create index table
0338: File f = new File(path, filenameStub + ".index");
0339: kelondroRow indexRowdef = indexRow(keylength, indexOrder);
0340:
0341: if (f.isDirectory()) {
0342: // use a flextable
0343: kelondroIndex theindex = new kelondroCache(
0344: new kelondroFlexTable(path,
0345: filenameStub + ".index", preloadTime,
0346: indexRowdef, initialSpace, true));
0347:
0348: // save/check property file for this array
0349: File propfile = propertyFile(path, filenameStub,
0350: loadfactor, rowdef.objectsize);
0351: Map<String, String> props = new HashMap<String, String>();
0352: if (propfile.exists()) {
0353: props = serverFileUtils.loadHashMap(propfile);
0354: String stored_rowdef = (String) props.get("rowdef");
0355: if ((stored_rowdef == null)
0356: || (!(rowdef.subsumes(new kelondroRow(
0357: stored_rowdef, rowdef.objectOrder, 0))))) {
0358: System.out.println("FATAL ERROR: stored rowdef '"
0359: + stored_rowdef
0360: + "' does not match with new rowdef '"
0361: + rowdef + "' for array cluster '" + path
0362: + "/" + filenameStub + "'");
0363: System.exit(-1);
0364: }
0365: }
0366: props.put("rowdef", rowdef.toString());
0367: serverFileUtils.saveMap(propfile, props,
0368: "CollectionIndex properties");
0369:
0370: return theindex;
0371: } else {
0372: // open a ecotable
0373: long records = f.length() / indexRowdef.objectsize;
0374: long necessaryRAM4fullTable = minimumRAM4Eco
0375: + (indexRowdef.objectsize + 4) * records * 3 / 2;
0376: return new kelondroEcoTable(
0377: f,
0378: indexRowdef,
0379: (serverMemory
0380: .request(necessaryRAM4fullTable, false)) ? kelondroEcoTable.tailCacheUsageAuto
0381: : kelondroEcoTable.tailCacheDenyUsage,
0382: EcoFSBufferSize, initialSpace);
0383: }
0384: }
0385:
0386: private kelondroFixedWidthArray openArrayFile(int partitionNumber,
0387: int serialNumber, kelondroByteOrder indexOrder,
0388: boolean create) throws IOException {
0389: File f = arrayFile(path, filenameStub, loadfactor,
0390: payloadrow.objectsize, partitionNumber, serialNumber);
0391: int load = arrayCapacity(partitionNumber);
0392: kelondroRow rowdef = new kelondroRow("byte[] key-"
0393: + keylength
0394: + ","
0395: + "byte[] collection-"
0396: + (kelondroRowCollection.exportOverheadSize + load
0397: * this .payloadrow.objectsize), indexOrder, 0);
0398: if ((!(f.exists())) && (!create))
0399: return null;
0400: kelondroFixedWidthArray a = new kelondroFixedWidthArray(f,
0401: rowdef, 0);
0402: serverLog.logFine("STARTUP", "opened array file " + f
0403: + " with " + a.size() + " RWIs");
0404: return a;
0405: }
0406:
0407: private kelondroFixedWidthArray getArray(int partitionNumber,
0408: int serialNumber, kelondroByteOrder indexOrder,
0409: int chunksize) {
0410: String accessKey = partitionNumber + "-" + chunksize;
0411: kelondroFixedWidthArray array = (kelondroFixedWidthArray) arrays
0412: .get(accessKey);
0413: if (array != null)
0414: return array;
0415: try {
0416: array = openArrayFile(partitionNumber, serialNumber,
0417: indexOrder, true);
0418: } catch (IOException e) {
0419: return null;
0420: }
0421: arrays.put(accessKey, array);
0422: return array;
0423: }
0424:
0425: private int arrayCapacity(int arrayCounter) {
0426: if (arrayCounter < 0)
0427: return 0;
0428: int load = this .loadfactor;
0429: for (int i = 0; i < arrayCounter; i++)
0430: load = load * this .loadfactor;
0431: return load;
0432: }
0433:
0434: private int arrayIndex(int requestedCapacity)
0435: throws kelondroOutOfLimitsException {
0436: // the requestedCapacity is the number of wanted chunks
0437: int load = 1, i = 0;
0438: while (true) {
0439: load = load * this .loadfactor;
0440: if (load >= requestedCapacity)
0441: return i;
0442: i++;
0443: }
0444: }
0445:
0446: public int size() {
0447: return index.size();
0448: }
0449:
0450: public int minMem() {
0451: // calculate a minimum amount of memory that is necessary to use the collection
0452: // during runtime (after the index was initialized)
0453:
0454: // caclculate an upper limit (not the correct size) of the maximum number of indexes for a wordHash
0455: // this is computed by the size of the biggest used collection
0456: // this must be multiplied with the payload size
0457: // and doubled for necessary memory transformation during sort operation
0458: return (int) (arrayCapacity(arrays.size() - 1)
0459: * this .payloadrow.objectsize * kelondroRowSet.growfactor);
0460: }
0461:
0462: private void array_remove(int oldPartitionNumber, int serialNumber,
0463: int chunkSize, int oldRownumber) throws IOException {
0464: // we need a new slot, that means we must first delete the old entry
0465: // find array file
0466: kelondroFixedWidthArray array = getArray(oldPartitionNumber,
0467: serialNumber, index.row().objectOrder, chunkSize);
0468:
0469: // delete old entry
0470: array.remove(oldRownumber);
0471: }
0472:
0473: private kelondroRow.Entry array_new(byte[] key,
0474: kelondroRowCollection collection) throws IOException {
0475: // the collection is new
0476: int partitionNumber = arrayIndex(collection.size());
0477: kelondroRow.Entry indexrow = index.row().newEntry();
0478: kelondroFixedWidthArray array = getArray(partitionNumber,
0479: serialNumber, index.row().objectOrder,
0480: this .payloadrow.objectsize);
0481:
0482: // define row
0483: kelondroRow.Entry arrayEntry = array.row().newEntry();
0484: arrayEntry.setCol(0, key);
0485: arrayEntry.setCol(1, collection.exportCollection());
0486:
0487: // write a new entry in this array
0488: int newRowNumber = array.add(arrayEntry);
0489:
0490: // store the new row number in the index
0491: indexrow.setCol(idx_col_key, key);
0492: indexrow.setCol(idx_col_chunksize, this .payloadrow.objectsize);
0493: indexrow.setCol(idx_col_chunkcount, collection.size());
0494: indexrow.setCol(idx_col_clusteridx, (byte) partitionNumber);
0495: indexrow.setCol(idx_col_flags, (byte) 0);
0496: indexrow.setCol(idx_col_indexpos, (long) newRowNumber);
0497: indexrow.setCol(idx_col_lastread, kelondroRowCollection
0498: .daysSince2000(System.currentTimeMillis()));
0499: indexrow.setCol(idx_col_lastwrote, kelondroRowCollection
0500: .daysSince2000(System.currentTimeMillis()));
0501:
0502: // after calling this method there must be an index.addUnique(indexrow);
0503: return indexrow;
0504: }
0505:
0506: private void array_add(byte[] key,
0507: kelondroRowCollection collection,
0508: kelondroRow.Entry indexrow, int partitionNumber,
0509: int serialNumber, int chunkSize) throws IOException {
0510:
0511: // write a new entry in the other array
0512: kelondroFixedWidthArray array = getArray(partitionNumber,
0513: serialNumber, index.row().objectOrder, chunkSize);
0514:
0515: // define new row
0516: kelondroRow.Entry arrayEntry = array.row().newEntry();
0517: arrayEntry.setCol(0, key);
0518: arrayEntry.setCol(1, collection.exportCollection());
0519:
0520: // write a new entry in this array
0521: int rowNumber = array.add(arrayEntry);
0522:
0523: // store the new row number in the index
0524: indexrow.setCol(idx_col_chunkcount, collection.size());
0525: indexrow.setCol(idx_col_clusteridx, (byte) partitionNumber);
0526: indexrow.setCol(idx_col_indexpos, (long) rowNumber);
0527: indexrow.setCol(idx_col_lastwrote, kelondroRowCollection
0528: .daysSince2000(System.currentTimeMillis()));
0529:
0530: // after calling this method there must be a index.put(indexrow);
0531: }
0532:
0533: private ArrayList<kelondroRow.Entry> array_add_multiple(
0534: TreeMap<Integer, ArrayList<Object[]>> array_add_map,
0535: int serialNumber, int chunkSize) throws IOException {
0536: // returns a List of kelondroRow.Entry entries for indexrow storage
0537: Map.Entry<Integer, ArrayList<Object[]>> entry;
0538: Iterator<Map.Entry<Integer, ArrayList<Object[]>>> i = array_add_map
0539: .entrySet().iterator();
0540: Iterator<Object[]> j;
0541: ArrayList<Object[]> actionList;
0542: int partitionNumber;
0543: kelondroFixedWidthArray array;
0544: Object[] objs;
0545: byte[] key;
0546: kelondroRowCollection collection;
0547: kelondroRow.Entry indexrow;
0548: ArrayList<kelondroRow.Entry> indexrows = new ArrayList<kelondroRow.Entry>();
0549: while (i.hasNext()) {
0550: entry = i.next();
0551: actionList = entry.getValue();
0552: partitionNumber = entry.getKey().intValue();
0553: array = getArray(partitionNumber, serialNumber,
0554: index.row().objectOrder, chunkSize);
0555: j = actionList.iterator();
0556: while (j.hasNext()) {
0557: objs = (Object[]) j.next();
0558: key = (byte[]) objs[0];
0559: collection = (kelondroRowCollection) objs[1];
0560: indexrow = (kelondroRow.Entry) objs[2];
0561:
0562: // define new row
0563: kelondroRow.Entry arrayEntry = array.row().newEntry();
0564: arrayEntry.setCol(0, key);
0565: arrayEntry.setCol(1, collection.exportCollection());
0566:
0567: // write a new entry in this array
0568: int rowNumber = array.add(arrayEntry);
0569:
0570: // store the new row number in the index
0571: indexrow.setCol(idx_col_chunkcount, collection.size());
0572: indexrow.setCol(idx_col_clusteridx,
0573: (byte) partitionNumber);
0574: indexrow.setCol(idx_col_indexpos, (long) rowNumber);
0575: indexrow.setCol(idx_col_lastwrote,
0576: kelondroRowCollection.daysSince2000(System
0577: .currentTimeMillis()));
0578: indexrows.add(indexrow);
0579: }
0580: }
0581: // after calling this method there must be a index.put(indexrow);
0582: return indexrows;
0583: }
0584:
0585: private void array_replace(byte[] key,
0586: kelondroRowCollection collection,
0587: kelondroRow.Entry indexrow, int partitionNumber,
0588: int serialNumber, int chunkSize, int rowNumber)
0589: throws IOException {
0590: // we don't need a new slot, just write collection into the old one
0591:
0592: // find array file
0593: kelondroFixedWidthArray array = getArray(partitionNumber,
0594: serialNumber, index.row().objectOrder, chunkSize);
0595:
0596: // define new row
0597: kelondroRow.Entry arrayEntry = array.row().newEntry();
0598: arrayEntry.setCol(0, key);
0599: arrayEntry.setCol(1, collection.exportCollection());
0600:
0601: // overwrite entry in this array
0602: array.set(rowNumber, arrayEntry);
0603:
0604: // update the index entry
0605: final int collectionsize = collection.size(); // extra variable for easier debugging
0606: indexrow.setCol(idx_col_chunkcount, collectionsize);
0607: indexrow.setCol(idx_col_clusteridx, (byte) partitionNumber);
0608: indexrow.setCol(idx_col_lastwrote, kelondroRowCollection
0609: .daysSince2000(System.currentTimeMillis()));
0610:
0611: // after calling this method there must be a index.put(indexrow);
0612: }
0613:
0614: private ArrayList<kelondroRow.Entry> array_replace_multiple(
0615: TreeMap<Integer, TreeMap<Integer, Object[]>> array_replace_map,
0616: int serialNumber, int chunkSize) throws IOException {
0617: Map.Entry<Integer, TreeMap<Integer, Object[]>> entry;
0618: Map.Entry<Integer, Object[]> e;
0619: Iterator<Map.Entry<Integer, TreeMap<Integer, Object[]>>> i = array_replace_map
0620: .entrySet().iterator();
0621: Iterator<Map.Entry<Integer, Object[]>> j;
0622: TreeMap<Integer, Object[]> actionMap;
0623: int partitionNumber;
0624: kelondroFixedWidthArray array;
0625: ArrayList<kelondroRow.Entry> indexrows = new ArrayList<kelondroRow.Entry>();
0626: Object[] objs;
0627: int rowNumber;
0628: byte[] key;
0629: kelondroRowCollection collection;
0630: kelondroRow.Entry indexrow;
0631: while (i.hasNext()) {
0632: entry = i.next();
0633: actionMap = entry.getValue();
0634: partitionNumber = ((Integer) entry.getKey()).intValue();
0635: array = getArray(partitionNumber, serialNumber,
0636: index.row().objectOrder, chunkSize);
0637:
0638: j = actionMap.entrySet().iterator();
0639: while (j.hasNext()) {
0640: e = j.next();
0641: rowNumber = ((Integer) e.getKey()).intValue();
0642: objs = (Object[]) e.getValue();
0643: key = (byte[]) objs[0];
0644: collection = (kelondroRowCollection) objs[1];
0645: indexrow = (kelondroRow.Entry) objs[2];
0646:
0647: // define new row
0648: kelondroRow.Entry arrayEntry = array.row().newEntry();
0649: arrayEntry.setCol(0, key);
0650: arrayEntry.setCol(1, collection.exportCollection());
0651:
0652: // overwrite entry in this array
0653: array.set(rowNumber, arrayEntry);
0654:
0655: // update the index entry
0656: indexrow.setCol(idx_col_chunkcount, collection.size());
0657: indexrow.setCol(idx_col_clusteridx,
0658: (byte) partitionNumber);
0659: indexrow.setCol(idx_col_lastwrote,
0660: kelondroRowCollection.daysSince2000(System
0661: .currentTimeMillis()));
0662: indexrows.add(indexrow);
0663: }
0664: }
0665: // after calling this method there mus be a index.put(indexrow);
0666: return indexrows;
0667: }
0668:
0669: public synchronized void put(byte[] key,
0670: kelondroRowCollection collection) throws IOException,
0671: kelondroOutOfLimitsException {
0672: assert (key != null);
0673: assert (collection != null);
0674: assert (collection.size() != 0);
0675:
0676: // first find an old entry, if one exists
0677: kelondroRow.Entry indexrow = index.get(key);
0678:
0679: if (indexrow == null) {
0680: // create new row and index entry
0681: if ((collection != null) && (collection.size() > 0)) {
0682: indexrow = array_new(key, collection); // modifies indexrow
0683: index.addUnique(indexrow);
0684: }
0685: return;
0686: }
0687:
0688: // overwrite the old collection
0689: // read old information
0690: //int oldchunksize = (int) indexrow.getColLong(idx_col_chunksize); // needed only for migration
0691: int oldchunkcount = (int) indexrow
0692: .getColLong(idx_col_chunkcount); // the number if rows in the collection
0693: int oldrownumber = (int) indexrow.getColLong(idx_col_indexpos); // index of the entry in array
0694: int oldPartitionNumber = (int) indexrow
0695: .getColByte(idx_col_clusteridx); // points to array file
0696: assert (oldPartitionNumber >= arrayIndex(oldchunkcount));
0697:
0698: int newPartitionNumber = arrayIndex(collection.size());
0699:
0700: // see if we need new space or if we can overwrite the old space
0701: if (oldPartitionNumber == newPartitionNumber) {
0702: array_replace(key, collection, indexrow,
0703: oldPartitionNumber, serialNumber,
0704: this .payloadrow.objectsize, oldrownumber); // modifies indexrow
0705: } else {
0706: array_remove(oldPartitionNumber, serialNumber,
0707: this .payloadrow.objectsize, oldrownumber);
0708: array_add(key, collection, indexrow, newPartitionNumber,
0709: serialNumber, this .payloadrow.objectsize); // modifies indexrow
0710: }
0711:
0712: if ((int) indexrow.getColLong(idx_col_chunkcount) != collection
0713: .size())
0714: serverLog
0715: .logSevere(
0716: "kelondroCollectionIndex",
0717: "UPDATE (put) ERROR: array has different chunkcount than index after merge: index = "
0718: + (int) indexrow
0719: .getColLong(idx_col_chunkcount)
0720: + ", collection.size() = "
0721: + collection.size());
0722:
0723: index.put(indexrow); // write modified indexrow
0724: }
0725:
0726: public synchronized void mergeMultiple(
0727: List<indexContainer> containerList) throws IOException,
0728: kelondroOutOfLimitsException {
0729: // merge a bulk of index containers
0730: // this method should be used to optimize the R/W head path length
0731:
0732: // separate the list in two halves:
0733: // - containers that do not exist yet in the collection
0734: // - containers that do exist in the collection and must be merged
0735: Iterator<indexContainer> i = containerList.iterator();
0736: indexContainer container;
0737: byte[] key;
0738: ArrayList<Object[]> newContainer = new ArrayList<Object[]>();
0739: TreeMap<Integer, TreeMap<Integer, Object[]>> existingContainer = new TreeMap<Integer, TreeMap<Integer, Object[]>>(); // a mapping from Integer (partition) to a TreeMap (mapping from index to object triple)
0740: TreeMap<Integer, Object[]> containerMap; // temporary map; mapping from index position to object triple with {key, container, indexrow}
0741: kelondroRow.Entry indexrow;
0742: int oldrownumber1; // index of the entry in array
0743: int oldPartitionNumber1; // points to array file
0744: while (i.hasNext()) {
0745: container = (indexContainer) i.next();
0746:
0747: if ((container == null) || (container.size() == 0))
0748: continue;
0749: key = container.getWordHash().getBytes();
0750:
0751: // first find an old entry, if one exists
0752: indexrow = index.get(key);
0753: if (indexrow == null) {
0754: newContainer.add(new Object[] { key, container });
0755: } else {
0756: oldrownumber1 = (int) indexrow
0757: .getColLong(idx_col_indexpos);
0758: oldPartitionNumber1 = (int) indexrow
0759: .getColByte(idx_col_clusteridx);
0760: containerMap = existingContainer.get(new Integer(
0761: oldPartitionNumber1));
0762: if (containerMap == null)
0763: containerMap = new TreeMap<Integer, Object[]>();
0764: containerMap.put(new Integer(oldrownumber1),
0765: new Object[] { key, container, indexrow });
0766: existingContainer.put(new Integer(oldPartitionNumber1),
0767: containerMap);
0768: }
0769: }
0770:
0771: // now iterate through the container lists and execute merges
0772: // this is done in such a way, that there is a optimized path for the R/W head
0773:
0774: // merge existing containers
0775: Map.Entry<Integer, Object[]> tripleEntry;
0776: Object[] record;
0777: ArrayList<kelondroRow.Entry> indexrows_existing = new ArrayList<kelondroRow.Entry>();
0778: kelondroRowCollection collection;
0779: TreeMap<Integer, TreeMap<Integer, Object[]>> array_replace_map = new TreeMap<Integer, TreeMap<Integer, Object[]>>();
0780: TreeMap<Integer, ArrayList<Object[]>> array_add_map = new TreeMap<Integer, ArrayList<Object[]>>();
0781: ArrayList<Object[]> actionList;
0782: TreeMap<Integer, Object[]> actionMap;
0783: //boolean madegc = false;
0784: //System.out.println("DEBUG existingContainer: " + existingContainer.toString());
0785: while (existingContainer.size() > 0) {
0786: oldPartitionNumber1 = ((Integer) existingContainer
0787: .lastKey()).intValue();
0788: containerMap = existingContainer.remove(new Integer(
0789: oldPartitionNumber1));
0790: Iterator<Map.Entry<Integer, Object[]>> j = containerMap
0791: .entrySet().iterator();
0792: while (j.hasNext()) {
0793: tripleEntry = j.next();
0794: oldrownumber1 = ((Integer) tripleEntry.getKey())
0795: .intValue();
0796: record = (Object[]) tripleEntry.getValue(); // {byte[], indexContainer, kelondroRow.Entry}
0797:
0798: // merge with the old collection
0799: key = (byte[]) record[0];
0800: collection = (kelondroRowCollection) record[1];
0801: indexrow = (kelondroRow.Entry) record[2];
0802:
0803: // read old information
0804: int oldchunksize = (int) indexrow
0805: .getColLong(idx_col_chunksize); // needed only for migration
0806: int oldchunkcount = (int) indexrow
0807: .getColLong(idx_col_chunkcount); // the number if rows in the collection
0808: int oldrownumber = (int) indexrow
0809: .getColLong(idx_col_indexpos); // index of the entry in array
0810: int oldPartitionNumber = (int) indexrow
0811: .getColByte(idx_col_clusteridx); // points to array file
0812: assert oldPartitionNumber1 == oldPartitionNumber : "oldPartitionNumber1 = "
0813: + oldPartitionNumber1
0814: + ", oldPartitionNumber = "
0815: + oldPartitionNumber
0816: + ", containerMap = "
0817: + containerMap
0818: + ", existingContainer: "
0819: + existingContainer.toString();
0820: assert oldrownumber1 == oldrownumber : "oldrownumber1 = "
0821: + oldrownumber1
0822: + ", oldrownumber = "
0823: + oldrownumber
0824: + ", containerMap = "
0825: + containerMap
0826: + ", existingContainer: "
0827: + existingContainer.toString();
0828: assert (oldPartitionNumber >= arrayIndex(oldchunkcount));
0829: int oldSerialNumber = 0;
0830:
0831: // load the old collection and join it
0832: collection.addAllUnique(getwithparams(indexrow,
0833: oldchunksize, oldchunkcount,
0834: oldPartitionNumber, oldrownumber,
0835: oldSerialNumber, false));
0836: collection.sort();
0837: collection.uniq(); // FIXME: not clear if it would be better to insert the collection with put to avoid double-entries
0838: collection.trim(false);
0839:
0840: // check for size of collection:
0841: // if necessary shrink the collection and dump a part of that collection
0842: // to avoid that this grows too big
0843: if (arrayIndex(collection.size()) > maxPartitions) {
0844: shrinkCollection(key, collection,
0845: arrayCapacity(maxPartitions));
0846: }
0847:
0848: // determine new partition position
0849: int newPartitionNumber = arrayIndex(collection.size());
0850:
0851: // see if we need new space or if we can overwrite the old space
0852: if (oldPartitionNumber == newPartitionNumber) {
0853: actionMap = array_replace_map.get(new Integer(
0854: oldPartitionNumber));
0855: if (actionMap == null)
0856: actionMap = new TreeMap<Integer, Object[]>();
0857: actionMap.put(new Integer(oldrownumber),
0858: new Object[] { key, collection, indexrow });
0859: array_replace_map.put(new Integer(
0860: oldPartitionNumber), actionMap);
0861: /*
0862: array_replace(
0863: key, collection, indexrow,
0864: oldPartitionNumber, oldSerialNumber, this.payloadrow.objectsize(),
0865: oldrownumber); // modifies indexrow
0866: indexrows_existing.add(indexrow); // indexrows are collected and written later as block
0867: */
0868: } else {
0869: array_remove(oldPartitionNumber, oldSerialNumber,
0870: this .payloadrow.objectsize, oldrownumber);
0871:
0872: actionList = array_add_map.get(new Integer(
0873: newPartitionNumber));
0874: if (actionList == null)
0875: actionList = new ArrayList<Object[]>();
0876: actionList.add(new Object[] { key, collection,
0877: indexrow });
0878: array_add_map.put(new Integer(newPartitionNumber),
0879: actionList);
0880: /*
0881: array_add(
0882: key, collection, indexrow,
0883: newPartitionNumber, oldSerialNumber, this.payloadrow.objectsize()); // modifies indexrow
0884: indexrows_existing.add(indexrow); // indexrows are collected and written later as block
0885: */
0886: }
0887:
0888: // memory protection: flush collected collections
0889: if (serverMemory.available() < minMem()) {
0890: // emergency flush
0891: indexrows_existing.addAll(array_replace_multiple(
0892: array_replace_map, 0,
0893: this .payloadrow.objectsize));
0894: array_replace_map = new TreeMap<Integer, TreeMap<Integer, Object[]>>(); // delete references
0895: indexrows_existing.addAll(array_add_multiple(
0896: array_add_map, 0,
0897: this .payloadrow.objectsize));
0898: array_add_map = new TreeMap<Integer, ArrayList<Object[]>>(); // delete references
0899: //if (!madegc) {
0900: // prevent that this flush is made again even when there is enough memory
0901: serverMemory
0902: .gc(10000,
0903: "kelendroCollectionIndex.mergeMultiple(...)"); // thq
0904: // prevent that this gc happens more than one time
0905: // madegc = true;
0906: //}
0907: }
0908: }
0909: }
0910:
0911: // finallly flush the collected collections
0912: indexrows_existing.addAll(array_replace_multiple(
0913: array_replace_map, 0, this .payloadrow.objectsize));
0914: array_replace_map = new TreeMap<Integer, TreeMap<Integer, Object[]>>(); // delete references
0915: indexrows_existing.addAll(array_add_multiple(array_add_map, 0,
0916: this .payloadrow.objectsize));
0917: array_add_map = new TreeMap<Integer, ArrayList<Object[]>>(); // delete references
0918:
0919: // write new containers
0920: Iterator<Object[]> k = newContainer.iterator();
0921: ArrayList<kelondroRow.Entry> indexrows_new = new ArrayList<kelondroRow.Entry>();
0922: while (k.hasNext()) {
0923: record = k.next(); // {byte[], indexContainer}
0924: key = (byte[]) record[0];
0925: collection = (indexContainer) record[1];
0926: indexrow = array_new(key, collection); // modifies indexrow
0927: indexrows_new.add(indexrow); // collect new index rows
0928: }
0929:
0930: // write index entries
0931: index.putMultiple(indexrows_existing); // write modified indexrows in optimized manner
0932: index.addUniqueMultiple(indexrows_new); // write new indexrows in optimized manner
0933: }
0934:
0935: public synchronized void merge(indexContainer container)
0936: throws IOException, kelondroOutOfLimitsException {
0937: if ((container == null) || (container.size() == 0))
0938: return;
0939: byte[] key = container.getWordHash().getBytes();
0940:
0941: // first find an old entry, if one exists
0942: kelondroRow.Entry indexrow = index.get(key);
0943: if (indexrow == null) {
0944: indexrow = array_new(key, container); // modifies indexrow
0945: index.addUnique(indexrow); // write modified indexrow
0946: } else {
0947: // merge with the old collection
0948: // attention! this modifies the indexrow entry which must be written with index.put(indexrow) afterwards!
0949: kelondroRowCollection collection = (kelondroRowCollection) container;
0950:
0951: // read old information
0952: int oldchunksize = (int) indexrow
0953: .getColLong(idx_col_chunksize); // needed only for migration
0954: int oldchunkcount = (int) indexrow
0955: .getColLong(idx_col_chunkcount); // the number if rows in the collection
0956: int oldrownumber = (int) indexrow
0957: .getColLong(idx_col_indexpos); // index of the entry in array
0958: int oldPartitionNumber = (int) indexrow
0959: .getColByte(idx_col_clusteridx); // points to array file
0960: assert (oldPartitionNumber >= arrayIndex(oldchunkcount)) : "oldPartitionNumber = "
0961: + oldPartitionNumber
0962: + ", arrayIndex(oldchunkcount) = "
0963: + arrayIndex(oldchunkcount);
0964: int oldSerialNumber = 0;
0965:
0966: // load the old collection and join it
0967: collection.addAllUnique(getwithparams(indexrow,
0968: oldchunksize, oldchunkcount, oldPartitionNumber,
0969: oldrownumber, oldSerialNumber, false));
0970: collection.sort();
0971: collection.uniq(); // FIXME: not clear if it would be better to insert the collection with put to avoid double-entries
0972: collection.trim(false);
0973:
0974: // check for size of collection:
0975: // if necessary shrink the collection and dump a part of that collection
0976: // to avoid that this grows too big
0977: if (arrayIndex(collection.size()) > maxPartitions) {
0978: shrinkCollection(key, collection,
0979: arrayCapacity(maxPartitions));
0980: }
0981:
0982: // determine new partition location
0983: int newPartitionNumber = arrayIndex(collection.size());
0984:
0985: // see if we need new space or if we can overwrite the old space
0986: if (oldPartitionNumber == newPartitionNumber) {
0987: array_replace(key, collection, indexrow,
0988: oldPartitionNumber, oldSerialNumber,
0989: this .payloadrow.objectsize, oldrownumber); // modifies indexrow
0990: } else {
0991: array_remove(oldPartitionNumber, oldSerialNumber,
0992: this .payloadrow.objectsize, oldrownumber);
0993: array_add(key, collection, indexrow,
0994: newPartitionNumber, oldSerialNumber,
0995: this .payloadrow.objectsize); // modifies indexrow
0996: }
0997:
0998: final int collectionsize = collection.size(); // extra variable for easier debugging
0999: final int indexrowcount = (int) indexrow
1000: .getColLong(idx_col_chunkcount);
1001: if (indexrowcount != collectionsize)
1002: serverLog
1003: .logSevere(
1004: "kelondroCollectionIndex",
1005: "UPDATE (merge) ERROR: array has different chunkcount than index after merge: index = "
1006: + indexrowcount
1007: + ", collection.size() = "
1008: + collectionsize);
1009:
1010: index.put(indexrow); // write modified indexrow
1011: }
1012: }
1013:
1014: private void shrinkCollection(byte[] key,
1015: kelondroRowCollection collection, int targetSize) {
1016: //TODO Remove timing before release
1017: // removes entries from collection
1018: // the removed entries are stored in a 'commons' dump file
1019:
1020: if (key.length != 12)
1021: return;
1022: // check if the collection is already small enough
1023: int oldsize = collection.size();
1024: if (oldsize <= targetSize)
1025: return;
1026: kelondroRowSet newcommon = new kelondroRowSet(
1027: collection.rowdef, 0);
1028: long sadd1 = 0, srem1 = 0, sadd2 = 0, srem2 = 0, tot1 = 0, tot2 = 0;
1029: long t1 = 0, t2 = 0;
1030:
1031: // delete some entries, which are bad rated
1032: Iterator<kelondroRow.Entry> i = collection.rows();
1033: kelondroRow.Entry entry;
1034: byte[] ref;
1035: t1 = System.currentTimeMillis();
1036: while (i.hasNext()) {
1037: entry = i.next();
1038: ref = entry.getColBytes(0);
1039: if ((ref.length != 12)
1040: || (!yacyURL.probablyRootURL(new String(ref)))) {
1041: t2 = System.currentTimeMillis();
1042: newcommon.addUnique(entry);
1043: sadd1 += System.currentTimeMillis() - t2;
1044: t2 = System.currentTimeMillis();
1045: i.remove();
1046: srem1 += System.currentTimeMillis() - t2;
1047: }
1048: }
1049: int firstnewcommon = newcommon.size();
1050: tot1 = System.currentTimeMillis() - t1;
1051:
1052: // check if we shrinked enough
1053: Random rand = new Random(System.currentTimeMillis());
1054: t1 = System.currentTimeMillis();
1055: while (collection.size() > targetSize) {
1056: // now delete randomly more entries from the survival collection
1057: i = collection.rows();
1058: while (i.hasNext()) {
1059: entry = (kelondroRow.Entry) i.next();
1060: ref = entry.getColBytes(0);
1061: if (rand.nextInt() % 4 != 0) {
1062: t2 = System.currentTimeMillis();
1063: newcommon.addUnique(entry);
1064: sadd2 += System.currentTimeMillis() - t2;
1065: t2 = System.currentTimeMillis();
1066: i.remove();
1067: srem2 += System.currentTimeMillis() - t2;
1068: }
1069: }
1070: }
1071: tot2 = System.currentTimeMillis() - t1;
1072: collection.trim(false);
1073:
1074: serverLog.logFine("kelondroCollectionIndex", "tot= " + tot1
1075: + '/' + tot2 + " # add/rem(1)= " + sadd1 + '/' + srem1
1076: + " # add/rem(2)= " + sadd2 + '/' + srem2);
1077: serverLog.logInfo("kelondroCollectionIndex",
1078: "shrinked common word " + new String(key)
1079: + "; old size = " + oldsize + ", new size = "
1080: + collection.size() + ", maximum size = "
1081: + targetSize + ", newcommon size = "
1082: + newcommon.size() + ", first newcommon = "
1083: + firstnewcommon);
1084:
1085: // finally dump the removed entries to a file
1086: newcommon.sort();
1087: SimpleDateFormat formatter = new SimpleDateFormat(
1088: "yyyyMMddHHmmss");
1089: formatter.setTimeZone(TimeZone.getTimeZone("GMT"));
1090: String filename = serverCodings
1091: .encodeHex(kelondroBase64Order.enhancedCoder
1092: .decode(new String(key),
1093: "de.anomic.kelondro.kelondroCollectionIndex.shrinkCollection(...)"))
1094: + "_" + formatter.format(new Date()) + ".collection";
1095: File storagePath = new File(commonsPath, filename.substring(0,
1096: 2)); // make a subpath
1097: storagePath.mkdirs();
1098: File file = new File(storagePath, filename);
1099: try {
1100: newcommon.saveCollection(file);
1101: serverLog.logInfo("kelondroCollectionIndex",
1102: "dumped common word " + new String(key) + " to "
1103: + file.toString() + "; size = "
1104: + newcommon.size());
1105: } catch (IOException e) {
1106: e.printStackTrace();
1107: serverLog.logWarning("kelondroCollectionIndex",
1108: "failed to dump common word " + new String(key)
1109: + " to " + file.toString() + "; size = "
1110: + newcommon.size());
1111: }
1112:
1113: }
1114:
1115: public synchronized int remove(byte[] key, Set<String> removekeys)
1116: throws IOException, kelondroOutOfLimitsException {
1117:
1118: if ((removekeys == null) || (removekeys.size() == 0))
1119: return 0;
1120:
1121: // first find an old entry, if one exists
1122: kelondroRow.Entry indexrow = index.get(key);
1123:
1124: if (indexrow == null)
1125: return 0;
1126:
1127: // overwrite the old collection
1128: // read old information
1129: int oldchunksize = (int) indexrow.getColLong(idx_col_chunksize); // needed only for migration
1130: int oldchunkcount = (int) indexrow
1131: .getColLong(idx_col_chunkcount); // the number if rows in the collection
1132: int oldrownumber = (int) indexrow.getColLong(idx_col_indexpos); // index of the entry in array
1133: int oldPartitionNumber = (int) indexrow
1134: .getColByte(idx_col_clusteridx); // points to array file
1135: assert (oldPartitionNumber >= arrayIndex(oldchunkcount));
1136:
1137: int removed = 0;
1138: assert (removekeys != null);
1139: // load the old collection and remove keys
1140: kelondroRowSet oldcollection = getwithparams(indexrow,
1141: oldchunksize, oldchunkcount, oldPartitionNumber,
1142: oldrownumber, serialNumber, false);
1143:
1144: // remove the keys from the set
1145: Iterator<String> i = removekeys.iterator();
1146: while (i.hasNext()) {
1147: if (oldcollection.remove(i.next().getBytes(), false) != null)
1148: removed++;
1149: }
1150: oldcollection.sort();
1151: oldcollection.trim(false);
1152:
1153: /* in case that the new array size is zero we dont delete the array, just allocate a minimal chunk
1154: *
1155:
1156: if (oldcollection.size() == 0) {
1157: // delete the index entry and the array
1158: kelondroFixedWidthArray array = getArray(oldPartitionNumber, serialNumber, oldchunksize);
1159: array.remove(oldrownumber, false);
1160: index.remove(key);
1161: return removed;
1162: }
1163: */
1164: int newPartitionNumber = arrayIndex(oldcollection.size());
1165:
1166: // see if we need new space or if we can overwrite the old space
1167: if (oldPartitionNumber == newPartitionNumber) {
1168: array_replace(key, oldcollection, indexrow,
1169: oldPartitionNumber, serialNumber,
1170: this .payloadrow.objectsize, oldrownumber); // modifies indexrow
1171: } else {
1172: array_remove(oldPartitionNumber, serialNumber,
1173: this .payloadrow.objectsize, oldrownumber);
1174: array_add(key, oldcollection, indexrow, newPartitionNumber,
1175: serialNumber, this .payloadrow.objectsize); // modifies indexrow
1176: }
1177: index.put(indexrow); // write modified indexrow
1178: return removed;
1179: }
1180:
1181: public synchronized int indexSize(byte[] key) throws IOException {
1182: kelondroRow.Entry indexrow = index.get(key);
1183: if (indexrow == null)
1184: return 0;
1185: return (int) indexrow.getColLong(idx_col_chunkcount);
1186: }
1187:
1188: public synchronized boolean has(byte[] key) throws IOException {
1189: return index.has(key);
1190: }
1191:
1192: public synchronized kelondroRowSet get(byte[] key)
1193: throws IOException {
1194: // find an entry, if one exists
1195: kelondroRow.Entry indexrow = index.get(key);
1196: if (indexrow == null)
1197: return null;
1198: kelondroRowSet col = getdelete(indexrow, false);
1199: assert (col != null);
1200: return col;
1201: }
1202:
1203: public synchronized kelondroRowSet delete(byte[] key)
1204: throws IOException {
1205: // find an entry, if one exists
1206: kelondroRow.Entry indexrow = index.remove(key, false);
1207: if (indexrow == null)
1208: return null;
1209: kelondroRowSet removedCollection = getdelete(indexrow, true);
1210: assert (removedCollection != null);
1211: return removedCollection;
1212: }
1213:
1214: protected kelondroRowSet getdelete(kelondroRow.Entry indexrow,
1215: boolean remove) throws IOException {
1216: // call this only within a synchronized(index) environment
1217:
1218: // read values
1219: int chunksize = (int) indexrow.getColLong(idx_col_chunksize);
1220: int chunkcount = (int) indexrow.getColLong(idx_col_chunkcount);
1221: int rownumber = (int) indexrow.getColLong(idx_col_indexpos);
1222: int partitionnumber = (int) indexrow
1223: .getColByte(idx_col_clusteridx);
1224: assert (partitionnumber >= arrayIndex(chunkcount)) : "partitionnumber = "
1225: + partitionnumber
1226: + ", arrayIndex(chunkcount) = "
1227: + arrayIndex(chunkcount);
1228: int serialnumber = 0;
1229:
1230: return getwithparams(indexrow, chunksize, chunkcount,
1231: partitionnumber, rownumber, serialnumber, remove);
1232: }
1233:
1234: private synchronized kelondroRowSet getwithparams(
1235: kelondroRow.Entry indexrow, int chunksize, int chunkcount,
1236: int clusteridx, int rownumber, int serialnumber,
1237: boolean remove) throws IOException {
1238: // open array entry
1239: kelondroFixedWidthArray array = getArray(clusteridx,
1240: serialnumber, index.row().objectOrder, chunksize);
1241: kelondroRow.Entry arrayrow = array.get(rownumber);
1242: if (arrayrow == null)
1243: throw new kelondroException(arrayFile(this .path,
1244: this .filenameStub, this .loadfactor, chunksize,
1245: clusteridx, serialnumber).toString(),
1246: "array does not contain expected row");
1247:
1248: // read the row and define a collection
1249: byte[] indexkey = indexrow.getColBytes(idx_col_key);
1250: byte[] arraykey = arrayrow.getColBytes(0);
1251: if (!(index.row().objectOrder.wellformed(arraykey))) {
1252: // cleanup for a bad bug that corrupted the database
1253: index.remove(indexkey, false); // the RowCollection must be considered lost
1254: array.remove(rownumber); // loose the RowCollection (we don't know how much is lost)
1255: serverLog.logSevere("kelondroCollectionIndex."
1256: + array.filename,
1257: "lost a RowCollection because of a bad arraykey");
1258: return new kelondroRowSet(this .payloadrow, 0);
1259: }
1260: kelondroRowSet collection = new kelondroRowSet(this .payloadrow,
1261: arrayrow, 1); // FIXME: this does not yet work with different rowdef in case of several rowdef.objectsize()
1262: if ((!(index.row().objectOrder.wellformed(indexkey)))
1263: || (index.row().objectOrder.compare(arraykey, indexkey) != 0)) {
1264: // check if we got the right row; this row is wrong. Fix it:
1265: index.remove(indexkey, false); // the wrong row cannot be fixed
1266: // store the row number in the index; this may be a double-entry, but better than nothing
1267: kelondroRow.Entry indexEntry = index.row().newEntry();
1268: indexEntry.setCol(idx_col_key, arrayrow.getColBytes(0));
1269: indexEntry.setCol(idx_col_chunksize,
1270: this .payloadrow.objectsize);
1271: indexEntry.setCol(idx_col_chunkcount, collection.size());
1272: indexEntry.setCol(idx_col_clusteridx, (byte) clusteridx);
1273: indexEntry.setCol(idx_col_flags, (byte) 0);
1274: indexEntry.setCol(idx_col_indexpos, (long) rownumber);
1275: indexEntry.setCol(idx_col_lastread, kelondroRowCollection
1276: .daysSince2000(System.currentTimeMillis()));
1277: indexEntry.setCol(idx_col_lastwrote, kelondroRowCollection
1278: .daysSince2000(System.currentTimeMillis()));
1279: index.put(indexEntry);
1280: serverLog.logSevere("kelondroCollectionIndex."
1281: + array.filename, "array contains wrong row '"
1282: + new String(arrayrow.getColBytes(0))
1283: + "', expected is '"
1284: + new String(indexrow.getColBytes(idx_col_key))
1285: + "', the row has been fixed");
1286: }
1287: int chunkcountInArray = collection.size();
1288: if (chunkcountInArray != chunkcount) {
1289: // fix the entry in index
1290: indexrow.setCol(idx_col_chunkcount, chunkcountInArray);
1291: index.put(indexrow);
1292: array
1293: .logFailure("INCONSISTENCY (get) in "
1294: + arrayFile(this .path, this .filenameStub,
1295: this .loadfactor, chunksize,
1296: clusteridx, serialnumber)
1297: .toString()
1298: + ": array has different chunkcount than index: index = "
1299: + chunkcount + ", array = "
1300: + chunkcountInArray
1301: + "; the index has been auto-fixed");
1302: }
1303: if (remove)
1304: array.remove(rownumber); // index is removed in calling method
1305: return collection;
1306: }
1307:
1308: public synchronized Iterator<Object[]> keycollections(
1309: byte[] startKey, byte[] secondKey, boolean rot) {
1310: // returns an iteration of {byte[], kelondroRowSet} Objects
1311: try {
1312: return new keycollectionIterator(startKey, secondKey, rot);
1313: } catch (IOException e) {
1314: e.printStackTrace();
1315: return null;
1316: }
1317: }
1318:
1319: public class keycollectionIterator implements Iterator<Object[]> {
1320:
1321: Iterator<kelondroRow.Entry> indexRowIterator;
1322:
1323: public keycollectionIterator(byte[] startKey, byte[] secondKey,
1324: boolean rot) throws IOException {
1325: // iterator of {byte[], kelondroRowSet} Objects
1326: kelondroCloneableIterator<kelondroRow.Entry> i = index
1327: .rows(true, startKey);
1328: indexRowIterator = (rot) ? new kelondroRotateIterator<kelondroRow.Entry>(
1329: i, secondKey, index.size())
1330: : i;
1331: }
1332:
1333: public boolean hasNext() {
1334: return indexRowIterator.hasNext();
1335: }
1336:
1337: public Object[] next() {
1338: kelondroRow.Entry indexrow = (kelondroRow.Entry) indexRowIterator
1339: .next();
1340: assert (indexrow != null);
1341: if (indexrow == null)
1342: return null;
1343: try {
1344: return new Object[] { indexrow.getColBytes(0),
1345: getdelete(indexrow, false) };
1346: } catch (IOException e) {
1347: e.printStackTrace();
1348: return null;
1349: }
1350: }
1351:
1352: public void remove() {
1353: indexRowIterator.remove();
1354: }
1355:
1356: }
1357:
1358: public synchronized void close() {
1359: this .index.close();
1360: Iterator<kelondroFixedWidthArray> i = arrays.values()
1361: .iterator();
1362: while (i.hasNext())
1363: i.next().close();
1364: }
1365:
1366: public static void main(String[] args) {
1367:
1368: // define payload structure
1369: kelondroRow rowdef = new kelondroRow(
1370: "byte[] a-10, byte[] b-80",
1371: kelondroNaturalOrder.naturalOrder, 0);
1372:
1373: File path = new File(args[0]);
1374: String filenameStub = args[1];
1375: long preloadTime = 10000;
1376: try {
1377: // initialize collection index
1378: kelondroCollectionIndex collectionIndex = new kelondroCollectionIndex(
1379: path, filenameStub, 9 /*keyLength*/,
1380: kelondroNaturalOrder.naturalOrder, preloadTime,
1381: 4 /*loadfactor*/, 7, rowdef);
1382:
1383: // fill index with values
1384: kelondroRowSet collection = new kelondroRowSet(rowdef, 0);
1385: collection.addUnique(rowdef.newEntry(new byte[][] {
1386: "abc".getBytes(), "efg".getBytes() }));
1387: collectionIndex.put("erstes".getBytes(), collection);
1388:
1389: for (int i = 1; i <= 170; i++) {
1390: collection = new kelondroRowSet(rowdef, 0);
1391: for (int j = 0; j < i; j++) {
1392: collection
1393: .addUnique(rowdef.newEntry(new byte[][] {
1394: ("abc" + j).getBytes(),
1395: "xxx".getBytes() }));
1396: }
1397: System.out.println("put key-" + i + ": "
1398: + collection.toString());
1399: collectionIndex
1400: .put(("key-" + i).getBytes(), collection);
1401: }
1402:
1403: // extend collections with more values
1404: for (int i = 0; i <= 170; i++) {
1405: collection = new kelondroRowSet(rowdef, 0);
1406: for (int j = 0; j < i; j++) {
1407: collection
1408: .addUnique(rowdef.newEntry(new byte[][] {
1409: ("def" + j).getBytes(),
1410: "xxx".getBytes() }));
1411: }
1412: collectionIndex.merge(new indexContainer("key-" + i,
1413: collection));
1414: }
1415:
1416: // printout of index
1417: collectionIndex.close();
1418: kelondroFlexTable index = new kelondroFlexTable(path,
1419: filenameStub + ".index", preloadTime,
1420: kelondroCollectionIndex.indexRow(9,
1421: kelondroNaturalOrder.naturalOrder), 0, true);
1422: index.print();
1423: index.close();
1424: } catch (IOException e) {
1425: e.printStackTrace();
1426: }
1427:
1428: }
1429: }
|