001: /*
002: * HA-JDBC: High-Availability JDBC
003: * Copyright (c) 2004-2007 Paul Ferraro
004: *
005: * This library is free software; you can redistribute it and/or modify it
006: * under the terms of the GNU Lesser General Public License as published by the
007: * Free Software Foundation; either version 2.1 of the License, or (at your
008: * option) any later version.
009: *
010: * This library is distributed in the hope that it will be useful, but WITHOUT
011: * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
012: * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
013: * for more details.
014: *
015: * You should have received a copy of the GNU Lesser General Public License
016: * along with this library; if not, write to the Free Software Foundation,
017: * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
018: *
019: * Contact: ferraro@users.sourceforge.net
020: */
021: package net.sf.hajdbc.distributable;
022:
023: import java.text.MessageFormat;
024: import java.util.Collection;
025: import java.util.Set;
026:
027: import net.sf.hajdbc.DatabaseCluster;
028: import net.sf.hajdbc.Messages;
029: import net.sf.hajdbc.StateManager;
030:
031: import org.jgroups.Address;
032: import org.jgroups.Channel;
033: import org.jgroups.Message;
034: import org.jgroups.MessageListener;
035: import org.jgroups.blocks.GroupRequest;
036: import org.jgroups.blocks.MessageDispatcher;
037: import org.jgroups.blocks.RequestHandler;
038: import org.jgroups.util.Rsp;
039: import org.slf4j.Logger;
040: import org.slf4j.LoggerFactory;
041:
042: /**
043: * StateManager implementation that broadcasts database activations and deactivations to other group members
044: * and retrieves initial state from another group member.
045: *
046: * @author Paul Ferraro
047: */
048: public class DistributableStateManager extends
049: AbstractMembershipListener implements StateManager,
050: MessageListener, RequestHandler {
051: private static final String CHANNEL = "{0}-state"; //$NON-NLS-1$
052:
053: private static Logger logger = LoggerFactory
054: .getLogger(DistributableStateManager.class);
055:
056: private int timeout;
057: private MessageDispatcher dispatcher;
058: private DatabaseCluster<?> databaseCluster;
059: private StateManager stateManager;
060:
061: public DistributableStateManager(
062: DatabaseCluster<?> databaseCluster,
063: DistributableDatabaseClusterDecorator decorator)
064: throws Exception {
065: super (decorator.createChannel(MessageFormat.format(CHANNEL,
066: databaseCluster.getId())));
067:
068: this .databaseCluster = databaseCluster;
069:
070: this .dispatcher = new MessageDispatcher(this .channel, this ,
071: this , this );
072:
073: this .timeout = decorator.getTimeout();
074: this .stateManager = databaseCluster.getStateManager();
075: }
076:
077: /**
078: * @see org.jgroups.blocks.RequestHandler#handle(org.jgroups.Message)
079: */
080: @SuppressWarnings("unchecked")
081: @Override
082: public Object handle(Message message) {
083: try {
084: Command<Object> command = (Command) message.getObject();
085:
086: logger.info(Messages.getMessage(Messages.COMMAND_RECEIVED,
087: command));
088:
089: return command.marshalResult(command.execute(
090: this .databaseCluster, this .stateManager));
091: } catch (Throwable e) {
092: logger.error(e.getMessage(), e);
093:
094: return e;
095: }
096: }
097:
098: /**
099: * @see net.sf.hajdbc.StateManager#getInitialState()
100: */
101: @Override
102: public Set<String> getInitialState() {
103: Command<Set<String>> command = new QueryInitialStateCommand();
104:
105: Collection<Rsp> responses = this .send(command,
106: GroupRequest.GET_FIRST, this .timeout);
107:
108: for (Rsp response : responses) {
109: Object result = response.getValue();
110:
111: if (result != null) {
112: Set<String> state = command.unmarshalResult(result);
113:
114: logger.info(Messages.getMessage(
115: Messages.INITIAL_CLUSTER_STATE_REMOTE, state,
116: response.getSender()));
117:
118: return state;
119: }
120: }
121:
122: return this .stateManager.getInitialState();
123: }
124:
125: /**
126: * @see net.sf.hajdbc.StateManager#add(java.lang.String)
127: */
128: @Override
129: public void add(String databaseId) {
130: if (this .databaseCluster.isActive()) {
131: // Send synchronous notification
132: this .send(new ActivateCommand(databaseId),
133: GroupRequest.GET_ALL, 0);
134: }
135:
136: this .stateManager.add(databaseId);
137: }
138:
139: /**
140: * @see net.sf.hajdbc.StateManager#remove(java.lang.String)
141: */
142: @Override
143: public void remove(String databaseId) {
144: // Send asynchronous notification
145: this .send(new DeactivateCommand(databaseId),
146: GroupRequest.GET_NONE, this .timeout);
147:
148: this .stateManager.remove(databaseId);
149: }
150:
151: private Collection<Rsp> send(Command<?> command, int mode,
152: long timeout) {
153: return this .dispatcher.castMessage(null,
154: this .createMessage(command), mode, timeout).values();
155: }
156:
157: private Message createMessage(Command<?> command) {
158: return new Message(null, this .dispatcher.getChannel()
159: .getLocalAddress(), command);
160: }
161:
162: /**
163: * @see net.sf.hajdbc.StateManager#start()
164: */
165: @Override
166: public void start() throws Exception {
167: Channel channel = this .dispatcher.getChannel();
168:
169: channel.connect(channel.getClusterName());
170:
171: this .dispatcher.start();
172:
173: this .stateManager.start();
174: }
175:
176: /**
177: * @see net.sf.hajdbc.StateManager#stop()
178: */
179: @Override
180: public void stop() {
181: this .dispatcher.stop();
182:
183: this .dispatcher.getChannel().close();
184:
185: this .stateManager.stop();
186: }
187:
188: /**
189: * @see net.sf.hajdbc.distributable.AbstractMembershipListener#memberJoined(org.jgroups.Address)
190: */
191: @Override
192: protected void memberJoined(Address address) {
193: logger.info(Messages.getMessage(Messages.GROUP_MEMBER_JOINED,
194: address, this .databaseCluster));
195: }
196:
197: /**
198: * @see net.sf.hajdbc.distributable.AbstractMembershipListener#memberLeft(org.jgroups.Address)
199: */
200: @Override
201: protected void memberLeft(Address address) {
202: logger.info(Messages.getMessage(Messages.GROUP_MEMBER_LEFT,
203: address, this .databaseCluster));
204: }
205:
206: /**
207: * @see org.jgroups.MessageListener#getState()
208: */
209: @Override
210: public byte[] getState() {
211: return null;
212: }
213:
214: /**
215: * @see org.jgroups.MessageListener#setState(byte[])
216: */
217: @Override
218: public void setState(byte[] state) {
219: // Do nothing
220: }
221:
222: /**
223: * @see org.jgroups.MessageListener#receive(org.jgroups.Message)
224: */
225: @Override
226: public void receive(Message message) {
227: // Do nothing
228: }
229: }
|