001: /**********************************************************************************
002: * $URL: https://source.sakaiproject.org/svn/cluster/tags/sakai_2-4-1/cluster-impl/impl/src/java/org/sakaiproject/cluster/impl/SakaiClusterService.java $
003: * $Id: SakaiClusterService.java 10385 2006-06-08 14:55:07Z ggolden@umich.edu $
004: ***********************************************************************************
005: *
006: * Copyright (c) 2004, 2005, 2006 The Sakai Foundation.
007: *
008: * Licensed under the Educational Community License, Version 1.0 (the "License");
009: * you may not use this file except in compliance with the License.
010: * You may obtain a copy of the License at
011: *
012: * http://www.opensource.org/licenses/ecl1.php
013: *
014: * Unless required by applicable law or agreed to in writing, software
015: * distributed under the License is distributed on an "AS IS" BASIS,
016: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017: * See the License for the specific language governing permissions and
018: * limitations under the License.
019: *
020: **********************************************************************************/package org.sakaiproject.cluster.impl;
021:
022: import java.util.Iterator;
023: import java.util.List;
024:
025: import org.apache.commons.logging.Log;
026: import org.apache.commons.logging.LogFactory;
027: import org.sakaiproject.cluster.api.ClusterService;
028: import org.sakaiproject.component.api.ServerConfigurationService;
029: import org.sakaiproject.component.cover.ComponentManager;
030: import org.sakaiproject.db.api.SqlService;
031: import org.sakaiproject.event.api.Event;
032: import org.sakaiproject.event.api.EventTrackingService;
033: import org.sakaiproject.event.api.UsageSession;
034: import org.sakaiproject.event.api.UsageSessionService;
035: import org.sakaiproject.presence.api.PresenceService;
036: import org.sakaiproject.thread_local.api.ThreadLocalManager;
037:
038: /**
039: * <p>
040: * SakaiClusterService is a Sakai cluster service implementation.
041: * </p>
042: */
043: public class SakaiClusterService implements ClusterService {
044: /** Our log (commons). */
045: private static Log M_log = LogFactory
046: .getLog(SakaiClusterService.class);
047:
048: /** The maintenance. */
049: protected Maintenance m_maintenance = null;
050:
051: /**********************************************************************************************************************************************************************************************************************************************************
052: * Dependencies and their setter methods
053: *********************************************************************************************************************************************************************************************************************************************************/
054:
055: /** Dependency: ServerConfigurationService. */
056: protected ServerConfigurationService m_serverConfigurationService = null;
057:
058: /**
059: * Dependency: ServerConfigurationService.
060: *
061: * @param service
062: * The ServerConfigurationService.
063: */
064: public void setServerConfigurationService(
065: ServerConfigurationService service) {
066: m_serverConfigurationService = service;
067: }
068:
069: /** Dependency: EventTrackingService. */
070: protected EventTrackingService m_eventTrackingService = null;
071:
072: /**
073: * Dependency: EventTrackingService.
074: *
075: * @param service
076: * The EventTrackingService.
077: */
078: public void setEventTrackingService(EventTrackingService service) {
079: m_eventTrackingService = service;
080: }
081:
082: /** Dependency: SqlService. */
083: protected SqlService m_sqlService = null;
084:
085: /**
086: * Dependency: SqlService.
087: *
088: * @param service
089: * The SqlService.
090: */
091: public void setSqlService(SqlService service) {
092: m_sqlService = service;
093: }
094:
095: /** Dependency: UsageSessionService. */
096: protected UsageSessionService m_usageSessionService = null;
097:
098: /**
099: * Dependency: UsageSessionService.
100: *
101: * @param service
102: * The UsageSessionService.
103: */
104: public void setUsageSessionService(UsageSessionService service) {
105: m_usageSessionService = service;
106: }
107:
108: /** Dependency: PresenceService. */
109: protected PresenceService m_presenceService = null;
110:
111: /**
112: * Dependency: PresenceService.
113: *
114: * @param service
115: * The PresenceService.
116: */
117: public void setPresenceService(PresenceService service) {
118: m_presenceService = service;
119: }
120:
121: /** Configuration: how often to register that we are alive with the cluster table (seconds). */
122: protected long m_refresh = 60;
123:
124: /**
125: * Configuration: set the refresh value
126: *
127: * @param value
128: * The refresh value.
129: */
130: public void setRefresh(String value) {
131: try {
132: m_refresh = Long.parseLong(value);
133: } catch (Exception ignore) {
134: }
135: }
136:
137: /** Configuration: how long we give an app server to respond before it is considered lost (seconds). */
138: protected long m_expired = 600;
139:
140: /**
141: * Configuration: set the expired value
142: *
143: * @param value
144: * The expired value.
145: */
146: public void setExpired(String value) {
147: try {
148: m_expired = Long.parseLong(value);
149: } catch (Exception ignore) {
150: }
151: }
152:
153: /** Configuration: to run the ddl on init or not. */
154: protected boolean m_autoDdl = false;
155:
156: /**
157: * Configuration: to run the ddl on init or not.
158: *
159: * @param value
160: * the auto ddl value.
161: */
162: public void setAutoDdl(String value) {
163: m_autoDdl = new Boolean(value).booleanValue();
164: }
165:
166: /** Dependency: the current manager. */
167: protected ThreadLocalManager m_threadLocalManager = null;
168:
169: /**
170: * Dependency - set the current manager.
171: *
172: * @param value
173: * The current manager.
174: */
175: public void setThreadLocalManager(ThreadLocalManager manager) {
176: m_threadLocalManager = manager;
177: }
178:
179: /** Configuration: percent of maintenance passes to run the full de-ghosting / cleanup activities. */
180: protected int m_ghostingPercent = 100;
181:
182: /**
183: * Configuration: set the percent of maintenance passes to run the full de-ghosting / cleanup activities
184: *
185: * @param value
186: * The percent of maintenance passes to run the full de-ghosting / cleanup activities.
187: */
188: public void setGhostingPercent(String value) {
189: try {
190: m_ghostingPercent = Integer.parseInt(value);
191: } catch (Exception ignore) {
192: }
193: }
194:
195: /**********************************************************************************************************************************************************************************************************************************************************
196: * Init and Destroy
197: *********************************************************************************************************************************************************************************************************************************************************/
198:
199: /**
200: * Final initialization, once all dependencies are set.
201: */
202: public void init() {
203: try {
204: // if we are auto-creating our schema, check and create
205: if (m_autoDdl) {
206: m_sqlService.ddl(this .getClass().getClassLoader(),
207: "sakai_cluster");
208: }
209:
210: // start the maintenance thread
211: m_maintenance = new Maintenance();
212: m_maintenance.start();
213:
214: M_log.info("init: refresh: " + m_refresh + " expired: "
215: + m_expired + " ghostingPercent: "
216: + m_ghostingPercent);
217: } catch (Throwable t) {
218: M_log.warn("init(): ", t);
219: }
220: }
221:
222: /**
223: * Returns to uninitialized state.
224: */
225: public void destroy() {
226: m_maintenance.stop();
227: m_maintenance = null;
228:
229: M_log.info("destroy()");
230: }
231:
232: /**********************************************************************************************************************************************************************************************************************************************************
233: * ClusterService implementation
234: *********************************************************************************************************************************************************************************************************************************************************/
235:
236: public List getServers() {
237: // get all expired open app servers not me
238: String statement = "select SERVER_ID from SAKAI_CLUSTER order by SERVER_ID asc";
239:
240: List servers = m_sqlService.dbRead(statement);
241:
242: return servers;
243: }
244:
245: /**********************************************************************************************************************************************************************************************************************************************************
246: * Maintenance
247: *********************************************************************************************************************************************************************************************************************************************************/
248:
249: protected class Maintenance implements Runnable {
250: /** My thread running my timeout checker. */
251: protected Thread m_maintenanceChecker = null;
252:
253: /** Signal to the timeout checker to stop. */
254: protected boolean m_maintenanceCheckerStop = false;
255:
256: /**
257: * Construct.
258: */
259: public Maintenance() {
260: }
261:
262: /**
263: * Start the maintenance thread, registering this app server in the cluster table.
264: */
265: public void start() {
266: if (m_maintenanceChecker != null)
267: return;
268:
269: // register in the cluster table
270: String statement = "insert into SAKAI_CLUSTER (SERVER_ID,UPDATE_TIME) values (?, "
271: + sqlTimestamp() + ")";
272: Object fields[] = new Object[1];
273: fields[0] = m_serverConfigurationService
274: .getServerIdInstance();
275: boolean ok = m_sqlService.dbWrite(statement, fields);
276: if (!ok) {
277: M_log.warn("start(): dbWrite failed");
278: }
279:
280: m_maintenanceChecker = new Thread(this ,
281: "SakaiClusterService.Maintenance");
282: m_maintenanceCheckerStop = false;
283: m_maintenanceChecker.start();
284: }
285:
286: /**
287: * Stop the maintenance thread, removing this app server's registration from the cluster table.
288: */
289: public void stop() {
290: if (m_maintenanceChecker != null) {
291: m_maintenanceCheckerStop = true;
292: m_maintenanceChecker.interrupt();
293: try {
294: // wait for it to die
295: m_maintenanceChecker.join();
296: } catch (InterruptedException ignore) {
297: }
298: m_maintenanceChecker = null;
299: }
300:
301: // close our entry from the database - delete the record
302: String statement = "delete from SAKAI_CLUSTER where SERVER_ID = ?";
303: Object fields[] = new Object[1];
304: fields[0] = m_serverConfigurationService
305: .getServerIdInstance();
306: boolean ok = m_sqlService.dbWrite(statement, fields);
307: if (!ok) {
308: M_log.warn("stop(): dbWrite failed: " + statement);
309: }
310: }
311:
312: /**
313: * Run the maintenance thread. Every REFRESH seconds, re-register this app server as alive in the cluster. Then check for any cluster entries that are more than EXPIRED seconds old, indicating a failed app server, and remove that record, that
314: * server's sessions, and presence, generating appropriate session and presence events so the other app servers know what's going on. The "then" checks need not be done each iteration - run them on 1 of n randomly choosen iterations. In a
315: * clustered environment, this also distributes the work over the cluster better.
316: */
317: public void run() {
318: // wait till things are rolling
319: ComponentManager.waitTillConfigured();
320:
321: if (M_log.isDebugEnabled())
322: M_log.debug("run()");
323:
324: while (!m_maintenanceCheckerStop) {
325: try {
326: final String serverIdInstance = m_serverConfigurationService
327: .getServerIdInstance();
328:
329: if (M_log.isDebugEnabled())
330: M_log.debug("checking...");
331:
332: // if we have been closed, reopen!
333: String statement = "select SERVER_ID from SAKAI_CLUSTER where SERVER_ID = ?";
334: Object[] fields = new Object[1];
335: fields[0] = serverIdInstance;
336: List results = m_sqlService.dbRead(statement,
337: fields, null);
338: if (results.isEmpty()) {
339: M_log
340: .warn("run(): server has been closed in cluster table, reopened: "
341: + serverIdInstance);
342:
343: statement = "insert into SAKAI_CLUSTER (SERVER_ID,UPDATE_TIME) values (?, "
344: + sqlTimestamp() + ")";
345: fields[0] = serverIdInstance;
346: boolean ok = m_sqlService.dbWrite(statement,
347: fields);
348: if (!ok) {
349: M_log.warn("start(): dbWrite failed");
350: }
351: }
352:
353: // update our alive and well status
354: else {
355: // register that this app server is alive and well
356: statement = "update SAKAI_CLUSTER set UPDATE_TIME = "
357: + sqlTimestamp()
358: + " where SERVER_ID = ?";
359: fields[0] = serverIdInstance;
360: boolean ok = m_sqlService.dbWrite(statement,
361: fields);
362: if (!ok) {
363: M_log.warn("run(): dbWrite failed: "
364: + statement);
365: }
366: }
367:
368: // pick a random number, 0..99, to see if we want to do the full ghosting / cleanup activities now
369: int rand = (int) (Math.random() * 100.0);
370: if (rand < m_ghostingPercent) {
371: // get all expired open app servers not me
372: if ("oracle".equals(m_sqlService.getVendor())) {
373: statement = "select SERVER_ID from SAKAI_CLUSTER where SERVER_ID != ? and UPDATE_TIME < (CURRENT_TIMESTAMP - "
374: + ((float) m_expired / (float) (60 * 60 * 24))
375: + " )";
376: } else if ("mysql".equals(m_sqlService
377: .getVendor())) {
378: statement = "select SERVER_ID from SAKAI_CLUSTER where SERVER_ID != ? and UPDATE_TIME < CURRENT_TIMESTAMP() - INTERVAL "
379: + m_expired + " SECOND";
380: } else
381: // if ("hsqldb".equals(m_sqlService.getVendor()))
382: {
383: statement = "select SERVER_ID from SAKAI_CLUSTER where SERVER_ID != ? and DATEDIFF('ss', UPDATE_TIME, CURRENT_TIMESTAMP) >= "
384: + m_expired;
385: }
386: // setup the fields to skip reading me!
387: fields[0] = serverIdInstance;
388:
389: List instances = m_sqlService.dbRead(statement,
390: fields, null);
391:
392: // close any severs found to be expired
393: for (Iterator iInstances = instances.iterator(); iInstances
394: .hasNext();) {
395: String serverId = (String) iInstances
396: .next();
397:
398: // close the server - delete the record
399: statement = "delete from SAKAI_CLUSTER where SERVER_ID = ?";
400: fields[0] = serverId;
401: boolean ok = m_sqlService.dbWrite(
402: statement, fields);
403: if (!ok) {
404: M_log.warn("run(): dbWrite failed: "
405: + statement);
406: }
407:
408: M_log.warn("run(): ghost-busting server: "
409: + serverId + " from : "
410: + serverIdInstance);
411: }
412:
413: // find all the session ids of sessions that are open but are from closed servers
414: statement = "select SS.SESSION_ID "
415: + "from SAKAI_SESSION SS "
416: + "left join SAKAI_CLUSTER SC on SS.SESSION_SERVER = SC.SERVER_ID "
417: + "where SS.SESSION_START = SS.SESSION_END "
418: + "and SC.SERVER_ID is null";
419:
420: List sessions = m_sqlService.dbRead(statement);
421:
422: // process each session to close it and lose it's presence
423: for (Iterator iSessions = sessions.iterator(); iSessions
424: .hasNext();) {
425: String sessionId = (String) iSessions
426: .next();
427:
428: // get all the presence for this session
429: statement = "select LOCATION_ID from SAKAI_PRESENCE where SESSION_ID = ?";
430: fields[0] = sessionId;
431: List presence = m_sqlService.dbRead(
432: statement, fields, null);
433:
434: // remove all the presence for this session
435: statement = "delete from SAKAI_PRESENCE where SESSION_ID = ?";
436: boolean ok = m_sqlService.dbWrite(
437: statement, fields);
438: if (!ok) {
439: M_log.warn("run(): dbWrite failed: "
440: + statement);
441: }
442:
443: // get the session
444: UsageSession session = m_usageSessionService
445: .getSession(sessionId);
446:
447: // send presence end events for these
448: for (Iterator iPresence = presence
449: .iterator(); iPresence.hasNext();) {
450: String locationId = (String) iPresence
451: .next();
452:
453: Event event = m_eventTrackingService
454: .newEvent(
455: PresenceService.EVENT_ABSENCE,
456: m_presenceService
457: .presenceReference(locationId),
458: true);
459: m_eventTrackingService.post(event,
460: session);
461: }
462:
463: // a session closed event (logout)
464: Event event = m_eventTrackingService
465: .newEvent(
466: UsageSessionService.EVENT_LOGOUT,
467: null, true);
468: m_eventTrackingService.post(event, session);
469:
470: // close this session on the db
471: statement = "update SAKAI_SESSION set SESSION_END = "
472: + sqlTimestamp()
473: + " where SESSION_ID = ?";
474: fields[0] = sessionId;
475: ok = m_sqlService
476: .dbWrite(statement, fields);
477: if (!ok) {
478: M_log.warn("run(): dbWrite failed: "
479: + statement);
480: }
481:
482: // remove any locks from the session
483: statement = "delete from SAKAI_LOCKS where USAGE_SESSION_ID = ?";
484: fields[0] = sessionId;
485: ok = m_sqlService
486: .dbWrite(statement, fields);
487: if (!ok) {
488: M_log.warn("run(): dbWrite failed: "
489: + statement);
490: }
491: }
492: }
493: } catch (Throwable e) {
494: M_log.warn("exception: ", e);
495: } finally {
496: // clear out any current access bindings
497: m_threadLocalManager.clear();
498: }
499:
500: // cycle every REFRESH seconds
501: if (!m_maintenanceCheckerStop) {
502: try {
503: Thread.sleep(m_refresh * 1000L);
504: } catch (Exception ignore) {
505: }
506: }
507: }
508:
509: if (M_log.isDebugEnabled())
510: M_log.debug("done");
511: }
512: }
513:
514: /** Return the vendor-specific SQL for the current timestamp */
515: private String sqlTimestamp() {
516: if ("mysql".equals(m_sqlService.getVendor())) {
517: return "CURRENT_TIMESTAMP()";
518: } else
519: // oracle, hsqldb
520: {
521: return "CURRENT_TIMESTAMP";
522: }
523: }
524: }
|