001: // $Id: DistributedQueue.java,v 1.19 2006/07/31 09:21:58 belaban Exp $
002: package org.jgroups.blocks;
003:
004: import org.apache.commons.logging.Log;
005: import org.apache.commons.logging.LogFactory;
006: import org.jgroups.*;
007: import org.jgroups.util.RspList;
008: import org.jgroups.util.Util;
009:
010: import java.io.Serializable;
011: import java.util.*;
012:
013: /**
014: * Provides the abstraction of a java.util.LinkedList that is replicated at several
015: * locations. Any change to the list (reset, add, remove, etc.) will transparently be
016: * propagated to all replicas in the group. All read-only methods will always access the
017: * local replica.<p>
018: * Both keys and values added to the list <em>must be serializable</em>, the reason
019: * being that they will be sent across the network to all replicas of the group.
020: * An instance of this class will contact an existing member of the group to fetch its
021: * initial state.
022: * Beware to use a <em>total protocol</em> on initialization or elements would not be in same
023: * order on all replicas.
024: * @author Romuald du Song
025: */
026: public class DistributedQueue implements MessageListener,
027: MembershipListener, Cloneable {
028: public interface Notification {
029: void entryAdd(Object value);
030:
031: void entryRemoved(Object key);
032:
033: void viewChange(Vector new_mbrs, Vector old_mbrs);
034:
035: void contentsCleared();
036:
037: void contentsSet(Collection new_entries);
038: }
039:
040: protected Log logger = LogFactory.getLog(getClass());
041: private long internal_timeout = 10000; // 10 seconds to wait for a response
042:
043: /*lock object for synchronization*/
044: protected final Object mutex = new Object();
045: protected boolean stopped = false; // whether to we are stopped !
046: protected LinkedList internalQueue;
047: protected Channel channel;
048: protected RpcDispatcher disp = null;
049: protected String groupname = null;
050: protected Vector notifs = new Vector(); // to be notified when mbrship changes
051: protected Vector members = new Vector(); // keeps track of all DHTs
052: private Class[] add_signature = null;
053: private Class[] addAtHead_signature = null;
054: private Class[] addAll_signature = null;
055: private Class[] reset_signature = null;
056: private Class[] remove_signature = null;
057:
058: /**
059: * Creates a DistributedQueue
060: * @param groupname The name of the group to join
061: * @param factory The ChannelFactory which will be used to create a channel
062: * @param properties The property string to be used to define the channel
063: * @param state_timeout The time to wait until state is retrieved in milliseconds. A value of 0 means wait forever.
064: */
065: public DistributedQueue(String groupname, ChannelFactory factory,
066: String properties, long state_timeout)
067: throws ChannelException {
068: if (logger.isDebugEnabled()) {
069: logger.debug("DistributedQueue(" + groupname + ','
070: + properties + ',' + state_timeout);
071: }
072:
073: this .groupname = groupname;
074: initSignatures();
075: internalQueue = new LinkedList();
076: channel = (factory != null) ? factory.createChannel(properties)
077: : new JChannel(properties);
078: disp = new RpcDispatcher(channel, this , this , this );
079: disp.setDeadlockDetection(false); // To ensure strict FIFO MethodCall
080: channel.connect(groupname);
081: start(state_timeout);
082: }
083:
084: public DistributedQueue(JChannel channel) {
085: this .groupname = channel.getClusterName();
086: this .channel = channel;
087: init();
088: }
089:
090: /**
091: * Uses a user-provided PullPushAdapter to create the dispatcher rather than a Channel. If id is non-null, it will be
092: * used to register under that id. This is typically used when another building block is already using
093: * PullPushAdapter, and we want to add this building block in addition. The id is the used to discriminate
094: * between messages for the various blocks on top of PullPushAdapter. If null, we will assume we are the
095: * first block created on PullPushAdapter.
096: * The caller needs to call start(), before using the this block. It gives the opportunity for the caller
097: * to register as a lessoner for Notifications events.
098: * @param adapter The PullPushAdapter which to use as underlying transport
099: * @param id A serializable object (e.g. an Integer) used to discriminate (multiplex/demultiplex) between
100: * requests/responses for different building blocks on top of PullPushAdapter.
101: */
102: public DistributedQueue(PullPushAdapter adapter, Serializable id) {
103: this .channel = (Channel) adapter.getTransport();
104: this .groupname = this .channel.getClusterName();
105:
106: initSignatures();
107: internalQueue = new LinkedList();
108:
109: disp = new RpcDispatcher(adapter, id, this , this , this );
110: disp.setDeadlockDetection(false); // To ensure strict FIFO MethodCall
111: }
112:
113: protected final void init() {
114: initSignatures();
115: internalQueue = new LinkedList();
116: disp = new RpcDispatcher(channel, this , this , this );
117: disp.setDeadlockDetection(false); // To ensure strict FIFO MethodCall
118: }
119:
120: public final void start(long state_timeout)
121: throws ChannelClosedException, ChannelNotConnectedException {
122: boolean rc;
123: logger.debug("DistributedQueue.initState(" + groupname
124: + "): starting state retrieval");
125:
126: rc = channel.getState(null, state_timeout);
127:
128: if (rc) {
129: logger.info("DistributedQueue.initState(" + groupname
130: + "): state was retrieved successfully");
131: } else {
132: logger.info("DistributedQueue.initState(" + groupname
133: + "): state could not be retrieved (first member)");
134: }
135: }
136:
137: public Address getLocalAddress() {
138: return (channel != null) ? channel.getLocalAddress() : null;
139: }
140:
141: public Channel getChannel() {
142: return channel;
143: }
144:
145: public void addNotifier(Notification n) {
146: if (n != null && !notifs.contains(n)) {
147: notifs.addElement(n);
148: }
149: }
150:
151: public void removeNotifier(Notification n) {
152: notifs.removeElement(n);
153: }
154:
155: public void stop() {
156: /*lock the queue from other threads*/
157: synchronized (mutex) {
158: internalQueue.clear();
159:
160: if (disp != null) {
161: disp.stop();
162: disp = null;
163: }
164:
165: if (channel != null) {
166: channel.close();
167: channel = null;
168: }
169:
170: stopped = true;
171: }
172: }
173:
174: /**
175: * Add the speficied element at the bottom of the queue
176: * @param value
177: */
178: public void add(Object value) {
179: try {
180: Object retval = null;
181:
182: RspList rsp = disp.callRemoteMethods(null, "_add",
183: new Object[] { value }, add_signature,
184: GroupRequest.GET_ALL, 0);
185: Vector results = rsp.getResults();
186:
187: if (results.size() > 0) {
188: retval = results.elementAt(0);
189:
190: if (logger.isDebugEnabled()) {
191: checkResult(rsp, retval);
192: }
193: }
194: } catch (Exception e) {
195: logger.error("Unable to add value " + value, e);
196: }
197:
198: }
199:
200: /**
201: * Add the speficied element at the top of the queue
202: * @param value
203: */
204: public void addAtHead(Object value) {
205: try {
206: disp.callRemoteMethods(null, "_addAtHead",
207: new Object[] { value }, addAtHead_signature,
208: GroupRequest.GET_ALL, 0);
209: } catch (Exception e) {
210: logger.error("Unable to addAtHead value " + value, e);
211: }
212:
213: }
214:
215: /**
216: * Add the speficied collection to the top of the queue.
217: * Elements are added in the order that they are returned by the specified
218: * collection's iterator.
219: * @param values
220: */
221: public void addAll(Collection values) {
222: try {
223: disp.callRemoteMethods(null, "_addAll",
224: new Object[] { values }, addAll_signature,
225: GroupRequest.GET_ALL, 0);
226: } catch (Exception e) {
227: logger.error("Unable to addAll value: " + values, e);
228: }
229:
230: }
231:
232: public Vector getContents() {
233: Vector result = new Vector();
234:
235: for (Iterator e = internalQueue.iterator(); e.hasNext();)
236: result.add(e.next());
237:
238: return result;
239: }
240:
241: public int size() {
242: return internalQueue.size();
243: }
244:
245: /**
246: * returns the first object on the queue, without removing it.
247: * If the queue is empty this object blocks until the first queue object has
248: * been added
249: * @return the first object on the queue
250: */
251: public Object peek() {
252: Object retval = null;
253:
254: try {
255: retval = internalQueue.getFirst();
256: } catch (NoSuchElementException e) {
257: }
258:
259: return retval;
260: }
261:
262: public void reset() {
263: try {
264: disp.callRemoteMethods(null, "_reset", null,
265: reset_signature, GroupRequest.GET_ALL, 0);
266: } catch (Exception e) {
267: logger
268: .error("DistributedQueue.reset(" + groupname + ')',
269: e);
270: }
271: }
272:
273: protected void checkResult(RspList rsp, Object retval) {
274: if (logger.isDebugEnabled()) {
275: logger.debug("Value updated from " + groupname + " :"
276: + retval);
277: }
278:
279: Vector results = rsp.getResults();
280:
281: for (int i = 0; i < results.size(); i++) {
282: Object data = results.elementAt(i);
283:
284: if (!data.equals(retval)) {
285: logger
286: .error("Reference value differs from returned value "
287: + retval + " != " + data);
288: }
289: }
290: }
291:
292: /**
293: * Try to return the first objet in the queue.It does not wait for an object.
294: * @return the first object in the queue or null if none were found.
295: */
296: public Object remove() {
297: Object retval = null;
298: RspList rsp = disp.callRemoteMethods(null, "_remove", null,
299: remove_signature, GroupRequest.GET_ALL,
300: internal_timeout);
301: Vector results = rsp.getResults();
302:
303: if (results.size() > 0) {
304: retval = results.elementAt(0);
305:
306: if (logger.isDebugEnabled()) {
307: checkResult(rsp, retval);
308: }
309: }
310:
311: return retval;
312: }
313:
314: /**
315: * @param timeout The time to wait until an entry is retrieved in milliseconds. A value of 0 means wait forever.
316: * @return the first object in the queue or null if none were found
317: */
318: public Object remove(long timeout) {
319: Object retval = null;
320: long start = System.currentTimeMillis();
321:
322: if (timeout <= 0) {
323: while (!stopped && (retval == null)) {
324: RspList rsp = disp.callRemoteMethods(null, "_remove",
325: null, remove_signature, GroupRequest.GET_ALL,
326: internal_timeout);
327: Vector results = rsp.getResults();
328:
329: if (results.size() > 0) {
330: retval = results.elementAt(0);
331:
332: if (logger.isDebugEnabled()) {
333: checkResult(rsp, retval);
334: }
335: }
336:
337: if (retval == null) {
338: try {
339: synchronized (mutex) {
340: mutex.wait();
341: }
342: } catch (InterruptedException e) {
343: }
344: }
345: }
346: } else {
347: while (((System.currentTimeMillis() - start) < timeout)
348: && !stopped && (retval == null)) {
349: RspList rsp = disp.callRemoteMethods(null, "_remove",
350: null, remove_signature, GroupRequest.GET_ALL,
351: internal_timeout);
352: Vector results = rsp.getResults();
353:
354: if (results.size() > 0) {
355: retval = results.elementAt(0);
356:
357: if (logger.isDebugEnabled()) {
358: checkResult(rsp, retval);
359: }
360: }
361:
362: if (retval == null) {
363: try {
364: long delay = timeout
365: - (System.currentTimeMillis() - start);
366:
367: synchronized (mutex) {
368: if (delay > 0) {
369: mutex.wait(delay);
370: }
371: }
372: } catch (InterruptedException e) {
373: }
374: }
375: }
376: }
377:
378: return retval;
379: }
380:
381: public String toString() {
382: return internalQueue.toString();
383: }
384:
385: /*------------------------ Callbacks -----------------------*/
386: public void _add(Object value) {
387: if (logger.isDebugEnabled()) {
388: logger.debug(groupname + '@' + getLocalAddress() + " _add("
389: + value + ')');
390: }
391:
392: /*lock the queue from other threads*/
393: synchronized (mutex) {
394: internalQueue.add(value);
395:
396: /*wake up all the threads that are waiting for the lock to be released*/
397: mutex.notifyAll();
398: }
399:
400: for (int i = 0; i < notifs.size(); i++)
401: ((Notification) notifs.elementAt(i)).entryAdd(value);
402: }
403:
404: public void _addAtHead(Object value) {
405: /*lock the queue from other threads*/
406: synchronized (mutex) {
407: internalQueue.addFirst(value);
408:
409: /*wake up all the threads that are waiting for the lock to be released*/
410: mutex.notifyAll();
411: }
412:
413: for (int i = 0; i < notifs.size(); i++)
414: ((Notification) notifs.elementAt(i)).entryAdd(value);
415: }
416:
417: public void _reset() {
418: if (logger.isDebugEnabled()) {
419: logger.debug(groupname + '@' + getLocalAddress()
420: + " _reset()");
421: }
422:
423: _private_reset();
424:
425: for (int i = 0; i < notifs.size(); i++)
426: ((Notification) notifs.elementAt(i)).contentsCleared();
427: }
428:
429: protected void _private_reset() {
430: /*lock the queue from other threads*/
431: synchronized (mutex) {
432: internalQueue.clear();
433:
434: /*wake up all the threads that are waiting for the lock to be released*/
435: mutex.notifyAll();
436: }
437: }
438:
439: public Object _remove() {
440: Object retval = null;
441:
442: try {
443: /*lock the queue from other threads*/
444: synchronized (mutex) {
445: retval = internalQueue.removeFirst();
446:
447: /*wake up all the threads that are waiting for the lock to be released*/
448: mutex.notifyAll();
449: }
450:
451: if (logger.isDebugEnabled()) {
452: logger.debug(groupname + '@' + getLocalAddress()
453: + "_remove(" + retval + ')');
454: }
455:
456: for (int i = 0; i < notifs.size(); i++)
457: ((Notification) notifs.elementAt(i))
458: .entryRemoved(retval);
459: } catch (NoSuchElementException e) {
460: logger.debug(groupname + '@' + getLocalAddress()
461: + "_remove(): nothing to remove");
462: }
463:
464: return retval;
465: }
466:
467: public void _addAll(Collection c) {
468: if (logger.isDebugEnabled()) {
469: logger.debug(groupname + '@' + getLocalAddress()
470: + " _addAll(" + c + ')');
471: }
472:
473: /*lock the queue from other threads*/
474: synchronized (mutex) {
475: internalQueue.addAll(c);
476:
477: /*wake up all the threads that are waiting for the lock to be released*/
478: mutex.notifyAll();
479: }
480:
481: for (int i = 0; i < notifs.size(); i++)
482: ((Notification) notifs.elementAt(i)).contentsSet(c);
483: }
484:
485: /*----------------------------------------------------------*/
486: /*-------------------- State Exchange ----------------------*/
487: public void receive(Message msg) {
488: }
489:
490: public byte[] getState() {
491: Vector copy = (Vector) getContents().clone();
492:
493: try {
494: return Util.objectToByteBuffer(copy);
495: } catch (Throwable ex) {
496: logger
497: .error(
498: "DistributedQueue.getState(): exception marshalling state.",
499: ex);
500:
501: return null;
502: }
503: }
504:
505: public void setState(byte[] new_state) {
506: Vector new_copy;
507:
508: try {
509: new_copy = (Vector) Util.objectFromByteBuffer(new_state);
510:
511: if (new_copy == null) {
512: return;
513: }
514: } catch (Throwable ex) {
515: logger
516: .error(
517: "DistributedQueue.setState(): exception unmarshalling state.",
518: ex);
519:
520: return;
521: }
522:
523: _private_reset(); // remove all elements
524: _addAll(new_copy);
525: }
526:
527: /*------------------- Membership Changes ----------------------*/
528: public void viewAccepted(View new_view) {
529: Vector new_mbrs = new_view.getMembers();
530:
531: if (new_mbrs != null) {
532: sendViewChangeNotifications(new_mbrs, members); // notifies observers (joined, left)
533: members.removeAllElements();
534:
535: for (int i = 0; i < new_mbrs.size(); i++)
536: members.addElement(new_mbrs.elementAt(i));
537: }
538: }
539:
540: /** Called when a member is suspected */
541: public void suspect(Address suspected_mbr) {
542: ;
543: }
544:
545: /** Block sending and receiving of messages until ViewAccepted is called */
546: public void block() {
547: }
548:
549: void sendViewChangeNotifications(Vector new_mbrs, Vector old_mbrs) {
550: Vector joined;
551: Vector left;
552: Object mbr;
553: Notification n;
554:
555: if ((notifs.size() == 0) || (old_mbrs == null)
556: || (new_mbrs == null) || (old_mbrs.size() == 0)
557: || (new_mbrs.size() == 0)) {
558: return;
559: }
560:
561: // 1. Compute set of members that joined: all that are in new_mbrs, but not in old_mbrs
562: joined = new Vector();
563:
564: for (int i = 0; i < new_mbrs.size(); i++) {
565: mbr = new_mbrs.elementAt(i);
566:
567: if (!old_mbrs.contains(mbr)) {
568: joined.addElement(mbr);
569: }
570: }
571:
572: // 2. Compute set of members that left: all that were in old_mbrs, but not in new_mbrs
573: left = new Vector();
574:
575: for (int i = 0; i < old_mbrs.size(); i++) {
576: mbr = old_mbrs.elementAt(i);
577:
578: if (!new_mbrs.contains(mbr)) {
579: left.addElement(mbr);
580: }
581: }
582:
583: for (int i = 0; i < notifs.size(); i++) {
584: n = (Notification) notifs.elementAt(i);
585: n.viewChange(joined, left);
586: }
587: }
588:
589: final void initSignatures() {
590: try {
591: if (add_signature == null) {
592: add_signature = new Class[] { Object.class };
593: }
594:
595: if (addAtHead_signature == null) {
596: addAtHead_signature = new Class[] { Object.class };
597: }
598:
599: if (addAll_signature == null) {
600: addAll_signature = new Class[] { Collection.class };
601: }
602:
603: if (reset_signature == null) {
604: reset_signature = new Class[0];
605: }
606:
607: if (remove_signature == null) {
608: remove_signature = new Class[0];
609: }
610: } catch (Throwable ex) {
611: logger.error("DistributedQueue.initMethods()", ex);
612: }
613: }
614:
615: public static void main(String[] args) {
616: try {
617: // The setup here is kind of weird:
618: // 1. Create a channel
619: // 2. Create a DistributedQueue (on the channel)
620: // 3. Connect the channel (so the HT gets a VIEW_CHANGE)
621: // 4. Start the HT
622: //
623: // A simpler setup is
624: // DistributedQueue ht = new DistributedQueue("demo", null,
625: // "file://c:/JGroups-2.0/conf/total-token.xml", 5000);
626: JChannel c = new JChannel(
627: "file:/c:/JGroups-2.0/conf/conf/total-token.xml");
628:
629: DistributedQueue ht = new DistributedQueue(c);
630: c.connect("demo");
631: ht.start(5000);
632:
633: ht.add("name");
634: ht.add("Michelle Ban");
635:
636: Object old_key = ht.remove();
637: System.out.println("old key was " + old_key);
638: old_key = ht.remove();
639: System.out.println("old value was " + old_key);
640:
641: ht.add("name 'Michelle Ban'");
642:
643: System.out.println("queue is " + ht);
644: } catch (Throwable t) {
645: t.printStackTrace();
646: }
647: }
648: }
|