001: /*
002: * Copyright 2004-2006 the original author or authors.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.compass.needle.coherence;
018:
019: import java.io.IOException;
020:
021: import org.apache.lucene.store.IndexOutput;
022:
023: /**
024: * Only use buckets to write the file to the index. Writes a header file and one
025: * or more buckets that hold the file content.
026: *
027: * <p>Allows only seeking to the first bucket, which is what Lucene does with typical
028: * bucket size.
029: *
030: * @author kimchy
031: */
032: class CoherenceMemIndexOutput extends IndexOutput {
033:
034: private CoherenceDirectory dir;
035:
036: private String fileName;
037:
038: private FileBucketKey firstBucketKey;
039:
040: private FileBucketValue firstBucketValue;
041:
042: private byte[] buffer;
043:
044: private int bufferPosition;
045:
046: private int currentBucketIndex;
047:
048: private long length;
049:
050: private long position;
051:
052: private boolean open;
053:
054: // seek occured, we only allow to work on the first bucket
055: private boolean seekOccured;
056:
057: public CoherenceMemIndexOutput(CoherenceDirectory dir,
058: String fileName) throws IOException {
059: this .dir = dir;
060: this .fileName = fileName;
061: open = true;
062: buffer = new byte[dir.getBucketSize()];
063: // this file is overridden by Lucene, so delete it first
064: if (fileName.equals("segments.gen")) {
065: dir.deleteFile(fileName);
066: }
067: }
068:
069: public void writeByte(byte b) throws IOException {
070: if (bufferPosition == dir.getBucketSize()) {
071: if (seekOccured) {
072: throw new CoherenceDirectoryException(dir
073: .getIndexName(), fileName,
074: "Seek occured and overflowed first bucket");
075: }
076: flushBucket();
077: }
078: buffer[bufferPosition++] = b;
079: if (!seekOccured) {
080: length++;
081: position++;
082: }
083: }
084:
085: public void writeBytes(byte[] b, int offset, int len)
086: throws IOException {
087: if (!seekOccured) {
088: position += len;
089: length += len;
090: }
091: while (len > 0) {
092: if (bufferPosition == dir.getBucketSize()) {
093: if (seekOccured) {
094: throw new CoherenceDirectoryException(dir
095: .getIndexName(), fileName,
096: "Seek occured and overflowed first bucket");
097: }
098: flushBucket();
099: }
100:
101: int remainInBuffer = dir.getBucketSize() - bufferPosition;
102: int bytesToCopy = len < remainInBuffer ? len
103: : remainInBuffer;
104: System.arraycopy(b, offset, buffer, bufferPosition,
105: bytesToCopy);
106: offset += bytesToCopy;
107: len -= bytesToCopy;
108: bufferPosition += bytesToCopy;
109: }
110: }
111:
112: public void flush() throws IOException {
113: // do nothing here
114: }
115:
116: public void close() throws IOException {
117: if (!open) {
118: return;
119: }
120: open = false;
121: // flush any bucket we might have
122: flushBucket();
123: try {
124: dir.getCache().put(firstBucketKey, firstBucketValue);
125: } catch (Exception e) {
126: throw new CoherenceDirectoryException(dir.getIndexName(),
127: fileName, "Failed to write bucket [0]", e);
128: }
129: try {
130: dir.getCache().put(
131: new FileHeaderKey(dir.getIndexName(), fileName),
132: new FileHeaderValue(System.currentTimeMillis(),
133: length));
134: } catch (Exception e) {
135: throw new CoherenceDirectoryException(dir.getIndexName(),
136: fileName, "Failed to write file entry", e);
137: }
138: buffer = null;
139: firstBucketKey = null;
140: firstBucketValue = null;
141: }
142:
143: public long getFilePointer() {
144: return this .position;
145: }
146:
147: public void seek(long pos) throws IOException {
148: if (pos >= dir.getBucketSize()) {
149: throw new CoherenceDirectoryException(dir.getIndexName(),
150: fileName,
151: "seek called outside of first bucket boundries");
152: }
153: // create the first bucket if still not created
154: if (firstBucketKey == null) {
155: firstBucketKey = new FileBucketKey(dir.getIndexName(),
156: fileName, 0);
157: firstBucketValue = new FileBucketValue(
158: new byte[bufferPosition]);
159: System.arraycopy(buffer, 0, firstBucketValue.getData(), 0,
160: bufferPosition);
161: } else {
162: // flush the current buffer. We only seek into the first bucket
163: // so no need to keep it around
164: if (!seekOccured) {
165: flushBucket(currentBucketIndex, buffer, bufferPosition);
166: }
167: }
168: position = pos;
169: currentBucketIndex = 0;
170: bufferPosition = (int) pos;
171: buffer = firstBucketValue.getData();
172: seekOccured = true;
173: }
174:
175: public long length() throws IOException {
176: return length;
177: }
178:
179: private void flushBucket() throws IOException {
180: if (currentBucketIndex == 0) {
181: if (firstBucketKey == null) {
182: firstBucketKey = new FileBucketKey(dir.getIndexName(),
183: fileName, 0);
184: firstBucketValue = new FileBucketValue(
185: new byte[bufferPosition]);
186: System.arraycopy(buffer, 0, firstBucketValue.getData(),
187: 0, bufferPosition);
188: } else {
189: // do nothing, we are writing directly into the first buffer
190: }
191: } else {
192: if (bufferPosition > 0) {
193: flushBucket(currentBucketIndex, buffer, bufferPosition);
194: }
195: }
196: currentBucketIndex++;
197: bufferPosition = 0;
198: }
199:
200: private void flushBucket(long bucketIndex, byte[] buffer, int length)
201: throws IOException {
202: FileBucketKey fileBucketKey = new FileBucketKey(dir
203: .getIndexName(), fileName, bucketIndex);
204: FileBucketValue fileBucketValue = new FileBucketValue(
205: new byte[length]);
206: System.arraycopy(buffer, 0, fileBucketValue.getData(), 0,
207: length);
208: try {
209: dir.getCache().put(fileBucketKey, fileBucketValue);
210: } catch (Exception e) {
211: throw new CoherenceDirectoryException(dir.getIndexName(),
212: fileName, "Failed to write bucket [" + bucketIndex
213: + "]", e);
214: }
215: }
216: }
|