001: // Space4J(TM) - Object Persistence in RAM
002: // Copyright (C) 2003 Sergio Oliveira Junior
003: // This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License version 2.1 as published by the Free Software Foundation. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
004:
005: package org.space4j.implementation;
006:
007: import org.space4j.*;
008:
009: import java.io.*;
010: import java.util.*;
011: import java.net.*;
012:
013: /**
014: * This is complex, but it works.<br>
015: * Users of Space4J do not have to understand this to use the system.<br>
016: * The true power of encapsulation and simplicity.
017: */
018: public class SlaveSpace4J extends SimpleSpace4J implements Runnable {
019:
020: private int master_port = 4000;
021: private int slave_port = 4001;
022: private ServerSocket ss;
023: private String master_ip;
024: private Socket socket;
025: private ObjectOutputStream out;
026: private ObjectInputStream in;
027: private boolean takingsnapshot = false;
028: private Object snapshotLock = new Object();
029: private LinkedList list = new LinkedList();
030: private long lognumber = -1;
031: private int retval;
032: private long replica_id = 0L;
033:
034: /**
035: * Initializes the SlaveSpace4J.
036: * @param dirname The dir where the space is stored.
037: * @param master_ip The ip address of the master.
038: * @see SimpleSpace4J
039: */
040: public SlaveSpace4J(String dirname, String master_ip)
041: throws LoggerException, CommandException {
042: super (dirname);
043: this .master_ip = master_ip;
044: }
045:
046: /**
047: * Initializes the SlaveSpace4J.
048: * @param dirname The dir where the space is stored.
049: * @param master_ip The ip address of the master.
050: * @param master_port The port the master is listening.
051: * @param slave_port The port this slave will be listening.
052: * @see SimpleSpace4J
053: */
054: public SlaveSpace4J(String dirname, String master_ip,
055: int master_port, int slave_port) throws LoggerException,
056: CommandException {
057: this (dirname, master_ip);
058: this .master_port = master_port;
059: this .slave_port = slave_port;
060: }
061:
062: public void start() throws UnknownHostException, IOException,
063: ClassNotFoundException {
064: super .start();
065: Thread listener = new Thread(this );
066: listener.setDaemon(true);
067: listener.start();
068: Thread executer = new CommandExecuter(this , list);
069: executer.setDaemon(true);
070: executer.start();
071: connectToMaster();
072: }
073:
074: // connect to master and get an unique slave id...
075: private void connectToMaster() throws UnknownHostException,
076: IOException, ClassNotFoundException {
077: socket = new Socket(master_ip, master_port);
078: out = new ObjectOutputStream(socket.getOutputStream());
079: in = new ObjectInputStream(socket.getInputStream());
080: Long x = (Long) in.readObject();
081: this .replica_id = x.longValue();
082: }
083:
084: private void sendCommandToMaster(Command cmd) throws IOException {
085: cmd.setReplicaId(replica_id);
086: out.writeObject(cmd);
087: }
088:
089: public synchronized int executeCommand(Command cmd)
090: throws CommandException {
091: // local command being executed in the slave
092: // slave needs to send it to the master
093: try {
094: sendCommandToMaster(cmd);
095: wait(); // wait for the master to execute it...
096: return retval;
097: } catch (Exception e) {
098: e.printStackTrace();
099: throw new CommandException(e.getMessage());
100: }
101: }
102:
103: // execute a list of commands...
104: void executeCommands(ArrayList list) {
105: Iterator iter = list.iterator();
106: while (iter.hasNext()) {
107: Command cmd = (Command) iter.next();
108: try {
109: executeCommandLocally(cmd);
110: } catch (CommandException e) {
111: e.printStackTrace();
112: } catch (LoggerException e) {
113: e.printStackTrace();
114: }
115: }
116: }
117:
118: void executeCommandFromMaster(Command cmd) throws CommandException {
119: // add the command to the linkedlist and wake up the commandexecuter thread
120: synchronized (list) {
121: list.add(cmd);
122: synchronized (snapshotLock) {
123: if (!takingsnapshot) {
124: list.notify();
125: }
126: }
127: }
128: }
129:
130: synchronized int executeCommandLocally(Command cmd)
131: throws CommandException, LoggerException {
132: lognumber = cmd.getLogNumber();
133: retval = super .executeCommand(cmd, false);
134: if (cmd.getReplicaId() == replica_id)
135: notify();
136: return retval;
137: }
138:
139: public void executeSnapshot() throws LoggerException {
140: synchronized (snapshotLock) {
141: takingsnapshot = true;
142: }
143:
144: try {
145: logger.takeSnapshot(space, lognumber);
146: } catch (LoggerException e) {
147: e.printStackTrace();
148: throw e;
149: } finally {
150: synchronized (snapshotLock) {
151: takingsnapshot = false;
152: }
153: synchronized (list) {
154: list.notify();
155: }
156: }
157: }
158:
159: // accept connection from the master...
160: // master may die or it may need to reconnect
161: // master may also change...
162: // that's why this is threaded in an infinite loop...
163: public void run() {
164:
165: try {
166:
167: ss = new ServerSocket(slave_port);
168:
169: while (true) {
170:
171: try {
172:
173: Socket socket = ss.accept();
174: Thread client = new MasterThread(this , socket);
175: client.start();
176:
177: } catch (Exception e) {
178: e.printStackTrace();
179: }
180:
181: }
182:
183: } catch (Exception e) {
184: e.printStackTrace();
185: }
186: }
187:
188: }
189:
190: // the slave will receive commands from the master here...
191: class MasterThread extends Thread {
192:
193: private SlaveSpace4J space4j;
194: private Socket socket;
195: private ObjectOutputStream out;
196: private ObjectInputStream in;
197:
198: private boolean bThread = true;
199:
200: public MasterThread(SlaveSpace4J space4j, Socket socket) {
201: this .space4j = space4j;
202: this .socket = socket;
203: }
204:
205: public void run() {
206:
207: try {
208:
209: out = new ObjectOutputStream(socket.getOutputStream());
210: in = new ObjectInputStream(socket.getInputStream());
211:
212: while (bThread) {
213: Command cmd = (Command) in.readObject();
214: space4j.executeCommandFromMaster(cmd);
215: }
216:
217: } catch (Exception e) {
218: e.printStackTrace();
219: }
220: }
221: }
222:
223: // this is a producer/consumer thread, backed-up by a linked list...
224: class CommandExecuter extends Thread {
225:
226: private SlaveSpace4J space4j;
227: private LinkedList list;
228: private boolean bThread = true;
229:
230: public CommandExecuter(SlaveSpace4J space4j, LinkedList list) {
231: this .space4j = space4j;
232: this .list = list;
233: }
234:
235: public void run() {
236: while (bThread) {
237: try {
238: ArrayList al = null;
239: synchronized (list) {
240: while (list.size() == 0)
241: list.wait();
242: al = new ArrayList(list);
243: list.clear();
244: }
245: space4j.executeCommands(al);
246: } catch (Exception e) {
247: e.printStackTrace();
248: }
249: }
250: }
251: }
|