Source Code Cross Referenced for AtomFeeder.java in  » Groupware » hipergate » com » knowgate » scheduler » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Groupware » hipergate » com.knowgate.scheduler 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:          Copyright (C) 2003  Know Gate S.L. All rights reserved.
003:                              C/Oña, 107 1º2 28050 Madrid (Spain)
004:
005:          Redistribution and use in source and binary forms, with or without
006:          modification, are permitted provided that the following conditions
007:          are met:
008:
009:          1. Redistributions of source code must retain the above copyright
010:             notice, this list of conditions and the following disclaimer.
011:
012:          2. The end-user documentation included with the redistribution,
013:             if any, must include the following acknowledgment:
014:             "This product includes software parts from hipergate
015:             (http://www.hipergate.org/)."
016:             Alternately, this acknowledgment may appear in the software itself,
017:             if and wherever such third-party acknowledgments normally appear.
018:
019:          3. The name hipergate must not be used to endorse or promote products
020:             derived from this software without prior written permission.
021:             Products derived from this software may not be called hipergate,
022:             nor may hipergate appear in their name, without prior written
023:             permission.
024:
025:          This library is distributed in the hope that it will be useful,
026:          but WITHOUT ANY WARRANTY; without even the implied warranty of
027:          MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
028:
029:          You should have received a copy of hipergate License with this code;
030:          if not, visit http://www.hipergate.org or mail to info@hipergate.org
031:         */
032:
033:        package com.knowgate.scheduler;
034:
035:        import java.util.Date;
036:
037:        import java.sql.Connection;
038:        import java.sql.SQLException;
039:        import java.sql.Statement;
040:        import java.sql.PreparedStatement;
041:        import java.sql.ResultSet;
042:        import java.sql.ResultSetMetaData;
043:
044:        import com.knowgate.debug.DebugFile;
045:        import com.knowgate.jdc.JDCConnection;
046:        import com.knowgate.dataobjs.DB;
047:        import com.knowgate.dataobjs.DBBind;
048:        import com.knowgate.dataobjs.DBSubset;
049:        import com.knowgate.misc.Gadgets;
050:        import com.knowgate.hipergate.QueryByForm;
051:
052:        import com.knowgate.crm.DistributionList;
053:
054:        import java.util.Properties;
055:
056:        /**
057:         * <p>Feeds atoms to RAM based AtomQueue</p>
058:         * @author Sergio Montoro Ten
059:         * @version 2.0
060:         */
061:        public class AtomFeeder {
062:            private int iMaxBatchSize;
063:
064:            public AtomFeeder() {
065:                iMaxBatchSize = 10000;
066:            }
067:
068:            // ----------------------------------------------------------
069:
070:            public void setMaxBatchSize(int iMaxBatch) {
071:                iMaxBatchSize = iMaxBatch;
072:            }
073:
074:            // ----------------------------------------------------------
075:
076:            public int getMaxBatchSize() {
077:                return iMaxBatchSize;
078:            }
079:
080:            // ----------------------------------------------------------
081:
082:            /**
083:             * <p>Load a dynamic list of members from k_member_address to k_job_atoms</p>
084:             * <p>Registers will be filtered according to the query stored at k_queries table
085:             * witch corresponds to the list at k_lists used by Job being loaded.</p>
086:             * @param oConn Database Connection
087:             * @param sJobGUID Job to be loaded
088:             * @param dtExec Scheduled Execution DateTime
089:             * @param sListGUID Base List GUID
090:             * @param sQueryGUID GUID of Query to be used for member filtering upon retrieval
091:             * @param sWorkAreaGUID GUID of WorArea
092:             * @throws SQLException
093:             */
094:
095:            private int loadDynamicList(JDCConnection oConn, String sJobGUID,
096:                    Date dtExec, String sListGUID, String sQueryGUID,
097:                    String sWorkAreaGUID) throws SQLException {
098:                Statement oStmt;
099:                QueryByForm oQBF;
100:                String sSQL;
101:                int iInserted;
102:
103:                if (DebugFile.trace) {
104:                    DebugFile
105:                            .writeln("Begin AtomFeeder.loadDynamicList([Connection] , "
106:                                    + sJobGUID
107:                                    + ","
108:                                    + dtExec.toString()
109:                                    + ","
110:                                    + sQueryGUID + "," + sWorkAreaGUID + " )");
111:                    DebugFile.incIdent();
112:                }
113:
114:                // Lista de columnas de la table k_member_address
115:                String sColumns = "gu_company,gu_contact,tx_email,tx_name,tx_surname,tx_salutation,nm_commercial,tp_street,nm_street,nu_street,tx_addr1,tx_addr2,nm_country,nm_state,mn_city,zipcode,work_phone,direct_phone,home_phone,mov_phone,fax_phone,other_phone,po_box";
116:
117:                // Componer la sentencia SQL de filtrado de datos a partir de la definición de la consulta almacenada en la tabla k_queries
118:                oQBF = new QueryByForm(oConn, DB.k_member_address, "ma",
119:                        sQueryGUID);
120:
121:                // Insertar los registros a capón haciendo un snapshot de k_member_address a k_job_atoms
122:                oStmt = oConn.createStatement();
123:
124:                sSQL = "INSERT INTO " + DB.k_job_atoms + " (gu_job,id_status,"
125:                        + sColumns + ") " + " (SELECT '" + sJobGUID + "',"
126:                        + String.valueOf(Atom.STATUS_PENDING) + "," + sColumns
127:                        + " FROM " + DB.k_member_address
128:                        + " ma WHERE ma.gu_workarea='" + sWorkAreaGUID
129:                        + "' AND (" + oQBF.composeSQL()
130:                        + ") AND NOT EXISTS (SELECT x." + DB.tx_email
131:                        + " FROM " + DB.k_lists + " b, " + DB.k_x_list_members
132:                        + " x WHERE b." + DB.gu_list + "=x." + DB.gu_list
133:                        + " AND b." + DB.gu_query + "='" + sListGUID
134:                        + "' AND b." + DB.tp_list + "="
135:                        + String.valueOf(DistributionList.TYPE_BLACK)
136:                        + " AND x." + DB.tx_email + "=ma." + DB.tx_email + "))";
137:
138:                if (DebugFile.trace)
139:                    DebugFile.writeln("Connection.executeUpdate(" + sSQL + ")");
140:
141:                iInserted = oStmt.executeUpdate(sSQL);
142:
143:                oStmt.close();
144:
145:                if (DebugFile.trace) {
146:                    DebugFile.decIdent();
147:
148:                    DebugFile.writeln("End AtomFeeder.loadDynamicList() : "
149:                            + String.valueOf(iInserted));
150:                }
151:                return iInserted;
152:            } // loadDynamicList()
153:
154:            // ----------------------------------------------------------
155:
156:            /**
157:             * <p>Load a static member list from k_x_list_members to k_job_atoms</p>
158:             * @param oConn Database Connection
159:             * @param sJobGUID GUID of Job to be loaded
160:             * @param dtExec Execution date to be assigned to Atoms (inherited from job)
161:             * @param sListGUID GUID of list to be loaded
162:             * @throws SQLException
163:             */
164:            private int loadStaticList(JDCConnection oConn, String sJobGUID,
165:                    Date dtExec, String sListGUID) throws SQLException {
166:                Statement oStmt;
167:                String sSQL;
168:                int iInserted;
169:
170:                if (DebugFile.trace) {
171:                    DebugFile
172:                            .writeln("Begin AtomFeeder.loadStaticList([Connection] , "
173:                                    + sJobGUID
174:                                    + ","
175:                                    + dtExec.toString()
176:                                    + ","
177:                                    + sListGUID + ")");
178:                    DebugFile.incIdent();
179:                }
180:
181:                // Lista de columnas de la table k_x_list_members
182:                // * TO DO: Añadir el resto de columnas que faltan para reemplazar direcciones
183:                String sColumns = "id_format,gu_company,gu_contact,tx_email,tx_name,tx_surname,tx_salutation";
184:
185:                // Insertar los registros a capón haciendo un snapshot de k_member_address a k_job_atoms
186:                oStmt = oConn.createStatement();
187:
188:                sSQL = "INSERT INTO " + DB.k_job_atoms + " (gu_job,id_status,"
189:                        + sColumns + ") " + " (SELECT '" + sJobGUID + "',"
190:                        + String.valueOf(Atom.STATUS_PENDING) + "," + sColumns
191:                        + " FROM " + DB.k_x_list_members + " m WHERE "
192:                        + DB.gu_list + "='" + sListGUID + "' AND m."
193:                        + DB.bo_active + "<>0 AND " + "NOT EXISTS (SELECT x."
194:                        + DB.tx_email + " FROM " + DB.k_lists + " b, "
195:                        + DB.k_x_list_members + " x WHERE b." + DB.gu_list
196:                        + "=x." + DB.gu_list + " AND b." + DB.gu_query + "='"
197:                        + sListGUID + "' AND b." + DB.tp_list + "="
198:                        + String.valueOf(DistributionList.TYPE_BLACK)
199:                        + " AND x." + DB.tx_email + "=m." + DB.tx_email + "))";
200:
201:                if (DebugFile.trace)
202:                    DebugFile.writeln("Connection.executeUpdate(" + sSQL + ")");
203:
204:                iInserted = oStmt.executeUpdate(sSQL);
205:                oStmt.close();
206:
207:                if (DebugFile.trace) {
208:                    DebugFile.decIdent();
209:                    DebugFile.writeln("End AtomFeeder.loadStaticList() : "
210:                            + String.valueOf(iInserted));
211:                }
212:
213:                return iInserted;
214:            } // loadStaticList()
215:
216:            // ----------------------------------------------------------
217:
218:            /**
219:             * <p>Load direct list into k_job_atoms table</p>
220:             * @param oConn Database Connection
221:             * @param sJobGUID GUID of Job to be loaded
222:             * @param dtExec Execution date to be assigned to Atoms (inherited from job)
223:             * @param sListGUID GUID of list to be loaded
224:             * @throws SQLException
225:             */
226:
227:            private int loadDirectList(JDCConnection oConn, String sJobGUID,
228:                    Date dtExec, String sListGUID) throws SQLException {
229:
230:                // Alimentar una lista directa se hace igual que una estática
231:                return loadStaticList(oConn, sJobGUID, dtExec, sListGUID);
232:            } // loadDirectList()
233:
234:            // ----------------------------------------------------------
235:
236:            private Properties parseParameters(String sTxParams) {
237:                String aVariable[];
238:                String aParams[];
239:                Properties oParams = new Properties();
240:
241:                if (DebugFile.trace) {
242:                    DebugFile.writeln("Begin AtomFeeder.parseParameters("
243:                            + sTxParams + ")");
244:                    DebugFile.incIdent();
245:                }
246:
247:                if (sTxParams != null) {
248:                    if (sTxParams.length() > 0) {
249:                        aParams = Gadgets.split(sTxParams, ",");
250:
251:                        for (int p = 0; p < aParams.length; p++) {
252:                            aVariable = Gadgets.split(aParams[p], ":");
253:                            oParams.put(aVariable[0], aVariable[1]);
254:                        } // next (p)
255:                    } // fi (sTxParams!="")
256:                } // fi (sTxParams!=null)
257:
258:                if (DebugFile.trace) {
259:                    DebugFile.decIdent();
260:                    DebugFile.writeln("End AtomFeeder.parseParameters() : "
261:                            + String.valueOf(oParams.size()));
262:                }
263:
264:                return oParams;
265:            } // parseParameters
266:
267:            // ----------------------------------------------------------
268:
269:            /**
270:             * <p>Load an Atom batch into k_job_atoms table</p>
271:             * <p>Atoms will be taken by looking up pending Jobs by its execution date and extracting Atoms
272:             * for nearest Jobs in time.<br>
273:             * On each loadAtoms() no more than iWorkerThreads Jobs will be loaded at a time.
274:             * @param oConn Database Connection
275:             * @param iWorkerThreads Number of worker thread. This parameter will limit the number of loaded Jobs as the program will try to use a one to one ratio between Jobs and WorkerThreads.
276:             * @return DBSubset with loaded Jobs
277:             * @throws SQLException
278:             */
279:
280:            public DBSubset loadAtoms(JDCConnection oConn, int iWorkerThreads)
281:                    throws SQLException {
282:
283:                PreparedStatement oJobStmt;
284:                DBSubset oJobsSet;
285:                int iJobCount;
286:                Properties oParams;
287:                DistributionList oDistribList;
288:                Date dtNow = new Date();
289:                Date dtExec;
290:                String sSQL;
291:                int iLoaded = 0;
292:
293:                if (DebugFile.trace) {
294:                    DebugFile
295:                            .writeln("Begin AtomFeeder.loadAtoms([Connection], "
296:                                    + String.valueOf(iWorkerThreads) + ")");
297:                    DebugFile.incIdent();
298:                }
299:
300:                // Crea un DBSubset para recorrer los jobs pendientes de ejecución
301:
302:                oJobsSet = new DBSubset(
303:                        DB.k_jobs,
304:                        "gu_job,gu_job_group,gu_workarea,id_command,tx_parameters,id_status,dt_execution,dt_finished,dt_created,dt_modified",
305:                        DB.id_status + "=" + String.valueOf(Job.STATUS_PENDING)
306:                                + " ORDER BY " + DB.dt_execution + " DESC",
307:                        iWorkerThreads);
308:
309:                oJobsSet.setMaxRows(iWorkerThreads);
310:
311:                iJobCount = oJobsSet.load(oConn); // Devuelve la cuenta de jobs pendientes
312:
313:                // Prepara la sentencia para actualizar el estado de los jobs a Running
314:                sSQL = "UPDATE " + DB.k_jobs + " SET " + DB.id_status + "="
315:                        + String.valueOf(Job.STATUS_RUNNING) + ","
316:                        + DB.dt_execution + "=" + DBBind.Functions.GETDATE
317:                        + " WHERE " + DB.gu_job + "=?";
318:
319:                if (DebugFile.trace)
320:                    DebugFile.writeln("Connection.prepareStatement(" + sSQL
321:                            + ")");
322:
323:                oJobStmt = oConn.prepareStatement(sSQL);
324:
325:                // Para cada job, cargar su lista de miembros del tipo que corresponda y
326:                // cambiar el estado a Running
327:                for (int j = 0; j < iJobCount; j++) {
328:                    // leer los parámetros adicionales del job del campo tx_parameters
329:                    oParams = parseParameters(oJobsSet.getString(4, j));
330:
331:                    // Generar un objeto temporal de tipo lista de distribución
332:                    // para leer los valores de la lista de miembros
333:                    if (oParams.getProperty("gu_list") != null) {
334:                        oDistribList = new DistributionList(oConn, oParams
335:                                .getProperty("gu_list"));
336:
337:                        // Si la fecha de ejecución del job es null,
338:                        // tomar la fecha actual como fecha de ejecución inmediata
339:                        if (oDistribList.isNull(DB.dt_execution))
340:                            dtExec = dtNow;
341:                        else
342:                            dtExec = oDistribList.getDate(DB.dt_execution);
343:
344:                        // Para cada tipo de lista usar el método de carga de miembros que corresponda
345:                        switch (oDistribList.getShort(DB.tp_list)) {
346:                        case DistributionList.TYPE_DYNAMIC:
347:                            iLoaded += loadDynamicList(oConn, oJobsSet
348:                                    .getString(0, j), dtExec, oParams
349:                                    .getProperty("gu_list"), oDistribList
350:                                    .getString(DB.gu_query), oDistribList
351:                                    .getString(DB.gu_workarea));
352:                            break;
353:                        case DistributionList.TYPE_STATIC:
354:                            iLoaded += loadStaticList(oConn, oJobsSet
355:                                    .getString(0, j), dtExec, oParams
356:                                    .getProperty("gu_list"));
357:                            break;
358:                        case DistributionList.TYPE_DIRECT:
359:                            iLoaded += loadDirectList(oConn, oJobsSet
360:                                    .getString(0, j), dtExec, oParams
361:                                    .getProperty("gu_list"));
362:                            break;
363:                        } // end switch()
364:                    } else
365:                        iLoaded = 0;
366:
367:                    // Cambiar el estado del job cargado de Pending a Running
368:
369:                    if (DebugFile.trace)
370:                        DebugFile.writeln("PrepareStatement.setString(1, '"
371:                                + oJobsSet.getStringNull(0, j, "") + "')");
372:
373:                    oJobStmt.setString(1, oJobsSet.getString(0, j));
374:
375:                    if (DebugFile.trace)
376:                        DebugFile.writeln("PrepareStatement.executeUpdate()");
377:
378:                    oJobStmt.executeUpdate();
379:                } // next (j)
380:
381:                if (DebugFile.trace)
382:                    DebugFile.writeln("PrepareStatement.close()");
383:
384:                oJobStmt.close();
385:
386:                if (DebugFile.trace) {
387:                    DebugFile.decIdent();
388:                    DebugFile.writeln("End AtomFeeder.loadAtoms() : "
389:                            + String.valueOf(oJobsSet.getRowCount()));
390:                }
391:
392:                return oJobsSet;
393:            } // loadAtoms()
394:
395:            // ----------------------------------------------------------
396:
397:            /**
398:             * <p>Load Atoms for a given Job into k_job_atoms table</p>
399:             * On each loadAtoms() no more than iWorkerThreads Jobs will be loaded at a time.
400:             * @param oConn Database Connection
401:             * @param sJodId GUID of Job for witch atoms are to be loaded.
402:             * @return DBSubset with loaded Job
403:             * @throws SQLException
404:             */
405:
406:            public DBSubset loadAtoms(JDCConnection oConn, String sJobId)
407:                    throws SQLException {
408:                PreparedStatement oCmdsStmt;
409:                PreparedStatement oJobStmt;
410:                ResultSet oCmdsSet;
411:                DBSubset oJobsSet;
412:                int iJobCount;
413:                String aParams[];
414:                String aVariable[];
415:                Properties oParams;
416:                DistributionList oDistribList;
417:                Date dtNow = new Date();
418:                Date dtExec;
419:                String sSQL;
420:                int iLoaded = 0;
421:
422:                if (DebugFile.trace) {
423:                    DebugFile
424:                            .writeln("Begin AtomFeeder.loadAtoms([Connection], "
425:                                    + sJobId + ")");
426:                    DebugFile.incIdent();
427:                }
428:
429:                // Crea un DBSubset para recorrer los jobs pendientes de ejecución
430:
431:                oJobsSet = new DBSubset(
432:                        DB.k_jobs,
433:                        "gu_job,gu_job_group,gu_workarea,id_command,tx_parameters,id_status,dt_execution,dt_finished,dt_created,dt_modified",
434:                        DB.gu_job + "='" + sJobId + "'", 1);
435:
436:                iJobCount = oJobsSet.load(oConn); // Devuelve la cuenta de jobs pendientes
437:
438:                // Prepara la sentencia para actualizar el estado de los jobs a Running
439:                sSQL = "UPDATE " + DB.k_jobs + " SET " + DB.id_status + "="
440:                        + String.valueOf(Job.STATUS_RUNNING) + ","
441:                        + DB.dt_execution + "=" + DBBind.Functions.GETDATE
442:                        + " WHERE " + DB.gu_job + "=?";
443:
444:                if (DebugFile.trace)
445:                    DebugFile.writeln("Connection.prepareStatement(" + sSQL
446:                            + ")");
447:
448:                oJobStmt = oConn.prepareStatement(sSQL);
449:
450:                // Para cada job, cargar su lista de miembros del tipo que corresponda y
451:                // cambiar el estado a Running
452:                if (1 == iJobCount) {
453:                    // leer los parámetros adicionales del job del campo tx_parameters
454:                    oParams = parseParameters(oJobsSet.getString(4, 0));
455:
456:                    // Generar un objeto temporal de tipo lista de distribución
457:                    // para leer los valores de la lista de miembros
458:                    if (oParams.getProperty("gu_list") != null) {
459:                        oDistribList = new DistributionList(oConn, oParams
460:                                .getProperty("gu_list"));
461:
462:                        // Si la fecha de ejecución del job es null,
463:                        // tomar la fecha actual como fecha de ejecución inmediata
464:                        if (oDistribList.isNull(DB.dt_execution))
465:                            dtExec = dtNow;
466:                        else
467:                            dtExec = oDistribList.getDate(DB.dt_execution);
468:
469:                        // Para cada tipo de lista usar el método de carga de miembros que corresponda
470:                        switch (oDistribList.getShort(DB.tp_list)) {
471:                        case DistributionList.TYPE_DYNAMIC:
472:                            iLoaded += loadDynamicList(oConn, oJobsSet
473:                                    .getString(0, 0), dtExec, oParams
474:                                    .getProperty("gu_list"), oDistribList
475:                                    .getString(DB.gu_query), oDistribList
476:                                    .getString(DB.gu_workarea));
477:                            break;
478:                        case DistributionList.TYPE_STATIC:
479:                            iLoaded += loadStaticList(oConn, oJobsSet
480:                                    .getString(0, 0), dtExec, oParams
481:                                    .getProperty("gu_list"));
482:                            break;
483:                        case DistributionList.TYPE_DIRECT:
484:                            iLoaded += loadDirectList(oConn, oJobsSet
485:                                    .getString(0, 0), dtExec, oParams
486:                                    .getProperty("gu_list"));
487:                            break;
488:                        } // end switch()
489:                    } else
490:                        iLoaded = 0;
491:
492:                    // Cambiar el estado del job cargado de Pending a Running
493:
494:                    if (DebugFile.trace)
495:                        DebugFile.writeln("PrepareStatement.setString(1, '"
496:                                + oJobsSet.getStringNull(0, 0, "") + "')");
497:
498:                    oJobStmt.setString(1, oJobsSet.getString(0, 0));
499:
500:                    if (DebugFile.trace)
501:                        DebugFile.writeln("PrepareStatement.executeUpdate()");
502:
503:                    oJobStmt.executeUpdate();
504:                } // fi
505:
506:                if (DebugFile.trace)
507:                    DebugFile.writeln("PrepareStatement.close()");
508:
509:                oJobStmt.close();
510:
511:                if (DebugFile.trace) {
512:                    DebugFile.decIdent();
513:                    DebugFile.writeln("End AtomFeeder.loadAtoms() : " + sJobId);
514:                }
515:
516:                return oJobsSet;
517:            } // loadAtoms()
518:
519:            // ----------------------------------------------------------
520:
521:            /**
522:             * <p>Feed RAM queue with pending Atoms from k_job_atoms table</p>
523:             * @param oConn Database Connection
524:             * @param oQueue AtomQueue
525:             * @throws SQLException
526:             */
527:
528:            public void feedQueue(JDCConnection oConn, AtomQueue oQueue)
529:                    throws SQLException {
530:                Statement oStmt;
531:                PreparedStatement oUpdt;
532:                PreparedStatement oPgSt;
533:                ResultSet oRSet;
534:                ResultSetMetaData oMDat;
535:                String sJobId;
536:                int iAtomId;
537:                int iJobCol;
538:                int iAtmCol;
539:                int iProcessed;
540:                String sSQL;
541:                Atom oAtm;
542:                boolean bNext;
543:
544:                if (DebugFile.trace) {
545:                    DebugFile
546:                            .writeln("Begin AtomFeeder.feedQueue([Connection], [AtomQueue])");
547:                    DebugFile.incIdent();
548:                }
549:
550:                // Crear un cursor actualizable para recorrer los átomos y cargarlos en la cola
551:                // al mismo tiempo que se cambia en la base de datos su estado de Pending a Running
552:                oStmt = oConn.createStatement(ResultSet.TYPE_FORWARD_ONLY,
553:                        ResultSet.CONCUR_READ_ONLY);
554:
555:                sSQL = "SELECT a.*, j." + DB.tx_parameters + " FROM "
556:                        + DB.k_job_atoms + " a, " + DB.k_jobs + " j WHERE a."
557:                        + DB.id_status + "="
558:                        + String.valueOf(Atom.STATUS_PENDING) + " AND j."
559:                        + DB.gu_job + "=a." + DB.gu_job + " ORDER BY j."
560:                        + DB.dt_execution;
561:
562:                if (DebugFile.trace)
563:                    DebugFile.writeln("Statement.executeQuery(" + sSQL + ")");
564:
565:                oRSet = oStmt.executeQuery(sSQL);
566:
567:                try {
568:                    oRSet.setFetchSize(getMaxBatchSize());
569:                } catch (SQLException e) { /* Si el driver no soporta setFetchSize da igual */
570:                }
571:
572:                oMDat = oRSet.getMetaData();
573:                iJobCol = oRSet.findColumn(DB.gu_job);
574:                iAtmCol = oRSet.findColumn(DB.pg_atom);
575:
576:                // Bucle de carga y actualización de estado de job_atoms
577:
578:                sSQL = "UPDATE " + DB.k_job_atoms + " SET " + DB.id_status
579:                        + "=" + Atom.STATUS_RUNNING + " WHERE " + DB.gu_job
580:                        + "=? AND " + DB.pg_atom + "=?";
581:                if (DebugFile.trace)
582:                    DebugFile.writeln("Connection.prepareStatement(" + sSQL
583:                            + ")");
584:                oUpdt = oConn.prepareStatement(sSQL);
585:
586:                iProcessed = 0;
587:
588:                bNext = oRSet.next();
589:
590:                while (bNext && iProcessed < iMaxBatchSize) {
591:                    oAtm = new Atom(oRSet, oMDat);
592:
593:                    oQueue.push(oAtm);
594:
595:                    sJobId = oRSet.getString(iJobCol);
596:                    iAtomId = oRSet.getInt(iAtmCol);
597:
598:                    bNext = oRSet.next();
599:
600:                    oUpdt.setString(1, sJobId);
601:                    oUpdt.setInt(2, iAtomId);
602:
603:                    if (DebugFile.trace)
604:                        DebugFile
605:                                .writeln("PreparedStatement.executeUpdate(UPDATE "
606:                                        + DB.k_job_atoms
607:                                        + " SET "
608:                                        + DB.id_status
609:                                        + "="
610:                                        + Atom.STATUS_RUNNING
611:                                        + " WHERE "
612:                                        + DB.gu_job
613:                                        + "='"
614:                                        + sJobId
615:                                        + "' AND "
616:                                        + DB.pg_atom
617:                                        + "="
618:                                        + String.valueOf(iAtomId) + ")");
619:                    oUpdt.executeUpdate();
620:
621:                    iProcessed++;
622:                } // wend
623:
624:                oUpdt.close();
625:                oRSet.close();
626:                oStmt.close();
627:
628:                if (DebugFile.trace) {
629:                    DebugFile.decIdent();
630:                    DebugFile.writeln("End AtomFeeder.feedQueue() : "
631:                            + String.valueOf(iProcessed));
632:                }
633:            } // feedQueue
634:
635:            /**
636:             * Formatea una fecha en formato escape ODBC
637:             * @param dt Fecha a formatear
638:             * @param sFormat tipo de formato {d=yyyy-mm-dd, ts=yyyy-mm-dd hh:nn:ss}
639:             * @return Fecha formateada como una cadena
640:             */
641:
642:            private static String escape(java.util.Date dt) {
643:                String str;
644:                String sMonth, sDay, sHour, sMin, sSec;
645:
646:                str = "{ ts '";
647:
648:                sMonth = (dt.getMonth() + 1 < 10 ? "0"
649:                        + String.valueOf((dt.getMonth() + 1)) : String
650:                        .valueOf(dt.getMonth() + 1));
651:                sDay = (dt.getDate() < 10 ? "0" + String.valueOf(dt.getDate())
652:                        : String.valueOf(dt.getDate()));
653:
654:                str += String.valueOf(dt.getYear() + 1900) + "-" + sMonth + "-"
655:                        + sDay + " ";
656:
657:                sHour = (dt.getHours() < 10 ? "0"
658:                        + String.valueOf(dt.getHours()) : String.valueOf(dt
659:                        .getHours()));
660:                sMin = (dt.getMinutes() < 10 ? "0"
661:                        + String.valueOf(dt.getMinutes()) : String.valueOf(dt
662:                        .getMinutes()));
663:                sSec = (dt.getSeconds() < 10 ? "0"
664:                        + String.valueOf(dt.getSeconds()) : String.valueOf(dt
665:                        .getSeconds()));
666:
667:                str += " " + sHour + ":" + sMin + ":" + sSec;
668:
669:                str = str.trim() + "'}";
670:
671:                return str;
672:            } // escape()
673:
674:            // ----------------------------------------------------------
675:
676:        } // AtomFeeder
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.