001: /*
002: * JBoss, a division of Red Hat
003: * Copyright 2006, Red Hat Middleware, LLC, and individual contributors as indicated
004: * by the @authors tag. See the copyright.txt in the distribution for a
005: * full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022:
023: package org.jboss.portal.core.identity;
024:
025: import edu.emory.mathcs.backport.java.util.Queue;
026: import edu.emory.mathcs.backport.java.util.concurrent.Callable;
027: import edu.emory.mathcs.backport.java.util.concurrent.Executor;
028: import edu.emory.mathcs.backport.java.util.concurrent.Executors;
029: import edu.emory.mathcs.backport.java.util.concurrent.FutureTask;
030: import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
031: import edu.emory.mathcs.backport.java.util.concurrent.ScheduledExecutorService;
032: import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
033: import org.jboss.logging.Logger;
034: import org.jboss.portal.jems.as.system.AbstractJBossService;
035:
036: import javax.management.Notification;
037: import javax.management.NotificationListener;
038: import javax.management.ObjectName;
039: import java.util.Collections;
040: import java.util.HashSet;
041: import java.util.Iterator;
042: import java.util.Set;
043:
044: /**
045: * @author <a href="mailto:boleslaw dot dawidowicz at redhat anotherdot com">Boleslaw Dawidowicz</a>
046: * @author <a href="mailto:jedim@vige.it">Luca Stancapiano</a>
047: * @version $Revision: 8786 $
048: */
049: public class UsersActivityStatsServiceImpl extends AbstractJBossService
050: implements UsersActivityStatsService, NotificationListener {
051: /** Our logger. */
052: private static final Logger log = Logger
053: .getLogger(UsersActivityStatsServiceImpl.class);
054:
055: //TODO: some value just to begin - find some good default
056: private int userTrackerThreadsNumber = 10;
057:
058: private int updaterThreadsNumber = 1;
059:
060: private int updaterInterval = 1000;
061:
062: private int activityQueueLimit = 1000;
063:
064: private long activityTimeout = 1800000;
065:
066: private Executor userTrackerExecutor;
067:
068: private ScheduledExecutorService updaterExecutor;
069:
070: private Queue activityQueue;
071:
072: private volatile Set activityResults = new HashSet();
073:
074: private String activityBroadcasterName;
075:
076: public UsersActivityStatsServiceImpl() {
077: }
078:
079: protected void startService() throws Exception {
080: super .startService();
081:
082: activityQueue = new LinkedBlockingQueue(getActivityQueueLimit());
083:
084: userTrackerExecutor = Executors
085: .newFixedThreadPool(getUserTrackerThreadsNumber());
086:
087: updaterExecutor = Executors
088: .newScheduledThreadPool(getUpdaterThreadsNumber());
089:
090: updaterExecutor.scheduleWithFixedDelay(new Updater(
091: activityQueue), getUpdaterInterval(),
092: getUpdaterInterval(), TimeUnit.MILLISECONDS);
093:
094: if (activityBroadcasterName != null) {
095: server.addNotificationListener(new ObjectName(
096: activityBroadcasterName), this , null, null);
097: } else {
098: addNotificationListener(this , null, null);
099: }
100:
101: }
102:
103: protected void stopService() throws Exception {
104: super .stopService();
105:
106: // /TODO: stop all the threads
107: }
108:
109: public Set getActiveUsersIds(long period) {
110: long currentTime = System.currentTimeMillis();
111:
112: Set results = new HashSet();
113: for (Iterator iterator = activityResults.iterator(); iterator
114: .hasNext();) {
115: UserActivity ua = (UserActivity) iterator.next();
116: if (currentTime - ua.getTimestamp() < period) {
117: results.add(ua.getId());
118: }
119: }
120: return results;
121: }
122:
123: public Set getActiveUsersNames(long period) {
124: long currentTime = System.currentTimeMillis();
125: Set results = new HashSet();
126: for (Iterator iterator = activityResults.iterator(); iterator
127: .hasNext();) {
128: UserActivity ua = (UserActivity) iterator.next();
129: if (currentTime - ua.getTimestamp() < period) {
130: results.add(ua.getId());
131: }
132: }
133: return results;
134: }
135:
136: public Set getUsersActivities(long period) {
137: long currentTime = System.currentTimeMillis();
138: Set results = new HashSet();
139: for (Iterator iterator = activityResults.iterator(); iterator
140: .hasNext();) {
141: UserActivity ua = (UserActivity) iterator.next();
142: if (currentTime - ua.getTimestamp() < period) {
143: results.add(ua);
144: }
145: }
146: return results;
147: }
148:
149: public void registerActivity(final UserActivity userActivity) {
150: try {
151: Notification notification = new Notification(Integer
152: .toString(userActivity.getType()), this
153: .getServiceName(), userActivity.getTimestamp(),
154: userActivity.getTimestamp(), userActivity.getId());
155:
156: if (activityBroadcasterName != null) {
157: log.debug("Broadcasting user activity notification ");
158:
159: server.invoke(new ObjectName(activityBroadcasterName),
160: "sendNotification",
161: new Object[] { notification },
162: new String[] { Notification.class.getName() });
163: } else {
164: log.debug("Sending local user activity notification ");
165: sendNotification(notification);
166: }
167:
168: } catch (Exception e) {
169: log.error("Failed to send user activity notification: ", e);
170: }
171:
172: }
173:
174: public void handleNotification(Notification notification,
175: Object object) {
176: log.debug("Handling user activity notification ");
177: final UserActivity ac = new UserActivity(notification);
178:
179: FutureTask task = new FutureTask(new Callable() {
180: public Object call() throws Exception {
181:
182: boolean success = activityQueue.offer(ac);
183: if (log.isTraceEnabled()) {
184: if (!success) {
185: log
186: .trace("Failed track user activity - activityQueue is full ");
187: }
188: }
189: return null;
190: }
191: });
192:
193: userTrackerExecutor.execute(task);
194: }
195:
196: public int getUserTrackerThreadsNumber() {
197: return userTrackerThreadsNumber;
198: }
199:
200: public void setUserTrackerThreadsNumber(int userTrackerThreadsNumber) {
201: this .userTrackerThreadsNumber = userTrackerThreadsNumber;
202: }
203:
204: public int getUpdaterThreadsNumber() {
205: return updaterThreadsNumber;
206: }
207:
208: public void setUpdaterThreadsNumber(int updaterThreadsNumber) {
209: this .updaterThreadsNumber = updaterThreadsNumber;
210: }
211:
212: public int getUpdaterInterval() {
213: return updaterInterval;
214: }
215:
216: public void setUpdaterInterval(int updaterInterval) {
217: this .updaterInterval = updaterInterval;
218: }
219:
220: public int getActivityQueueLimit() {
221: return activityQueueLimit;
222: }
223:
224: public void setActivityQueueLimit(int activityQueueLimit) {
225: this .activityQueueLimit = activityQueueLimit;
226: }
227:
228: public long getActivityTimeout() {
229: return activityTimeout;
230: }
231:
232: public void setActivityTimeout(long activityTimeout) {
233: this .activityTimeout = activityTimeout;
234: }
235:
236: public String getActivityBroadcasterName() {
237: return activityBroadcasterName;
238: }
239:
240: public void setActivityBroadcasterName(
241: String activityBroadcasterName) {
242: this .activityBroadcasterName = activityBroadcasterName;
243: }
244:
245: private class Updater implements Runnable {
246: private final Queue activityQueue;
247:
248: public Updater(Queue activityQueue) {
249: this .activityQueue = activityQueue;
250: }
251:
252: //never run
253: private Updater() {
254: this .activityQueue = null;
255: }
256:
257: public void run() {
258: long currentTime = System.currentTimeMillis();
259:
260: Set stillActive = getUsersActivities(activityTimeout);
261:
262: while (!activityQueue.isEmpty()) {
263: UserActivity activity = (UserActivity) activityQueue
264: .poll();
265: if (activity != null
266: && ((currentTime - activity.getTimestamp()) < activityTimeout)) {
267: if (activity.getType() != UserActivity.SIGN_OUT) {
268: stillActive.add(activity);
269: } else {
270: stillActive.remove(activity);
271: }
272: }
273: }
274:
275: activityResults = Collections.unmodifiableSet(stillActive);
276:
277: }
278: }
279:
280: }
|