001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.ejb.plugins;
023:
024: import java.lang.reflect.Method;
025: import java.security.Principal;
026: import java.util.ArrayList;
027: import java.util.List;
028:
029: import javax.jms.DeliveryMode;
030: import javax.jms.JMSException;
031: import javax.jms.Message;
032: import javax.jms.Session;
033: import javax.jms.Topic;
034: import javax.jms.TopicConnection;
035: import javax.jms.TopicConnectionFactory;
036: import javax.jms.TopicPublisher;
037: import javax.jms.TopicSession;
038: import javax.naming.Context;
039: import javax.naming.InitialContext;
040: import javax.naming.NamingException;
041: import javax.transaction.Transaction;
042:
043: import org.jboss.ejb.Container;
044: import org.jboss.invocation.Invocation;
045: import org.jboss.monitor.MetricsConstants;
046:
047: /**
048: * MetricsInterceptor collects data from the bean invocation call and publishes
049: * them on a JMS topic (bound to <tt>topic/metrics</tt> in the name service).
050: *
051: * @author <a href="mailto:jplindfo@helsinki.fi">Juha Lindfors</a>
052: * @author <a href="mailto:dimitris@jboss.org">Dimitris Anreadis</a>
053: * @version $Revision: 57209 $
054: *
055: * @since v2.0
056: */
057: public class MetricsInterceptor extends AbstractInterceptor implements
058: MetricsConstants {
059: // Constants -----------------------------------------------------
060:
061: // Attributes ----------------------------------------------------
062:
063: /** Application name this bean belongs to */
064: private String applicationName = "<undefined>";
065:
066: /** Bean name in the container */
067: private String beanName = "<undefined>";
068:
069: /** Publisher thread */
070: private Thread publisher = null;
071:
072: /**
073: * Message queue for the outgoing JMS messages. This list is accessed
074: * by the interceptor when adding new messages, and by the publisher
075: * thread when copying and clearing the contents of the queue. The list
076: * must be locked for access and locks should be kept down to minimum
077: * as they degrade the interceptor stack performance.
078: */
079: private List msgQueue = new ArrayList(2000);
080:
081: // Public --------------------------------------------------------
082: /**
083: * Stores the container reference and the application and bean JNDI
084: * names.
085: *
086: * @param container set by the container initialization code
087: */
088: public void setContainer(Container container) {
089: super .setContainer(container);
090: if (container != null) {
091: applicationName = container.getEjbModule().getName();
092: beanName = container.getBeanMetaData().getJndiName();
093: }
094: }
095:
096: // Interceptor implementation ------------------------------------
097:
098: public Object invokeHome(Invocation mi) throws Exception {
099:
100: long begin = System.currentTimeMillis();
101:
102: try {
103: return super .invokeHome(mi);
104: } finally {
105: if (mi.getMethod() != null && publisher.isAlive()) {
106: addEntry(mi, begin, System.currentTimeMillis());
107: }
108: }
109: }
110:
111: public Object invoke(Invocation mi) throws Exception {
112:
113: long begin = System.currentTimeMillis();
114:
115: try {
116: return super .invoke(mi);
117: } finally {
118: if (mi.getMethod() != null && publisher.isAlive()) {
119: addEntry(mi, begin, System.currentTimeMillis());
120: }
121: }
122: }
123:
124: /**
125: * Starts the JMS publisher thread.
126: */
127: public void create() {
128: log
129: .warn("\n"
130: + "----------------------------------------------------------------------\n"
131: + "Deprecated MetricsInterceptor activated for bean: '"
132: + beanName
133: + "'\n"
134: + "Invocation metrics will be published in JMS Topic: 'topic/metrics'\n"
135: + "----------------------------------------------------------------------");
136:
137: // looks like create() is called after setContainer().
138: // wonder if container method callback order is documented somewhere, it should be..
139: publisher = new Thread(new Publisher());
140: publisher.setName("Metrics Publisher Thread for " + beanName);
141: publisher.setDaemon(true);
142: publisher.start();
143: }
144:
145: /**
146: * Kills the publisher thread.
147: */
148: public void destroy() {
149: publisher.interrupt();
150: }
151:
152: // Private --------------------------------------------------------
153:
154: /**
155: * Store the required information from this invocation: principal,
156: * transaction, method, time.
157: *
158: * @param begin invocation begin time in ms
159: * @param end invocation end time in ms
160: */
161: private final void addEntry(Invocation mi, long begin, long end) {
162:
163: /* this gets called by the interceptor */
164:
165: Transaction tx = mi.getTransaction();
166: Principal princ = mi.getPrincipal();
167: Method method = mi.getMethod();
168: Entry start = new Entry(princ, method, tx, begin, "START");
169: Entry stop = new Entry(princ, method, tx, end, "STOP");
170:
171: // add both entries, order is guaranteed, synchronized to prevent
172: // publisher from touching the queue while working on it
173: synchronized (msgQueue) {
174:
175: // Two entries for now, one should suffice but requires changes in
176: // the client.
177: msgQueue.add(start);
178: msgQueue.add(stop);
179: }
180: }
181:
182: private Message createMessage(Session session, String principal,
183: int txID, String method, String checkpoint, long time) {
184:
185: try {
186: Message msg = session.createMessage();
187:
188: msg.setJMSType(INVOCATION_METRICS);
189: msg.setStringProperty(CHECKPOINT, checkpoint);
190: msg.setStringProperty(BEAN, beanName);
191: msg.setObjectProperty(METHOD, method);
192: msg.setLongProperty(TIME, time);
193:
194: if (txID != -1)
195: msg.setStringProperty("ID", String.valueOf(txID));
196:
197: if (principal != null)
198: msg.setStringProperty("PRINCIPAL", principal);
199:
200: return msg;
201: } catch (Exception e) {
202: // catch JMSExceptions, tx.SystemExceptions, and NPE's
203: // don't want to bother the container even if the metrics fail.
204: return null;
205: }
206: }
207:
208: /**
209: * JMS Publisher thread implementation.
210: */
211: private class Publisher implements Runnable {
212:
213: /** Thread keep-alive field. */
214: private boolean running = true;
215: /** Thread sleep delay. */
216: private int delay = 2000;
217: /** JMS Connection */
218: private TopicConnection connection = null;
219:
220: /**
221: * Thread implementation. <p>
222: *
223: * When started, looks up a topic connection factory from the name
224: * service, and attempts to create a publisher to <tt>topic/metrics</tt>
225: * topic. <p>
226: *
227: * While alive, locks the <tt>msgQueue</tt> every two seconds to make a
228: * copy of the contents and then clear it. <p>
229: *
230: * Interrupting this thread will kill it.
231: *
232: * @see #msgQueue
233: * @see java.lang.Thread#interrupt()
234: */
235: public void run() {
236:
237: try {
238: final boolean IS_TRANSACTED = true;
239: final int ACKNOWLEDGE_MODE = Session.DUPS_OK_ACKNOWLEDGE;
240:
241: // lookup the connection factory and topic and create a JMS session
242: Context namingContext = new InitialContext();
243: TopicConnectionFactory fact = (TopicConnectionFactory) namingContext
244: .lookup("java:/ConnectionFactory");
245:
246: connection = fact.createTopicConnection();
247:
248: Topic topic = (Topic) namingContext
249: .lookup("topic/metrics");
250: TopicSession session = connection.createTopicSession(
251: IS_TRANSACTED, ACKNOWLEDGE_MODE);
252: TopicPublisher pub = session.createPublisher(topic);
253:
254: pub.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
255: pub.setPriority(Message.DEFAULT_PRIORITY);
256: pub.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
257:
258: // start the JMS connection
259: connection.start();
260:
261: // copy the message queue every x seconds, and publish the messages
262: while (running) {
263:
264: Object[] array;
265: long sleepTime = delay;
266:
267: try {
268: Thread.sleep(sleepTime);
269:
270: // measure message processing cost and try to deal
271: // with congestion
272: long begin = System.currentTimeMillis();
273:
274: // synchronized during the copy... the interceptor will
275: // have to wait til done
276: synchronized (msgQueue) {
277: array = msgQueue.toArray();
278: msgQueue.clear();
279: }
280:
281: // publish the messages
282: for (int i = 0; i < array.length; ++i) {
283: Message msg = createMessage(session,
284: ((Entry) array[i]).principal,
285: ((Entry) array[i]).id,
286: ((Entry) array[i]).method,
287: ((Entry) array[i]).checkpoint,
288: ((Entry) array[i]).time);
289:
290: pub.publish(msg);
291: }
292:
293: // try to deal with congestion a little better, alot of
294: // small messages fast will kill JBossMQ performance, this is
295: // a temp fix to group many messages into one operation
296: try {
297: session.commit();
298: } catch (Exception e) {
299: }
300:
301: // stop the clock and reduce the work time from our
302: // resting time
303: long end = System.currentTimeMillis();
304:
305: sleepTime = delay - (end - begin);
306: } catch (InterruptedException e) {
307: // kill this thread
308: running = false;
309: }
310: }
311: } catch (NamingException e) {
312: log.warn(Thread.currentThread().getName() + " exiting",
313: e);
314: } catch (JMSException e) {
315: log.warn(Thread.currentThread().getName() + " exiting",
316: e);
317: } finally {
318: // thread cleanup
319: synchronized (msgQueue) {
320: msgQueue.clear();
321: }
322:
323: try {
324: if (connection != null)
325: connection.close();
326: } catch (JMSException e) {
327: log.warn(e);
328: }
329: }
330: }
331: }
332:
333: /**
334: * Wrapper class for message queue entries.
335: *
336: * @see #msgQueue
337: */
338: private final class Entry {
339: int id = -1;
340: long time;
341: String principal = null;
342: String checkpoint;
343: String method;
344:
345: Entry(Principal principal, Method method, Transaction tx,
346: long time, String checkpoint) {
347: this.time = time;
348: this.checkpoint = checkpoint;
349: this.method = method.getName();
350:
351: if (tx != null)
352: this.id = tx.hashCode();
353: if (principal != null)
354: this.principal = principal.getName();
355: }
356: }
357: }
|