001: /*
002:
003: This software is OSI Certified Open Source Software.
004: OSI Certified is a certification mark of the Open Source Initiative.
005:
006: The license (Mozilla version 1.0) can be read at the MMBase site.
007: See http://www.MMBase.org/license
008:
009: */
010:
011: package org.mmbase.module.builders;
012:
013: import java.util.*;
014: import java.util.concurrent.*;
015:
016: import org.mmbase.module.core.*;
017: import org.mmbase.util.functions.*;
018: import org.mmbase.util.logging.*;
019: import org.mmbase.storage.search.implementation.*;
020: import org.mmbase.storage.search.*;
021:
022: /**
023: * @javadoc
024: * mmservers stands for MMBase servers. It is possible to run multiple mmbase servers on one database instance.
025: * Every mmserver node represent a real MMBase server(think of it as a machine where one instance of MMBase is running).
026: * On startup MMBase looks in the mmservers table and looks if he is listed in the list of mmservers,
027: * if not MMBase create a new node containing imfornation about itselve(name/host/os/jdk). the mmservers builder has extra behaviour,
028: * it can communicate with other servers(using multicast). The basic funtionality it provides however is sending information
029: * about changes of node to other mmservers (Listen !! I just have changed node 123). This mechanisme makes it possible to keep
030: * nodes caches in sync but also makes it possible to split tasks between machines. You could for example have a server that encodes video.
031: * when a change to a certain node is made one of the servers (if wel configured) can start encoding the videos.
032: * @author vpro
033: * @version $Id: MMServers.java,v 1.50 2007/06/21 15:50:23 nklasens Exp $
034: */
035: public class MMServers extends MMObjectBuilder implements
036: MMBaseObserver, Runnable,
037: org.mmbase.datatypes.resources.StateConstants {
038:
039: private static final Logger log = Logging
040: .getLoggerInstance(MMServers.class);
041: private int serviceTimeout = 60 * 15; // 15 minutes
042: private long intervalTime = 60 * 1000; // 1 minute
043:
044: private boolean checkedSystem = false;
045: private final String javastr;
046: private final String osstr;
047: private final List<String> possibleServices = new CopyOnWriteArrayList<String>();
048:
049: /**
050: * Function uptime
051: * @since MMBase-1.8
052: */
053: protected Function<Long> getUpTime = new AbstractFunction<Long>(
054: "uptime", Parameter.emptyArray(), ReturnType.LONG) {
055: {
056: setDescription("The function 'uptime' returns the uptime of the current server.");
057: }
058:
059: public Long getFunctionValue(Parameters parameters) {
060: int now = (int) (System.currentTimeMillis() / 1000);
061: return Long.valueOf(now - MMBase.startTime);
062: }
063: };
064: {
065: addFunction(getUpTime);
066: }
067:
068: /**
069: * @javadoc
070: */
071: public MMServers() {
072: javastr = System.getProperty("java.version") + "/"
073: + System.getProperty("java.vm.name");
074: osstr = System.getProperty("os.name") + "/"
075: + System.getProperty("os.version");
076: }
077:
078: public boolean init() {
079: if (oType != -1)
080: return true; // inited already
081: if (!super .init())
082: return false;
083: String tmp = getInitParameter("ProbeInterval");
084: if (tmp != null) {
085: intervalTime = (long) Integer.parseInt(tmp) * 1000;
086: log.service("ProbeInterval was configured to be "
087: + intervalTime / 1000 + " seconds");
088: } else {
089: log.service("ProbeInterval defaults to " + intervalTime
090: / 1000 + " seconds");
091: }
092: tmp = getInitParameter("ServiceTimeout");
093: if (tmp != null) {
094: serviceTimeout = Integer.parseInt(tmp);
095: log.service("ServiceTimeout was configured to be "
096: + serviceTimeout + " seconds");
097: } else {
098: log.service("ServiceTimeout defaults to " + serviceTimeout
099: + " seconds");
100: }
101: start();
102: return true;
103: }
104:
105: /**
106: * Starts the thread for the task scheduler
107: * @since MMBase-1.7
108: */
109: protected void start() {
110: MMBaseContext.startThread(this , "MMServers");
111: }
112:
113: /**
114: * @javadoc
115: */
116: public Object getValue(MMObjectNode node, String field) {
117: if (field.equals("showstate")) {
118: return getGUIIndicator("state", node);
119: } else if (field.equals("showatime")) {
120: return getGUIIndicator("atime", node);
121: } else if (field.equals("uptime")) {
122: // The 'node' object is not used, so this info makes only sense for _this_ server.
123: int now = (int) (System.currentTimeMillis() / 1000);
124: int uptime = now - MMBase.startTime;
125: return getUptimeString(uptime);
126: }
127: return super .getValue(node, field);
128: }
129:
130: /**
131: * @javadoc
132: */
133: private String getUptimeString(int uptime) {
134: StringBuffer result = new StringBuffer();
135: if (uptime >= (24 * 3600)) {
136: int d = uptime / (24 * 3600);
137: result.append(d).append(" d ");
138: uptime -= d * 24 * 3600;
139: }
140: if (uptime >= 3600) {
141: int h = uptime / 3600;
142: result.append(h).append(" h ");
143: uptime -= h * 3600;
144: }
145: if (uptime >= 60) {
146: int m = uptime / (60);
147: result.append(m).append(" m ");
148: uptime -= m * 60;
149: }
150: result.append(uptime).append(" s");
151: return result.toString();
152: }
153:
154: /**
155: * run, checkup probe runs every intervaltime to
156: * set the state of the server (used in clusters)
157: * @since MMBase-1.7
158: */
159: public void run() {
160: while (!mmb.isShutdown()) {
161: long this Time = intervalTime;
162: if (mmb != null && mmb.getState()) {
163: doCheckUp();
164: } else {
165: // shorter wait, the server is starting
166: this Time = 2 * 1000; // wait 2 second
167: }
168:
169: // wait the defined time
170: try {
171: Thread.sleep(this Time);
172: } catch (InterruptedException e) {
173: log.debug(Thread.currentThread().getName()
174: + " was interrupted.");
175: continue;
176: }
177: }
178: }
179:
180: /**
181: * @javadoc
182: */
183: private void doCheckUp() {
184: try {
185: boolean imoke = false;
186: String machineName = mmb.getMachineName();
187: String host = mmb.getHost();
188: log.debug("doCheckUp(): machine=" + machineName);
189: for (MMObjectNode node : getNodes()) {
190: String name = node.getStringValue("name");
191: String h = node.getStringValue("host");
192: log.debug("Checking " + name + "@" + h);
193: if (name.equals(machineName) && h.equals(host)) {
194: imoke = checkMySelf(node);
195: } else {
196: checkOther(node);
197: }
198: }
199: if (!imoke) {
200: log.info("No mmservers found for machineName "
201: + machineName + " host " + host
202: + " creating one now");
203: createMySelf(machineName, host);
204: }
205: } catch (Exception e) {
206: log.error(
207: "Something went wrong in MMServers Checkup Thread "
208: + e.getMessage(), e);
209: }
210: }
211:
212: /**
213: * Returns all the nodes from the builder.
214: * @return The nodes.
215: */
216: public List<MMObjectNode> getNodes() {
217: try {
218: List<MMObjectNode> nodes = storageConnector.getNodes(
219: new NodeSearchQuery(this ), false);
220: if (nodes != null) {
221: return nodes;
222: }
223: } catch (SearchQueryException e) {
224: log.error(e);
225: }
226: return new ArrayList<MMObjectNode>();
227: }
228:
229: /**
230: * @javadoc
231: */
232: private boolean checkMySelf(MMObjectNode node) {
233: boolean state = true;
234: log.debug("checkMySelf() updating timestamp");
235: node.setValue("state", ACTIVE);
236: node.setValue("atime",
237: (int) (System.currentTimeMillis() / 1000));
238: if (!checkedSystem) {
239: node.setValue("os", osstr);
240: node.setValue("host", mmb.getHost());
241: node.setValue("jdk", javastr);
242: checkedSystem = true;
243: }
244: node.commit();
245: log.debug("checkMySelf() updating timestamp done");
246: return state;
247: }
248:
249: /**
250: * @javadoc
251: */
252: private void checkOther(MMObjectNode node) {
253: int now = (int) (System.currentTimeMillis() / 1000);
254: int then = node.getIntValue("atime");
255: if (log.isDebugEnabled()) {
256: log.debug("" + now + ": Checking " + node.getValue("name")
257: + " (updated at " + then + ", " + (now - then)
258: + " s ago, interval: " + serviceTimeout + " s )");
259: }
260: if ((now - then) > (serviceTimeout)) {
261: if (node.getIntValue("state") != INACTIVE) {
262: log.debug("checkOther() updating state for "
263: + node.getStringValue("host"));
264: node.setValue("state", INACTIVE);
265: node.commit();
266:
267: // now also signal all its services are down !
268: setServicesDown(node);
269: }
270: }
271: }
272:
273: /**
274: * @javadoc
275: */
276: private void createMySelf(String machineName, String host) {
277:
278: MMObjectNode node = getNewNode("system");
279: node.setValue("name", machineName);
280: node.setValue("state", ACTIVE);
281: node.setValue("atime",
282: (int) (System.currentTimeMillis() / 1000));
283: node.setValue("os", osstr);
284: node.setValue("host", host);
285: node.setValue("jdk", javastr);
286: insert("system", node);
287: }
288:
289: /**
290: * @javadoc
291: */
292: private void setServicesDown(MMObjectNode node) {
293: log.debug("setServicesDown() for " + node);
294: for (String type : possibleServices) {
295: Enumeration<MMObjectNode> e = mmb.getInsRel().getRelated(
296: node.getIntValue("number"), type);
297: while (e.hasMoreElements()) {
298: MMObjectNode node2 = e.nextElement();
299: MMObjectBuilder parent = node2.getBuilder();
300: log.info("setServicesDown(): downnode(" + node2
301: + ") REMOVING node");
302: parent.removeRelations(node2);
303: parent.removeNode(node2);
304:
305: //node2.setValue("state","down");
306: //node2.commit();
307: }
308: }
309: if (log.isDebugEnabled()) {
310: log.debug("setServicesDown() for " + node + " done");
311: }
312: }
313:
314: /**
315: * @javadoc
316: */
317: public void setCheckService(String name) {
318: if (!possibleServices.contains(name)) {
319: possibleServices.add(name);
320: }
321: }
322:
323: /**
324: * @javadoc
325: */
326: public String getMMServerProperty(String mmserver, String key) {
327: String value = getInitParameter(mmserver + ":" + key);
328: return value;
329: }
330:
331: /**
332: * @javadoc
333: */
334: public MMObjectNode getMMServerNode(String name) {
335: return getMMServerNode(name, null);
336: }
337:
338: /**
339: * @since MMBase-1.8.3
340: */
341: public MMObjectNode getMMServerNode(String name, String host) {
342: NodeSearchQuery query = new NodeSearchQuery(this );
343: Constraint constraint = new BasicFieldValueConstraint(query
344: .getField(getField("name")), name);
345: if (host != null) {
346: BasicCompositeConstraint comp = new BasicCompositeConstraint(
347: CompositeConstraint.LOGICAL_AND);
348: comp.addChild(constraint);
349: BasicFieldValueConstraint constraint2 = new BasicFieldValueConstraint(
350: query.getField(getField("host")), host);
351: comp.addChild(constraint2);
352: constraint = comp;
353: }
354: query.setConstraint(constraint);
355: try {
356: List<MMObjectNode> nodeList = getNodes(query);
357: if (nodeList.size() > 0) {
358: return nodeList.get(0);
359: } else {
360: log.info("Can't find any mmserver node with name="
361: + name);
362: return null;
363: }
364: } catch (SearchQueryException sqe) {
365: log.warn(sqe);
366: return null;
367: }
368: }
369:
370: /**
371: * @return Returns the intervalTime.
372: */
373: public long getIntervalTime() {
374: return intervalTime;
375: }
376:
377: /**
378: * MMServer object are field by field equals.
379: */
380:
381: public boolean equals(MMObjectNode o1, MMObjectNode o2) {
382: return o1 == null ? o2 == null : o2 != null
383: && (o1.getNumber() == o2.getNumber()
384: && o1.getValue("name").equals(
385: o2.getValue("name")) && o1.getValue(
386: "host").equals(o2.getValue("host")));
387: }
388:
389: public String toString(MMObjectNode n) {
390: return "" + n.getValue("name") + "@" + n.getValue("host");
391: }
392:
393: protected NodeSearchQuery query = null;
394:
395: /**
396: * @return List of MMObjectNodes representing active servers, which are not this server.
397: */
398: public List<MMObjectNode> getActiveServers() {
399: String machineName = mmb.getMachineName();
400: String host = mmb.getHost();
401: if (log.isDebugEnabled()) {
402: log.debug("machine=" + machineName + " host=" + host);
403: }
404: if (query == null) {
405: query = new NodeSearchQuery(this );
406: BasicFieldValueConstraint constraint1a = new BasicFieldValueConstraint(
407: query.getField(getField("name")), machineName);
408: BasicFieldValueConstraint constraint1b = new BasicFieldValueConstraint(
409: query.getField(getField("host")), host);
410: BasicCompositeConstraint constraint1 = new BasicCompositeConstraint(
411: CompositeConstraint.LOGICAL_AND);
412: constraint1.addChild(constraint1a);
413: constraint1.addChild(constraint1b);
414: constraint1.setInverse(true);
415: BasicFieldValueConstraint constraint2 = new BasicFieldValueConstraint(
416: query.getField(getField("state")), ACTIVE);
417: BasicCompositeConstraint constraint = new BasicCompositeConstraint(
418: CompositeConstraint.LOGICAL_AND);
419: constraint.addChild(constraint1);
420: constraint.addChild(constraint2);
421: query.setConstraint(constraint);
422: StepField field = query.getField(getField(FIELD_NUMBER));
423: BasicSortOrder so = query.addSortOrder(field);
424: so.setDirection(SortOrder.ORDER_DESCENDING);
425: }
426:
427: try {
428: return storageConnector.getNodes(query, false);
429: } catch (org.mmbase.storage.search.SearchQueryException sqe) {
430: log.error(sqe);
431: return new ArrayList<MMObjectNode>();
432: }
433: }
434: }
|