Source Code Cross Referenced for ClusterEventTracking.java in  » ERP-CRM-Financial » sakai » org » sakaiproject » event » impl » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » ERP CRM Financial » sakai » org.sakaiproject.event.impl 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /**********************************************************************************
002:         * $URL: https://source.sakaiproject.org/svn/event/tags/sakai_2-4-1/event-impl/impl/src/java/org/sakaiproject/event/impl/ClusterEventTracking.java $
003:         * $Id: ClusterEventTracking.java 9900 2006-05-24 19:41:06Z ggolden@umich.edu $
004:         ***********************************************************************************
005:         *
006:         * Copyright (c) 2003, 2004, 2005, 2006 The Sakai Foundation.
007:         * 
008:         * Licensed under the Educational Community License, Version 1.0 (the "License"); 
009:         * you may not use this file except in compliance with the License. 
010:         * You may obtain a copy of the License at
011:         * 
012:         *      http://www.opensource.org/licenses/ecl1.php
013:         * 
014:         * Unless required by applicable law or agreed to in writing, software 
015:         * distributed under the License is distributed on an "AS IS" BASIS, 
016:         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
017:         * See the License for the specific language governing permissions and 
018:         * limitations under the License.
019:         *
020:         **********************************************************************************/package org.sakaiproject.event.impl;
021:
022:        import java.sql.Connection;
023:        import java.sql.ResultSet;
024:        import java.sql.SQLException;
025:        import java.util.Collection;
026:        import java.util.Iterator;
027:        import java.util.List;
028:        import java.util.Vector;
029:
030:        import org.apache.commons.logging.Log;
031:        import org.apache.commons.logging.LogFactory;
032:        import org.sakaiproject.component.api.ServerConfigurationService;
033:        import org.sakaiproject.component.cover.ComponentManager;
034:        import org.sakaiproject.db.api.SqlReader;
035:        import org.sakaiproject.db.api.SqlService;
036:        import org.sakaiproject.event.api.Event;
037:        import org.sakaiproject.event.api.NotificationService;
038:        import org.sakaiproject.time.api.Time;
039:        import org.sakaiproject.time.api.TimeService;
040:        import org.sakaiproject.util.StringUtil;
041:
042:        /**
043:         * <p>
044:         * ClusterEventTracking is the implmentation for the EventTracking service for use in a clustered multi-app server configuration.<br />
045:         * Events are backed in the cluster database, and this database is polled to read and process locally events posted by the other cluster members.
046:         * </p>
047:         */
048:        public abstract class ClusterEventTracking extends
049:                BaseEventTrackingService implements  Runnable {
050:            /** Our logger. */
051:            private static Log M_log = LogFactory
052:                    .getLog(ClusterEventTracking.class);
053:
054:            /** String used to identify this service in the logs */
055:            protected static String m_logId = "EventTracking: ";
056:
057:            /** The db event checker thread. */
058:            protected Thread m_thread = null;
059:
060:            /** The thread quit flag. */
061:            protected boolean m_threadStop = false;
062:
063:            /** Last event code read from the db */
064:            protected long m_lastEventSeq = 0;
065:
066:            /** Queue of events to write if we are batching. */
067:            protected Collection m_eventQueue = null;
068:
069:            /**********************************************************************************************************************************************************************************************************************************************************
070:             * Dependencies
071:             *********************************************************************************************************************************************************************************************************************************************************/
072:
073:            /**
074:             * @return the MemoryService collaborator.
075:             */
076:            protected abstract SqlService sqlService();
077:
078:            /**
079:             * @return the ServerConfigurationService collaborator.
080:             */
081:            protected abstract ServerConfigurationService serverConfigurationService();
082:
083:            /**
084:             * @return the TimeService collaborator.
085:             */
086:            protected abstract TimeService timeService();
087:
088:            /**********************************************************************************************************************************************************************************************************************************************************
089:             * Configuration
090:             *********************************************************************************************************************************************************************************************************************************************************/
091:
092:            /** Unless false, check the db for events from the other cluster servers. */
093:            protected boolean m_checkDb = true;
094:
095:            /**
096:             * Configuration: set the check-db.
097:             * 
098:             * @param value
099:             *        The check-db value.
100:             */
101:            public void setCheckDb(String value) {
102:                try {
103:                    m_checkDb = new Boolean(value).booleanValue();
104:                } catch (Exception any) {
105:                }
106:            }
107:
108:            /** If true, batch events for bulk write. */
109:            protected boolean m_batchWrite = true;
110:
111:            /**
112:             * Configuration: set the batch writing flag.
113:             * 
114:             * @param value
115:             *        The batch writing value.
116:             */
117:            public void setBatchWrite(String value) {
118:                try {
119:                    m_batchWrite = new Boolean(value).booleanValue();
120:                } catch (Exception any) {
121:                }
122:            }
123:
124:            /** Configuration: to run the ddl on init or not. */
125:            protected boolean m_autoDdl = false;
126:
127:            /**
128:             * Configuration: to run the ddl on init or not.
129:             * 
130:             * @param value
131:             *        the auto ddl value.
132:             */
133:            public void setAutoDdl(String value) {
134:                m_autoDdl = new Boolean(value).booleanValue();
135:            }
136:
137:            /** How long to wait between checks for new events from the db. */
138:            protected long m_period = 1000L * 5L;
139:
140:            /**
141:             * Set the # seconds to wait between db checks for new events.
142:             * 
143:             * @param time
144:             *        The # seconds to wait between db checks for new events.
145:             */
146:            public void setPeriod(String time) {
147:                m_period = Integer.parseInt(time) * 1000L;
148:            }
149:
150:            /**********************************************************************************************************************************************************************************************************************************************************
151:             * Init and Destroy
152:             *********************************************************************************************************************************************************************************************************************************************************/
153:
154:            /**
155:             * Final initialization, once all dependencies are set.
156:             */
157:            public void init() {
158:                try {
159:                    // if we are auto-creating our schema, check and create
160:                    if (m_autoDdl) {
161:                        sqlService().ddl(this .getClass().getClassLoader(),
162:                                "sakai_event");
163:                    }
164:
165:                    super .init();
166:
167:                    if (m_batchWrite) {
168:                        m_eventQueue = new Vector();
169:                    }
170:
171:                    // startup the event checking
172:                    if (m_checkDb) {
173:                        start();
174:                    }
175:
176:                    M_log.info(this  + ".init() - period: " + m_period / 1000
177:                            + " batch: " + m_batchWrite + " checkDb: "
178:                            + m_checkDb);
179:                } catch (Throwable t) {
180:                    M_log.warn(this  + ".init(): ", t);
181:                }
182:            }
183:
184:            /**
185:             * Final cleanup.
186:             */
187:            public void destroy() {
188:                // stop our thread
189:                stop();
190:
191:                super .destroy();
192:            }
193:
194:            /**********************************************************************************************************************************************************************************************************************************************************
195:             * Event post / flow
196:             *********************************************************************************************************************************************************************************************************************************************************/
197:
198:            /**
199:             * Cause this new event to get to wherever it has to go for persistence, etc.
200:             * 
201:             * @param event
202:             *        The new event to post.
203:             */
204:            protected void postEvent(Event event) {
205:                // mark the event time
206:                ((BaseEvent) event).m_time = timeService().newTime();
207:
208:                // notify locally generated events immediately -
209:                // they will not be process again when read back from the database
210:                try {
211:                    notifyObservers(event, true);
212:                } catch (Throwable t) {
213:                    M_log.warn("postEvent, notifyObservers(), event: "
214:                            + event.toString(), t);
215:                }
216:
217:                // batch the event if we are batching
218:                if (m_batchWrite) {
219:                    synchronized (m_eventQueue) {
220:                        m_eventQueue.add(event);
221:                    }
222:                }
223:
224:                // if not batching, write out the individual event
225:                else {
226:                    writeEvent(event, null);
227:                }
228:
229:                if (M_log.isDebugEnabled())
230:                    M_log.debug(m_logId + event);
231:            }
232:
233:            /**
234:             * Write a single event to the db
235:             * 
236:             * @param event
237:             *        The event to write.
238:             */
239:            protected void writeEvent(Event event, Connection conn) {
240:                // get the SQL statement
241:                String statement = insertStatement();
242:
243:                // collect the fields
244:                Object fields[] = new Object[5];
245:                bindValues(event, fields);
246:
247:                // process the insert
248:                boolean ok = sqlService().dbWrite(conn, statement, fields);
249:                if (!ok) {
250:                    M_log.warn(this 
251:                            + ".writeEvent(): dbWrite failed: session: "
252:                            + fields[3] + " event: " + event.toString());
253:                }
254:            }
255:
256:            /**
257:             * Write a batch of events to the db
258:             * 
259:             * @param events
260:             *        The collection of event to write.
261:             */
262:            protected void writeBatchEvents(Collection events) {
263:                // get a connection
264:                Connection conn = null;
265:                boolean wasCommit = true;
266:                try {
267:                    conn = sqlService().borrowConnection();
268:                    wasCommit = conn.getAutoCommit();
269:                    if (wasCommit) {
270:                        conn.setAutoCommit(false);
271:                    }
272:
273:                    // Note: investigate batch writing via the jdbc driver: make sure we can still use prepared statements (check out host arrays, too) -ggolden
274:
275:                    // common preparation for each insert
276:                    String statement = insertStatement();
277:                    Object fields[] = new Object[5];
278:
279:                    // write all events
280:                    for (Iterator i = events.iterator(); i.hasNext();) {
281:                        Event event = (Event) i.next();
282:                        bindValues(event, fields);
283:
284:                        // process the insert
285:                        boolean ok = sqlService().dbWrite(conn, statement,
286:                                fields);
287:                        if (!ok) {
288:                            M_log
289:                                    .warn(this 
290:                                            + ".writeBatchEvents(): dbWrite failed: session: "
291:                                            + fields[3] + " event: "
292:                                            + event.toString());
293:                        }
294:                    }
295:
296:                    // commit
297:                    conn.commit();
298:                } catch (Throwable e) {
299:                    if (conn != null) {
300:                        try {
301:                            conn.rollback();
302:                        } catch (Exception ee) {
303:                            M_log.warn(this 
304:                                    + ".writeBatchEvents, while rolling back: "
305:                                    + ee);
306:                        }
307:                    }
308:                    M_log.warn(this  + ".writeBatchEvents: " + e);
309:                } finally {
310:                    if (conn != null) {
311:                        try {
312:                            if (conn.getAutoCommit() != wasCommit) {
313:                                conn.setAutoCommit(wasCommit);
314:                            }
315:                        } catch (Exception e) {
316:                            M_log
317:                                    .warn(this 
318:                                            + ".writeBatchEvents, while setting auto commit: "
319:                                            + e);
320:                        }
321:                        sqlService().returnConnection(conn);
322:                    }
323:                }
324:            }
325:
326:            /**
327:             * Form the proper event insert statement for the database technology.
328:             * 
329:             * @return The SQL insert statement for writing an event.
330:             */
331:            protected String insertStatement() {
332:                String statement;
333:                if ("oracle".equals(sqlService().getVendor())) {
334:                    statement = "insert into SAKAI_EVENT"
335:                            + " (EVENT_ID,EVENT_DATE,EVENT,REF,SESSION_ID,EVENT_CODE)"
336:                            + " values ("
337:                            // form the id based on the sequence
338:                            + " SAKAI_EVENT_SEQ.NEXTVAL,"
339:                            // date
340:                            + " ?,"
341:                            // event
342:                            + " ?,"
343:                            // reference
344:                            + " ?,"
345:                            // session id
346:                            + " ?,"
347:                            // code
348:                            + " ?"
349:
350:                            + " )";
351:                } else if ("mysql".equals(sqlService().getVendor())) {
352:                    // leave out the EVENT_ID as it will be automatically generated on the server
353:                    statement = "insert into SAKAI_EVENT"
354:                            + " (EVENT_DATE,EVENT,REF,SESSION_ID,EVENT_CODE)"
355:                            + " values ("
356:                            // date
357:                            + " ?,"
358:                            // event
359:                            + " ?,"
360:                            // reference
361:                            + " ?,"
362:                            // session id
363:                            + " ?,"
364:                            // code
365:                            + " ?"
366:
367:                            + " )";
368:                } else
369:                // if ("hsqldb".equals(sqlService().getVendor()))
370:                {
371:                    statement = "insert into SAKAI_EVENT"
372:                            + " (EVENT_ID,EVENT_DATE,EVENT,REF,SESSION_ID,EVENT_CODE)"
373:                            + " values ("
374:                            // form the id based on the sequence
375:                            + " NEXT VALUE FOR SAKAI_EVENT_SEQ,"
376:                            // date
377:                            + " ?,"
378:                            // event
379:                            + " ?,"
380:                            // reference
381:                            + " ?,"
382:                            // session id
383:                            + " ?,"
384:                            // code
385:                            + " ?"
386:
387:                            + " )";
388:                }
389:
390:                return statement;
391:            }
392:
393:            /**
394:             * Bind the event values into an array of fields for inserting.
395:             * 
396:             * @param event
397:             *        The event to write.
398:             * @param fields
399:             *        The object[] to hold bind variables.
400:             */
401:            protected void bindValues(Event event, Object[] fields) {
402:                // session or user?
403:                String reportId = null;
404:                if (event.getSessionId() != null) {
405:                    reportId = event.getSessionId();
406:                } else {
407:                    // form an id based on the cluster server's id and the event user id
408:                    reportId = "~" + serverConfigurationService().getServerId()
409:                            + "~" + event.getUserId();
410:                }
411:
412:                fields[0] = ((BaseEvent) event).m_time;
413:                fields[1] = event.getEvent();
414:                fields[2] = event.getResource();
415:                fields[3] = reportId;
416:                fields[4] = (event.getModify() ? "m" : "a");
417:            }
418:
419:            /**********************************************************************************************************************************************************************************************************************************************************
420:             * Runnable
421:             *********************************************************************************************************************************************************************************************************************************************************/
422:
423:            /**
424:             * Start the clean and report thread.
425:             */
426:            protected void start() {
427:                m_threadStop = false;
428:
429:                m_thread = new Thread(this , getClass().getName());
430:                m_thread.start();
431:            }
432:
433:            /**
434:             * Stop the clean and report thread.
435:             */
436:            protected void stop() {
437:                if (m_thread == null)
438:                    return;
439:
440:                // signal the thread to stop
441:                m_threadStop = true;
442:
443:                // wake up the thread
444:                m_thread.interrupt();
445:
446:                m_thread = null;
447:            }
448:
449:            /**
450:             * Run the event checking thread.
451:             */
452:            public void run() {
453:                // since we might be running while the component manager is still being created and populated, such as at server startup, wait here for a complete component manager
454:                ComponentManager.waitTillConfigured();
455:
456:                // find the latest event in the db
457:                initLastEvent();
458:
459:                // loop till told to stop
460:                while ((!m_threadStop)
461:                        && (!Thread.currentThread().isInterrupted())) {
462:                    final String serverInstance = serverConfigurationService()
463:                            .getServerIdInstance();
464:                    final String serverId = serverConfigurationService()
465:                            .getServerId();
466:
467:                    try {
468:                        // write any batched events
469:                        Collection myEvents = new Vector();
470:                        if (m_batchWrite) {
471:                            synchronized (m_eventQueue) {
472:                                if (m_eventQueue.size() > 0) {
473:                                    myEvents.addAll(m_eventQueue);
474:                                    m_eventQueue.clear();
475:                                }
476:                            }
477:
478:                            if (myEvents.size() > 0) {
479:                                if (M_log.isDebugEnabled())
480:                                    M_log.debug("writing " + myEvents.size()
481:                                            + " batched events");
482:                                writeBatchEvents(myEvents);
483:                            }
484:                        }
485:
486:                        if (M_log.isDebugEnabled())
487:                            M_log.debug("checking for events > "
488:                                    + m_lastEventSeq);
489:                        // check the db for new events
490:                        // Note: the events may not all have sessions, so to get them we need an outer join.
491:                        // TODO: switch to a "view" read once that's established, for now, a join -ggolden
492:                        String statement = null;
493:                        if ("oracle".equals(sqlService().getVendor())) {
494:                            // this now has Oracle specific hint to improve performance with large tables -ggolden
495:                            statement = "select /*+ FIRST_ROWS */ EVENT_ID,EVENT_DATE,EVENT,REF,SAKAI_EVENT.SESSION_ID,EVENT_CODE,SESSION_SERVER"
496:                                    + " from SAKAI_EVENT,SAKAI_SESSION"
497:                                    + " where (SAKAI_EVENT.SESSION_ID = SAKAI_SESSION.SESSION_ID(+)) and (EVENT_ID > ?)";
498:                        } else
499:                        // non-Oracle, without Oracle hint
500:                        {
501:                            statement = "select EVENT_ID,EVENT_DATE,EVENT,REF,SAKAI_EVENT.SESSION_ID,EVENT_CODE,SESSION_SERVER"
502:                                    + " from SAKAI_EVENT,SAKAI_SESSION"
503:                                    + " where (SAKAI_EVENT.SESSION_ID = SAKAI_SESSION.SESSION_ID) and (EVENT_ID > ?)";
504:                        }
505:
506:                        // we might want a left join, which would get us records from non-sessions, which the above mysql code does NOT give -ggolden
507:                        //				select e.EVENT_ID,e.EVENT_DATE,e.EVENT,e.REF,e.SESSION_ID,e.EVENT_CODE,s.SESSION_SERVER
508:                        //				from SAKAI_EVENT e
509:                        //				left join SAKAI_SESSION s on (e.SESSION_ID = s.SESSION_ID)
510:                        //				where EVENT_ID > 0
511:
512:                        // send in the last seq number parameter
513:                        Object[] fields = new Object[1];
514:                        fields[0] = new Long(m_lastEventSeq);
515:
516:                        List events = sqlService().dbRead(statement, fields,
517:                                new SqlReader() {
518:                                    public Object readSqlResultRecord(
519:                                            ResultSet result) {
520:                                        try {
521:                                            // read the Event
522:                                            long id = result.getLong(1);
523:                                            Time date = timeService().newTime(
524:                                                    result.getTimestamp(
525:                                                            2,
526:                                                            sqlService()
527:                                                                    .getCal())
528:                                                            .getTime());
529:                                            String function = result
530:                                                    .getString(3);
531:                                            String ref = result.getString(4);
532:                                            String session = result
533:                                                    .getString(5);
534:                                            String code = result.getString(6);
535:                                            String eventSessionServerId = result
536:                                                    .getString(7);
537:
538:                                            // for each one (really, for the last one), update the last event seen seq number
539:                                            if (id > m_lastEventSeq) {
540:                                                m_lastEventSeq = id;
541:                                            }
542:
543:                                            boolean nonSessionEvent = session
544:                                                    .startsWith("~");
545:                                            String userId = null;
546:                                            boolean skipIt = false;
547:
548:                                            if (nonSessionEvent) {
549:                                                String[] parts = StringUtil
550:                                                        .split(session, "~");
551:                                                userId = parts[2];
552:
553:                                                // we skip this event if it came from our server
554:                                                skipIt = serverId
555:                                                        .equals(parts[1]);
556:                                            }
557:
558:                                            // for session events, if the event is from this server instance,
559:                                            // we have already processed it and can skip it here.
560:                                            else {
561:                                                skipIt = serverInstance
562:                                                        .equals(eventSessionServerId);
563:                                            }
564:
565:                                            if (skipIt) {
566:                                                return null;
567:                                            }
568:
569:                                            // Note: events from outside the server don't need notification info, since notification is processed only on internal events -ggolden
570:                                            BaseEvent event = new BaseEvent(
571:                                                    id,
572:                                                    function,
573:                                                    ref,
574:                                                    code.equals("m"),
575:                                                    NotificationService.NOTI_NONE);
576:                                            if (nonSessionEvent) {
577:                                                event.setUserId(userId);
578:                                            } else {
579:                                                event.setSessionId(session);
580:                                            }
581:
582:                                            return event;
583:                                        } catch (SQLException ignore) {
584:                                            return null;
585:                                        }
586:                                    }
587:                                });
588:
589:                        // for each new event found, notify observers
590:                        for (int i = 0; i < events.size(); i++) {
591:                            Event event = (Event) events.get(i);
592:                            notifyObservers(event, false);
593:                        }
594:                    } catch (Throwable e) {
595:                        M_log.warn("run: will continue: ", e);
596:                    }
597:
598:                    // take a small nap
599:                    try {
600:                        Thread.sleep(m_period);
601:                    } catch (Exception ignore) {
602:                    }
603:                }
604:            }
605:
606:            /**
607:             * Check the db for the largest event seq number, and set this as the one after which we will next get event.
608:             */
609:            protected void initLastEvent() {
610:                String statement = "select MAX(EVENT_ID) from SAKAI_EVENT";
611:
612:                sqlService().dbRead(statement, null, new SqlReader() {
613:                    public Object readSqlResultRecord(ResultSet result) {
614:                        try {
615:                            // read the one long value into our last event seq number
616:                            m_lastEventSeq = result.getLong(1);
617:                        } catch (SQLException ignore) {
618:                        }
619:                        return null;
620:                    }
621:                });
622:
623:                if (M_log.isDebugEnabled())
624:                    M_log.debug(this  + " Starting (after) Event #: "
625:                            + m_lastEventSeq);
626:            }
627:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.