001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */
019: package org.apache.openjpa.event;
020:
021: import java.util.ArrayList;
022: import java.util.Collection;
023: import java.util.HashSet;
024: import java.util.Iterator;
025: import java.util.List;
026: import java.util.Collections;
027:
028: import org.apache.openjpa.conf.OpenJPAConfiguration;
029: import org.apache.openjpa.kernel.Broker;
030: import org.apache.openjpa.kernel.OpenJPAStateManager;
031: import org.apache.openjpa.lib.util.Closeable;
032: import org.apache.openjpa.lib.util.Localizer;
033: import org.apache.openjpa.lib.util.concurrent.AbstractConcurrentEventManager;
034: import org.apache.openjpa.util.UserException;
035:
036: /**
037: * Manager that can be used to track and notify
038: * {@link RemoteCommitListener}s on remote commit events. If remote events
039: * are enabled, this manager should be installed as a transaction listener on
040: * all brokers so that it knows when commits are made.
041: *
042: * @author Patrick Linskey
043: * @author Abe White
044: * @since 0.3.0
045: */
046: public class RemoteCommitEventManager extends
047: AbstractConcurrentEventManager implements
048: EndTransactionListener, Closeable {
049:
050: private static final Localizer _loc = Localizer
051: .forPackage(RemoteCommitEventManager.class);
052:
053: private final RemoteCommitProvider _provider;
054: private boolean _transmitPersIds = false;
055:
056: /**
057: * Constructor. Supply configuration.
058: */
059: public RemoteCommitEventManager(OpenJPAConfiguration conf) {
060: _provider = conf.newRemoteCommitProviderInstance();
061: if (_provider != null) {
062: _provider.setRemoteCommitEventManager(this );
063: }
064: }
065:
066: /**
067: * Return true if remote events are enabled.
068: */
069: public boolean areRemoteEventsEnabled() {
070: return _provider != null;
071: }
072:
073: /**
074: * Return the {@link RemoteCommitProvider} that this manager uses.
075: *
076: * @since 0.3.1
077: */
078: public RemoteCommitProvider getRemoteCommitProvider() {
079: return _provider;
080: }
081:
082: /**
083: * Whether the oids of added instances will be transmitted.
084: */
085: public boolean getTransmitPersistedObjectIds() {
086: return _transmitPersIds;
087: }
088:
089: /**
090: * Whether the oids of added instances will be transmitted.
091: */
092: public void setTransmitPersistedObjectIds(boolean transmit) {
093: _transmitPersIds = transmit;
094: }
095:
096: /**
097: * Adds an OpenJPA-internal listener to this RemoteCommitEventManager.
098: * Listeners so registered will be fired before any that are registered
099: * via {@link #addListener}. This means that the external listeners can
100: * rely on internal caches and data structures being up-to-date by the
101: * time that they are invoked.
102: *
103: * @since 1.0.0
104: */
105: public void addInternalListener(RemoteCommitListener listen) {
106: if (_provider == null)
107: throw new UserException(_loc.get("no-provider"));
108: ((List) _listeners).add(0, listen);
109: }
110:
111: public void addListener(RemoteCommitListener listen) {
112: if (_provider == null)
113: throw new UserException(_loc.get("no-provider"));
114: super .addListener(listen);
115: }
116:
117: /**
118: * Close this manager and all registered listeners.
119: */
120: public void close() {
121: if (_provider != null) {
122: _provider.close();
123: Collection listeners = getListeners();
124: for (Iterator itr = listeners.iterator(); itr.hasNext();)
125: ((RemoteCommitListener) itr.next()).close();
126: }
127: }
128:
129: protected void fireEvent(Object event, Object listener) {
130: RemoteCommitListener listen = (RemoteCommitListener) listener;
131: RemoteCommitEvent ev = (RemoteCommitEvent) event;
132: listen.afterCommit(ev);
133: }
134:
135: /**
136: * Fire an event to local listeners only notifying them of a detected
137: * stale record.
138: *
139: * @since 1.0.0
140: */
141: public void fireLocalStaleNotification(Object oid) {
142: RemoteCommitEvent ev = new RemoteCommitEvent(
143: RemoteCommitEvent.PAYLOAD_LOCAL_STALE_DETECTION, null,
144: null, Collections.singleton(oid), null);
145: fireEvent(ev);
146: }
147:
148: //////////////////////////////////////
149: // TransactionListener implementation
150: //////////////////////////////////////
151:
152: public void afterCommit(TransactionEvent event) {
153: if (_provider != null) {
154: RemoteCommitEvent rce = createRemoteCommitEvent(event);
155: if (rce != null)
156: _provider.broadcast(rce);
157: }
158: }
159:
160: /**
161: * Create a remote commit event from the given transaction event.
162: */
163: private RemoteCommitEvent createRemoteCommitEvent(
164: TransactionEvent event) {
165: Broker broker = (Broker) event.getSource();
166: int payload;
167: Collection persIds = null;
168: Collection addClassNames = null;
169: Collection updates = null;
170: Collection deletes = null;
171:
172: if (broker.isTrackChangesByType()) {
173: payload = RemoteCommitEvent.PAYLOAD_EXTENTS;
174: addClassNames = toClassNames(event.getPersistedTypes());
175: updates = toClassNames(event.getUpdatedTypes());
176: deletes = toClassNames(event.getDeletedTypes());
177: if (addClassNames == null && updates == null
178: && deletes == null)
179: return null;
180: } else {
181: Collection trans = event.getTransactionalObjects();
182: if (trans.isEmpty())
183: return null;
184:
185: payload = (_transmitPersIds) ? RemoteCommitEvent.PAYLOAD_OIDS_WITH_ADDS
186: : RemoteCommitEvent.PAYLOAD_OIDS;
187: Object oid;
188: Object obj;
189: OpenJPAStateManager sm;
190: for (Iterator itr = trans.iterator(); itr.hasNext();) {
191: obj = itr.next();
192: sm = broker.getStateManager(obj);
193:
194: if (sm == null || !sm.isPersistent() || !sm.isDirty())
195: continue;
196: if (sm.isNew() && sm.isDeleted())
197: continue;
198:
199: oid = sm.fetchObjectId();
200: if (sm.isNew()) {
201: if (_transmitPersIds) {
202: if (persIds == null)
203: persIds = new ArrayList();
204: persIds.add(oid);
205: }
206: if (addClassNames == null)
207: addClassNames = new HashSet();
208: addClassNames.add(obj.getClass().getName());
209: } else if (sm.isDeleted()) {
210: if (deletes == null)
211: deletes = new ArrayList();
212: deletes.add(oid);
213: } else {
214: if (updates == null)
215: updates = new ArrayList();
216: updates.add(oid);
217: }
218: }
219: if (addClassNames == null && updates == null
220: && deletes == null)
221: return null;
222: }
223: return new RemoteCommitEvent(payload, persIds, addClassNames,
224: updates, deletes);
225: }
226:
227: /**
228: * Transform a collection of classes to class names.
229: */
230: private static Collection toClassNames(Collection clss) {
231: if (clss.isEmpty())
232: return null;
233:
234: List names = new ArrayList(clss);
235: for (int i = 0; i < names.size(); i++)
236: names.set(i, ((Class) names.get(i)).getName());
237: return names;
238: }
239:
240: public void beforeCommit(TransactionEvent event) {
241: }
242:
243: public void afterRollback(TransactionEvent event) {
244: }
245:
246: public void afterCommitComplete(TransactionEvent event) {
247: }
248:
249: public void afterRollbackComplete(TransactionEvent event) {
250: }
251:
252: public void afterStateTransitions(TransactionEvent event) {
253: }
254: }
|