001: // Space4J(TM) - Object Persistence in RAM
002: // Copyright (C) 2003 Sergio Oliveira Junior
003: // This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License version 2.1 as published by the Free Software Foundation. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
004:
005: package org.space4j.implementation;
006:
007: import org.space4j.*;
008:
009: import java.io.*;
010: import java.util.*;
011: import java.net.*;
012:
013: /**
014: * The master space4j in the center of the replication ring.<br>
015: * It will execute all commands and send them to the slaves, so they can execute them too.
016: */
017: public class MasterSpace4J extends SimpleSpace4J implements Runnable {
018:
019: private int master_port = 4000;
020: private int slave_port = 4001;
021:
022: private ServerSocket ss;
023: private HashMap slaves = new HashMap();
024: private Thread listener;
025:
026: private long[] unique_replica_id = { 0L };
027:
028: /**
029: * Initializes the MasterSpace4J.
030: * @param dirname The dir where the space is stored.
031: * @see SimpleSpace4J
032: */
033: public MasterSpace4J(String dirname) throws LoggerException,
034: CommandException {
035: super (dirname);
036: }
037:
038: /**
039: * Initializes the MasterSpace4J.
040: * @param dirname The dir where the space is stored.
041: * @param master_port The port where the master will be listening.
042: * @param slave_port The port where the slaves will be listening.
043: * @see SimpleSpace4J
044: */
045: public MasterSpace4J(String dirname, int master_port, int slave_port)
046: throws LoggerException, CommandException {
047: this (dirname);
048: this .master_port = master_port;
049: this .slave_port = slave_port;
050: }
051:
052: /**
053: * Start listening for slaves.
054: */
055: public void start() throws UnknownHostException, IOException,
056: ClassNotFoundException {
057: super .start();
058: listener = new Thread(this );
059: listener.setDaemon(true);
060: listener.start();
061: }
062:
063: // connect back to slaves...
064: // the master will notify the slaves of changes (commands) through here...
065: void connectToSlave(InetAddress slave_ip) throws SocketException,
066: IOException {
067: Socket socket = new Socket(slave_ip, slave_port);
068: ObjectOutputStream oos = new ObjectOutputStream(socket
069: .getOutputStream());
070: synchronized (slaves) {
071: slaves.put(slave_ip, oos);
072: }
073: }
074:
075: // send a just executed command to the slaves, so they can execute it too...
076: private void notifySlaves(Command cmd) {
077: synchronized (slaves) {
078: Iterator iter = slaves.values().iterator();
079: while (iter.hasNext()) {
080: ObjectOutputStream oos = (ObjectOutputStream) iter
081: .next();
082: try {
083: oos.writeObject(cmd);
084: } catch (IOException e) {
085: // we lost a slave
086: // close the connection and remove it from the ring...
087: e.printStackTrace();
088: try {
089: oos.close();
090: } catch (Exception exc) {
091: }
092: iter.remove();
093: }
094: }
095: }
096: }
097:
098: // if you are reapplying, there is no need for logging, and also no need for notifying the slaves...
099: public synchronized int executeCommand(Command cmd, boolean log)
100: throws CommandException, LoggerException {
101: int x = super .executeCommand(cmd, log);
102: if (log)
103: notifySlaves(cmd);
104: return x;
105: }
106:
107: // get an unique id for the slave...
108: long getUniqueReplicaId() {
109: synchronized (unique_replica_id) {
110: return ++unique_replica_id[0];
111: }
112: }
113:
114: public void run() {
115:
116: try {
117:
118: ss = new ServerSocket(master_port);
119:
120: while (true) {
121:
122: try {
123:
124: Socket socket = ss.accept();
125: Thread client = new SlaveThread(this , socket);
126: client.start();
127: //System.out.println("Slave connected !!!");
128:
129: } catch (Exception e) {
130: e.printStackTrace();
131: }
132:
133: }
134:
135: } catch (Exception e) {
136: e.printStackTrace();
137: }
138: }
139:
140: }
141:
142: /*
143: * This thread will wait for commands coming from the slaves.
144: * All commands must be executed first in the slaves.
145: */
146: class SlaveThread extends Thread {
147:
148: private MasterSpace4J space4j;
149: private Socket socket;
150: private ObjectOutputStream out;
151: private ObjectInputStream in;
152: private InetAddress slave_ip;
153:
154: private boolean bThread = true;
155:
156: public SlaveThread(MasterSpace4J space4j, Socket socket) {
157: this .space4j = space4j;
158: this .socket = socket;
159: }
160:
161: public void run() {
162:
163: try {
164:
165: slave_ip = socket.getInetAddress();
166:
167: out = new ObjectOutputStream(socket.getOutputStream());
168: in = new ObjectInputStream(socket.getInputStream());
169:
170: // send a unique id to the slave...
171: out.writeObject(new Long(space4j.getUniqueReplicaId()));
172:
173: space4j.connectToSlave(slave_ip);
174:
175: // just wait for commands from the slave...
176: while (bThread) {
177: Command cmd = (Command) in.readObject();
178: space4j.executeCommand(cmd);
179: }
180:
181: } catch (Exception e) {
182: e.printStackTrace();
183: }
184: }
185: }
|