001: /*
002: @COPYRIGHT@
003: */
004: package demo.sharedqueue;
005:
006: import java.util.Collections;
007: import java.util.LinkedList;
008: import java.util.List;
009: import java.util.ListIterator;
010:
011: class Worker implements Runnable {
012:
013: private final String name;
014:
015: private final int port;
016:
017: private final Queue queue;
018:
019: private final List jobs;
020:
021: private final String nodeId;
022:
023: private final static int HEALTH_ALIVE = 0;
024:
025: private final static int HEALTH_DYING = 1;
026:
027: private final static int HEALTH_DEAD = 2;
028:
029: private final static int MAX_LOAD = 10;
030:
031: private int health = HEALTH_ALIVE;
032:
033: public Worker(Queue queue, int port, String nodeId) {
034: this .name = Queue.getHostName();
035: this .port = port;
036: this .queue = queue;
037: this .nodeId = nodeId;
038: jobs = Collections.synchronizedList(new LinkedList());
039: }
040:
041: public final String getNodeId() {
042: return this .nodeId;
043: }
044:
045: public final String getName() {
046: return "node: " + nodeId + " (" + name + ":" + port + ")";
047: }
048:
049: public final String toXml() {
050: synchronized (jobs) {
051: String data = "<worker><name>" + getName()
052: + "</name><jobs>";
053: ListIterator i = jobs.listIterator();
054: while (i.hasNext()) {
055: data += ((Job) i.next()).toXml();
056: }
057: data += "</jobs></worker>";
058: return data;
059: }
060: }
061:
062: /**
063: * Attempt to mark the Worker as dead (if it's already dying); Note that we
064: * synchronize this method since it's mutating a shared object (this class)
065: *
066: * @return True if the Worker is dead.
067: */
068: public final synchronized boolean expire() {
069: if (HEALTH_DYING == health) {
070: // a dying Worker wont die until it has
071: // consumed all of it's jobs
072: if (jobs.size() > 0) {
073: queue.addJob((Job) jobs.remove(0));
074: } else {
075: setHealth(HEALTH_DEAD);
076: }
077: }
078: return (HEALTH_DEAD == health);
079: }
080:
081: /**
082: * Set the state of the Worker's health; Note that we synchronize this
083: * method since it's mutating a shared object (this class)
084: *
085: * @param health
086: */
087: private final synchronized void setHealth(int health) {
088: this .health = health;
089: }
090:
091: /**
092: * Set the state of the Worker's health to dying; Note that we synchronize
093: * this method since it's mutating a shared object (this class)
094: *
095: * @param health
096: */
097: public final synchronized void markForExpiration() {
098: setHealth(HEALTH_DYING);
099: }
100:
101: public final void run() {
102: while (HEALTH_DEAD != health) {
103: if ((HEALTH_ALIVE == health) && (jobs.size() < MAX_LOAD)) {
104: final Job job = queue.getJob();
105:
106: try {
107: Thread.sleep(500);
108: } catch (InterruptedException ie) {
109: System.err.println(ie.getMessage());
110: }
111:
112: synchronized (jobs) {
113: jobs.add(job);
114: }
115:
116: Thread processor = new Thread(new Runnable() {
117: public void run() {
118: job.run(Worker.this);
119: synchronized (jobs) {
120: jobs.remove(job);
121: }
122: queue.log(job);
123: }
124: });
125: processor.start();
126: }
127: }
128: }
129: }
|