001: /* Copyright (C) 2004 - 2007 db4objects Inc. http://www.db4o.com
002:
003: This file is part of the db4o open source object database.
004:
005: db4o is free software; you can redistribute it and/or modify it under
006: the terms of version 2 of the GNU General Public License as published
007: by the Free Software Foundation and as clarified by db4objects' GPL
008: interpretation policy, available at
009: http://www.db4o.com/about/company/legalpolicies/gplinterpretation/
010: Alternatively you can write to db4objects, Inc., 1900 S Norfolk Street,
011: Suite 350, San Mateo, CA 94403, USA.
012:
013: db4o is distributed in the hope that it will be useful, but WITHOUT ANY
014: WARRANTY; without even the implied warranty of MERCHANTABILITY or
015: FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
016: for more details.
017:
018: You should have received a copy of the GNU General Public License along
019: with this program; if not, write to the Free Software Foundation, Inc.,
020: 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */
021: package com.db4o.internal.cs;
022:
023: import com.db4o.*;
024: import com.db4o.foundation.*;
025: import com.db4o.foundation.network.*;
026: import com.db4o.internal.*;
027: import com.db4o.internal.cs.messages.*;
028:
029: public final class ServerMessageDispatcherImpl extends Thread implements
030: ServerMessageDispatcher {
031:
032: private String _clientName;
033:
034: private boolean _loggedin;
035:
036: private boolean _closeMessageSent;
037:
038: private final ObjectServerImpl _server;
039:
040: private Socket4 _socket;
041:
042: private ClientTransactionHandle _transactionHandle;
043:
044: private Hashtable4 _queryResults;
045:
046: final int _threadID;
047:
048: private CallbackObjectInfoCollections _committedInfo;
049:
050: private boolean _caresAboutCommitted;
051:
052: private boolean _isClosed;
053:
054: private final Object _lock = new Object();
055:
056: private final Object _mainLock;
057:
058: ServerMessageDispatcherImpl(ObjectServerImpl server,
059: ClientTransactionHandle transactionHandle, Socket4 socket,
060: int threadID, boolean loggedIn, Object mainLock)
061: throws Exception {
062:
063: _mainLock = mainLock;
064: _transactionHandle = transactionHandle;
065:
066: setDaemon(true);
067:
068: _loggedin = loggedIn;
069:
070: _server = server;
071: _threadID = threadID;
072: setDispatcherName("" + threadID);
073: _socket = socket;
074: _socket.setSoTimeout(((Config4Impl) server.configure())
075: .timeoutServerSocket());
076:
077: // TODO: Experiment with packetsize and noDelay
078: // i_socket.setSendBufferSize(100);
079: // i_socket.setTcpNoDelay(true);
080: }
081:
082: public boolean close() {
083: synchronized (_lock) {
084: if (!isMessageDispatcherAlive()) {
085: return true;
086: }
087: _isClosed = true;
088: }
089: synchronized (_mainLock) {
090: _transactionHandle.releaseTransaction();
091: sendCloseMessage();
092: _transactionHandle.close();
093: closeSocket();
094: removeFromServer();
095: return true;
096: }
097: }
098:
099: public void closeConnection() {
100: synchronized (_lock) {
101: if (!isMessageDispatcherAlive()) {
102: return;
103: }
104: _isClosed = true;
105: }
106: synchronized (_mainLock) {
107: closeSocket();
108: removeFromServer();
109: }
110: }
111:
112: public boolean isMessageDispatcherAlive() {
113: synchronized (_lock) {
114: return !_isClosed;
115: }
116: }
117:
118: private void sendCloseMessage() {
119: try {
120: if (!_closeMessageSent) {
121: _closeMessageSent = true;
122: write(Msg.CLOSE);
123: }
124: } catch (Exception e) {
125: if (Debug.atHome) {
126: e.printStackTrace();
127: }
128: }
129: }
130:
131: private void removeFromServer() {
132: try {
133: _server.removeThread(this );
134: } catch (Exception e) {
135: if (Debug.atHome) {
136: e.printStackTrace();
137: }
138: }
139: }
140:
141: private void closeSocket() {
142: try {
143: if (_socket != null) {
144: _socket.close();
145: }
146: } catch (Db4oIOException e) {
147: if (Debug.atHome) {
148: e.printStackTrace();
149: }
150: }
151: }
152:
153: public Transaction getTransaction() {
154: return _transactionHandle.transaction();
155: }
156:
157: public void run() {
158: messageLoop();
159: close();
160: }
161:
162: private void messageLoop() {
163: while (isMessageDispatcherAlive()) {
164: try {
165: if (!messageProcessor()) {
166: return;
167: }
168: } catch (Db4oIOException e) {
169: return;
170: }
171: }
172: }
173:
174: private boolean messageProcessor() throws Db4oIOException {
175: Msg message = Msg.readMessage(this , getTransaction(), _socket);
176: if (message == null) {
177: return true;
178: }
179: if (!_loggedin && !Msg.LOGIN.equals(message)) {
180: return true;
181: }
182:
183: // TODO: COR-885 - message may process against closed server
184: // Checking aliveness just makes the issue less likely to occur. Naive synchronization against main lock is prohibitive.
185: if (isMessageDispatcherAlive()) {
186: return ((ServerSideMessage) message).processAtServer();
187: }
188: return false;
189: }
190:
191: public ObjectServerImpl server() {
192: return _server;
193: }
194:
195: public void queryResultFinalized(int queryResultID) {
196: _queryResults.remove(queryResultID);
197: }
198:
199: public void mapQueryResultToID(LazyClientObjectSetStub stub,
200: int queryResultID) {
201: if (_queryResults == null) {
202: _queryResults = new Hashtable4();
203: }
204: _queryResults.put(queryResultID, stub);
205: }
206:
207: public LazyClientObjectSetStub queryResultForID(int queryResultID) {
208: return (LazyClientObjectSetStub) _queryResults
209: .get(queryResultID);
210: }
211:
212: public void switchToFile(MSwitchToFile message) {
213: synchronized (_mainLock) {
214: String fileName = message.readString();
215: try {
216: _transactionHandle.releaseTransaction();
217: _transactionHandle.acquireTransactionForFile(fileName);
218: write(Msg.OK);
219: } catch (Exception e) {
220: if (Debug.atHome) {
221: System.out.println("Msg.SWITCH_TO_FILE failed.");
222: e.printStackTrace();
223: }
224: _transactionHandle.releaseTransaction();
225: write(Msg.ERROR);
226: }
227: }
228: }
229:
230: public void switchToMainFile() {
231: synchronized (_mainLock) {
232: _transactionHandle.releaseTransaction();
233: write(Msg.OK);
234: }
235: }
236:
237: public void useTransaction(MUseTransaction message) {
238: int threadID = message.readInt();
239: Transaction transToUse = _server.findTransaction(threadID);
240: _transactionHandle.transaction(transToUse);
241: }
242:
243: public boolean write(Msg msg) {
244: synchronized (_lock) {
245: if (!isMessageDispatcherAlive()) {
246: return false;
247: }
248: return msg.write(_socket);
249: }
250: }
251:
252: public Socket4 socket() {
253: return _socket;
254: }
255:
256: public String name() {
257: return _clientName;
258: }
259:
260: public void setDispatcherName(String name) {
261: _clientName = name;
262: // set thread name
263: setName("db4o server message dispatcher " + name);
264: }
265:
266: public int dispatcherID() {
267: return _threadID;
268: }
269:
270: public void login() {
271: _loggedin = true;
272: }
273:
274: public void startDispatcher() {
275: start();
276: }
277:
278: public boolean caresAboutCommitted() {
279: return _caresAboutCommitted;
280: }
281:
282: public void caresAboutCommitted(boolean care) {
283: _caresAboutCommitted = true;
284: server().checkCaresAboutCommitted();
285: }
286:
287: public CallbackObjectInfoCollections committedInfo() {
288: return _committedInfo;
289: }
290:
291: public void committedInfo(
292: CallbackObjectInfoCollections committedInfo) {
293: _committedInfo = committedInfo;
294: }
295:
296: }
|