001: /**
002: * Copyright (C) 2006 NetMind Consulting Bt.
003: *
004: * This library is free software; you can redistribute it and/or
005: * modify it under the terms of the GNU Lesser General Public
006: * License as published by the Free Software Foundation; either
007: * version 3 of the License, or (at your option) any later version.
008: *
009: * This library is distributed in the hope that it will be useful,
010: * but WITHOUT ANY WARRANTY; without even the implied warranty of
011: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012: * Lesser General Public License for more details.
013: *
014: * You should have received a copy of the GNU Lesser General Public
015: * License along with this library; if not, write to the Free Software
016: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
017: */package hu.netmind.persistence.node;
018:
019: import hu.netmind.persistence.*;
020: import java.util.*;
021: import org.apache.log4j.Logger;
022:
023: /**
024: * Tracks remote locks. There are two views of tracked data: the index
025: * of the node it came from, and the object's id itself. There is also
026: * a transaction associated with the lock.
027: * @author Brautigam Robert
028: * @version Revision: $Revision$
029: */
030: public class RemoteLockTracker {
031: private static Logger logger = Logger
032: .getLogger(RemoteLockTracker.class);
033: private ModificationTracker modificationTracker;
034:
035: private HashMap lockEntriesByName;
036: private HashMap lockEntriesByIndex;
037: private TreeSet lockEntriesSorted;
038: private HashMap indirectLockEntriesByClass;
039:
040: public RemoteLockTracker(ModificationTracker modificationTracker) {
041: this .modificationTracker = modificationTracker;
042: lockEntriesByName = new HashMap();
043: lockEntriesByIndex = new HashMap();
044: lockEntriesSorted = new TreeSet();
045: indirectLockEntriesByClass = new HashMap();
046: }
047:
048: private void modifyIndirectEntries(LockEntry entry, boolean add) {
049: modifyIndirectEntries(entry.objectClass, entry, add);
050: }
051:
052: private void modifyIndirectEntries(Class currentClass,
053: LockEntry entry, boolean add) {
054: // If there is no class, then return
055: if (currentClass == null)
056: return;
057: // Mark class
058: Set entries = (Set) indirectLockEntriesByClass
059: .get(currentClass);
060: if (entries == null) {
061: entries = new HashSet();
062: indirectLockEntriesByClass.put(currentClass, entries);
063: }
064: if (add) {
065: // Add entry to indicate that it indirectly uses this class
066: entries.add(entry);
067: } else {
068: // Remove that entry
069: entries.remove(entry);
070: if (entries.size() == 0)
071: indirectLockEntriesByClass.remove(currentClass);
072: }
073: // Mark superclass
074: modifyIndirectEntries(currentClass.getSuperclass(), entry, add);
075: // Mark interfaces
076: Class[] interfaces = currentClass.getInterfaces();
077: for (int i = 0; i < interfaces.length; i++)
078: modifyIndirectEntries(interfaces[i], entry, add);
079: }
080:
081: public synchronized void unlockAll(int index) {
082: if (logger.isDebugEnabled())
083: logger.debug("unlocking all from: " + index);
084: List entries = (List) lockEntriesByIndex.remove(new Integer(
085: index));
086: if (entries == null)
087: return;
088: for (int i = 0; i < entries.size(); i++) {
089: LockEntry entry = (LockEntry) entries.get(i);
090: lockEntriesByName.remove(entry.name);
091: lockEntriesSorted.remove(entry);
092: modifyIndirectEntries(entry, false);
093: }
094: // Notify waiting threads, that locks became unlocked.
095: notifyAll();
096: }
097:
098: /**
099: * Wait a given amount of time for an unlock event.
100: * @return The new wait interval that's left of the input wait period, or -1 if during
101: * the wait period no unlock events were generated.
102: */
103: private int waitForUnlock(int wait) {
104: if (wait < 0)
105: throw new StoreException(
106: "wait called with negative wit period, this should not happen");
107: // Start wait
108: long startTime = System.currentTimeMillis();
109: try {
110: // Wait for unlock notification
111: if (wait > 0)
112: wait(wait);
113: else
114: wait();
115: } catch (Throwable e) {
116: throw new StoreException("wait interrupted", e);
117: }
118: if (wait > 0) {
119: long endTime = System.currentTimeMillis();
120: wait -= (endTime - startTime);
121: if (wait <= 0) // Would mean infinite
122: wait = -1; // Expired instead
123: }
124: // Return the modified time
125: return wait;
126: }
127:
128: /**
129: * Get the lock entry for a class or any of it's superclasses or superinterfaces.
130: */
131: private LockEntry getClassLockEntry(Class currentClass) {
132: logger.debug("checking lock entry for class: " + currentClass);
133: if (currentClass == null)
134: return null;
135: // Check class
136: LockEntry entry = (LockEntry) lockEntriesByName
137: .get(currentClass.getName());
138: if (entry != null)
139: return entry;
140: // Check superclass
141: entry = getClassLockEntry(currentClass.getSuperclass());
142: if (entry != null)
143: return entry;
144: // Check interfaces
145: Class[] interfaces = currentClass.getInterfaces();
146: for (int i = 0; i < interfaces.length; i++) {
147: entry = getClassLockEntry(interfaces[i]);
148: if (entry != null)
149: return entry;
150: }
151: // Fallback
152: return null;
153: }
154:
155: /**
156: * Get the composite name for a lock metadata.
157: */
158: private String getEntryName(LockMetaData meta) {
159: return meta.getObjectClass().getName()
160: + (meta.getObjectId() == null ? "" : (":" + meta
161: .getObjectId()));
162: }
163:
164: private SessionInfo lock(int index, long threadId, long txSerial,
165: LockMetaData meta, SessionInfo info, int wait,
166: boolean ensureCurrent) {
167: if (logger.isDebugEnabled())
168: logger.debug("locking from: " + index + ":" + threadId
169: + ":" + txSerial + ", meta: " + meta + ", info: "
170: + info + ", wait: " + wait + ", ensure current: "
171: + ensureCurrent);
172: // Check whether object can have a lock engaged. To do this we require the
173: // following checks _in a single interation_, because if we yield to another
174: // thread, the situation may change, and every check has to be made again.
175: // The checks:
176: // - Check if lock is explicitly given using lockEntriesByName (if object)
177: // - Check if lock explicitly set for any class or superclass.
178: // - Check whether class has implicit usage by other lock (if class).
179: boolean allChecksOk = false;
180: while (!allChecksOk) {
181: LockEntry lockingEntry = null;
182: // Check whether object is locked
183: if (meta.getObjectId() != null) {
184: LockEntry entry = (LockEntry) lockEntriesByName
185: .get(getEntryName(meta));
186: if (entry != null) {
187: if ((entry.index == index)
188: && (entry.threadId == threadId)
189: && ((entry.txSerial == 0)
190: || (txSerial == 0) || (entry.txSerial == txSerial))) {
191: logger
192: .debug("there is already a lock for that object, but from the same node and thread, lock success.");
193: // Lock comes from the same node, and from the same thread,
194: // so it's the owner of the lock. Increase depth level.
195: entry.lockDepth++;
196: return null;
197: } else {
198: logger
199: .debug("lock is present from another node or thread, lock unsuccessful.");
200: lockingEntry = entry;
201: }
202: } else {
203: logger.debug("no lock for object is present.");
204: }
205: }
206: // Check class and superclasses, whether they are locked
207: if (lockingEntry == null) {
208: lockingEntry = getClassLockEntry(meta.getObjectClass());
209: if ((lockingEntry != null)
210: && (lockingEntry.index == index)
211: && (lockingEntry.threadId == threadId)
212: && ((lockingEntry.txSerial == 0)
213: || (txSerial == 0) || (lockingEntry.txSerial == txSerial)))
214: lockingEntry = null;
215: }
216: // If this is a class, then check, if it's indirectly used by
217: // already activated locks, which are no owned by this thread.
218: if ((lockingEntry == null) && (meta.getObjectId() == null)) {
219: Set entries = (Set) indirectLockEntriesByClass.get(meta
220: .getObjectClass());
221: if (entries != null) {
222: Iterator entriesIterator = entries.iterator();
223: while ((lockingEntry == null)
224: && (entriesIterator.hasNext())) {
225: LockEntry entry = (LockEntry) entriesIterator
226: .next();
227: if (!((entry.index == index)
228: && (entry.threadId == threadId) && ((entry.txSerial == 0)
229: || (txSerial == 0) || (entry.txSerial == txSerial))))
230: lockingEntry = entry;
231: }
232: }
233: }
234: // If a locking entry is found, then try to wait
235: if (lockingEntry != null) {
236: if (wait < 0) {
237: // Tried to lock from another node or another thread in the same node
238: logger.debug("object already locked: "
239: + getEntryName(meta) + ", from: "
240: + lockingEntry.index + ":"
241: + lockingEntry.threadId + ":"
242: + lockingEntry.txSerial);
243: return lockingEntry.info;
244: } else {
245: // Wait for an unlock event
246: wait = waitForUnlock(wait);
247: }
248: allChecksOk = false;
249: } else {
250: allChecksOk = true;
251: }
252: }
253: // The lock is ready to be engaged. Now we check (if necessary)
254: // if the object is current. If it is not, we can still throw
255: // an exception, because the lock is not yet inserted in the
256: // datamodel. On the other hand, the object is surely not locked,
257: // or owned by caller, so it's sure, there won't be any operations
258: // on the object now, during this call.
259: if (ensureCurrent) {
260: SessionInfo failInfo = modificationTracker.isCurrent(meta);
261: if (failInfo != null)
262: return failInfo;
263: }
264: // Create lock
265: LockEntry entry = new LockEntry();
266: entry.id = meta.getObjectId();
267: entry.objectClass = meta.getObjectClass();
268: entry.name = getEntryName(meta);
269: entry.index = index;
270: entry.threadId = threadId;
271: entry.txSerial = txSerial;
272: entry.info = info;
273: entry.lockDepth = 1;
274: entry.lastModificationTime = System.currentTimeMillis();
275: // Update data model
276: List entries = (List) lockEntriesByIndex
277: .get(new Integer(index));
278: if (entries == null) {
279: entries = new LinkedList();
280: lockEntriesByIndex.put(new Integer(index), entries);
281: }
282: entries.add(entry);
283: lockEntriesByName.put(getEntryName(meta), entry);
284: lockEntriesSorted.add(entry);
285: modifyIndirectEntries(entry, true);
286: // Return
287: return null;
288: }
289:
290: private void unlock(int index, long threadId, long txSerial,
291: LockMetaData meta) {
292: if (logger.isDebugEnabled())
293: logger.debug("unlocking from: " + index + ":" + threadId
294: + ":" + txSerial + ", name: " + getEntryName(meta));
295: // Get the entry first
296: LockEntry entry = (LockEntry) lockEntriesByName
297: .get(getEntryName(meta));
298: if (entry == null)
299: return; // No lock present
300: // If lock is not from owner, then return. Note: owner is the same
301: // if it's the same index and thread. Transactions do not matter here.
302: if ((entry.index != index) || (entry.threadId != threadId))
303: return;
304: // Decrease lock depth
305: entry.lockDepth--;
306: if (entry.lockDepth > 0)
307: return; // No unlock yet
308: // Full remove of the entry
309: List entries = (List) lockEntriesByIndex
310: .get(new Integer(index));
311: if (entries == null)
312: return;
313: entries.remove(entry);
314: if (entries.size() == 0)
315: lockEntriesByIndex.remove(new Integer(index));
316: lockEntriesByName.remove(getEntryName(meta));
317: lockEntriesSorted.remove(entry);
318: modifyIndirectEntries(entry, false);
319: }
320:
321: /**
322: * Lock given ids.
323: */
324: public synchronized SessionInfo lock(int index, long threadId,
325: long txSerial, List metas, SessionInfo info, int wait,
326: boolean ensureCurrent) {
327: Vector lockedMetas = new Vector();
328: // Go through all ids, and lock'em all
329: SessionInfo result = null;
330: boolean ended = false;
331: try {
332: for (int i = 0; (i < metas.size()) && (result == null); i++) {
333: LockMetaData meta = (LockMetaData) metas.get(i);
334: long startTime = System.currentTimeMillis();
335: result = lock(index, threadId, txSerial, meta, info,
336: wait, ensureCurrent);
337: if (wait > 0) {
338: long endTime = System.currentTimeMillis();
339: wait -= (endTime - startTime);
340: if (wait <= 0) // Would mean infinite
341: wait = -1; // Expired instead
342: }
343: if (result != null)
344: lockedMetas.add(meta);
345: }
346: ended = true;
347: } finally {
348: if ((!ended) || (result != null)) {
349: // Some error happened, or a lock could not be established,
350: // so unlock all successfully locked ones, because either
351: // all locks are established, or none of them when the method
352: // returns.
353: for (int o = 0; o < lockedMetas.size(); o++)
354: unlock(index, threadId, txSerial,
355: (LockMetaData) lockedMetas.get(o));
356: }
357: }
358: return result;
359: }
360:
361: /**
362: * Unlock given ids.
363: */
364: public synchronized void unlock(int index, long threadId,
365: long txSerial, List metas) {
366: // Go through all ids, and unlock'em all
367: for (int i = 0; i < metas.size(); i++)
368: unlock(index, threadId, txSerial, (LockMetaData) metas
369: .get(i));
370: // Notify waiting threads, that locks became unlocked.
371: notifyAll();
372: }
373:
374: public class LockEntry implements Comparable {
375: public Long id;
376: public String name;
377: public Class objectClass;
378:
379: public long threadId;
380: public long txSerial;
381: public int index;
382:
383: public SessionInfo info;
384: public long lastModificationTime;
385: public int lockDepth;
386:
387: public int compareTo(Object obj) {
388: return name.compareTo(((LockEntry) obj).name);
389: }
390:
391: public int hashCode() {
392: return name.hashCode();
393: }
394:
395: public boolean equals(Object o) {
396: return name.equals(((LockEntry) o).name);
397: }
398: }
399:
400: }
|