001: /*
002: * $Id: EventGroup.java 10489 2008-01-23 17:53:38Z dfeist $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule.routing.inbound;
012:
013: import org.mule.api.MuleEvent;
014: import org.mule.util.ClassUtils;
015:
016: import java.io.Serializable;
017: import java.util.ArrayList;
018: import java.util.Iterator;
019: import java.util.List;
020:
021: import edu.emory.mathcs.backport.java.util.concurrent.helpers.Utils;
022:
023: import org.apache.commons.collections.IteratorUtils;
024:
025: /**
026: * <code>EventGroup</code> is a holder over events grouped by a common group Id.
027: * This can be used by components such as routers to managed related events.
028: */
029: // @ThreadSafe
030: public class EventGroup implements Comparable, Serializable {
031: /**
032: * Serial version
033: */
034: private static final long serialVersionUID = 953739659615692697L;
035:
036: public static final MuleEvent[] EMPTY_EVENTS_ARRAY = new MuleEvent[0];
037:
038: private final Object groupId;
039: // @GuardedBy("this")
040: private final List events;
041: private final long created;
042: private final int expectedSize;
043:
044: public EventGroup(Object groupId) {
045: this (groupId, -1);
046: }
047:
048: public EventGroup(Object groupId, int expectedSize) {
049: super ();
050: this .created = Utils.nanoTime();
051: this .events = new ArrayList(expectedSize > 0 ? expectedSize
052: : 10);
053: this .expectedSize = expectedSize;
054: this .groupId = groupId;
055: }
056:
057: /**
058: * Compare this EventGroup to another one. If the receiver and the argument both
059: * have groupIds that are {@link Comparable}, they are used for the comparison;
060: * otherwise - since the id can be any object - the group creation time stamp is
061: * used as fallback. Older groups are considered "smaller".
062: *
063: * @see java.lang.Comparable#compareTo(java.lang.Object)
064: */
065: public int compareTo(Object o) {
066: EventGroup other = (EventGroup) o;
067: Object otherId = other.getGroupId();
068:
069: if (groupId instanceof Comparable
070: && otherId instanceof Comparable) {
071: return ((Comparable) groupId).compareTo(otherId);
072: } else {
073: long diff = created - other.getCreated();
074: return (diff > 0 ? 1 : (diff < 0 ? -1 : 0));
075: }
076: }
077:
078: /**
079: * Compares two EventGroups for equality. EventGroups are considered equal when
080: * their groupIds (as returned by {@link #getGroupId()}) are equal.
081: *
082: * @see java.lang.Object#equals(Object)
083: */
084: // //@Override
085: public boolean equals(Object obj) {
086: if (this == obj) {
087: return true;
088: }
089:
090: if (!(obj instanceof EventGroup)) {
091: return false;
092: }
093:
094: final EventGroup other = (EventGroup) obj;
095: if (groupId == null) {
096: return (other.groupId == null);
097: }
098:
099: return groupId.equals(other.groupId);
100: }
101:
102: /**
103: * The hashCode of an EventGroup is derived from the object returned by
104: * {@link #getGroupId()}.
105: *
106: * @see java.lang.Object#hashCode()
107: */
108: // //@Override
109: public int hashCode() {
110: return groupId.hashCode();
111: }
112:
113: /**
114: * Returns an identifier for this EventGroup. It is recommended that this id is
115: * unique and {@link Comparable} e.g. a UUID.
116: *
117: * @return the id of this event group
118: */
119: public Object getGroupId() {
120: return groupId;
121: }
122:
123: /**
124: * Returns an iterator over a snapshot copy of this group's collected events. If
125: * you need to iterate over the group and e.g. remove select events, do so via
126: * {@link #removeEvent(MuleEvent)}. If you need to do so atomically in order to
127: * prevent e.g. concurrent reception/aggregation of the group during iteration,
128: * wrap the iteration in a synchronized block on the group instance.
129: *
130: * @return an iterator over collected {@link MuleEvent}s.
131: */
132: public Iterator iterator() {
133: synchronized (this ) {
134: if (events.isEmpty()) {
135: return IteratorUtils.emptyIterator();
136: } else {
137: return IteratorUtils.arrayIterator(this .toArray());
138: }
139: }
140: }
141:
142: /**
143: * Returns a snapshot of collected events in this group.
144: *
145: * @return an array of collected {@link MuleEvent}s.
146: */
147: public MuleEvent[] toArray() {
148: synchronized (this ) {
149: if (events.isEmpty()) {
150: return EMPTY_EVENTS_ARRAY;
151: }
152:
153: return (MuleEvent[]) events.toArray(EMPTY_EVENTS_ARRAY);
154: }
155: }
156:
157: /**
158: * Add the given event to this group.
159: *
160: * @param event the event to add
161: */
162: public void addEvent(MuleEvent event) {
163: synchronized (this ) {
164: events.add(event);
165: }
166: }
167:
168: /**
169: * Remove the given event from the group.
170: *
171: * @param event the evnt to remove
172: */
173: public void removeEvent(MuleEvent event) {
174: synchronized (this ) {
175: events.remove(event);
176: }
177: }
178:
179: /**
180: * Return the creation timestamp of the current group.
181: *
182: * @return the timestamp when this group was instantiated.
183: * @see {@link Utils#nanoTime()}
184: */
185: public long getCreated() {
186: return created;
187: }
188:
189: /**
190: * Returns the number of events collected so far.
191: *
192: * @return number of events in this group or 0 if the group is empty.
193: */
194: public int size() {
195: synchronized (this ) {
196: return events.size();
197: }
198: }
199:
200: /**
201: * Returns the number of events that this EventGroup is expecting before
202: * correlation can proceed.
203: *
204: * @return expected number of events or -1 if no expected size was specified.
205: */
206: public int expectedSize() {
207: return expectedSize;
208: }
209:
210: /**
211: * Removes all events from this group.
212: */
213: public void clear() {
214: synchronized (this ) {
215: events.clear();
216: }
217: }
218:
219: // //@Override
220: public String toString() {
221: StringBuffer buf = new StringBuffer(80);
222: buf.append(ClassUtils.getSimpleName(this .getClass()));
223: buf.append(" {");
224: buf.append("id=").append(groupId);
225: buf.append(", expected size=").append(expectedSize);
226:
227: synchronized (this ) {
228: int currentSize = events.size();
229: buf.append(", current events=").append(currentSize);
230:
231: if (currentSize > 0) {
232: buf.append(" [");
233: Iterator i = events.iterator();
234: while (i.hasNext()) {
235: MuleEvent event = (MuleEvent) i.next();
236: buf.append(event.getMessage().getUniqueId());
237: if (i.hasNext()) {
238: buf.append(", ");
239: }
240: }
241: buf.append(']');
242: }
243: }
244:
245: buf.append('}');
246:
247: return buf.toString();
248: }
249:
250: }
|