001: /* Copyright (C) 2004 - 2007 db4objects Inc. http://www.db4o.com
002:
003: This file is part of the db4o open source object database.
004:
005: db4o is free software; you can redistribute it and/or modify it under
006: the terms of version 2 of the GNU General Public License as published
007: by the Free Software Foundation and as clarified by db4objects' GPL
008: interpretation policy, available at
009: http://www.db4o.com/about/company/legalpolicies/gplinterpretation/
010: Alternatively you can write to db4objects, Inc., 1900 S Norfolk Street,
011: Suite 350, San Mateo, CA 94403, USA.
012:
013: db4o is distributed in the hope that it will be useful, but WITHOUT ANY
014: WARRANTY; without even the implied warranty of MERCHANTABILITY or
015: FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
016: for more details.
017:
018: You should have received a copy of the GNU General Public License along
019: with this program; if not, write to the Free Software Foundation, Inc.,
020: 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */
021: package com.db4o.internal.cs;
022:
023: import java.io.*;
024:
025: import com.db4o.*;
026: import com.db4o.config.*;
027: import com.db4o.ext.*;
028: import com.db4o.foundation.*;
029: import com.db4o.foundation.network.*;
030: import com.db4o.internal.*;
031: import com.db4o.internal.cs.messages.*;
032:
033: public class ObjectServerImpl implements ObjectServer, ExtObjectServer,
034: Runnable, LoopbackSocketServer {
035:
036: private static final int START_THREAD_WAIT_TIMEOUT = 5000;
037:
038: private final String _name;
039:
040: private ServerSocket4 _serverSocket;
041:
042: private int _port;
043:
044: private int i_threadIDGen = 1;
045:
046: private final Collection4 _dispatchers = new Collection4();
047:
048: LocalObjectContainer _container;
049: ClientTransactionPool _transactionPool;
050:
051: private final Object _startupLock = new Object();
052:
053: private Config4Impl _config;
054:
055: private BlockingQueue _committedInfosQueue = new BlockingQueue();
056:
057: private CommittedCallbacksDispatcher _committedCallbacksDispatcher;
058:
059: private boolean _caresAboutCommitted;
060:
061: private final NativeSocketFactory _socketFactory;
062:
063: private final boolean _isEmbeddedServer;
064:
065: public ObjectServerImpl(final LocalObjectContainer container,
066: int port, NativeSocketFactory socketFactory) {
067: this (container, (port < 0 ? 0 : port), port == 0, socketFactory);
068: }
069:
070: public ObjectServerImpl(final LocalObjectContainer container,
071: int port, boolean isEmbeddedServer,
072: NativeSocketFactory socketFactory) {
073: _isEmbeddedServer = isEmbeddedServer;
074: _socketFactory = socketFactory;
075: _container = container;
076: _transactionPool = new ClientTransactionPool(container);
077: _port = port;
078: _config = _container.configImpl();
079: _name = "db4o ServerSocket FILE: " + container.toString()
080: + " PORT:" + _port;
081:
082: _container.setServer(true);
083: configureObjectServer();
084:
085: boolean ok = false;
086: try {
087: ensureLoadStaticClass();
088: startCommittedCallbackThread(_committedInfosQueue);
089: startServer();
090: ok = true;
091: } finally {
092: if (!ok) {
093: close();
094: }
095: }
096: }
097:
098: private void startServer() {
099: if (isEmbeddedServer()) {
100: return;
101: }
102:
103: synchronized (_startupLock) {
104: startServerSocket();
105: startServerThread();
106: boolean started = false;
107: while (!started) {
108: try {
109: _startupLock.wait(START_THREAD_WAIT_TIMEOUT);
110: started = true;
111: }
112: // not specialized to InterruptException for .NET conversion
113: catch (Exception exc) {
114: }
115: }
116: }
117: }
118:
119: private void startServerThread() {
120: synchronized (_startupLock) {
121: final Thread thread = new Thread(this );
122: thread.setDaemon(true);
123: thread.start();
124: }
125: }
126:
127: private void startServerSocket() {
128: try {
129: _serverSocket = new ServerSocket4(_socketFactory, _port);
130: _port = _serverSocket.getLocalPort();
131: } catch (IOException e) {
132: throw new Db4oIOException(e);
133: }
134: _serverSocket.setSoTimeout(_config.timeoutServerSocket());
135: }
136:
137: private boolean isEmbeddedServer() {
138: return _isEmbeddedServer;
139: }
140:
141: private void ensureLoadStaticClass() {
142: _container
143: .produceClassMetadata(_container._handlers.ICLASS_STATICCLASS);
144: }
145:
146: private void configureObjectServer() {
147: _config.callbacks(false);
148: _config.isServer(true);
149: // the minium activation depth of com.db4o.User.class should be 1.
150: // Otherwise, we may get null password.
151: _config.objectClass(User.class).minimumActivationDepth(1);
152: }
153:
154: public void backup(String path) throws IOException {
155: _container.backup(path);
156: }
157:
158: final void checkClosed() {
159: if (_container == null) {
160: Exceptions4.throwRuntimeException(
161: Messages.CLOSED_OR_OPEN_FAILED, _name);
162: }
163: _container.checkClosed();
164: }
165:
166: public synchronized boolean close() {
167: closeServerSocket();
168: stopCommittedCallbacksDispatcher();
169: closeMessageDispatchers();
170: return closeFile();
171: }
172:
173: private void stopCommittedCallbacksDispatcher() {
174: if (_committedCallbacksDispatcher != null) {
175: _committedCallbacksDispatcher.stop();
176: }
177: }
178:
179: private boolean closeFile() {
180: if (_container != null) {
181: _transactionPool.close();
182: _container = null;
183: }
184: return true;
185: }
186:
187: private void closeMessageDispatchers() {
188: Iterator4 i = iterateDispatchers();
189: while (i.moveNext()) {
190: try {
191: ((ServerMessageDispatcher) i.current()).close();
192: } catch (Exception e) {
193: e.printStackTrace();
194: }
195: }
196: i = iterateDispatchers();
197: while (i.moveNext()) {
198: try {
199: ((Thread) i.current()).join();
200: } catch (Exception e) {
201: e.printStackTrace();
202: }
203: }
204: }
205:
206: public Iterator4 iterateDispatchers() {
207: synchronized (_dispatchers) {
208: return new Collection4(_dispatchers).iterator();
209: }
210: }
211:
212: private void closeServerSocket() {
213: try {
214: if (_serverSocket != null) {
215: _serverSocket.close();
216: }
217: } catch (Exception e) {
218: if (Deploy.debug) {
219: System.out
220: .println("YapServer.close() ServerSocket failed to close.");
221: }
222: }
223: _serverSocket = null;
224: }
225:
226: public Configuration configure() {
227: return _config;
228: }
229:
230: public ExtObjectServer ext() {
231: return this ;
232: }
233:
234: private ServerMessageDispatcherImpl findThread(int a_threadID) {
235: synchronized (_dispatchers) {
236: Iterator4 i = _dispatchers.iterator();
237: while (i.moveNext()) {
238: ServerMessageDispatcherImpl serverThread = (ServerMessageDispatcherImpl) i
239: .current();
240: if (serverThread._threadID == a_threadID) {
241: return serverThread;
242: }
243: }
244: }
245: return null;
246: }
247:
248: Transaction findTransaction(int threadID) {
249: ServerMessageDispatcherImpl dispatcher = findThread(threadID);
250: return (dispatcher == null ? null : dispatcher.getTransaction());
251: }
252:
253: public synchronized void grantAccess(String userName,
254: String password) {
255: checkClosed();
256: synchronized (_container._lock) {
257: User existing = getUser(userName);
258: if (existing != null) {
259: setPassword(existing, password);
260: } else {
261: addUser(userName, password);
262: }
263: _container.commit();
264: }
265: }
266:
267: private void addUser(String userName, String password) {
268: _container.set(new User(userName, password));
269: }
270:
271: private void setPassword(User existing, String password) {
272: existing.password = password;
273: _container.set(existing);
274: }
275:
276: public User getUser(String userName) {
277: final ObjectSet result = queryUsers(userName);
278: if (!result.hasNext()) {
279: return null;
280: }
281: return (User) result.next();
282: }
283:
284: private ObjectSet queryUsers(String userName) {
285: _container.showInternalClasses(true);
286: try {
287: return _container.get(new User(userName, null));
288: } finally {
289: _container.showInternalClasses(false);
290: }
291: }
292:
293: public ObjectContainer objectContainer() {
294: return _container;
295: }
296:
297: public ObjectContainer openClient() {
298: return openClient(Db4o.cloneConfiguration());
299: }
300:
301: public synchronized ObjectContainer openClient(Configuration config) {
302: checkClosed();
303: synchronized (_container._lock) {
304: return new EmbeddedClientObjectContainer(_container);
305: }
306:
307: // The following uses old embedded C/S mode:
308:
309: // ClientObjectContainer client = new ClientObjectContainer(config,
310: // openClientSocket(), Const4.EMBEDDED_CLIENT_USER
311: // + (i_threadIDGen - 1), "", false);
312: // client.blockSize(_container.blockSize());
313: // return client;
314:
315: }
316:
317: public LoopbackSocket openClientSocket() {
318: int timeout = _config.timeoutClientSocket();
319: LoopbackSocket clientFake = new LoopbackSocket(this , timeout);
320: LoopbackSocket serverFake = new LoopbackSocket(this , timeout,
321: clientFake);
322: try {
323: ServerMessageDispatcher messageDispatcher = new ServerMessageDispatcherImpl(
324: this ,
325: new ClientTransactionHandle(_transactionPool),
326: serverFake, newThreadId(), true, _container.lock());
327: addServerMessageDispatcher(messageDispatcher);
328: messageDispatcher.startDispatcher();
329: return clientFake;
330: } catch (Exception e) {
331: e.printStackTrace();
332: }
333: return null;
334:
335: }
336:
337: void removeThread(ServerMessageDispatcherImpl dispatcher) {
338: synchronized (_dispatchers) {
339: _dispatchers.remove(dispatcher);
340: checkCaresAboutCommitted();
341: }
342: }
343:
344: public synchronized void revokeAccess(String userName) {
345: checkClosed();
346: synchronized (_container._lock) {
347: deleteUsers(userName);
348: _container.commit();
349: }
350: }
351:
352: private void deleteUsers(String userName) {
353: ObjectSet set = queryUsers(userName);
354: while (set.hasNext()) {
355: _container.delete(set.next());
356: }
357: }
358:
359: public void run() {
360: setThreadName();
361: logListeningOnPort();
362: notifyThreadStarted();
363: listen();
364: }
365:
366: private void startCommittedCallbackThread(
367: BlockingQueue committedInfosQueue) {
368: _committedCallbacksDispatcher = new CommittedCallbacksDispatcher(
369: this , committedInfosQueue);
370: Thread thread = new Thread(_committedCallbacksDispatcher);
371: thread.setName("committed callback thread");
372: thread.setDaemon(true);
373: thread.start();
374: }
375:
376: private void setThreadName() {
377: Thread.currentThread().setName(_name);
378: }
379:
380: private void listen() {
381: while (_serverSocket != null) {
382: try {
383: ServerMessageDispatcher messageDispatcher = new ServerMessageDispatcherImpl(
384: this , new ClientTransactionHandle(
385: _transactionPool), _serverSocket
386: .accept(), newThreadId(), false,
387: _container.lock());
388: addServerMessageDispatcher(messageDispatcher);
389: messageDispatcher.startDispatcher();
390: } catch (Exception e) {
391: // e.printStackTrace();
392: }
393: }
394: }
395:
396: private void notifyThreadStarted() {
397: synchronized (_startupLock) {
398: _startupLock.notifyAll();
399: }
400: }
401:
402: private void logListeningOnPort() {
403: _container.logMsg(Messages.SERVER_LISTENING_ON_PORT, ""
404: + _serverSocket.getLocalPort());
405: }
406:
407: private int newThreadId() {
408: return i_threadIDGen++;
409: }
410:
411: private void addServerMessageDispatcher(
412: ServerMessageDispatcher thread) {
413: synchronized (_dispatchers) {
414: _dispatchers.add(thread);
415: checkCaresAboutCommitted();
416: }
417: }
418:
419: public void addCommittedInfoMsg(MCommittedInfo message) {
420: _committedInfosQueue.add(message);
421: }
422:
423: public void broadcastMsg(Msg message, BroadcastFilter filter) {
424: Iterator4 i = iterateDispatchers();
425: while (i.moveNext()) {
426: ServerMessageDispatcher dispatcher = (ServerMessageDispatcher) i
427: .current();
428: if (filter.accept(dispatcher)) {
429: dispatcher.write(message);
430: }
431: }
432: }
433:
434: public boolean caresAboutCommitted() {
435: return _caresAboutCommitted;
436: }
437:
438: public void checkCaresAboutCommitted() {
439: _caresAboutCommitted = anyDispatcherCaresAboutCommitted();
440: }
441:
442: private boolean anyDispatcherCaresAboutCommitted() {
443: Iterator4 i = iterateDispatchers();
444: while (i.moveNext()) {
445: ServerMessageDispatcher dispatcher = (ServerMessageDispatcher) i
446: .current();
447: if (dispatcher.caresAboutCommitted()) {
448: return true;
449: }
450: }
451: return false;
452: }
453:
454: public int port() {
455: return _port;
456: }
457:
458: public int clientCount() {
459: synchronized (_dispatchers) {
460: return _dispatchers.size();
461: }
462: }
463: }
|