001: /*
002: Copyright (C) 2003 Know Gate S.L. All rights reserved.
003: C/Oña, 107 1º2 28050 Madrid (Spain)
004:
005: Redistribution and use in source and binary forms, with or without
006: modification, are permitted provided that the following conditions
007: are met:
008:
009: 1. Redistributions of source code must retain the above copyright
010: notice, this list of conditions and the following disclaimer.
011:
012: 2. The end-user documentation included with the redistribution,
013: if any, must include the following acknowledgment:
014: "This product includes software parts from hipergate
015: (http://www.hipergate.org/)."
016: Alternately, this acknowledgment may appear in the software itself,
017: if and wherever such third-party acknowledgments normally appear.
018:
019: 3. The name hipergate must not be used to endorse or promote products
020: derived from this software without prior written permission.
021: Products derived from this software may not be called hipergate,
022: nor may hipergate appear in their name, without prior written
023: permission.
024:
025: This library is distributed in the hope that it will be useful,
026: but WITHOUT ANY WARRANTY; without even the implied warranty of
027: MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
028:
029: You should have received a copy of hipergate License with this code;
030: if not, visit http://www.hipergate.org or mail to info@hipergate.org
031: */
032:
033: package com.knowgate.scheduler;
034:
035: import java.util.Date;
036:
037: import java.sql.SQLException;
038: import java.sql.Connection;
039: import java.sql.PreparedStatement;
040: import java.sql.Timestamp;
041:
042: import com.knowgate.jdc.JDCConnection;
043: import com.knowgate.dataobjs.DB;
044: import com.knowgate.debug.DebugFile;
045:
046: /**
047: * Atom queue consumer
048: * @author Sergio Montoro Ten
049: * @version 1.0
050: */
051: public class AtomConsumer {
052:
053: private AtomQueue oQueue;
054: private JDCConnection oConn;
055: private PreparedStatement oStmt;
056:
057: // ----------------------------------------------------------
058:
059: /**
060: * <p>Create Atom Queue Consumer</p>
061: * @param oConnection
062: * @param oAtomQueue
063: * @throws SQLException
064: */
065: public AtomConsumer(JDCConnection oConnection, AtomQueue oAtomQueue)
066: throws SQLException {
067: oQueue = oAtomQueue;
068: oConn = oConnection;
069:
070: if (DebugFile.trace) {
071: DebugFile.writeln("Connection.prepareStatement (UPDATE "
072: + DB.k_job_atoms + " SET " + DB.id_status + "="
073: + String.valueOf(Atom.STATUS_RUNNING) + ", "
074: + DB.dt_execution + "=? WHERE " + DB.gu_job
075: + "=? AND " + DB.pg_atom + "=?)");
076: }
077:
078: // deja preparada la sentencia de actulización de estado del átomo para mejor velocidad
079: oStmt = oConn.prepareStatement("UPDATE " + DB.k_job_atoms
080: + " SET " + DB.id_status + "="
081: + String.valueOf(Atom.STATUS_RUNNING) + ", "
082: + DB.dt_execution + "=? WHERE " + DB.gu_job + "=? AND "
083: + DB.pg_atom + "=?");
084:
085: try {
086: oStmt.setQueryTimeout(20);
087: } catch (SQLException sqle) {
088: }
089: }
090:
091: // ----------------------------------------------------------
092:
093: public void close() {
094: if (null != oStmt)
095: try {
096: oStmt.close();
097: } catch (SQLException sqle) {
098: }
099: oStmt = null;
100: }
101:
102: // ----------------------------------------------------------
103:
104: /**
105: * <p>Get next Atom and remove it from queue</p>
106: * @return Atom object instance
107: * @throws SQLException
108: */
109:
110: public synchronized Atom next() throws SQLException {
111:
112: if (DebugFile.trace) {
113: DebugFile.writeln("Begin AtomConsumer.next()");
114: DebugFile.incIdent();
115: }
116:
117: Atom oAtm = oQueue.pop();
118:
119: if (oAtm != null) {
120:
121: if (DebugFile.trace)
122: DebugFile
123: .writeln("PreparedStatement.setTimestamp(1, new Timestamp(new Date().getTime()))");
124:
125: oStmt.setTimestamp(1, new Timestamp(new Date().getTime()));
126:
127: if (DebugFile.trace)
128: DebugFile.writeln("PreparedStatement.setString(2, "
129: + oAtm.getStringNull(DB.gu_job, "null") + ")");
130:
131: // Actualizar el estado en la base de datos a Finished
132: oStmt.setString(2, oAtm.getString(DB.gu_job));
133:
134: if (DebugFile.trace)
135: DebugFile
136: .writeln("PreparedStatement.setInt(3, "
137: + String.valueOf(oAtm
138: .getInt(DB.pg_atom)) + ")");
139:
140: oStmt.setInt(3, oAtm.getInt(DB.pg_atom));
141:
142: if (DebugFile.trace)
143: DebugFile.writeln("PreparedStatement.executeUpdate()");
144:
145: oStmt.executeUpdate();
146: }
147:
148: if (DebugFile.trace) {
149: DebugFile.decIdent();
150: DebugFile.writeln("End AtomConsumer.next()");
151: }
152:
153: return oAtm;
154: } // next()
155:
156: // ----------------------------------------------------------
157:
158: public JDCConnection getConnection() {
159: return oConn;
160: }
161:
162: } // AtomConsumer
|