001: /*
002: @COPYRIGHT@
003: */
004: package demo.sharedqueue;
005:
006: import java.net.InetAddress;
007: import java.net.UnknownHostException;
008: import java.util.Collections;
009: import java.util.LinkedList;
010: import java.util.List;
011: import java.util.ListIterator;
012:
013: public class Queue {
014: private List queue = Collections.synchronizedList(new LinkedList());
015:
016: private List workers = Collections
017: .synchronizedList(new LinkedList());
018:
019: private List completedJobs = Collections
020: .synchronizedList(new LinkedList());
021:
022: private int nextJobId;
023:
024: private int port;
025:
026: private static final int MAX_HISTORY_LENGTH = 15;
027:
028: private static final int MAX_QUEUE_LENGTH = 150;
029:
030: public Queue(int port) {
031: this .port = port;
032: this .nextJobId = 1;
033: }
034:
035: public final Job getJob() {
036: synchronized (queue) {
037: while (queue.size() == 0) {
038: try {
039: queue.wait();
040: } catch (InterruptedException e) {
041: throw new RuntimeException(e);
042: }
043: }
044: return (Job) queue.remove(0);
045: }
046: }
047:
048: public final String getXmlData() {
049: // the list of jobs in the queue
050: String data = "<workqueue>";
051: synchronized (queue) {
052: ListIterator i = queue.listIterator();
053: while (i.hasNext()) {
054: Job job = (Job) i.next();
055: data += job.toXml();
056: }
057: }
058: data += "</workqueue>";
059:
060: // the list of completed jobs
061: data += "<completed>";
062: synchronized (completedJobs) {
063: ListIterator i = completedJobs.listIterator();
064: while (i.hasNext()) {
065: Job job = (Job) i.next();
066: data += job.toXml();
067: }
068: }
069: data += "</completed>";
070:
071: // the list of registered job consumers
072: data += "<consumers>";
073: synchronized (workers) {
074: ListIterator i = workers.listIterator();
075: while (i.hasNext()) {
076: Worker worker = (Worker) i.next();
077: data += worker.toXml();
078: }
079: }
080: data += "</consumers>";
081: return data;
082: }
083:
084: public final Worker createWorker(String nodeId) {
085: synchronized (workers) {
086: Worker worker = new Worker(this , port, nodeId);
087: workers.add(worker);
088: Thread t = new Thread(worker);
089: t.setDaemon(true);
090: t.start();
091: return worker;
092: }
093: }
094:
095: public final Worker getWorker(String nodeId) {
096: synchronized (workers) {
097: ListIterator i = workers.listIterator();
098: while (i.hasNext()) {
099: Worker worker = (Worker) i.next();
100: if (worker.getNodeId().equals(nodeId)) {
101: return worker;
102: }
103: }
104: }
105: return null;
106: }
107:
108: public final void log(Job job) {
109: synchronized (completedJobs) {
110: completedJobs.add(0, job);
111: if (completedJobs.size() > MAX_HISTORY_LENGTH) {
112: completedJobs.remove(completedJobs.size() - 1);
113: }
114: }
115: }
116:
117: public final void reap() {
118: synchronized (workers) {
119: ListIterator i = workers.listIterator();
120: while (i.hasNext()) {
121: Worker worker = (Worker) i.next();
122: if (worker.expire()) {
123: i.remove();
124: }
125: }
126: }
127: }
128:
129: public final void addJob() {
130: synchronized (queue) {
131: if (queue.size() < MAX_QUEUE_LENGTH) {
132: Job job = new Job(
133: Queue.getHostName() + " " + this .port,
134: this .nextJobId);
135: this .nextJobId = this .nextJobId < 999 ? this .nextJobId + 1
136: : 1;
137: queue.add(job);
138: queue.notifyAll();
139: }
140: }
141: }
142:
143: public final void addJob(Job job) {
144: synchronized (queue) {
145: queue.add(job);
146: queue.notifyAll();
147: }
148: }
149:
150: public final static String getHostName() {
151: try {
152: final InetAddress addr = InetAddress.getLocalHost();
153: return addr.getHostName();
154: } catch (UnknownHostException e) {
155: return "Unknown";
156: }
157: }
158: }
|