001: /**********************************************************************************
002: * $URL: https://source.sakaiproject.org/svn/event/tags/sakai_2-4-1/event-impl/impl/src/java/org/sakaiproject/event/impl/ClusterEventTracking.java $
003: * $Id: ClusterEventTracking.java 9900 2006-05-24 19:41:06Z ggolden@umich.edu $
004: ***********************************************************************************
005: *
006: * Copyright (c) 2003, 2004, 2005, 2006 The Sakai Foundation.
007: *
008: * Licensed under the Educational Community License, Version 1.0 (the "License");
009: * you may not use this file except in compliance with the License.
010: * You may obtain a copy of the License at
011: *
012: * http://www.opensource.org/licenses/ecl1.php
013: *
014: * Unless required by applicable law or agreed to in writing, software
015: * distributed under the License is distributed on an "AS IS" BASIS,
016: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017: * See the License for the specific language governing permissions and
018: * limitations under the License.
019: *
020: **********************************************************************************/package org.sakaiproject.event.impl;
021:
022: import java.sql.Connection;
023: import java.sql.ResultSet;
024: import java.sql.SQLException;
025: import java.util.Collection;
026: import java.util.Iterator;
027: import java.util.List;
028: import java.util.Vector;
029:
030: import org.apache.commons.logging.Log;
031: import org.apache.commons.logging.LogFactory;
032: import org.sakaiproject.component.api.ServerConfigurationService;
033: import org.sakaiproject.component.cover.ComponentManager;
034: import org.sakaiproject.db.api.SqlReader;
035: import org.sakaiproject.db.api.SqlService;
036: import org.sakaiproject.event.api.Event;
037: import org.sakaiproject.event.api.NotificationService;
038: import org.sakaiproject.time.api.Time;
039: import org.sakaiproject.time.api.TimeService;
040: import org.sakaiproject.util.StringUtil;
041:
042: /**
043: * <p>
044: * ClusterEventTracking is the implmentation for the EventTracking service for use in a clustered multi-app server configuration.<br />
045: * Events are backed in the cluster database, and this database is polled to read and process locally events posted by the other cluster members.
046: * </p>
047: */
048: public abstract class ClusterEventTracking extends
049: BaseEventTrackingService implements Runnable {
050: /** Our logger. */
051: private static Log M_log = LogFactory
052: .getLog(ClusterEventTracking.class);
053:
054: /** String used to identify this service in the logs */
055: protected static String m_logId = "EventTracking: ";
056:
057: /** The db event checker thread. */
058: protected Thread m_thread = null;
059:
060: /** The thread quit flag. */
061: protected boolean m_threadStop = false;
062:
063: /** Last event code read from the db */
064: protected long m_lastEventSeq = 0;
065:
066: /** Queue of events to write if we are batching. */
067: protected Collection m_eventQueue = null;
068:
069: /**********************************************************************************************************************************************************************************************************************************************************
070: * Dependencies
071: *********************************************************************************************************************************************************************************************************************************************************/
072:
073: /**
074: * @return the MemoryService collaborator.
075: */
076: protected abstract SqlService sqlService();
077:
078: /**
079: * @return the ServerConfigurationService collaborator.
080: */
081: protected abstract ServerConfigurationService serverConfigurationService();
082:
083: /**
084: * @return the TimeService collaborator.
085: */
086: protected abstract TimeService timeService();
087:
088: /**********************************************************************************************************************************************************************************************************************************************************
089: * Configuration
090: *********************************************************************************************************************************************************************************************************************************************************/
091:
092: /** Unless false, check the db for events from the other cluster servers. */
093: protected boolean m_checkDb = true;
094:
095: /**
096: * Configuration: set the check-db.
097: *
098: * @param value
099: * The check-db value.
100: */
101: public void setCheckDb(String value) {
102: try {
103: m_checkDb = new Boolean(value).booleanValue();
104: } catch (Exception any) {
105: }
106: }
107:
108: /** If true, batch events for bulk write. */
109: protected boolean m_batchWrite = true;
110:
111: /**
112: * Configuration: set the batch writing flag.
113: *
114: * @param value
115: * The batch writing value.
116: */
117: public void setBatchWrite(String value) {
118: try {
119: m_batchWrite = new Boolean(value).booleanValue();
120: } catch (Exception any) {
121: }
122: }
123:
124: /** Configuration: to run the ddl on init or not. */
125: protected boolean m_autoDdl = false;
126:
127: /**
128: * Configuration: to run the ddl on init or not.
129: *
130: * @param value
131: * the auto ddl value.
132: */
133: public void setAutoDdl(String value) {
134: m_autoDdl = new Boolean(value).booleanValue();
135: }
136:
137: /** How long to wait between checks for new events from the db. */
138: protected long m_period = 1000L * 5L;
139:
140: /**
141: * Set the # seconds to wait between db checks for new events.
142: *
143: * @param time
144: * The # seconds to wait between db checks for new events.
145: */
146: public void setPeriod(String time) {
147: m_period = Integer.parseInt(time) * 1000L;
148: }
149:
150: /**********************************************************************************************************************************************************************************************************************************************************
151: * Init and Destroy
152: *********************************************************************************************************************************************************************************************************************************************************/
153:
154: /**
155: * Final initialization, once all dependencies are set.
156: */
157: public void init() {
158: try {
159: // if we are auto-creating our schema, check and create
160: if (m_autoDdl) {
161: sqlService().ddl(this .getClass().getClassLoader(),
162: "sakai_event");
163: }
164:
165: super .init();
166:
167: if (m_batchWrite) {
168: m_eventQueue = new Vector();
169: }
170:
171: // startup the event checking
172: if (m_checkDb) {
173: start();
174: }
175:
176: M_log.info(this + ".init() - period: " + m_period / 1000
177: + " batch: " + m_batchWrite + " checkDb: "
178: + m_checkDb);
179: } catch (Throwable t) {
180: M_log.warn(this + ".init(): ", t);
181: }
182: }
183:
184: /**
185: * Final cleanup.
186: */
187: public void destroy() {
188: // stop our thread
189: stop();
190:
191: super .destroy();
192: }
193:
194: /**********************************************************************************************************************************************************************************************************************************************************
195: * Event post / flow
196: *********************************************************************************************************************************************************************************************************************************************************/
197:
198: /**
199: * Cause this new event to get to wherever it has to go for persistence, etc.
200: *
201: * @param event
202: * The new event to post.
203: */
204: protected void postEvent(Event event) {
205: // mark the event time
206: ((BaseEvent) event).m_time = timeService().newTime();
207:
208: // notify locally generated events immediately -
209: // they will not be process again when read back from the database
210: try {
211: notifyObservers(event, true);
212: } catch (Throwable t) {
213: M_log.warn("postEvent, notifyObservers(), event: "
214: + event.toString(), t);
215: }
216:
217: // batch the event if we are batching
218: if (m_batchWrite) {
219: synchronized (m_eventQueue) {
220: m_eventQueue.add(event);
221: }
222: }
223:
224: // if not batching, write out the individual event
225: else {
226: writeEvent(event, null);
227: }
228:
229: if (M_log.isDebugEnabled())
230: M_log.debug(m_logId + event);
231: }
232:
233: /**
234: * Write a single event to the db
235: *
236: * @param event
237: * The event to write.
238: */
239: protected void writeEvent(Event event, Connection conn) {
240: // get the SQL statement
241: String statement = insertStatement();
242:
243: // collect the fields
244: Object fields[] = new Object[5];
245: bindValues(event, fields);
246:
247: // process the insert
248: boolean ok = sqlService().dbWrite(conn, statement, fields);
249: if (!ok) {
250: M_log.warn(this
251: + ".writeEvent(): dbWrite failed: session: "
252: + fields[3] + " event: " + event.toString());
253: }
254: }
255:
256: /**
257: * Write a batch of events to the db
258: *
259: * @param events
260: * The collection of event to write.
261: */
262: protected void writeBatchEvents(Collection events) {
263: // get a connection
264: Connection conn = null;
265: boolean wasCommit = true;
266: try {
267: conn = sqlService().borrowConnection();
268: wasCommit = conn.getAutoCommit();
269: if (wasCommit) {
270: conn.setAutoCommit(false);
271: }
272:
273: // Note: investigate batch writing via the jdbc driver: make sure we can still use prepared statements (check out host arrays, too) -ggolden
274:
275: // common preparation for each insert
276: String statement = insertStatement();
277: Object fields[] = new Object[5];
278:
279: // write all events
280: for (Iterator i = events.iterator(); i.hasNext();) {
281: Event event = (Event) i.next();
282: bindValues(event, fields);
283:
284: // process the insert
285: boolean ok = sqlService().dbWrite(conn, statement,
286: fields);
287: if (!ok) {
288: M_log
289: .warn(this
290: + ".writeBatchEvents(): dbWrite failed: session: "
291: + fields[3] + " event: "
292: + event.toString());
293: }
294: }
295:
296: // commit
297: conn.commit();
298: } catch (Throwable e) {
299: if (conn != null) {
300: try {
301: conn.rollback();
302: } catch (Exception ee) {
303: M_log.warn(this
304: + ".writeBatchEvents, while rolling back: "
305: + ee);
306: }
307: }
308: M_log.warn(this + ".writeBatchEvents: " + e);
309: } finally {
310: if (conn != null) {
311: try {
312: if (conn.getAutoCommit() != wasCommit) {
313: conn.setAutoCommit(wasCommit);
314: }
315: } catch (Exception e) {
316: M_log
317: .warn(this
318: + ".writeBatchEvents, while setting auto commit: "
319: + e);
320: }
321: sqlService().returnConnection(conn);
322: }
323: }
324: }
325:
326: /**
327: * Form the proper event insert statement for the database technology.
328: *
329: * @return The SQL insert statement for writing an event.
330: */
331: protected String insertStatement() {
332: String statement;
333: if ("oracle".equals(sqlService().getVendor())) {
334: statement = "insert into SAKAI_EVENT"
335: + " (EVENT_ID,EVENT_DATE,EVENT,REF,SESSION_ID,EVENT_CODE)"
336: + " values ("
337: // form the id based on the sequence
338: + " SAKAI_EVENT_SEQ.NEXTVAL,"
339: // date
340: + " ?,"
341: // event
342: + " ?,"
343: // reference
344: + " ?,"
345: // session id
346: + " ?,"
347: // code
348: + " ?"
349:
350: + " )";
351: } else if ("mysql".equals(sqlService().getVendor())) {
352: // leave out the EVENT_ID as it will be automatically generated on the server
353: statement = "insert into SAKAI_EVENT"
354: + " (EVENT_DATE,EVENT,REF,SESSION_ID,EVENT_CODE)"
355: + " values ("
356: // date
357: + " ?,"
358: // event
359: + " ?,"
360: // reference
361: + " ?,"
362: // session id
363: + " ?,"
364: // code
365: + " ?"
366:
367: + " )";
368: } else
369: // if ("hsqldb".equals(sqlService().getVendor()))
370: {
371: statement = "insert into SAKAI_EVENT"
372: + " (EVENT_ID,EVENT_DATE,EVENT,REF,SESSION_ID,EVENT_CODE)"
373: + " values ("
374: // form the id based on the sequence
375: + " NEXT VALUE FOR SAKAI_EVENT_SEQ,"
376: // date
377: + " ?,"
378: // event
379: + " ?,"
380: // reference
381: + " ?,"
382: // session id
383: + " ?,"
384: // code
385: + " ?"
386:
387: + " )";
388: }
389:
390: return statement;
391: }
392:
393: /**
394: * Bind the event values into an array of fields for inserting.
395: *
396: * @param event
397: * The event to write.
398: * @param fields
399: * The object[] to hold bind variables.
400: */
401: protected void bindValues(Event event, Object[] fields) {
402: // session or user?
403: String reportId = null;
404: if (event.getSessionId() != null) {
405: reportId = event.getSessionId();
406: } else {
407: // form an id based on the cluster server's id and the event user id
408: reportId = "~" + serverConfigurationService().getServerId()
409: + "~" + event.getUserId();
410: }
411:
412: fields[0] = ((BaseEvent) event).m_time;
413: fields[1] = event.getEvent();
414: fields[2] = event.getResource();
415: fields[3] = reportId;
416: fields[4] = (event.getModify() ? "m" : "a");
417: }
418:
419: /**********************************************************************************************************************************************************************************************************************************************************
420: * Runnable
421: *********************************************************************************************************************************************************************************************************************************************************/
422:
423: /**
424: * Start the clean and report thread.
425: */
426: protected void start() {
427: m_threadStop = false;
428:
429: m_thread = new Thread(this , getClass().getName());
430: m_thread.start();
431: }
432:
433: /**
434: * Stop the clean and report thread.
435: */
436: protected void stop() {
437: if (m_thread == null)
438: return;
439:
440: // signal the thread to stop
441: m_threadStop = true;
442:
443: // wake up the thread
444: m_thread.interrupt();
445:
446: m_thread = null;
447: }
448:
449: /**
450: * Run the event checking thread.
451: */
452: public void run() {
453: // since we might be running while the component manager is still being created and populated, such as at server startup, wait here for a complete component manager
454: ComponentManager.waitTillConfigured();
455:
456: // find the latest event in the db
457: initLastEvent();
458:
459: // loop till told to stop
460: while ((!m_threadStop)
461: && (!Thread.currentThread().isInterrupted())) {
462: final String serverInstance = serverConfigurationService()
463: .getServerIdInstance();
464: final String serverId = serverConfigurationService()
465: .getServerId();
466:
467: try {
468: // write any batched events
469: Collection myEvents = new Vector();
470: if (m_batchWrite) {
471: synchronized (m_eventQueue) {
472: if (m_eventQueue.size() > 0) {
473: myEvents.addAll(m_eventQueue);
474: m_eventQueue.clear();
475: }
476: }
477:
478: if (myEvents.size() > 0) {
479: if (M_log.isDebugEnabled())
480: M_log.debug("writing " + myEvents.size()
481: + " batched events");
482: writeBatchEvents(myEvents);
483: }
484: }
485:
486: if (M_log.isDebugEnabled())
487: M_log.debug("checking for events > "
488: + m_lastEventSeq);
489: // check the db for new events
490: // Note: the events may not all have sessions, so to get them we need an outer join.
491: // TODO: switch to a "view" read once that's established, for now, a join -ggolden
492: String statement = null;
493: if ("oracle".equals(sqlService().getVendor())) {
494: // this now has Oracle specific hint to improve performance with large tables -ggolden
495: statement = "select /*+ FIRST_ROWS */ EVENT_ID,EVENT_DATE,EVENT,REF,SAKAI_EVENT.SESSION_ID,EVENT_CODE,SESSION_SERVER"
496: + " from SAKAI_EVENT,SAKAI_SESSION"
497: + " where (SAKAI_EVENT.SESSION_ID = SAKAI_SESSION.SESSION_ID(+)) and (EVENT_ID > ?)";
498: } else
499: // non-Oracle, without Oracle hint
500: {
501: statement = "select EVENT_ID,EVENT_DATE,EVENT,REF,SAKAI_EVENT.SESSION_ID,EVENT_CODE,SESSION_SERVER"
502: + " from SAKAI_EVENT,SAKAI_SESSION"
503: + " where (SAKAI_EVENT.SESSION_ID = SAKAI_SESSION.SESSION_ID) and (EVENT_ID > ?)";
504: }
505:
506: // we might want a left join, which would get us records from non-sessions, which the above mysql code does NOT give -ggolden
507: // select e.EVENT_ID,e.EVENT_DATE,e.EVENT,e.REF,e.SESSION_ID,e.EVENT_CODE,s.SESSION_SERVER
508: // from SAKAI_EVENT e
509: // left join SAKAI_SESSION s on (e.SESSION_ID = s.SESSION_ID)
510: // where EVENT_ID > 0
511:
512: // send in the last seq number parameter
513: Object[] fields = new Object[1];
514: fields[0] = new Long(m_lastEventSeq);
515:
516: List events = sqlService().dbRead(statement, fields,
517: new SqlReader() {
518: public Object readSqlResultRecord(
519: ResultSet result) {
520: try {
521: // read the Event
522: long id = result.getLong(1);
523: Time date = timeService().newTime(
524: result.getTimestamp(
525: 2,
526: sqlService()
527: .getCal())
528: .getTime());
529: String function = result
530: .getString(3);
531: String ref = result.getString(4);
532: String session = result
533: .getString(5);
534: String code = result.getString(6);
535: String eventSessionServerId = result
536: .getString(7);
537:
538: // for each one (really, for the last one), update the last event seen seq number
539: if (id > m_lastEventSeq) {
540: m_lastEventSeq = id;
541: }
542:
543: boolean nonSessionEvent = session
544: .startsWith("~");
545: String userId = null;
546: boolean skipIt = false;
547:
548: if (nonSessionEvent) {
549: String[] parts = StringUtil
550: .split(session, "~");
551: userId = parts[2];
552:
553: // we skip this event if it came from our server
554: skipIt = serverId
555: .equals(parts[1]);
556: }
557:
558: // for session events, if the event is from this server instance,
559: // we have already processed it and can skip it here.
560: else {
561: skipIt = serverInstance
562: .equals(eventSessionServerId);
563: }
564:
565: if (skipIt) {
566: return null;
567: }
568:
569: // Note: events from outside the server don't need notification info, since notification is processed only on internal events -ggolden
570: BaseEvent event = new BaseEvent(
571: id,
572: function,
573: ref,
574: code.equals("m"),
575: NotificationService.NOTI_NONE);
576: if (nonSessionEvent) {
577: event.setUserId(userId);
578: } else {
579: event.setSessionId(session);
580: }
581:
582: return event;
583: } catch (SQLException ignore) {
584: return null;
585: }
586: }
587: });
588:
589: // for each new event found, notify observers
590: for (int i = 0; i < events.size(); i++) {
591: Event event = (Event) events.get(i);
592: notifyObservers(event, false);
593: }
594: } catch (Throwable e) {
595: M_log.warn("run: will continue: ", e);
596: }
597:
598: // take a small nap
599: try {
600: Thread.sleep(m_period);
601: } catch (Exception ignore) {
602: }
603: }
604: }
605:
606: /**
607: * Check the db for the largest event seq number, and set this as the one after which we will next get event.
608: */
609: protected void initLastEvent() {
610: String statement = "select MAX(EVENT_ID) from SAKAI_EVENT";
611:
612: sqlService().dbRead(statement, null, new SqlReader() {
613: public Object readSqlResultRecord(ResultSet result) {
614: try {
615: // read the one long value into our last event seq number
616: m_lastEventSeq = result.getLong(1);
617: } catch (SQLException ignore) {
618: }
619: return null;
620: }
621: });
622:
623: if (M_log.isDebugEnabled())
624: M_log.debug(this + " Starting (after) Event #: "
625: + m_lastEventSeq);
626: }
627: }
|