001: /*
002: * HA-JDBC: High-Availability JDBC
003: * Copyright (c) 2004-2007 Paul Ferraro
004: *
005: * This library is free software; you can redistribute it and/or modify it
006: * under the terms of the GNU Lesser General Public License as published by the
007: * Free Software Foundation; either version 2.1 of the License, or (at your
008: * option) any later version.
009: *
010: * This library is distributed in the hope that it will be useful, but WITHOUT
011: * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
012: * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
013: * for more details.
014: *
015: * You should have received a copy of the GNU Lesser General Public License
016: * along with this library; if not, write to the Free Software Foundation,
017: * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
018: *
019: * Contact: ferraro@users.sourceforge.net
020: */
021: package net.sf.hajdbc.distributable;
022:
023: import java.lang.reflect.Method;
024: import java.text.MessageFormat;
025: import java.util.Collection;
026: import java.util.HashMap;
027: import java.util.Map;
028: import java.util.TreeMap;
029: import java.util.Vector;
030: import java.util.concurrent.ConcurrentHashMap;
031: import java.util.concurrent.TimeUnit;
032: import java.util.concurrent.locks.Condition;
033: import java.util.concurrent.locks.Lock;
034:
035: import net.sf.hajdbc.DatabaseCluster;
036: import net.sf.hajdbc.LockManager;
037:
038: import org.jgroups.Address;
039: import org.jgroups.Message;
040: import org.jgroups.MessageListener;
041: import org.jgroups.blocks.GroupRequest;
042: import org.jgroups.blocks.MethodCall;
043: import org.jgroups.blocks.RpcDispatcher;
044: import org.jgroups.util.Rsp;
045: import org.slf4j.Logger;
046: import org.slf4j.LoggerFactory;
047:
048: /**
049: * LockManager implementation that leverages a JGroups 2-phase voting adapter for obtain remote write locks.
050: *
051: * @author Paul Ferraro
052: */
053: public class DistributableLockManager extends
054: AbstractMembershipListener implements LockManager,
055: MessageListener {
056: private static final String CHANNEL = "{0}-lock"; //$NON-NLS-1$
057:
058: static Logger logger = LoggerFactory
059: .getLogger(DistributableLockManager.class);
060:
061: protected RpcDispatcher dispatcher;
062: protected int timeout;
063: private LockManager lockManager;
064: private Map<Address, Map<String, Lock>> addressMap = new ConcurrentHashMap<Address, Map<String, Lock>>();
065:
066: /**
067: * Constructs a new DistributableLock.
068: * @param <D> either java.sql.Driver or javax.sql.Datasource
069: * @param databaseCluster a database cluster
070: * @param decorator a decorator
071: * @throws Exception
072: */
073: public <D> DistributableLockManager(
074: DatabaseCluster<D> databaseCluster,
075: DistributableDatabaseClusterDecorator decorator)
076: throws Exception {
077: super (decorator.createChannel(MessageFormat.format(CHANNEL,
078: databaseCluster.getId())));
079:
080: this .lockManager = databaseCluster.getLockManager();
081:
082: this .timeout = decorator.getTimeout();
083:
084: this .dispatcher = new RpcDispatcher(this .channel, this , this ,
085: this );
086: }
087:
088: @Override
089: public void start() throws Exception {
090: this .channel.connect(this .channel.getClusterName());
091:
092: this .lockManager.start();
093: }
094:
095: @Override
096: public void stop() {
097: this .channel.close();
098:
099: this .lockManager.stop();
100: }
101:
102: /**
103: * Read locks are local.
104: * @see net.sf.hajdbc.LockManager#readLock(java.lang.String)
105: */
106: @Override
107: public Lock readLock(String object) {
108: return this .lockManager.readLock(object);
109: }
110:
111: /**
112: * Write locks are distributed.
113: * @see net.sf.hajdbc.LockManager#writeLock(java.lang.String)
114: */
115: @Override
116: public Lock writeLock(String object) {
117: return new DistributableLock(object, this .lockManager
118: .writeLock(object));
119: }
120:
121: public boolean vote(LockDecree decree) {
122: Map<String, Lock> lockMap = this .addressMap.get(decree
123: .getAddress());
124:
125: // Vote negatively for decrees from non-members
126: if (lockMap == null) {
127: return false;
128: }
129:
130: return decree.vote(this .lockManager, lockMap);
131: }
132:
133: /**
134: * @see net.sf.hajdbc.distributable.AbstractMembershipListener#memberJoined(org.jgroups.Address)
135: */
136: @Override
137: protected void memberJoined(Address address) {
138: this .addressMap.put(address, new HashMap<String, Lock>());
139: }
140:
141: /**
142: * @see net.sf.hajdbc.distributable.AbstractMembershipListener#memberLeft(org.jgroups.Address)
143: */
144: @Override
145: protected void memberLeft(Address address) {
146: Map<String, Lock> lockMap = this .addressMap.remove(address);
147:
148: for (Lock lock : lockMap.values()) {
149: lock.unlock();
150: }
151: }
152:
153: /**
154: * @see org.jgroups.MessageListener#getState()
155: */
156: @Override
157: public byte[] getState() {
158: return null;
159: }
160:
161: /**
162: * @see org.jgroups.MessageListener#receive(org.jgroups.Message)
163: */
164: @Override
165: public void receive(Message message) {
166: // Do nothing
167: }
168:
169: /**
170: * @see org.jgroups.MessageListener#setState(byte[])
171: */
172: @Override
173: public void setState(byte[] arg0) {
174: // Do nothing
175: }
176:
177: private class DistributableLock implements Lock {
178: private LockDecree acquireDecree;
179: private LockDecree releaseDecree;
180: private Lock lock;
181:
182: public DistributableLock(String object, Lock lock) {
183: Address address = DistributableLockManager.this .channel
184: .getLocalAddress();
185:
186: this .acquireDecree = new AcquireLockDecree(object, address);
187: this .releaseDecree = new ReleaseLockDecree(object, address);
188:
189: this .lock = lock;
190: }
191:
192: /**
193: * @see java.util.concurrent.locks.Lock#lock()
194: */
195: @Override
196: public void lock() {
197: while (!DistributableLockManager.this .isMembershipEmpty()) {
198: if (this .tryLock()) {
199: return;
200: }
201: }
202:
203: this .lock.lock();
204: }
205:
206: /**
207: * @see java.util.concurrent.locks.Lock#lockInterruptibly()
208: */
209: @Override
210: public void lockInterruptibly() throws InterruptedException {
211: while (!DistributableLockManager.this .isMembershipEmpty()) {
212: if (this .tryLock()) {
213: return;
214: }
215:
216: if (Thread.currentThread().isInterrupted()) {
217: throw new InterruptedException();
218: }
219: }
220:
221: this .lock.lockInterruptibly();
222: }
223:
224: /**
225: * @see java.util.concurrent.locks.Lock#tryLock()
226: */
227: @Override
228: public boolean tryLock() {
229: if (this .lock.tryLock()) {
230: if (this .tryRemoteLock()) {
231: return true;
232: }
233:
234: this .lock.unlock();
235: }
236:
237: return false;
238: }
239:
240: /**
241: * @see java.util.concurrent.locks.Lock#tryLock(long, java.util.concurrent.TimeUnit)
242: */
243: @Override
244: public boolean tryLock(long timeout, TimeUnit unit)
245: throws InterruptedException {
246: // Convert timeout to milliseconds
247: long ms = unit.toMillis(timeout);
248:
249: long stopTime = System.currentTimeMillis() + ms;
250:
251: do {
252: if (DistributableLockManager.this .isMembershipEmpty()) {
253: return this .lock.tryLock(ms, TimeUnit.MILLISECONDS);
254: }
255:
256: if (this .tryLock()) {
257: return true;
258: }
259:
260: if (Thread.currentThread().isInterrupted()) {
261: throw new InterruptedException();
262: }
263:
264: ms = stopTime - System.currentTimeMillis();
265: } while (ms >= 0);
266:
267: return false;
268: }
269:
270: /**
271: * @see java.util.concurrent.locks.Lock#unlock()
272: */
273: @Override
274: public void unlock() {
275: this .remoteUnlock();
276:
277: this .lock.unlock();
278: }
279:
280: private boolean tryRemoteLock() {
281: Map<Boolean, Vector<Address>> map = null;
282:
283: try {
284: map = this .remoteLock();
285:
286: return map.get(false).isEmpty();
287: } finally {
288: if (map != null) {
289: this .remoteUnlock(map.get(true));
290: }
291: }
292: }
293:
294: private Map<Boolean, Vector<Address>> remoteLock() {
295: return DistributableLockManager.this .remoteVote(
296: this .acquireDecree, null,
297: DistributableLockManager.this .timeout);
298: }
299:
300: private Map<Boolean, Vector<Address>> remoteUnlock() {
301: return this .remoteUnlock(null);
302: }
303:
304: private Map<Boolean, Vector<Address>> remoteUnlock(
305: Vector<Address> address) {
306: return DistributableLockManager.this .remoteVote(
307: this .releaseDecree, address, 0);
308: }
309:
310: /**
311: * @see java.util.concurrent.locks.Lock#newCondition()
312: */
313: @Override
314: public Condition newCondition() {
315: throw new UnsupportedOperationException();
316: }
317: }
318:
319: public Map<Boolean, Vector<Address>> remoteVote(LockDecree decree,
320: Vector<Address> addresses, long timeout) {
321: Map<Boolean, Vector<Address>> map = new TreeMap<Boolean, Vector<Address>>();
322:
323: int size = (addresses != null) ? addresses.size() : this
324: .getMembershipSize();
325:
326: map.put(true, new Vector<Address>(size));
327: map.put(false, new Vector<Address>(size));
328:
329: if (size > 0) {
330: try {
331: Method method = this .getClass().getMethod(
332: "vote", LockDecree.class); //$NON-NLS-1$
333:
334: MethodCall call = new MethodCall(method,
335: new Object[] { decree });
336:
337: Collection<Rsp> responses = this .dispatcher
338: .callRemoteMethods(addresses, call,
339: GroupRequest.GET_ALL, timeout).values();
340:
341: for (Rsp response : responses) {
342: Object value = response.wasReceived() ? response
343: .getValue() : false;
344:
345: map.get((value != null) ? value : false).add(
346: response.getSender());
347: }
348: } catch (NoSuchMethodException e) {
349: throw new IllegalStateException(e);
350: }
351: }
352:
353: return map;
354: }
355: }
|