001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.object.msg;
006:
007: import com.tc.bytes.TCByteBuffer;
008: import com.tc.io.TCByteBufferOutput;
009: import com.tc.io.TCSerializable;
010: import com.tc.net.protocol.tcm.MessageChannel;
011: import com.tc.net.protocol.tcm.MessageMonitor;
012: import com.tc.net.protocol.tcm.TCMessageHeader;
013: import com.tc.net.protocol.tcm.TCMessageType;
014: import com.tc.object.lockmanager.api.LockContext;
015: import com.tc.object.lockmanager.api.LockID;
016: import com.tc.object.lockmanager.api.LockLevel;
017: import com.tc.object.lockmanager.api.ThreadID;
018: import com.tc.object.lockmanager.api.TryLockContext;
019: import com.tc.object.lockmanager.api.WaitContext;
020: import com.tc.object.session.SessionID;
021: import com.tc.object.tx.WaitInvocation;
022: import com.tc.object.tx.WaitInvocationFactory;
023: import com.tc.util.Assert;
024:
025: import java.io.IOException;
026: import java.util.ArrayList;
027: import java.util.Collection;
028: import java.util.HashSet;
029: import java.util.Iterator;
030: import java.util.List;
031: import java.util.Set;
032:
033: /**
034: * Message for obtaining/releasing locks, and for modifying them (ie. wait/notify)
035: *
036: * @author steve
037: */
038: public class LockRequestMessage extends DSOMessageBase implements
039: LockRequestMessageConsts {
040:
041: private static final WaitInvocationFactory waitInvocationFactory = new WaitInvocationFactory();
042:
043: private final static byte LOCK_ID = 1;
044: private final static byte LOCK_LEVEL = 2;
045: private final static byte THREAD_ID = 3;
046: private final static byte REQUEST_TYPE = 4;
047: private final static byte WITH_WAIT = 5;
048: private final static byte WAIT_MILLIS = 6;
049: private final static byte WAIT_NANOS = 7;
050: private final static byte NOTIFY_ALL = 8;
051: private static final byte WAIT_ARG_COUNT = 9;
052: private static final byte WAIT_CONTEXT = 10;
053: private static final byte LOCK_CONTEXT = 11;
054: private static final byte PENDING_LOCK_CONTEXT = 12;
055: private static final byte PENDING_TRY_LOCK_CONTEXT = 13;
056:
057: // request types
058: private final static byte UNITIALIZED_REQUEST_TYPE = -1;
059: private final static byte OBTAIN_LOCK_REQUEST_TYPE = 1;
060: private final static byte RELEASE_LOCK_REQUEST_TYPE = 2;
061: private final static byte RECALL_COMMIT_LOCK_REQUEST_TYPE = 3;
062: private final static byte QUERY_LOCK_REQUEST_TYPE = 4;
063: private final static byte TRY_OBTAIN_LOCK_REQUEST_TYPE = 5;
064: private final static byte INTERRUPT_WAIT_REQUEST_TYPE = 6;
065:
066: private final Set lockContexts = new HashSet();
067: private final Set waitContexts = new HashSet();
068: private final Set pendingLockContexts = new HashSet();
069: private final List pendingTryLockContexts = new ArrayList();
070:
071: private LockID lockID = LockID.NULL_ID;
072: private int lockLevel = LockLevel.NIL_LOCK_LEVEL;
073: private ThreadID threadID = ThreadID.NULL_ID;
074: private byte requestType = UNITIALIZED_REQUEST_TYPE;
075: private boolean withWait;
076: private long waitMillis = UNITIALIZED_WAIT_TIME;
077: private int waitNanos = UNITIALIZED_WAIT_TIME;
078: private boolean notifyAll;
079: private int waitArgCount = -1;
080:
081: public LockRequestMessage(SessionID sessionID,
082: MessageMonitor monitor, TCByteBufferOutput out,
083: MessageChannel channel, TCMessageType type) {
084: super (sessionID, monitor, out, channel, type);
085: }
086:
087: public LockRequestMessage(SessionID sessionID,
088: MessageMonitor monitor, MessageChannel channel,
089: TCMessageHeader header, TCByteBuffer[] data) {
090: super (sessionID, monitor, channel, header, data);
091: }
092:
093: protected void dehydrateValues() {
094: putNVPair(LOCK_ID, lockID.asString());
095: putNVPair(LOCK_LEVEL, lockLevel);
096: putNVPair(THREAD_ID, threadID.toLong());
097:
098: putNVPair(REQUEST_TYPE, requestType);
099:
100: putNVPair(WITH_WAIT, withWait);
101:
102: if (withWait || isTryObtainLockRequest()) {
103: putNVPair(WAIT_ARG_COUNT, this .waitArgCount);
104: putNVPair(WAIT_MILLIS, waitMillis);
105: putNVPair(WAIT_NANOS, waitNanos);
106: }
107:
108: putNVPair(NOTIFY_ALL, notifyAll);
109:
110: for (Iterator i = lockContexts.iterator(); i.hasNext();) {
111: putNVPair(LOCK_CONTEXT, (TCSerializable) i.next());
112: }
113:
114: for (Iterator i = waitContexts.iterator(); i.hasNext();) {
115: putNVPair(WAIT_CONTEXT, (TCSerializable) i.next());
116: }
117:
118: for (Iterator i = pendingLockContexts.iterator(); i.hasNext();) {
119: putNVPair(PENDING_LOCK_CONTEXT, (TCSerializable) i.next());
120: }
121:
122: for (Iterator i = pendingTryLockContexts.iterator(); i
123: .hasNext();) {
124: putNVPair(PENDING_TRY_LOCK_CONTEXT, (TCSerializable) i
125: .next());
126: }
127: }
128:
129: private static boolean isValidRequestType(byte type) {
130: if ((type == RELEASE_LOCK_REQUEST_TYPE)
131: || (type == OBTAIN_LOCK_REQUEST_TYPE)
132: || (type == RECALL_COMMIT_LOCK_REQUEST_TYPE)
133: || (type == QUERY_LOCK_REQUEST_TYPE)
134: || (type == TRY_OBTAIN_LOCK_REQUEST_TYPE)
135: || (type == INTERRUPT_WAIT_REQUEST_TYPE)) {
136: return true;
137: }
138:
139: return false;
140: }
141:
142: private static String getRequestTypeDescription(byte type) {
143: switch (type) {
144: case RELEASE_LOCK_REQUEST_TYPE:
145: return "Lock Release";
146: case OBTAIN_LOCK_REQUEST_TYPE:
147: return "Obtain Lock";
148: case RECALL_COMMIT_LOCK_REQUEST_TYPE:
149: return "Recall Lock Commit";
150: case QUERY_LOCK_REQUEST_TYPE:
151: return "Query Lock";
152: case TRY_OBTAIN_LOCK_REQUEST_TYPE:
153: return "Try Obtain Lock";
154: case INTERRUPT_WAIT_REQUEST_TYPE:
155: return "Interrupt Wait";
156: default:
157: return "UNKNOWN (" + type + ")";
158: }
159: }
160:
161: protected String describePayload() {
162: StringBuffer rv = new StringBuffer();
163:
164: rv.append("Request Type: ").append(
165: getRequestTypeDescription(this .requestType)).append(
166: '\n');
167: rv.append(lockID).append(' ').append(threadID).append(' ')
168: .append("Lock Type: ").append(
169: LockLevel.toString(lockLevel)).append('\n');
170:
171: if (isWaitRelease()) {
172: rv.append("Wait millis: ").append(waitMillis).append(
173: ", nanos: ").append(waitNanos).append('\n');
174: }
175: if (waitContexts.size() > 0) {
176: rv.append("Wait contexts size = ").append(
177: waitContexts.size()).append('\n');
178: }
179: if (lockContexts.size() > 0) {
180: rv.append("Lock contexts size = ").append(
181: lockContexts.size()).append('\n');
182: }
183: if (pendingLockContexts.size() > 0) {
184: rv.append("Pending Lock contexts size = ").append(
185: pendingLockContexts.size()).append('\n');
186: }
187:
188: return rv.toString();
189: }
190:
191: protected boolean hydrateValue(byte name) throws IOException {
192: switch (name) {
193: case LOCK_ID:
194: // TODO: Make this use a lockID factory so that we can avoid dups
195: lockID = new LockID(getStringValue());
196: return true;
197: case LOCK_LEVEL:
198: lockLevel = getIntValue();
199: return true;
200: case THREAD_ID:
201: threadID = new ThreadID(getLongValue());
202: return true;
203: case WITH_WAIT:
204: withWait = getBooleanValue();
205: return true;
206: case REQUEST_TYPE:
207: final byte req = getByteValue();
208: if (!isValidRequestType(req)) {
209: return false;
210: }
211: requestType = req;
212: return true;
213: case WAIT_MILLIS:
214: waitMillis = getLongValue();
215: return true;
216: case WAIT_NANOS:
217: waitNanos = getIntValue();
218: return true;
219: case NOTIFY_ALL:
220: notifyAll = getBooleanValue();
221: return true;
222: case WAIT_ARG_COUNT:
223: waitArgCount = getIntValue();
224: return true;
225: case LOCK_CONTEXT:
226: lockContexts.add(getObject(new LockContext()));
227: return true;
228: case WAIT_CONTEXT:
229: waitContexts.add(getObject(new WaitContext()));
230: return true;
231: case PENDING_LOCK_CONTEXT:
232: pendingLockContexts.add(getObject(new LockContext()));
233: return true;
234: case PENDING_TRY_LOCK_CONTEXT:
235: pendingTryLockContexts.add(getObject(new TryLockContext()));
236: return true;
237: default:
238: return false;
239: }
240: }
241:
242: public LockID getLockID() {
243: return lockID;
244: }
245:
246: public ThreadID getThreadID() {
247: return threadID;
248: }
249:
250: public int getLockLevel() {
251: return lockLevel;
252: }
253:
254: public boolean isNotifyAll() {
255: return notifyAll;
256: }
257:
258: public void addLockContext(LockContext ctxt) {
259: synchronized (lockContexts) {
260: lockContexts.add(ctxt);
261: }
262: }
263:
264: public Collection getLockContexts() {
265: synchronized (lockContexts) {
266: return new HashSet(lockContexts);
267: }
268: }
269:
270: public void addWaitContext(WaitContext ctxt) {
271: synchronized (waitContexts) {
272: waitContexts.add(ctxt);
273: }
274: }
275:
276: public Collection getWaitContexts() {
277: synchronized (waitContexts) {
278: return new HashSet(waitContexts);
279: }
280: }
281:
282: public void addPendingLockContext(LockContext ctxt) {
283: synchronized (pendingLockContexts) {
284: pendingLockContexts.add(ctxt);
285: }
286: }
287:
288: public Collection getPendingLockContexts() {
289: synchronized (pendingLockContexts) {
290: return new HashSet(pendingLockContexts);
291: }
292: }
293:
294: public void addPendingTryLockContext(LockContext ctxt) {
295: Assert.eval(ctxt instanceof TryLockContext);
296: synchronized (pendingTryLockContexts) {
297: pendingTryLockContexts.add(ctxt);
298: }
299: }
300:
301: public Collection getPendingTryLockContexts() {
302: synchronized (pendingTryLockContexts) {
303: return new ArrayList(pendingTryLockContexts);
304: }
305: }
306:
307: public boolean isInterruptWaitRequest() {
308: if (!isValidRequestType(requestType)) {
309: throw new AssertionError("Invalid request type: "
310: + requestType);
311: }
312: return requestType == INTERRUPT_WAIT_REQUEST_TYPE;
313: }
314:
315: public boolean isQueryLockRequest() {
316: if (!isValidRequestType(requestType)) {
317: throw new AssertionError("Invalid request type: "
318: + requestType);
319: }
320: return requestType == QUERY_LOCK_REQUEST_TYPE;
321: }
322:
323: public boolean isTryObtainLockRequest() {
324: if (!isValidRequestType(requestType)) {
325: throw new AssertionError("Invalid request type: "
326: + requestType);
327: }
328: return requestType == TRY_OBTAIN_LOCK_REQUEST_TYPE;
329: }
330:
331: public boolean isObtainLockRequest() {
332: if (!isValidRequestType(requestType)) {
333: throw new AssertionError("Invalid request type: "
334: + requestType);
335: }
336: return requestType == OBTAIN_LOCK_REQUEST_TYPE;
337: }
338:
339: public boolean isReleaseLockRequest() {
340: if (!isValidRequestType(requestType)) {
341: throw new AssertionError("Invalid request type: "
342: + requestType);
343: }
344: return requestType == RELEASE_LOCK_REQUEST_TYPE;
345: }
346:
347: public boolean isRecallCommitLockRequest() {
348: if (!isValidRequestType(requestType)) {
349: throw new AssertionError("Invalid request type: "
350: + requestType);
351: }
352: return requestType == RECALL_COMMIT_LOCK_REQUEST_TYPE;
353: }
354:
355: public WaitInvocation getWaitInvocation() {
356: if (!this .withWait && !isTryObtainLockRequest()) {
357: throw new IllegalStateException("not a wait request");
358: }
359: return waitInvocationFactory.newWaitInvocation(
360: this .waitArgCount, this .waitMillis, this .waitNanos);
361: }
362:
363: public boolean isWaitRelease() {
364: return this .withWait;
365: }
366:
367: public void initializeInterruptWait(LockID lid, ThreadID id) {
368: initialize(lid, id, LockLevel.NIL_LOCK_LEVEL,
369: INTERRUPT_WAIT_REQUEST_TYPE, false, false,
370: UNITIALIZED_WAIT_TIME, UNITIALIZED_WAIT_TIME, -1);
371: }
372:
373: public void initializeQueryLock(LockID lid, ThreadID id) {
374: initialize(lid, id, LockLevel.NIL_LOCK_LEVEL,
375: QUERY_LOCK_REQUEST_TYPE, false, false,
376: UNITIALIZED_WAIT_TIME, UNITIALIZED_WAIT_TIME, -1);
377: }
378:
379: public void initializeObtainLock(LockID lid, ThreadID id, int type) {
380: initialize(lid, id, type, OBTAIN_LOCK_REQUEST_TYPE, false,
381: false, UNITIALIZED_WAIT_TIME, UNITIALIZED_WAIT_TIME, -1);
382: }
383:
384: public void initializeTryObtainLock(LockID lid, ThreadID id,
385: WaitInvocation timeout, int type) {
386: initialize(lid, id, type, TRY_OBTAIN_LOCK_REQUEST_TYPE, false,
387: false, timeout.getMillis(), timeout.getNanos(), timeout
388: .getSignature().getArgCount());
389: }
390:
391: public void initializeLockRelease(LockID lid, ThreadID id) {
392: initialize(lid, id, LockLevel.NIL_LOCK_LEVEL,
393: RELEASE_LOCK_REQUEST_TYPE, false, false,
394: UNITIALIZED_WAIT_TIME, UNITIALIZED_WAIT_TIME, -1);
395: }
396:
397: public void initializeLockReleaseWait(LockID lid, ThreadID id,
398: WaitInvocation call) {
399: initialize(lid, id, LockLevel.NIL_LOCK_LEVEL,
400: RELEASE_LOCK_REQUEST_TYPE, true, false, call
401: .getMillis(), call.getNanos(), call
402: .getSignature().getArgCount());
403: }
404:
405: public void initializeLockRecallCommit(LockID lid) {
406: initialize(lid, ThreadID.VM_ID, LockLevel.NIL_LOCK_LEVEL,
407: RECALL_COMMIT_LOCK_REQUEST_TYPE, false, false,
408: UNITIALIZED_WAIT_TIME, UNITIALIZED_WAIT_TIME, -1);
409: }
410:
411: private void initialize(LockID lid, ThreadID id, int level,
412: byte reqType, boolean wait, boolean all, long millis,
413: int nanos, int waitArgs) {
414: this .lockID = lid;
415: this .lockLevel = level;
416: this .threadID = id;
417:
418: if (!isValidRequestType(reqType)) {
419: throw new IllegalArgumentException("Invalid request type: "
420: + reqType);
421: }
422: this .requestType = reqType;
423:
424: if (this .requestType == RELEASE_LOCK_REQUEST_TYPE
425: || this .requestType == RECALL_COMMIT_LOCK_REQUEST_TYPE) {
426: if (this .lockLevel != LockLevel.NIL_LOCK_LEVEL) {
427: // make the formatter happy
428: throw new IllegalArgumentException(
429: "Cannot specify a lock level for release or recall commit(yet)");
430: }
431: }
432:
433: this .withWait = wait;
434: this .notifyAll = all;
435:
436: if (wait || isTryObtainLockRequest()) {
437: if ((waitArgs < 0) || (waitArgs > 2)) {
438: throw new IllegalArgumentException(
439: "Wait argument count must be 0, 1 or 2");
440: }
441:
442: if (requestType != RELEASE_LOCK_REQUEST_TYPE
443: && requestType != TRY_OBTAIN_LOCK_REQUEST_TYPE) {
444: throw new IllegalArgumentException(
445: "Can't include withWait option for non lock release requests");
446: }
447:
448: this.waitArgCount = waitArgs;
449: this.waitMillis = millis;
450: this.waitNanos = nanos;
451: }
452: }
453:
454: }
|