001: /*
002: * $Id: AbstractEventAggregator.java 10961 2008-02-22 19:01:02Z dfeist $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule.routing.inbound;
012:
013: import org.mule.DefaultMuleEvent;
014: import org.mule.MuleServer;
015: import org.mule.api.MessagingException;
016: import org.mule.api.MuleContext;
017: import org.mule.api.MuleEvent;
018: import org.mule.api.MuleException;
019: import org.mule.api.MuleMessage;
020: import org.mule.api.endpoint.EndpointBuilder;
021: import org.mule.api.endpoint.InboundEndpoint;
022: import org.mule.endpoint.EndpointURIEndpointBuilder;
023: import org.mule.routing.AggregationException;
024:
025: import java.util.LinkedList;
026:
027: import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
028: import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentMap;
029:
030: /**
031: * <code>AbstractEventAggregator</code> will aggregate a set of messages into a
032: * single message.
033: */
034:
035: public abstract class AbstractEventAggregator extends SelectiveConsumer {
036: public static final String NO_CORRELATION_ID = "no-id";
037:
038: private final ConcurrentMap eventGroups = new ConcurrentHashMap();
039:
040: // //@Override
041: public MuleEvent[] process(MuleEvent event)
042: throws MessagingException {
043: MuleEvent[] result = null;
044:
045: if (this .isMatch(event)) {
046: // indicates interleaved EventGroup removal (very rare)
047: boolean miss = false;
048:
049: // match event to its group
050: final Object groupId = this .getEventGroupIdForEvent(event);
051:
052: // spinloop for the EventGroup lookup
053: while (true) {
054: if (miss) {
055: try {
056: // recommended over Thread.yield()
057: Thread.sleep(1);
058: } catch (InterruptedException interrupted) {
059: Thread.currentThread().interrupt();
060: }
061: }
062:
063: // check for an existing group first
064: EventGroup group = this .getEventGroup(groupId);
065:
066: // does the group exist?
067: if (group == null) {
068: // ..apparently not, so create a new one & add it
069: group = this .addEventGroup(this .createEventGroup(
070: event, groupId));
071: }
072:
073: // ensure that only one thread at a time evaluates this EventGroup
074: synchronized (group) {
075: // make sure no other thread removed the group in the meantime
076: if (group != this .getEventGroup(groupId)) {
077: // if that is the (rare) case, spin
078: miss = true;
079: continue;
080: }
081:
082: // add the incoming event to the group
083: group.addEvent(event);
084:
085: if (this .shouldAggregateEvents(group)) {
086: MuleMessage returnMessage = this
087: .aggregateEvents(group);
088: InboundEndpoint endpoint;
089:
090: try {
091: MuleContext muleContext = MuleServer
092: .getMuleContext();
093: EndpointBuilder builder = new EndpointURIEndpointBuilder(
094: event.getEndpoint(), muleContext);
095: // TODO - is this correct? it stops other transformers from being used
096: builder.setTransformers(new LinkedList());
097: builder.setName(this .getClass().getName());
098: endpoint = muleContext.getRegistry()
099: .lookupEndpointFactory()
100: .getInboundEndpoint(builder);
101: } catch (MuleException e) {
102: throw new MessagingException(e
103: .getI18nMessage(), returnMessage, e);
104: }
105: MuleEvent returnEvent = new DefaultMuleEvent(
106: returnMessage, endpoint, event
107: .getService(), event);
108: result = new MuleEvent[] { returnEvent };
109: this .removeEventGroup(group);
110: }
111: // result or not: exit spinloop
112: break;
113: }
114: }
115: }
116:
117: return result;
118: }
119:
120: /**
121: * Create a new EventGroup with the specified groupId.
122: *
123: * @param event the event that caused creation of this group; can be used for
124: * additional information
125: * @param groupId the id to use for the new EventGroup
126: * @return a new EventGroup
127: */
128: protected EventGroup createEventGroup(MuleEvent event,
129: Object groupId) {
130: return new EventGroup(groupId);
131: }
132:
133: /**
134: * Returns the identifier by which events will be correlated. By default this is
135: * the value as returned by {@link org.mule.api.transport.MessageAdapter#getCorrelationId()}.
136: *
137: * @param event the event use for determining the correlation group id
138: * @return the id used to correlate related events
139: */
140: protected Object getEventGroupIdForEvent(MuleEvent event) {
141: String groupId = event.getMessage().getCorrelationId();
142:
143: if (groupId == null) {
144: groupId = NO_CORRELATION_ID;
145: }
146:
147: return groupId;
148: }
149:
150: /**
151: * Look up the existing EventGroup with the given id.
152: *
153: * @param groupId
154: * @return the EventGroup with the given id or <code>null</code> if the group
155: * does not exist.
156: */
157: protected EventGroup getEventGroup(Object groupId) {
158: return (EventGroup) eventGroups.get(groupId);
159: }
160:
161: /**
162: * Add the given EventGroup to this aggregator's "group store". Currently this is
163: * only a ConcurrentHashMap, and the group's id as returned by
164: * {@link EventGroup#getGroupId()} is used to match the group. Since group
165: * creation/lookup/storage can happen fully concurrent, we return the stored
166: * group. Callers are required to switch their method-local references when a
167: * different group is returned.
168: *
169: * @param group the EventGroup to "store"
170: * @return the stored EventGroup (may be different from the one passed as
171: * argument)
172: */
173: protected EventGroup addEventGroup(EventGroup group) {
174: // a parallel thread might have removed the EventGroup already,
175: // therefore we need to validate our current reference
176: EventGroup previous = (EventGroup) eventGroups.putIfAbsent(
177: group.getGroupId(), group);
178: return (previous != null ? previous : group);
179: }
180:
181: /**
182: * Remove the group from this aggregator's "store". The group's id as returned by
183: * {@link EventGroup#getGroupId()} is used to match the group.
184: *
185: * @param group the EventGroup to remove
186: */
187: protected void removeEventGroup(EventGroup group) {
188: eventGroups.remove(group.getGroupId());
189: }
190:
191: /**
192: * Determines if the event group is ready to be aggregated. if the group is ready
193: * to be aggregated (this is entirely up to the application. it could be
194: * determined by volume, last modified time or some oher criteria based on the
195: * last event received).
196: *
197: * @param events
198: * @return true if the group is ready for aggregation
199: */
200: protected abstract boolean shouldAggregateEvents(EventGroup events);
201:
202: /**
203: * This method is invoked if the shouldAggregate method is called and returns
204: * true. Once this method returns an aggregated message, the event group is
205: * removed from the router.
206: *
207: * @param events the event group for this request
208: * @return an aggregated message
209: * @throws AggregationException if the aggregation fails. in this scenario the
210: * whole event group is removed and passed to the exception handler
211: * for this componenet
212: */
213: protected abstract MuleMessage aggregateEvents(EventGroup events)
214: throws AggregationException;
215:
216: }
|