001: /*******************************************************************************
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: *******************************************************************************/package org.ofbiz.entity.util;
019:
020: import java.sql.Connection;
021: import java.sql.ResultSet;
022: import java.sql.SQLException;
023: import java.sql.Statement;
024: import java.util.Hashtable;
025: import java.util.Map;
026:
027: import javax.transaction.Transaction;
028:
029: import org.ofbiz.base.util.Debug;
030: import org.ofbiz.entity.GenericEntityException;
031: import org.ofbiz.entity.jdbc.ConnectionFactory;
032: import org.ofbiz.entity.model.ModelEntity;
033: import org.ofbiz.entity.model.ModelField;
034: import org.ofbiz.entity.transaction.GenericTransactionException;
035: import org.ofbiz.entity.transaction.TransactionUtil;
036:
037: /**
038: * Sequence Utility to get unique sequences from named sequence banks
039: * Uses a collision detection approach to safely get unique sequenced ids in banks from the database
040: */
041: public class SequenceUtil {
042:
043: public static final String module = SequenceUtil.class.getName();
044:
045: Map sequences = new Hashtable();
046: String helperName;
047: ModelEntity seqEntity;
048: String tableName;
049: String nameColName;
050: String idColName;
051:
052: private SequenceUtil() {
053: }
054:
055: public SequenceUtil(String helperName, ModelEntity seqEntity,
056: String nameFieldName, String idFieldName) {
057: this .helperName = helperName;
058: this .seqEntity = seqEntity;
059: if (seqEntity == null) {
060: throw new IllegalArgumentException(
061: "The sequence model entity was null but is required.");
062: }
063: this .tableName = seqEntity.getTableName(helperName);
064:
065: ModelField nameField = seqEntity.getField(nameFieldName);
066:
067: if (nameField == null) {
068: throw new IllegalArgumentException(
069: "Could not find the field definition for the sequence name field "
070: + nameFieldName);
071: }
072: this .nameColName = nameField.getColName();
073:
074: ModelField idField = seqEntity.getField(idFieldName);
075:
076: if (idField == null) {
077: throw new IllegalArgumentException(
078: "Could not find the field definition for the sequence id field "
079: + idFieldName);
080: }
081: this .idColName = idField.getColName();
082: }
083:
084: public Long getNextSeqId(String seqName, long staggerMax) {
085: SequenceBank bank = (SequenceBank) sequences.get(seqName);
086:
087: if (bank == null) {
088: synchronized (this ) {
089: bank = (SequenceBank) sequences.get(seqName);
090: if (bank == null) {
091: bank = new SequenceBank(seqName, this );
092: sequences.put(seqName, bank);
093: }
094: }
095: }
096: return bank.getNextSeqId(staggerMax);
097: }
098:
099: class SequenceBank {
100:
101: public static final long defaultBankSize = 10;
102: public static final long startSeqId = 10000;
103: public static final int minWaitMillis = 5;
104: public static final int maxWaitMillis = 50;
105: public static final int maxTries = 5;
106:
107: long curSeqId;
108: long maxSeqId;
109: String seqName;
110: SequenceUtil parentUtil;
111:
112: public SequenceBank(String seqName, SequenceUtil parentUtil) {
113: this .seqName = seqName;
114: this .parentUtil = parentUtil;
115: curSeqId = 0;
116: maxSeqId = 0;
117: fillBank(1);
118: }
119:
120: public synchronized Long getNextSeqId(long staggerMax) {
121: long stagger = 1;
122: if (staggerMax > 1) {
123: stagger = Math.round(Math.random() * staggerMax);
124: if (stagger == 0)
125: stagger = 1;
126: }
127:
128: if ((curSeqId + stagger) <= maxSeqId) {
129: Long retSeqId = new Long(curSeqId);
130: curSeqId += stagger;
131: return retSeqId;
132: } else {
133: fillBank(stagger);
134: if ((curSeqId + stagger) <= maxSeqId) {
135: Long retSeqId = new Long(curSeqId);
136: curSeqId += stagger;
137: return retSeqId;
138: } else {
139: Debug
140: .logError(
141: "[SequenceUtil.SequenceBank.getNextSeqId] Fill bank failed, returning null",
142: module);
143: return null;
144: }
145: }
146: }
147:
148: protected synchronized void fillBank(long stagger) {
149: //Debug.logWarning("[SequenceUtil.SequenceBank.fillBank] Starting fillBank Thread Name is: " + Thread.currentThread().getName() + ":" + Thread.currentThread().toString(), module);
150:
151: long bankSize = defaultBankSize;
152: if (stagger > 1) {
153: // NOTE: could use staggerMax for this, but if that is done it would be easier to guess a valid next id without a brute force attack
154: bankSize = stagger * defaultBankSize;
155: }
156:
157: // no need to get a new bank, SeqIds available
158: if ((curSeqId + stagger) <= maxSeqId)
159: return;
160:
161: long val1 = 0;
162: long val2 = 0;
163:
164: // NOTE: the fancy ethernet type stuff is for the case where transactions not available
165: Transaction suspendedTransaction = null;
166: try {
167: //if we can suspend the transaction, we'll try to do this in a local manual transaction
168: suspendedTransaction = TransactionUtil.suspend();
169:
170: boolean beganTransaction = false;
171: try {
172: beganTransaction = TransactionUtil.begin();
173:
174: Connection connection = null;
175: Statement stmt = null;
176: ResultSet rs = null;
177:
178: try {
179: connection = ConnectionFactory
180: .getConnection(parentUtil.helperName);
181: } catch (SQLException sqle) {
182: Debug
183: .logWarning(
184: "[SequenceUtil.SequenceBank.fillBank]: Unable to esablish a connection with the database... Error was:"
185: + sqle.toString(),
186: module);
187: throw sqle;
188: } catch (GenericEntityException e) {
189: Debug
190: .logWarning(
191: "[SequenceUtil.SequenceBank.fillBank]: Unable to esablish a connection with the database... Error was: "
192: + e.toString(), module);
193: throw e;
194: }
195:
196: if (connection == null) {
197: throw new GenericEntityException(
198: "[SequenceUtil.SequenceBank.fillBank]: Unable to esablish a connection with the database, connection was null...");
199: }
200:
201: String sql = null;
202:
203: try {
204: // we shouldn't need this, and some TX managers complain about it, so not including it: connection.setAutoCommit(false);
205:
206: stmt = connection.createStatement();
207: int numTries = 0;
208:
209: while (val1 + bankSize != val2) {
210: if (Debug.verboseOn())
211: Debug
212: .logVerbose(
213: "[SequenceUtil.SequenceBank.fillBank] Trying to get a bank of sequenced ids for "
214: + this .seqName
215: + "; start of loop val1="
216: + val1
217: + ", val2="
218: + val2
219: + ", bankSize="
220: + bankSize,
221: module);
222:
223: sql = "SELECT " + parentUtil.idColName
224: + " FROM " + parentUtil.tableName
225: + " WHERE "
226: + parentUtil.nameColName + "='"
227: + this .seqName + "'";
228: rs = stmt.executeQuery(sql);
229: boolean gotVal1 = false;
230: if (rs.next()) {
231: val1 = rs.getLong(parentUtil.idColName);
232: gotVal1 = true;
233: }
234: rs.close();
235:
236: if (!gotVal1) {
237: Debug
238: .logWarning(
239: "[SequenceUtil.SequenceBank.fillBank] first select failed: will try to add new row, result set was empty for sequence ["
240: + seqName
241: + "] \nUsed SQL: "
242: + sql
243: + " \n Thread Name is: "
244: + Thread
245: .currentThread()
246: .getName()
247: + ":"
248: + Thread
249: .currentThread()
250: .toString(),
251: module);
252: sql = "INSERT INTO "
253: + parentUtil.tableName + " ("
254: + parentUtil.nameColName + ", "
255: + parentUtil.idColName
256: + ") VALUES ('" + this .seqName
257: + "', " + startSeqId + ")";
258: if (stmt.executeUpdate(sql) <= 0) {
259: throw new GenericEntityException(
260: "No rows changed when trying insert new sequence row with this SQL: "
261: + sql);
262: }
263: continue;
264: }
265:
266: sql = "UPDATE " + parentUtil.tableName
267: + " SET " + parentUtil.idColName
268: + "=" + parentUtil.idColName + "+"
269: + bankSize + " WHERE "
270: + parentUtil.nameColName + "='"
271: + this .seqName + "'";
272: if (stmt.executeUpdate(sql) <= 0) {
273: throw new GenericEntityException(
274: "[SequenceUtil.SequenceBank.fillBank] update failed, no rows changes for seqName: "
275: + seqName);
276: }
277:
278: sql = "SELECT " + parentUtil.idColName
279: + " FROM " + parentUtil.tableName
280: + " WHERE "
281: + parentUtil.nameColName + "='"
282: + this .seqName + "'";
283: rs = stmt.executeQuery(sql);
284: boolean gotVal2 = false;
285: if (rs.next()) {
286: val2 = rs.getLong(parentUtil.idColName);
287: gotVal2 = true;
288: }
289:
290: rs.close();
291:
292: if (!gotVal2) {
293: throw new GenericEntityException(
294: "[SequenceUtil.SequenceBank.fillBank] second select failed: aborting, result "
295: + "set was empty for sequence: "
296: + seqName);
297:
298: }
299:
300: if (val1 + bankSize != val2) {
301: if (numTries >= maxTries) {
302: throw new GenericEntityException(
303: "[SequenceUtil.SequenceBank.fillBank] maxTries ("
304: + maxTries
305: + ") reached, giving up.");
306: }
307:
308: // collision happened, wait a bounded random amount of time then continue
309: int waitTime = (new Double(
310: Math.random()
311: * (maxWaitMillis - minWaitMillis)))
312: .intValue()
313: + minWaitMillis;
314:
315: Debug.logWarning(
316: "[SequenceUtil.SequenceBank.fillBank] Collision found for seqName ["
317: + seqName + "], val1="
318: + val1 + ", val2="
319: + val2
320: + ", val1+bankSize="
321: + (val1 + bankSize)
322: + ", bankSize="
323: + bankSize
324: + ", waitTime="
325: + waitTime, module);
326:
327: try {
328: this .wait(waitTime);
329: } catch (Exception e) {
330: Debug
331: .logWarning(
332: e,
333: "Error waiting in sequence util",
334: module);
335: throw e;
336: }
337: }
338:
339: numTries++;
340: }
341:
342: curSeqId = val1;
343: maxSeqId = val2;
344: if (Debug.infoOn())
345: Debug.logInfo(
346: "Got bank of sequenced IDs for ["
347: + this .seqName
348: + "]; curSeqId=" + curSeqId
349: + ", maxSeqId=" + maxSeqId
350: + ", bankSize=" + bankSize,
351: module);
352: } catch (SQLException sqle) {
353: Debug
354: .logWarning(
355: sqle,
356: "[SequenceUtil.SequenceBank.fillBank] SQL Exception while executing the following:\n"
357: + sql
358: + "\nError was:"
359: + sqle.getMessage(),
360: module);
361: throw sqle;
362: } finally {
363: try {
364: if (stmt != null)
365: stmt.close();
366: } catch (SQLException sqle) {
367: Debug
368: .logWarning(
369: sqle,
370: "Error closing statement in sequence util",
371: module);
372: }
373: try {
374: if (connection != null)
375: connection.close();
376: } catch (SQLException sqle) {
377: Debug
378: .logWarning(
379: sqle,
380: "Error closing connection in sequence util",
381: module);
382: }
383: }
384: } catch (Exception e) {
385: String errMsg = "General error in getting a sequenced ID";
386: Debug.logError(e, errMsg, module);
387: try {
388: TransactionUtil.rollback(beganTransaction,
389: errMsg, e);
390: } catch (GenericTransactionException gte2) {
391: Debug.logError(gte2,
392: "Unable to rollback transaction",
393: module);
394: }
395: } finally {
396: try {
397: TransactionUtil.commit(beganTransaction);
398: } catch (GenericTransactionException gte) {
399: Debug.logError(gte,
400: "Unable to commit transaction", module);
401: }
402: }
403: } catch (GenericTransactionException e) {
404: Debug
405: .logError(
406: e,
407: "System Error suspending transaction in sequence util",
408: module);
409: } finally {
410: if (suspendedTransaction != null) {
411: try {
412: TransactionUtil.resume(suspendedTransaction);
413: } catch (GenericTransactionException e) {
414: Debug
415: .logError(
416: e,
417: "Error resuming suspended transaction in sequence util",
418: module);
419: }
420: }
421: }
422: //Debug.logWarning("[SequenceUtil.SequenceBank.fillBank] Ending fillBank Thread Name is: " + Thread.currentThread().getName() + ":" + Thread.currentThread().toString(), module);
423: }
424: }
425: }
|