001: /*
002: @COPYRIGHT@
003: */
004: package demo.sharedqueue;
005:
006: import java.io.File;
007: import java.net.InetAddress;
008: import java.net.UnknownHostException;
009:
010: import javax.management.MBeanServer;
011: import javax.management.MBeanServerFactory;
012: import javax.management.MBeanServerNotification;
013: import javax.management.Notification;
014: import javax.management.NotificationFilter;
015: import javax.management.NotificationListener;
016: import javax.management.ObjectName;
017:
018: import org.mortbay.jetty.Connector;
019: import org.mortbay.jetty.Handler;
020: import org.mortbay.jetty.Server;
021: import org.mortbay.jetty.bio.SocketConnector;
022: import org.mortbay.jetty.handler.ContextHandler;
023: import org.mortbay.jetty.handler.HandlerCollection;
024: import org.mortbay.jetty.handler.ResourceHandler;
025:
026: public class Main {
027: private final File cwd = new File(System.getProperty("user.dir"));
028:
029: private int lastPortUsed;
030:
031: private demo.sharedqueue.Queue queue;
032:
033: private Worker worker;
034:
035: public final void start(int port) throws Exception {
036: String nodeId = registerForNotifications();
037: port = setPort(port);
038:
039: System.out.println("DSO SharedQueue (node " + nodeId + ")");
040: System.out.println("Open your browser and go to - http://"
041: + getHostName() + ":" + port + "/webapp\n");
042:
043: Server server = new Server();
044: Connector connector = new SocketConnector();
045: connector.setPort(port);
046: server.setConnectors(new Connector[] { connector });
047:
048: queue = new Queue(port);
049: worker = queue.createWorker(nodeId);
050:
051: ResourceHandler resourceHandler = new ResourceHandler();
052: resourceHandler.setResourceBase(".");
053:
054: ContextHandler ajaxContext = new ContextHandler();
055: ajaxContext.setContextPath(SimpleHttpHandler.ACTION);
056: ajaxContext.setResourceBase(cwd.getPath());
057: ajaxContext.setClassLoader(Thread.currentThread()
058: .getContextClassLoader());
059: ajaxContext.addHandler(new SimpleHttpHandler(queue));
060:
061: HandlerCollection handlers = new HandlerCollection();
062: handlers.setHandlers(new Handler[] { ajaxContext,
063: resourceHandler });
064: server.setHandler(handlers);
065:
066: startReaper();
067: server.start();
068: server.join();
069: }
070:
071: private final int setPort(int port) {
072: if (port == -1) {
073: if (lastPortUsed == 0) {
074: port = lastPortUsed = 1990;
075: } else {
076: port = ++lastPortUsed;
077: }
078: } else {
079: lastPortUsed = port;
080: }
081: return port;
082: }
083:
084: /**
085: * Starts a thread to identify dead workers (From nodes that have been
086: * brought down) and removes them from the (shared) list of workers.
087: */
088: private final void startReaper() {
089: Thread reaper = new Thread(new Runnable() {
090: public void run() {
091: while (true) {
092: Main.this .queue.reap();
093: try {
094: Thread.sleep(1000);
095: } catch (InterruptedException ie) {
096: System.err.println(ie.getMessage());
097: }
098: }
099: }
100: });
101: reaper.start();
102: }
103:
104: public final static void main(String[] args) throws Exception {
105: int port = -1;
106: try {
107: port = Integer.parseInt(args[0]);
108: } catch (Exception e) {
109: }
110: (new Main()).start(port);
111: }
112:
113: static final String getHostName() {
114: try {
115: InetAddress addr = InetAddress.getLocalHost();
116: return addr.getHostName();
117: } catch (UnknownHostException e) {
118: return "Unknown";
119: }
120: }
121:
122: /**
123: * Registers this client for JMX notifications.
124: *
125: * @returns This clients Node ID
126: */
127: private final String registerForNotifications() throws Exception {
128: java.util.List servers = MBeanServerFactory
129: .findMBeanServer(null);
130: if (servers.size() == 0) {
131: System.err
132: .println("WARNING: No JMX servers found, unable to register for notifications.");
133: return "0";
134: }
135:
136: MBeanServer server = (MBeanServer) servers.get(0);
137: final ObjectName clusterBean = new ObjectName(
138: "org.terracotta:type=Terracotta Cluster,name=Terracotta Cluster Bean");
139: ObjectName delegateName = ObjectName
140: .getInstance("JMImplementation:type=MBeanServerDelegate");
141: final java.util.List clusterBeanBag = new java.util.ArrayList();
142:
143: // listener for newly registered MBeans
144: NotificationListener listener0 = new NotificationListener() {
145: public void handleNotification(Notification notification,
146: Object handback) {
147: synchronized (clusterBeanBag) {
148: clusterBeanBag.add(handback);
149: clusterBeanBag.notifyAll();
150: }
151: }
152: };
153:
154: // filter to let only clusterBean passed through
155: NotificationFilter filter0 = new NotificationFilter() {
156: public boolean isNotificationEnabled(
157: Notification notification) {
158: if (notification.getType().equals(
159: "JMX.mbean.registered")
160: && ((MBeanServerNotification) notification)
161: .getMBeanName().equals(clusterBean))
162: return true;
163: return false;
164: }
165: };
166:
167: // add our listener for clusterBean's registration
168: server.addNotificationListener(delegateName, listener0,
169: filter0, clusterBean);
170:
171: // because of race condition, clusterBean might already have registered
172: // before we registered the listener
173: java.util.Set allObjectNames = server.queryNames(null, null);
174:
175: if (!allObjectNames.contains(clusterBean)) {
176: synchronized (clusterBeanBag) {
177: while (clusterBeanBag.isEmpty()) {
178: clusterBeanBag.wait();
179: }
180: }
181: }
182:
183: // clusterBean is now registered, no need to listen for it
184: server.removeNotificationListener(delegateName, listener0);
185:
186: // listener for clustered bean events
187: NotificationListener listener1 = new NotificationListener() {
188: public void handleNotification(Notification notification,
189: Object handback) {
190: String nodeId = notification.getMessage();
191: Worker worker = Main.this .queue.getWorker(nodeId);
192: if (worker != null) {
193: worker.markForExpiration();
194: } else {
195: System.err.println("Worker for nodeId: " + nodeId
196: + " not found.");
197: }
198: }
199: };
200:
201: // filter for nodeDisconnected notifications only
202: NotificationFilter filter1 = new NotificationFilter() {
203: public boolean isNotificationEnabled(
204: Notification notification) {
205: return notification.getType().equals(
206: "com.tc.cluster.event.nodeDisconnected");
207: }
208: };
209:
210: // now that we have the clusterBean, add listener for membership events
211: server.addNotificationListener(clusterBean, listener1, filter1,
212: clusterBean);
213: return (server.getAttribute(clusterBean, "NodeId")).toString();
214: }
215: }
|