001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.object.lockmanager.impl;
006:
007: import com.tc.async.api.EventContext;
008: import com.tc.async.api.Sink;
009: import com.tc.async.impl.NullSink;
010: import com.tc.management.L2LockStatsManager;
011: import com.tc.net.groups.ClientID;
012: import com.tc.net.protocol.tcm.ChannelID;
013: import com.tc.object.lockmanager.api.ClientLockManager;
014: import com.tc.object.lockmanager.api.LockContext;
015: import com.tc.object.lockmanager.api.LockFlushCallback;
016: import com.tc.object.lockmanager.api.LockID;
017: import com.tc.object.lockmanager.api.LockRequest;
018: import com.tc.object.lockmanager.api.RemoteLockManager;
019: import com.tc.object.lockmanager.api.ThreadID;
020: import com.tc.object.lockmanager.api.TryLockContext;
021: import com.tc.object.lockmanager.api.TryLockRequest;
022: import com.tc.object.lockmanager.api.WaitContext;
023: import com.tc.object.lockmanager.api.WaitLockRequest;
024: import com.tc.object.session.SessionProvider;
025: import com.tc.object.tx.WaitInvocation;
026: import com.tc.objectserver.api.TestSink;
027: import com.tc.objectserver.context.LockResponseContext;
028: import com.tc.objectserver.lockmanager.api.NotifiedWaiters;
029: import com.tc.objectserver.lockmanager.api.NullChannelManager;
030: import com.tc.objectserver.lockmanager.impl.LockManagerImpl;
031:
032: import java.util.ArrayList;
033: import java.util.Collection;
034: import java.util.HashSet;
035: import java.util.Iterator;
036: import java.util.Set;
037:
038: public class ClientServerLockManagerGlue implements RemoteLockManager,
039: Runnable {
040:
041: private static final Sink NULL_SINK = new NullSink();
042:
043: private LockManagerImpl serverLockManager;
044: private ClientLockManager clientLockManager;
045:
046: private TestSink sink = new TestSink();
047: private ClientID clientID = new ClientID(new ChannelID(1));
048: private boolean stop = false;
049: private Thread eventNotifier;
050:
051: private final SessionProvider sessionProvider;
052:
053: public ClientServerLockManagerGlue(SessionProvider sessionProvider) {
054: super ();
055: this .sessionProvider = sessionProvider;
056: eventNotifier = new Thread(this , "ClientServerLockManagerGlue");
057: eventNotifier.setDaemon(true);
058: eventNotifier.start();
059: }
060:
061: public void requestLock(LockID lockID, ThreadID threadID,
062: int lockType) {
063: serverLockManager.requestLock(lockID, clientID, threadID,
064: lockType, sink);
065: }
066:
067: public void releaseLock(LockID lockID, ThreadID threadID) {
068: serverLockManager.unlock(lockID, clientID, threadID);
069: }
070:
071: public void releaseLockWait(LockID lockID, ThreadID threadID,
072: WaitInvocation call) {
073: serverLockManager.wait(lockID, clientID, threadID, call, sink);
074: }
075:
076: public void recallCommit(LockID lockID, Collection lockContext,
077: Collection waitContext, Collection pendingRequests,
078: Collection pendingTryLockRequests) {
079: Collection serverLC = new ArrayList();
080: for (Iterator i = lockContext.iterator(); i.hasNext();) {
081: LockRequest request = (LockRequest) i.next();
082: LockContext ctxt = new LockContext(request.lockID(),
083: clientID, request.threadID(), request.lockLevel());
084: serverLC.add(ctxt);
085: }
086:
087: Collection serverWC = new ArrayList();
088: for (Iterator i = waitContext.iterator(); i.hasNext();) {
089: WaitLockRequest request = (WaitLockRequest) i.next();
090: WaitContext ctxt = new WaitContext(request.lockID(),
091: clientID, request.threadID(), request.lockLevel(),
092: request.getWaitInvocation());
093: serverWC.add(ctxt);
094: }
095:
096: Collection serverPC = new ArrayList();
097: for (Iterator i = pendingRequests.iterator(); i.hasNext();) {
098: LockRequest request = (LockRequest) i.next();
099: LockContext ctxt = new LockContext(request.lockID(),
100: clientID, request.threadID(), request.lockLevel());
101: serverPC.add(ctxt);
102: }
103:
104: Collection serverPTC = new ArrayList();
105: for (Iterator i = pendingTryLockRequests.iterator(); i
106: .hasNext();) {
107: TryLockRequest request = (TryLockRequest) i.next();
108: LockContext ctxt = new TryLockContext(request.lockID(),
109: clientID, request.threadID(), request.lockLevel(),
110: request.getWaitInvocation());
111: serverPTC.add(ctxt);
112: }
113:
114: serverLockManager.recallCommit(lockID, clientID, serverLC,
115: serverWC, serverPC, serverPTC, sink);
116: }
117:
118: public void flush(LockID lockID) {
119: return;
120: }
121:
122: public boolean isTransactionsForLockFlushed(LockID lockID,
123: LockFlushCallback callback) {
124: return true;
125: }
126:
127: public void set(ClientLockManager clmgr, LockManagerImpl slmgr) {
128: this .clientLockManager = clmgr;
129: this .serverLockManager = slmgr;
130: this .serverLockManager.start();
131: }
132:
133: public void run() {
134: while (!stop) {
135: EventContext ec = null;
136: try {
137: ec = sink.take();
138: } catch (InterruptedException e) {
139: //
140: }
141: if (ec instanceof LockResponseContext) {
142: LockResponseContext lrc = (LockResponseContext) ec;
143: if (lrc.isLockAward()) {
144: clientLockManager.awardLock(sessionProvider
145: .getSessionID(), lrc.getLockID(), lrc
146: .getThreadID(), lrc.getLockLevel());
147: }
148: }
149: // ToDO :: implment WaitContext etc..
150: }
151: }
152:
153: public LockManagerImpl restartServer() {
154: int policy = this .serverLockManager.getLockPolicy();
155: this .serverLockManager = new LockManagerImpl(
156: new NullChannelManager(),
157: L2LockStatsManager.NULL_LOCK_STATS_MANAGER);
158: if (!clientLockManager.isStarting())
159: clientLockManager.starting();
160: for (Iterator i = clientLockManager.addAllHeldLocksTo(
161: new HashSet()).iterator(); i.hasNext();) {
162: LockRequest request = (LockRequest) i.next();
163: serverLockManager.reestablishLock(request.lockID(),
164: clientID, request.threadID(), request.lockLevel(),
165: NULL_SINK);
166: }
167:
168: for (Iterator i = clientLockManager.addAllWaitersTo(
169: new HashSet()).iterator(); i.hasNext();) {
170: WaitLockRequest request = (WaitLockRequest) i.next();
171: serverLockManager.reestablishWait(request.lockID(),
172: clientID, request.threadID(), request.lockLevel(),
173: request.getWaitInvocation(), sink);
174: }
175:
176: for (Iterator i = clientLockManager
177: .addAllPendingLockRequestsTo(new HashSet()).iterator(); i
178: .hasNext();) {
179: LockRequest request = (LockRequest) i.next();
180: serverLockManager.requestLock(request.lockID(), clientID,
181: request.threadID(), request.lockLevel(), sink);
182: }
183:
184: if (policy == LockManagerImpl.ALTRUISTIC_LOCK_POLICY) {
185: this .serverLockManager.setLockPolicy(policy);
186: }
187: this .serverLockManager.start();
188: return this .serverLockManager;
189: }
190:
191: public void notify(LockID lockID1, ThreadID tx2, boolean all) {
192: NotifiedWaiters waiters = new NotifiedWaiters();
193: serverLockManager.notify(lockID1, clientID, tx2, all, waiters);
194: Set s = waiters.getNotifiedFor(clientID);
195: for (Iterator i = s.iterator(); i.hasNext();) {
196: LockContext lc = (LockContext) i.next();
197: clientLockManager
198: .notified(lc.getLockID(), lc.getThreadID());
199: }
200: }
201:
202: public void stop() {
203: stop = true;
204: eventNotifier.interrupt();
205: }
206:
207: public void queryLock(LockID lockID, ThreadID threadID) {
208: serverLockManager.queryLock(lockID, clientID, threadID, sink);
209: }
210:
211: public void tryRequestLock(LockID lockID, ThreadID threadID,
212: int lockType) {
213: serverLockManager.tryRequestLock(lockID, clientID, threadID,
214: lockType, new WaitInvocation(0), sink);
215: }
216:
217: public void interrruptWait(LockID lockID, ThreadID threadID) {
218: serverLockManager.interrupt(lockID, clientID, threadID);
219:
220: }
221:
222: public void tryRequestLock(LockID lockID, ThreadID threadID,
223: WaitInvocation timeout, int lockType) {
224: serverLockManager.tryRequestLock(lockID, clientID, threadID,
225: lockType, timeout, sink);
226: }
227: }
|