001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */
019: package org.apache.openjpa.datacache;
020:
021: import java.util.ArrayList;
022: import java.util.Collection;
023: import java.util.HashSet;
024: import java.util.Iterator;
025: import java.util.List;
026: import java.util.Set;
027:
028: import org.apache.openjpa.conf.OpenJPAConfiguration;
029: import org.apache.openjpa.event.RemoteCommitEvent;
030: import org.apache.openjpa.event.RemoteCommitListener;
031: import org.apache.openjpa.lib.conf.Configurable;
032: import org.apache.openjpa.lib.conf.Configuration;
033: import org.apache.openjpa.lib.log.Log;
034: import org.apache.openjpa.lib.util.Localizer;
035: import org.apache.openjpa.lib.util.concurrent.AbstractConcurrentEventManager;
036: import org.apache.openjpa.lib.util.concurrent.ConcurrentReferenceHashSet;
037: import org.apache.openjpa.meta.ClassMetaData;
038: import org.apache.openjpa.meta.MetaDataRepository;
039: import org.apache.openjpa.util.Id;
040:
041: /**
042: * Abstract {@link QueryCache} implementation that provides various
043: * statistics, logging, and timeout functionality common across cache
044: * implementations.
045: *
046: * @author Patrick Linskey
047: * @author Abe White
048: */
049: public abstract class AbstractQueryCache extends
050: AbstractConcurrentEventManager implements QueryCache,
051: Configurable {
052:
053: private static final Localizer s_loc = Localizer
054: .forPackage(AbstractQueryCache.class);
055:
056: /**
057: * The configuration set by the system.
058: */
059: protected OpenJPAConfiguration conf;
060:
061: /**
062: * The log to use.
063: */
064: protected Log log;
065:
066: private boolean _closed = false;
067:
068: public void initialize(DataCacheManager manager) {
069: }
070:
071: public void onTypesChanged(TypesChangedEvent ev) {
072: writeLock();
073: Collection keys = null;
074: try {
075: if (hasListeners())
076: fireEvent(ev);
077: keys = keySet();
078: } finally {
079: writeUnlock();
080: }
081:
082: QueryKey qk;
083: List removes = null;
084: for (Iterator iter = keys.iterator(); iter.hasNext();) {
085: qk = (QueryKey) iter.next();
086: if (qk.changeInvalidatesQuery(ev.getTypes())) {
087: if (removes == null)
088: removes = new ArrayList();
089: removes.add(qk);
090: }
091: }
092: if (removes != null)
093: removeAllInternal(removes);
094: }
095:
096: public QueryResult get(QueryKey key) {
097: QueryResult o = getInternal(key);
098: if (o != null && o.isTimedOut()) {
099: o = null;
100: removeInternal(key);
101: if (log.isTraceEnabled())
102: log.trace(s_loc.get("cache-timeout", key));
103: }
104:
105: if (log.isTraceEnabled()) {
106: if (o == null)
107: log.trace(s_loc.get("cache-miss", key));
108: else
109: log.trace(s_loc.get("cache-hit", key));
110: }
111: return o;
112: }
113:
114: public QueryResult put(QueryKey qk, QueryResult oids) {
115: QueryResult o = putInternal(qk, oids);
116: if (log.isTraceEnabled())
117: log.trace(s_loc.get("cache-put", qk));
118: return (o == null || o.isTimedOut()) ? null : o;
119: }
120:
121: public QueryResult remove(QueryKey key) {
122: QueryResult o = removeInternal(key);
123: if (o != null && o.isTimedOut())
124: o = null;
125: if (log.isTraceEnabled()) {
126: if (o == null)
127: log.trace(s_loc.get("cache-remove-miss", key));
128: else
129: log.trace(s_loc.get("cache-remove-hit", key));
130: }
131: return o;
132: }
133:
134: public boolean pin(QueryKey key) {
135: boolean bool = pinInternal(key);
136: if (log.isTraceEnabled()) {
137: if (bool)
138: log.trace(s_loc.get("cache-pin-hit", key));
139: else
140: log.trace(s_loc.get("cache-pin-miss", key));
141: }
142: return bool;
143: }
144:
145: public boolean unpin(QueryKey key) {
146: boolean bool = unpinInternal(key);
147: if (log.isTraceEnabled()) {
148: if (bool)
149: log.trace(s_loc.get("cache-unpin-hit", key));
150: else
151: log.trace(s_loc.get("cache-unpin-miss", key));
152: }
153: return bool;
154: }
155:
156: public void clear() {
157: clearInternal();
158: if (log.isTraceEnabled())
159: log.trace(s_loc.get("cache-clear", "<query-cache>"));
160: }
161:
162: public void close() {
163: close(true);
164: }
165:
166: protected void close(boolean clear) {
167: if (!_closed) {
168: if (clear)
169: clearInternal();
170: _closed = true;
171: }
172: }
173:
174: public boolean isClosed() {
175: return _closed;
176: }
177:
178: public void addTypesChangedListener(TypesChangedListener listen) {
179: addListener(listen);
180: }
181:
182: public boolean removeTypesChangedListener(
183: TypesChangedListener listen) {
184: return removeListener(listen);
185: }
186:
187: /**
188: * This method is part of the {@link RemoteCommitListener} interface. If
189: * your cache subclass relies on OpenJPA for clustering support, make it
190: * implement <code>RemoteCommitListener</code>. This method will take
191: * care of invalidating entries from remote commits, by delegating to
192: * {@link #onTypesChanged}.
193: */
194: public void afterCommit(RemoteCommitEvent event) {
195: if (_closed)
196: return;
197:
198: // drop all committed classes
199: Set classes = Caches.addTypesByName(conf, event
200: .getPersistedTypeNames(), null);
201: if (event.getPayloadType() == RemoteCommitEvent.PAYLOAD_EXTENTS) {
202: classes = Caches.addTypesByName(conf, event
203: .getUpdatedTypeNames(), classes);
204: classes = Caches.addTypesByName(conf, event
205: .getDeletedTypeNames(), classes);
206: } else {
207: classes = addTypes(event.getUpdatedObjectIds(), classes);
208: classes = addTypes(event.getDeletedObjectIds(), classes);
209: }
210: if (classes != null)
211: onTypesChanged(new TypesChangedEvent(this , classes));
212: }
213:
214: /**
215: * Build up a set of classes for the given oids.
216: */
217: private Set addTypes(Collection oids, Set classes) {
218: if (oids.isEmpty())
219: return classes;
220: if (classes == null)
221: classes = new HashSet();
222:
223: MetaDataRepository repos = conf.getMetaDataRepositoryInstance();
224: ClassMetaData meta;
225: Object oid;
226: for (Iterator itr = oids.iterator(); itr.hasNext();) {
227: oid = itr.next();
228: if (oid instanceof Id)
229: classes.add(((Id) oid).getType());
230: else {
231: // ok if no metadata for oid; that just means the pc type
232: // probably hasn't been loaded into this JVM yet, and therefore
233: // there's no chance that it's in the cache anyway
234: meta = repos.getMetaData(oid, null, false);
235: if (meta != null)
236: classes.add(meta.getDescribedType());
237: }
238: }
239: return classes;
240: }
241:
242: /**
243: * Return a threadsafe view of the keys in this cache. This collection
244: * must be iterable without risk of concurrent modification exceptions.
245: * It does not have to implement contains() efficiently or use set
246: * semantics.
247: */
248: protected abstract Collection keySet();
249:
250: /**
251: * Return the list for the given key.
252: */
253: protected abstract QueryResult getInternal(QueryKey qk);
254:
255: /**
256: * Add the given result to the cache, returning the old result under the
257: * given key.
258: */
259: protected abstract QueryResult putInternal(QueryKey qk,
260: QueryResult oids);
261:
262: /**
263: * Remove the result under the given key from the cache.
264: */
265: protected abstract QueryResult removeInternal(QueryKey qk);
266:
267: /**
268: * Remove all results under the given keys from the cache.
269: */
270: protected void removeAllInternal(Collection qks) {
271: for (Iterator iter = qks.iterator(); iter.hasNext();)
272: removeInternal((QueryKey) iter.next());
273: }
274:
275: /**
276: * Clear the cache.
277: */
278: protected abstract void clearInternal();
279:
280: /**
281: * Pin an object to the cache.
282: */
283: protected abstract boolean pinInternal(QueryKey qk);
284:
285: /**
286: * Unpin an object from the cache.
287: */
288: protected abstract boolean unpinInternal(QueryKey qk);
289:
290: // ---------- Configurable implementation ----------
291:
292: public void setConfiguration(Configuration conf) {
293: this .conf = (OpenJPAConfiguration) conf;
294: this .log = conf.getLog(OpenJPAConfiguration.LOG_DATACACHE);
295: }
296:
297: public void startConfiguration() {
298: }
299:
300: public void endConfiguration() {
301: }
302:
303: // ---------- AbstractEventManager implementation ----------
304:
305: protected void fireEvent(Object event, Object listener) {
306: TypesChangedListener listen = (TypesChangedListener) listener;
307: TypesChangedEvent ev = (TypesChangedEvent) event;
308: try {
309: listen.onTypesChanged(ev);
310: } catch (Exception e) {
311: if (log.isWarnEnabled())
312: log.warn(s_loc.get("exp-listener-ex"), e);
313: }
314: }
315:
316: /**
317: * Individual query results will be registered as types changed
318: * listeners. We want such query results to be gc'd once
319: * the only reference is held by the list of expiration listeners.
320: */
321: protected Collection newListenerCollection() {
322: return new ConcurrentReferenceHashSet(
323: ConcurrentReferenceHashSet.WEAK);
324: }
325: }
|