001: /*
002:
003: Derby - Class org.apache.derby.impl.store.raw.log.FlushedScan
004:
005: Licensed to the Apache Software Foundation (ASF) under one or more
006: contributor license agreements. See the NOTICE file distributed with
007: this work for additional information regarding copyright ownership.
008: The ASF licenses this file to you under the Apache License, Version 2.0
009: (the "License"); you may not use this file except in compliance with
010: the License. You may obtain a copy of the License at
011:
012: http://www.apache.org/licenses/LICENSE-2.0
013:
014: Unless required by applicable law or agreed to in writing, software
015: distributed under the License is distributed on an "AS IS" BASIS,
016: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017: See the License for the specific language governing permissions and
018: limitations under the License.
019:
020: */
021:
022: package org.apache.derby.impl.store.raw.log;
023:
024: import org.apache.derby.iapi.reference.SQLState;
025: import org.apache.derby.iapi.reference.MessageId;
026:
027: import org.apache.derby.impl.store.raw.log.LogCounter;
028: import org.apache.derby.impl.store.raw.log.LogRecord;
029: import org.apache.derby.impl.store.raw.log.StreamLogScan;
030:
031: import org.apache.derby.iapi.services.sanity.SanityManager;
032: import org.apache.derby.iapi.error.StandardException;
033: import org.apache.derby.iapi.services.i18n.MessageService;
034: import org.apache.derby.iapi.store.raw.log.LogInstant;
035: import org.apache.derby.iapi.store.raw.log.LogFactory;
036: import org.apache.derby.iapi.store.raw.xact.TransactionId;
037: import org.apache.derby.iapi.services.io.ArrayInputStream;
038:
039: import org.apache.derby.io.StorageRandomAccessFile;
040:
041: import java.io.IOException;
042:
043: /**
044:
045: Scan the the log which is implemented by a series of log files.n
046: This log scan knows how to move across log file if it is positioned at
047: the boundary of a log file and needs to getNextRecord.
048:
049: <PRE>
050: 4 bytes - length of user data, i.e. N
051: 8 bytes - long representing log instant
052: N bytes of supplied data
053: 4 bytes - length of user data, i.e. N
054: </PRE>
055:
056: */
057: public class FlushedScan implements StreamLogScan {
058:
059: private StorageRandomAccessFile scan; // an output stream to the log file
060: LogToFile logFactory; // log factory knows how to to skip
061: // from log file to log file
062:
063: boolean open; // true if the scan is open
064:
065: long currentLogFileNumber; // the log file the scan is currently on
066:
067: long currentLogFileFirstUnflushedPosition;
068: // The length of the unflushed portion
069: // of the current log file. This is the
070: // length of the file for all but the
071: // last log file.
072:
073: long currentInstant; // the log instant the scan is
074: // currently on - only valid after a
075: // successful getNextRecord
076:
077: long firstUnflushed = -1; // scan until we reach the first
078: // unflushed byte in the log.
079: long firstUnflushedFileNumber;
080: long firstUnflushedFilePosition;
081:
082: //RESOLVE: This belongs in a shared place.
083: static final int LOG_REC_LEN_BYTE_LENGTH = 4;
084:
085: public FlushedScan(LogToFile logFactory, long startAt)
086: throws StandardException {
087: if (SanityManager.DEBUG) {
088: SanityManager.ASSERT(
089: startAt != LogCounter.INVALID_LOG_INSTANT,
090: "cannot start scan on an invalid log instant");
091: }
092:
093: try {
094: currentLogFileNumber = LogCounter.getLogFileNumber(startAt);
095: this .logFactory = logFactory;
096: scan = logFactory.getLogFileAtPosition(startAt);
097: setFirstUnflushed();
098: open = true;
099: currentInstant = LogCounter.INVALID_LOG_INSTANT; // set at getNextRecord
100: }
101:
102: catch (IOException ioe) {
103: throw logFactory.markCorrupt(StandardException
104: .newException(SQLState.LOG_IO_ERROR, ioe));
105: }
106: }
107:
108: /*
109: ** Methods of LogScan
110: */
111:
112: /**
113: Read a log record into the byte array provided. Resize the input
114: stream byte array if necessary.
115:
116: @return the length of the data written into data, or -1 if the end of the
117: scan has been reached.
118:
119: @exception StandardException Standard Cloudscape error policy
120: */
121: public LogRecord getNextRecord(ArrayInputStream input,
122: TransactionId tranId, int groupmask)
123: throws StandardException {
124: try {
125: boolean candidate;
126: int peekAmount = LogRecord.formatOverhead()
127: + LogRecord.maxGroupStoredSize();
128: if (tranId != null)
129: peekAmount += LogRecord
130: .maxTransactionIdStoredSize(tranId);
131: int readAmount; // the number of bytes actually read
132:
133: LogRecord lr;
134:
135: do {
136: if (!open || !positionToNextRecord())
137: return null;
138:
139: int checkLength;
140:
141: // this log record is a candidate unless proven otherwise
142: lr = null;
143: candidate = true;
144: readAmount = -1;
145:
146: currentInstant = scan.readLong();
147: byte[] data = input.getData();
148: if (data.length < nextRecordLength) {
149: // make a new array of sufficient size and reset the arrary
150: // in the input stream
151: data = new byte[nextRecordLength];
152: input.setData(data);
153: }
154:
155: if (logFactory.databaseEncrypted()) {
156: scan.readFully(data, 0, nextRecordLength);
157: int len = logFactory.decrypt(data, 0,
158: nextRecordLength, data, 0);
159: if (SanityManager.DEBUG)
160: SanityManager.ASSERT(len == nextRecordLength);
161: input.setLimit(0, len);
162:
163: } else // no need to decrypt, only get the group and tid if we filter
164: {
165: if (groupmask == 0 && tranId == null) {
166: // no filter, get the whole thing
167: scan.readFully(data, 0, nextRecordLength);
168: input.setLimit(0, nextRecordLength);
169: } else {
170: // Read only enough so that group and the tran id is in
171: // the data buffer. Group is stored as compressed int
172: // and tran id is stored as who knows what. read min
173: // of peekAmount or nextRecordLength
174: readAmount = (nextRecordLength > peekAmount) ? peekAmount
175: : nextRecordLength;
176:
177: // in the data buffer, we now have enough to peek
178: scan.readFully(data, 0, readAmount);
179: input.setLimit(0, readAmount);
180:
181: }
182: }
183:
184: lr = (LogRecord) input.readObject();
185:
186: if (groupmask != 0 || tranId != null) {
187: if (groupmask != 0 && (groupmask & lr.group()) == 0)
188: candidate = false; // no match, throw this log record out
189:
190: if (candidate && tranId != null) {
191: TransactionId tid = lr.getTransactionId();
192: if (!tid.equals(tranId)) // nomatch
193: candidate = false; // throw this log record out
194: }
195:
196: // if this log record is not filtered out, we need to read
197: // in the rest of the log record to the input buffer.
198: // Except if it is an encrypted database, in which case the
199: // entire log record have already be read in for
200: // decryption.
201:
202: if (candidate && !logFactory.databaseEncrypted()) {
203: // read the rest of the log into the buffer
204: if (SanityManager.DEBUG)
205: SanityManager.ASSERT(readAmount > 0);
206:
207: if (readAmount < nextRecordLength) {
208: // Need to remember where we are because the log
209: // record may have read part of it off the input
210: // stream already and that position is lost when we
211: // set limit again.
212: int inputPosition = input.getPosition();
213:
214: scan.readFully(data, readAmount,
215: nextRecordLength - readAmount);
216:
217: input.setLimit(0, nextRecordLength);
218: input.setPosition(inputPosition);
219: }
220: }
221: }
222:
223: if (candidate || logFactory.databaseEncrypted()) {
224: checkLength = scan.readInt();
225:
226: if (SanityManager.DEBUG) {
227: SanityManager.ASSERT(
228: checkLength == nextRecordLength,
229: "log currupted");
230: }
231: } else // chances are, we haven't read all of the log record, skip it
232: {
233: // the starting record position is in the currentInstant,
234: // calculate the next record starting position using that
235: // and the nextRecordLength
236: long nextRecordStartPosition = LogCounter
237: .getLogFilePosition(currentInstant)
238: + nextRecordLength
239: + LogToFile.LOG_RECORD_OVERHEAD;
240:
241: scan.seek(nextRecordStartPosition);
242: }
243:
244: } while (candidate == false);
245:
246: return lr;
247: } catch (ClassNotFoundException cnfe) {
248: throw logFactory.markCorrupt(StandardException
249: .newException(SQLState.LOG_CORRUPTED, cnfe));
250: } catch (IOException ioe) {
251: throw logFactory.markCorrupt(StandardException
252: .newException(SQLState.LOG_IO_ERROR, ioe));
253: }
254: }
255:
256: /**
257: Reset the scan to the given LogInstant.
258:
259: @param instant the position to reset to
260: @exception IOException scan cannot access the log at the new position.
261: */
262: public void resetPosition(LogInstant instant) throws IOException {
263: if (SanityManager.DEBUG) {
264: SanityManager.THROWASSERT("Unsupported feature");
265: }
266: }
267:
268: /**
269: Get the log instant that is right after the record just retrived
270: @return INVALID_LOG_INSTANT if this is not a FORWARD scan or, no
271: record have been returned yet or the scan has completed.
272: */
273: public long getLogRecordEnd() {
274: if (SanityManager.DEBUG) {
275: SanityManager.THROWASSERT("Unsupported feature");
276: }
277: return LogCounter.INVALID_LOG_INSTANT;
278: }
279:
280: /**
281: returns true if there is partially writen log records before the crash
282: in the last log file. Partiall wrires are identified during forward
283: scans for log recovery.
284: */
285: public boolean isLogEndFuzzy() {
286: if (SanityManager.DEBUG) {
287: SanityManager.THROWASSERT("Unsupported feature");
288: }
289: return false;
290: }
291:
292: /**
293: Return the log instant (as an integer) the scan is currently on - this is the log
294: instant of the log record that was returned by getNextRecord.
295: */
296: public long getInstant() {
297: return currentInstant;
298: }
299:
300: /**
301: Return the log instant the scan is currently on - this is the log
302: instant of the log record that was returned by getNextRecord.
303: */
304: public LogInstant getLogInstant() {
305: if (currentInstant == LogCounter.INVALID_LOG_INSTANT)
306: return null;
307: else
308: return new LogCounter(currentInstant);
309: }
310:
311: /**
312: Close the scan.
313: */
314: public void close() {
315: if (scan != null) {
316: try {
317: scan.close();
318: } catch (IOException ioe) {
319: }
320:
321: scan = null;
322: }
323: currentInstant = LogCounter.INVALID_LOG_INSTANT;
324: open = false;
325: }
326:
327: /*
328: Private methods.
329: */
330: private void setFirstUnflushed() throws StandardException,
331: IOException {
332: LogInstant firstUnflushedInstant = logFactory
333: .getFirstUnflushedInstant();
334: firstUnflushed = ((LogCounter) firstUnflushedInstant)
335: .getValueAsLong();
336: firstUnflushedFileNumber = LogCounter
337: .getLogFileNumber(firstUnflushed);
338: firstUnflushedFilePosition = LogCounter
339: .getLogFilePosition(firstUnflushed);
340:
341: setCurrentLogFileFirstUnflushedPosition();
342: }
343:
344: private void setCurrentLogFileFirstUnflushedPosition()
345: throws IOException {
346: /*
347: Note we get the currentLogFileLength without synchronization.
348: This is safe because one of the following cases apply:
349:
350: <OL>
351: <LI> The end of the flushed section of the log is in another file.
352: In this case the end of the current file will not change.
353: <LI> The end of the log is in this file. In this case we
354: end our scan at the firstUnflushedInstant and do not use
355: currentLogFileLength.
356: </OL>
357: */
358: if (currentLogFileNumber == firstUnflushedFileNumber)
359: currentLogFileFirstUnflushedPosition = firstUnflushedFilePosition;
360: else if (currentLogFileNumber < firstUnflushedFileNumber)
361: currentLogFileFirstUnflushedPosition = scan.length();
362: else {
363: // RESOLVE
364: throw new IOException(MessageService
365: .getTextMessage(MessageId.LOG_BAD_START_INSTANT));
366: }
367: }
368:
369: private void switchLogFile() throws StandardException {
370: try {
371: readNextRecordLength = false;
372: scan.close();
373: scan = null;
374: scan = logFactory
375: .getLogFileAtBeginning(++currentLogFileNumber);
376: setCurrentLogFileFirstUnflushedPosition();
377: }
378:
379: catch (IOException ioe) {
380: throw logFactory.markCorrupt(StandardException
381: .newException(SQLState.LOG_IO_ERROR, ioe));
382: }
383: }
384:
385: /**
386: The length of the next record. Read from scan and set by
387: currentLogFileHasUnflushedRecord. This is used to retain the length of a
388: log record in the case currentLogFileHasUnflushedRecord reads the length
389: and determines that some bytes in the log record are not yet flushed.
390: */
391: int nextRecordLength;
392:
393: /**
394: Flag to indicate that the length of the next log record has been read by
395: currentLogFileHasUnflushedRecord.
396:
397: This flag gets reset in two ways:
398:
399: <OL>
400: <LI> currentLogFileHasUnflushedRecord determines that the entire log
401: record is flushed and returns true. In this case getNextRecord reads and
402: returns the log record.
403: <LI> we switch log files --due to a partial log record at the end of an
404: old log file.
405: </OL>
406: */
407: boolean readNextRecordLength;
408:
409: private boolean currentLogFileHasUnflushedRecord()
410: throws IOException {
411: if (SanityManager.DEBUG)
412: SanityManager.ASSERT(scan != null, "scan is null");
413: long curPos = scan.getFilePointer();
414:
415: if (!readNextRecordLength) {
416: if (curPos + LOG_REC_LEN_BYTE_LENGTH > currentLogFileFirstUnflushedPosition)
417: return false;
418:
419: nextRecordLength = scan.readInt();
420: curPos += 4;
421: readNextRecordLength = true;
422: }
423:
424: if (nextRecordLength == 0)
425: return false;
426:
427: int bytesNeeded = nextRecordLength + LOG_REC_LEN_BYTE_LENGTH;
428:
429: if (curPos + bytesNeeded > currentLogFileFirstUnflushedPosition) {
430: return false;
431: } else {
432: readNextRecordLength = false;
433: return true;
434: }
435: }
436:
437: private boolean positionToNextRecord() throws StandardException,
438: IOException {
439: //If the flushed section of the current log file contains our record we
440: //simply return.
441: if (currentLogFileHasUnflushedRecord())
442: return true;
443:
444: //Update our cached copy of the first unflushed instant.
445: setFirstUnflushed();
446:
447: //In the call to setFirstUnflushed, we may have noticed that the current
448: //log file really does contain our record. If so we simply return.
449: if (currentLogFileHasUnflushedRecord())
450: return true;
451:
452: //Our final chance of finding a record is if we are not scanning the log
453: //file with the last flushed instant we can switch logfiles. Note that
454: //we do this in a loop to cope with empty log files.
455: while (currentLogFileNumber < firstUnflushedFileNumber) {
456: switchLogFile();
457: if (currentLogFileHasUnflushedRecord())
458: return true;
459: }
460:
461: //The log contains no more flushed log records so we return false.
462: currentInstant = LogCounter.INVALID_LOG_INSTANT;
463: return false;
464: }
465: }
|