001: package com.quadcap.sql.file;
002:
003: /* Copyright 1997 - 2003 Quadcap Software. All rights reserved.
004: *
005: * This software is distributed under the Quadcap Free Software License.
006: * This software may be used or modified for any purpose, personal or
007: * commercial. Open Source redistributions are permitted. Commercial
008: * redistribution of larger works derived from, or works which bundle
009: * this software requires a "Commercial Redistribution License"; see
010: * http://www.quadcap.com/purchase.
011: *
012: * Redistributions qualify as "Open Source" under one of the following terms:
013: *
014: * Redistributions are made at no charge beyond the reasonable cost of
015: * materials and delivery.
016: *
017: * Redistributions are accompanied by a copy of the Source Code or by an
018: * irrevocable offer to provide a copy of the Source Code for up to three
019: * years at the cost of materials and delivery. Such redistributions
020: * must allow further use, modification, and redistribution of the Source
021: * Code under substantially the same terms as this license.
022: *
023: * Redistributions of source code must retain the copyright notices as they
024: * appear in each source code file, these license terms, and the
025: * disclaimer/limitation of liability set forth as paragraph 6 below.
026: *
027: * Redistributions in binary form must reproduce this Copyright Notice,
028: * these license terms, and the disclaimer/limitation of liability set
029: * forth as paragraph 6 below, in the documentation and/or other materials
030: * provided with the distribution.
031: *
032: * The Software is provided on an "AS IS" basis. No warranty is
033: * provided that the Software is free of defects, or fit for a
034: * particular purpose.
035: *
036: * Limitation of Liability. Quadcap Software shall not be liable
037: * for any damages suffered by the Licensee or any third party resulting
038: * from use of the Software.
039: */
040:
041: import java.io.InputStream;
042: import java.io.IOException;
043: import java.io.OutputStream;
044:
045: import java.util.Properties;
046:
047: import com.quadcap.util.Debug;
048: import com.quadcap.util.Util;
049:
050: /**
051: * A bounded buffer based on an underlying RandomAccess
052: *
053: * @author Stan Bailes
054: */
055: public class LogBuffer {
056: RandomAccess ra;
057:
058: /**
059: * "Begin" pointer.
060: */
061: int bX;
062:
063: /**
064: * "End" pointer
065: */
066: int eX;
067:
068: /**
069: * "Checkpoint" pointer
070: */
071: int cX;
072:
073: /**
074: * Size Limit, in octets.
075: */
076: int maxSize = 128 * 1024 * 1024;
077:
078: /**
079: * Total header bytes
080: */
081: static final int headerSize = 16;
082:
083: /**
084: * Default constructor (for factory)
085: */
086: public LogBuffer() {
087: }
088:
089: /**
090: * Initialize from existing randomaccess
091: */
092: public void init(RandomAccess ra, Properties props)
093: throws IOException {
094: this .ra = ra;
095: this .bX = ra.readInt(0);
096: this .eX = ra.readInt(4);
097: this .cX = ra.readInt(8);
098: maxSize = Integer.parseInt(props.getProperty("maxLogSize", ""
099: + maxSize));
100: }
101:
102: /**
103: * Create a new one
104: *
105: * @param ra the underlying random access interface to the store
106: * @param m the maximum size of this buffer.
107: */
108: public void init(RandomAccess ra, int m) throws IOException {
109: this .ra = ra;
110: this .bX = 0;
111: this .eX = 0;
112: this .cX = 0;
113: this .maxSize = m;
114: sync();
115: }
116:
117: /**
118: * Return an InputStream which will supply a subset of the buffer starting
119: * at 'pos' and ending at the current buffer end
120: */
121: public InputStream getInputStream(int pos) {
122: return new LogInputStream(this , pos, this .eX);
123: }
124:
125: /**
126: * Return an OutputStream which can be used to write to the end of
127: * the buffer
128: */
129: public OutputStream getOutputStream() {
130: return new LogOutputStream(this );
131: }
132:
133: /**
134: * Return the current end position (the position of the next byte to
135: * be written)
136: */
137: public int getEnd() {
138: return eX;
139: }
140:
141: /**
142: * Return the current begin position (the position of the first byte
143: * written
144: */
145: public int getBegin() {
146: return bX;
147: }
148:
149: /**
150: * Return the value marking the end of the last checkpoint
151: */
152: public int getCheckpoint() {
153: return cX;
154: }
155:
156: /**
157: * Set the checkpoint value
158: */
159: public void checkpoint() {
160: this .cX = this .eX;
161: }
162:
163: /**
164: * Move the begin pointer past some bytes.
165: */
166: public void setBegin(int b) {
167: //#ifdef DEBUG
168: if (Trace.bit(13)) {
169: Debug.println(toString() + ".setBegin(" + b + ")");
170: }
171: //#endif
172: bX = b;
173: }
174:
175: /**
176: * Reset the buffer to its empty state
177: */
178: public void reset() {
179: //#ifdef DEBUG
180: if (Trace.bit(13)) {
181: Debug.println(toString()
182: + ".reset() -----------------------");
183: }
184: //#endif
185: bX = 0;
186: eX = 0;
187: cX = 0;
188: }
189:
190: /**
191: * Truncate the underlying region to the current actual size of the
192: * buffer
193: */
194: public void truncate() throws IOException {
195: ra.resize(size() + headerSize);
196: }
197:
198: /**
199: * Return the size of the active area
200: */
201: public int size() {
202: return eX - bX;
203: }
204:
205: /**
206: * Read a range of bytes from the buffer
207: */
208: public int read(int pos, byte[] buf, int off, int cnt)
209: throws IOException {
210: int ret = 0;
211: if (cnt > 0) {
212: ra.read(pos + headerSize, buf, off, cnt);
213: ret = cnt;
214: }
215: //#ifdef DEBUG
216: if (Trace.bit(13)) {
217: Debug.println(toString() + ".read(" + pos + ", " + cnt
218: + ") = " + ret + ", "
219: + Util.hexBytes(buf, off, ret));
220: }
221: //#endif
222: return ret;
223: }
224:
225: /**
226: * Read a single byte
227: */
228: public int readByte(int pos) throws IOException {
229: int ret = ra.readByte(pos + headerSize);
230: //#ifdef DEBUG
231: if (Trace.bit(13)) {
232: Debug.println(toString() + ".read(" + pos + ") = " + ret);
233: }
234: //#endif
235: return ret;
236: }
237:
238: /**
239: * Write a range of bytes to the end of the buffer, updating 'eX' to
240: * point to the new end of the buffer.
241: */
242: public void write(byte[] buf, int off, int cnt) throws IOException {
243: //#ifdef DEBUG
244: if (Trace.bit(13)) {
245: Debug.println(toString() + ".write("
246: + Util.hexBytes(buf, off, cnt) + ")");
247: }
248: //#endif
249: if (eX + cnt > maxSize) {
250: //#ifdef DEBUG
251: Debug.println("eX: " + eX + ", write(" + off + ": "
252: + Util.strBytes(buf, off, cnt));
253: //#endif
254: throw new IOException("LogBuffer full: " + eX + " + " + cnt
255: + " bytes");
256: }
257: if (cnt > 0) {
258: ra.write(eX + headerSize, buf, off, cnt);
259: eX += cnt;
260: }
261: }
262:
263: /**
264: * Write a single byte to the buffer. Do not modify eX.
265: */
266: public void writeByte(int pos, int b) throws IOException {
267: //#ifdef DEBUG
268: if (Trace.bit(13)) {
269: Debug.println(toString() + ".writeByte(" + pos + ", " + b
270: + ")");
271: }
272: //#endif
273: ra.writeByte(pos + headerSize, b);
274: }
275:
276: /**
277: * Flush any changes to disk.
278: */
279: public void sync() throws IOException {
280: //#ifdef DEBUG
281: if (Trace.bit(13) || Trace.bit(24)) {
282: Debug.println(toString() + ".sync()");
283: }
284: //#endif
285: ra.writeInt(0, bX);
286: ra.writeInt(4, eX);
287: ra.writeInt(8, cX);
288: ra.writeInt(12, maxSize);
289: ra.flush();
290: }
291:
292: /**
293: * Close the file
294: */
295: public void close() throws IOException {
296: //#ifdef DEBUG
297: if (Trace.bit(13)) {
298: Debug.println(toString() + ".close()");
299: }
300: //#endif
301: sync();
302: ra.close();
303: }
304:
305: /**
306: * Perform buffer-wrapping address incrementing math
307: */
308: public int addPos(int pos, int amt) {
309: int ret = pos + amt;
310: return ret;
311: }
312:
313: /**
314: * Inner input stream
315: */
316: class LogInputStream extends InputStream {
317: LogBuffer b;
318: int pos;
319: int lim;
320:
321: LogInputStream(LogBuffer b, int pos, int lim) {
322: //#ifdef DEBUG
323: if (Trace.bit(13)) {
324: Debug.println("CIS(" + pos + "-" + lim + ")");
325: }
326: //#endif
327: this .b = b;
328: this .pos = pos;
329: this .lim = lim;
330: }
331:
332: public int read(byte[] buf, int off, int cnt)
333: throws IOException {
334: //#ifdef DEBUG
335: int xpos = pos;
336: //#endif
337: int xpos2 = pos + cnt;
338: if (xpos2 > lim) {
339: cnt -= (xpos2 - lim);
340: }
341: int ret = -1;
342: if (cnt > 0) {
343: ret = b.read(pos, buf, off, cnt);
344: if (ret > 0) {
345: pos = b.addPos(pos, ret);
346: } else {
347: ret = -1;
348: }
349: }
350: //#ifdef DEBUG
351: if (Trace.bit(14)) {
352: Debug.println("CIS[" + xpos + "-" + lim + "].read() = "
353: + ret + ": " + Util.hexBytes(buf, off, ret));
354: }
355: //#endif
356: return ret;
357: }
358:
359: public int read() throws IOException {
360: //#ifdef DEBUG
361: int xpos = pos;
362: //#endif
363: int ret = b.readByte(pos);
364: if (ret >= 0) {
365: pos = b.addPos(pos, 1);
366: }
367: //#ifdef DEBUG
368: if (Trace.bit(14)) {
369: Debug.println("CIS[" + xpos + "].read() = " + ret);
370: }
371: //#endif
372: return ret;
373: }
374: }
375:
376: /**
377: * Inner output stream
378: */
379: class LogOutputStream extends OutputStream {
380: LogBuffer b;
381:
382: LogOutputStream(LogBuffer b) {
383: this .b = b;
384: }
385:
386: public void write(byte[] buf, int off, int cnt)
387: throws IOException {
388: //#ifdef DEBUG
389: if (Trace.bit(14)) {
390: Debug.println("COS[" + b.eX + "].write("
391: + Util.hexBytes(buf, off, cnt) + ")");
392: }
393: //#endif
394: b.write(buf, off, cnt);
395: }
396:
397: public void write(int x) throws IOException {
398: //#ifdef DEBUG
399: if (Trace.bit(14)) {
400: Debug.println("COS[" + b.eX + "].write(" + x + ")");
401: }
402: //#endif
403: if (false) {
404: b.writeByte(b.eX, x);
405: b.eX = addPos(b.eX, 1);
406: } else {
407: byte[] fu = new byte[1];
408: fu[0] = (byte) x;
409: write(fu, 0, 1);
410: }
411: }
412: }
413:
414: //#ifdef DEBUG
415:
416: public String toString() {
417: return "LogBuffer(" + maxSize + "," + bX + "," + eX + "," + cX
418: + ")";
419: }
420:
421: public static void main(String[] args) {
422: try {
423: test1();
424: } catch (Throwable t) {
425: Debug.print(t);
426: }
427: }
428:
429: static void test1() throws Exception {
430: int size = 100000;
431: ByteArrayRandomAccess bra = new ByteArrayRandomAccess(size + 12);
432: BufferedRandomAccess bbra = new BufferedRandomAccess(bra);
433: LogBuffer cb = new LogBuffer();
434: cb.init(bbra, size);
435:
436: // for (int i = 0; i < size; i++) {
437: // cb.writeByte(i, (byte)(i & 0xff));
438: // }
439: // for (int i = 0; i < size; i++) {
440: // int ret = cb.readByte(i);
441: // if (ret != (i & 0xff)) {
442: // throw new Exception("LogBuffer.test1 " + i +
443: // " (" + (i&0xff) + ")" +
444: // ", got " + ret);
445: // }
446: // }
447:
448: int xs = 12;
449: int[] poss = new int[xs];
450: int[] sizz = new int[xs];
451: int beg = cb.getBegin();
452: byte[] buf = new byte[4096];
453: byte[] rbuf = new byte[4096];
454: OutputStream os = cb.getOutputStream();
455: int wsize = 1;
456: for (int i = 0; i < xs; i++) {
457: poss[i] = cb.getEnd();
458: sizz[i] = wsize + 1;
459: wsize += wsize;
460: makeBuf(buf, i, sizz[i]);
461: os.write(buf, 0, sizz[i]);
462: for (int j = 0; j < i; j++) {
463: makeBuf(buf, j, sizz[j]);
464: InputStream is = cb.getInputStream(poss[j]);
465: is.read(rbuf, 0, sizz[i]);
466: if (Util
467: .compareBytes(buf, 0, sizz[j], rbuf, 0, sizz[j]) != 0) {
468: throw new Exception("R " + i + ": "
469: + Util.hexBytes(buf, 0, sizz[i]) + " vs "
470: + j + ": "
471: + Util.hexBytes(rbuf, 0, sizz[i]));
472: }
473: }
474: }
475:
476: }
477:
478: static void makeBuf(byte[] buf, int x, int siz) {
479: x *= 13;
480: for (int i = 0; i < siz; i++) {
481: buf[i] = (byte) (x++);
482: }
483: }
484:
485: //#endif
486:
487: }
|