001: /*
002: * Copyright 2004-2006 the original author or authors.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.apache.lucene.store.jdbc.index;
018:
019: import java.io.IOException;
020: import java.sql.Blob;
021: import java.sql.Connection;
022: import java.sql.PreparedStatement;
023: import java.sql.ResultSet;
024: import java.util.HashMap;
025:
026: import org.apache.lucene.store.jdbc.JdbcDirectory;
027: import org.apache.lucene.store.jdbc.JdbcFileEntrySettings;
028: import org.apache.lucene.store.jdbc.JdbcStoreException;
029: import org.apache.lucene.store.jdbc.datasource.DataSourceUtils;
030: import org.apache.lucene.store.jdbc.support.JdbcTable;
031:
032: /**
033: * Caches blobs per transaction. Only supported for dialects that supports blobs per transaction (see
034: * {@link org.apache.lucene.store.jdbc.dialect.Dialect#supportTransactionalScopedBlobs()}.
035: * <p/>
036: * Note, using this index input requires calling the {@link #releaseBlobs(java.sql.Connection)} when the transaction
037: * ends. It is automatically taken care of if using {@link org.apache.lucene.store.jdbc.datasource.TransactionAwareDataSourceProxy}.
038: * If using JTA for example, a transcation synchronization should be registered with JTA to clear the blobs.
039: *
040: * @author kimchy
041: */
042: public class FetchPerTransactionJdbcIndexInput extends
043: JdbcBufferedIndexInput {
044:
045: private static final Object blobHolderLock = new Object();
046:
047: private static final ThreadLocal blobHolder = new ThreadLocal();
048:
049: public static void releaseBlobs(Connection connection) {
050: synchronized (blobHolderLock) {
051: Connection targetConnection = DataSourceUtils
052: .getTargetConnection(connection);
053: HashMap holdersPerConn = (HashMap) blobHolder.get();
054: if (holdersPerConn == null) {
055: return;
056: }
057: holdersPerConn.remove(targetConnection);
058: holdersPerConn.remove(new Integer(System
059: .identityHashCode(targetConnection)));
060: if (holdersPerConn.isEmpty()) {
061: blobHolder.set(null);
062: }
063: }
064: }
065:
066: public static void releaseBlobs(Connection connection,
067: JdbcTable table, String name) {
068: synchronized (blobHolderLock) {
069: Connection targetConnection = DataSourceUtils
070: .getTargetConnection(connection);
071: HashMap holdersPerConn = (HashMap) blobHolder.get();
072: if (holdersPerConn == null) {
073: return;
074: }
075: HashMap holdersPerName = (HashMap) holdersPerConn
076: .get(targetConnection);
077: if (holdersPerName != null) {
078: holdersPerName.remove(name);
079: }
080: holdersPerName = (HashMap) holdersPerConn.get(new Integer(
081: System.identityHashCode(targetConnection)));
082: if (holdersPerName != null) {
083: holdersPerName.remove(table.getName() + name);
084: }
085: }
086: }
087:
088: private static Blob getBoundBlob(Connection connection,
089: JdbcTable table, String name) {
090: synchronized (blobHolderLock) {
091: Connection targetConnection = DataSourceUtils
092: .getTargetConnection(connection);
093: HashMap holdersPerConn = (HashMap) blobHolder.get();
094: if (holdersPerConn == null) {
095: return null;
096: }
097: HashMap holdersPerName = (HashMap) holdersPerConn
098: .get(targetConnection);
099: if (holdersPerName == null) {
100: holdersPerName = (HashMap) holdersPerConn
101: .get(new Integer(System
102: .identityHashCode(targetConnection)));
103: if (holdersPerName == null) {
104: return null;
105: }
106: }
107: Blob blob = (Blob) holdersPerName.get(table.getName()
108: + name);
109: if (blob != null) {
110: return blob;
111: }
112: return null;
113: }
114: }
115:
116: private static void bindBlob(Connection connection,
117: JdbcTable table, String name, Blob blob) {
118: synchronized (blobHolderLock) {
119: Connection targetConnection = DataSourceUtils
120: .getTargetConnection(connection);
121: HashMap holdersPerCon = (HashMap) blobHolder.get();
122: if (holdersPerCon == null) {
123: holdersPerCon = new HashMap();
124: blobHolder.set(holdersPerCon);
125: }
126: HashMap holdersPerName = (HashMap) holdersPerCon
127: .get(targetConnection);
128: if (holdersPerName == null) {
129: holdersPerName = (HashMap) holdersPerCon
130: .get(new Integer(System
131: .identityHashCode(targetConnection)));
132: if (holdersPerName == null) {
133: holdersPerName = new HashMap();
134: holdersPerCon.put(targetConnection, holdersPerName);
135: holdersPerCon.put(new Integer(System
136: .identityHashCode(targetConnection)),
137: holdersPerName);
138: }
139: }
140:
141: holdersPerName.put(table.getName() + name, blob);
142: }
143: }
144:
145: private String name;
146:
147: // lazy intialize the length
148: private long totalLength = -1;
149:
150: private long position = 1;
151:
152: private JdbcDirectory jdbcDirectory;
153:
154: public void configure(String name, JdbcDirectory jdbcDirectory,
155: JdbcFileEntrySettings settings) throws IOException {
156: super .configure(name, jdbcDirectory, settings);
157: this .jdbcDirectory = jdbcDirectory;
158: this .name = name;
159: }
160:
161: // Overriding refill here since we can execute a single query to get both the length and the buffer data
162: // resulted in not the nicest OO design, where the buffer information is protected in the JdbcBufferedIndexInput class
163: // and code duplication between this method and JdbcBufferedIndexInput.
164: // Performance is much better this way!
165: protected void refill() throws IOException {
166: Connection conn = DataSourceUtils.getConnection(jdbcDirectory
167: .getDataSource());
168: PreparedStatement ps = null;
169: ResultSet rs = null;
170: try {
171: Blob blob = getBoundBlob(conn, jdbcDirectory.getTable(),
172: name);
173: if (blob == null) {
174: ps = conn.prepareStatement(jdbcDirectory.getTable()
175: .sqlSelectSizeValueByName());
176: ps.setFetchSize(1);
177: ps.setString(1, name);
178:
179: rs = ps.executeQuery();
180:
181: // START read blob and update length if required
182: if (!rs.next()) {
183: throw new JdbcStoreException("No entry for ["
184: + name + "] table "
185: + jdbcDirectory.getTable());
186: }
187: synchronized (this ) {
188: if (totalLength == -1) {
189: totalLength = rs.getLong(3);
190: }
191: }
192: // END read blob and update length if required
193:
194: blob = rs.getBlob(2);
195: bindBlob(conn, jdbcDirectory.getTable(), name, blob);
196: } else {
197: }
198:
199: long start = bufferStart + bufferPosition;
200: long end = start + bufferSize;
201: if (end > length()) // don't read past EOF
202: end = length();
203: bufferLength = (int) (end - start);
204: if (bufferLength <= 0)
205: throw new IOException("read past EOF");
206:
207: if (buffer == null) {
208: buffer = new byte[bufferSize]; // allocate buffer lazily
209: seekInternal(bufferStart);
210: }
211: // readInternal(buffer, 0, bufferLength);
212: readInternal(blob, buffer, 0, bufferLength);
213:
214: bufferStart = start;
215: bufferPosition = 0;
216: } catch (Exception e) {
217: throw new JdbcStoreException(
218: "Failed to read transactional blob [" + name + "]",
219: e);
220: } finally {
221: DataSourceUtils.closeResultSet(rs);
222: DataSourceUtils.closeStatement(ps);
223: DataSourceUtils.releaseConnection(conn);
224: }
225: }
226:
227: protected synchronized void readInternal(final byte[] b,
228: final int offset, final int length) throws IOException {
229: Connection conn = DataSourceUtils.getConnection(jdbcDirectory
230: .getDataSource());
231: PreparedStatement ps = null;
232: ResultSet rs = null;
233: try {
234: Blob blob = getBoundBlob(conn, jdbcDirectory.getTable(),
235: name);
236: if (blob == null) {
237: ps = conn.prepareStatement(jdbcDirectory.getTable()
238: .sqlSelectSizeValueByName());
239: ps.setFetchSize(1);
240: ps.setString(1, name);
241:
242: rs = ps.executeQuery();
243:
244: if (!rs.next()) {
245: throw new JdbcStoreException("No entry for ["
246: + name + "] table "
247: + jdbcDirectory.getTable());
248: }
249:
250: blob = rs.getBlob(2);
251: bindBlob(conn, jdbcDirectory.getTable(), name, blob);
252:
253: synchronized (this ) {
254: if (this .totalLength == -1) {
255: this .totalLength = rs.getLong(3);
256: }
257: }
258: }
259: readInternal(blob, b, offset, length);
260: } catch (Exception e) {
261: throw new JdbcStoreException(
262: "Failed to read transactional blob [" + name + "]",
263: e);
264: } finally {
265: DataSourceUtils.closeResultSet(rs);
266: DataSourceUtils.closeStatement(ps);
267: DataSourceUtils.releaseConnection(conn);
268: }
269: }
270:
271: /**
272: * A helper methods that already reads an open blob
273: */
274: private synchronized void readInternal(Blob blob, final byte[] b,
275: final int offset, final int length) throws Exception {
276: long curPos = getFilePointer();
277: if (curPos + 1 != position) {
278: position = curPos + 1;
279: }
280: if (position + length > length() + 1) {
281: System.err.println("BAD");
282: }
283: byte[] bytesRead = blob.getBytes(position, length);
284: if (bytesRead.length != length) {
285: throw new IOException("read past EOF");
286: }
287: System.arraycopy(bytesRead, 0, b, offset, length);
288: position += bytesRead.length;
289: }
290:
291: protected void seekInternal(long pos) throws IOException {
292: this .position = pos + 1;
293: }
294:
295: public void close() throws IOException {
296: Connection conn = DataSourceUtils.getConnection(jdbcDirectory
297: .getDataSource());
298: try {
299: releaseBlobs(conn, jdbcDirectory.getTable(), name);
300: } finally {
301: DataSourceUtils.releaseConnection(conn);
302: }
303: }
304:
305: public synchronized long length() {
306: if (totalLength == -1) {
307: try {
308: this .totalLength = jdbcDirectory.fileLength(name);
309: } catch (IOException e) {
310: // do nothing here for now, much better for performance
311: }
312: }
313: return totalLength;
314: }
315:
316: }
|