001: /* Copyright (C) 2004 - 2007 db4objects Inc. http://www.db4o.com
002:
003: This file is part of the db4o open source object database.
004:
005: db4o is free software; you can redistribute it and/or modify it under
006: the terms of version 2 of the GNU General Public License as published
007: by the Free Software Foundation and as clarified by db4objects' GPL
008: interpretation policy, available at
009: http://www.db4o.com/about/company/legalpolicies/gplinterpretation/
010: Alternatively you can write to db4objects, Inc., 1900 S Norfolk Street,
011: Suite 350, San Mateo, CA 94403, USA.
012:
013: db4o is distributed in the hope that it will be useful, but WITHOUT ANY
014: WARRANTY; without even the implied warranty of MERCHANTABILITY or
015: FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
016: for more details.
017:
018: You should have received a copy of the GNU General Public License along
019: with this program; if not, write to the Free Software Foundation, Inc.,
020: 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */
021: package com.db4o;
022:
023: import com.db4o.ext.*;
024: import com.db4o.internal.*;
025: import com.db4o.internal.replication.*;
026: import com.db4o.query.*;
027: import com.db4o.replication.*;
028:
029: /**
030: * @exclude
031: * @deprecated
032: */
033: public class ReplicationImpl implements ReplicationProcess {
034:
035: final ObjectContainerBase _peerA;
036:
037: final Transaction _transA;
038:
039: final ObjectContainerBase _peerB;
040:
041: final Transaction _transB;
042:
043: final ReplicationConflictHandler _conflictHandler;
044:
045: final ReplicationRecord _record;
046:
047: private int _direction;
048:
049: private static final int IGNORE = 0;
050:
051: private static final int TO_B = -1;
052:
053: private static final int TO_A = 1;
054:
055: private static final int CHECK_CONFLICT = -99;
056:
057: public ReplicationImpl(ObjectContainerBase peerA,
058: ObjectContainerBase peerB,
059: ReplicationConflictHandler conflictHandler) {
060:
061: if (conflictHandler == null) {
062: // We don't allow starting replication without a
063: // conflict handler, so we don't get late failures.
064: throw new NullPointerException();
065: }
066:
067: synchronized (peerA.lock()) {
068: synchronized (peerB.lock()) {
069:
070: _peerA = peerA;
071: _transA = peerA.checkTransaction();
072:
073: _peerB = peerB;
074: _transB = _peerB.checkTransaction(null);
075:
076: MigrationConnection mgc = new MigrationConnection(
077: _peerA, _peerB);
078:
079: _peerA._handlers.migrationConnection(mgc);
080: _peerA._handlers.replication(this );
081: _peerA.replicationCallState(Const4.OLD);
082:
083: _peerB._handlers.migrationConnection(mgc);
084: _peerB._handlers.replication(this );
085: _peerB.replicationCallState(Const4.OLD);
086:
087: _conflictHandler = conflictHandler;
088:
089: _record = ReplicationRecord.beginReplication(_transA,
090: _transB);
091: }
092: }
093:
094: }
095:
096: private int bindAndSet(Transaction trans, ObjectContainerBase peer,
097: ObjectReference ref, Object sourceObject) {
098: if (sourceObject instanceof Db4oTypeImpl) {
099: Db4oTypeImpl db4oType = (Db4oTypeImpl) sourceObject;
100: if (!db4oType.canBind()) {
101: Db4oTypeImpl targetObject = (Db4oTypeImpl) ref
102: .getObject();
103: targetObject.replicateFrom(sourceObject);
104: return ref.getID();
105: }
106: }
107: peer.bind2(trans, ref, sourceObject);
108: return peer.setAfterReplication(trans, sourceObject, 1, true);
109: }
110:
111: public void checkConflict(Object obj) {
112: int temp = _direction;
113: _direction = CHECK_CONFLICT;
114: replicate(obj);
115: _direction = temp;
116: }
117:
118: public void commit() {
119: synchronized (_peerA.lock()) {
120: synchronized (_peerB.lock()) {
121:
122: _peerA.commit(_transA);
123: _peerB.commit(_transB);
124:
125: endReplication();
126:
127: long versionA = _peerA.currentVersion();
128: long versionB = _peerB.currentVersion();
129:
130: _record._version = (versionA > versionB) ? versionA
131: : versionB;
132:
133: _peerA.raiseVersion(_record._version + 1);
134: _peerB.raiseVersion(_record._version + 1);
135:
136: _record.store(_peerA);
137: _record.store(_peerB);
138: }
139: }
140: }
141:
142: private void endReplication() {
143:
144: _peerA.replicationCallState(Const4.NONE);
145: _peerA._handlers.migrationConnection(null);
146: _peerA._handlers.replication(null);
147:
148: _peerA.replicationCallState(Const4.NONE);
149: _peerB._handlers.migrationConnection(null);
150: _peerB._handlers.replication(null);
151: }
152:
153: private int idInCaller(ObjectContainerBase caller,
154: ObjectReference referenceA, ObjectReference referenceB) {
155: return (caller == _peerA) ? referenceA.getID() : referenceB
156: .getID();
157: }
158:
159: private int ignoreOrCheckConflict() {
160: if (_direction == CHECK_CONFLICT) {
161: return CHECK_CONFLICT;
162: }
163: return IGNORE;
164: }
165:
166: private boolean isInConflict(long versionA, long versionB) {
167: if (versionA > _record._version && versionB > _record._version) {
168: return true;
169: }
170: if (versionB > _record._version && _direction == TO_B) {
171: return true;
172: }
173: if (versionA > _record._version && _direction == TO_A) {
174: return true;
175: }
176: return false;
177: }
178:
179: private long lastSynchronization() {
180: return _record._version;
181: }
182:
183: public ObjectContainer peerA() {
184: return (ObjectContainer) _peerA;
185: }
186:
187: public ObjectContainer peerB() {
188: return (ObjectContainer) _peerB;
189: }
190:
191: public void replicate(Object obj) {
192:
193: // When there is an active replication process, the set() method
194: // will call back to the #process() method in this class.
195:
196: // This detour is necessary, since #set() has to handle all cases
197: // anyway, for members of the replicated object, especially the
198: // prevention of endless loops in case of circular references.
199:
200: ObjectContainerBase container = _peerB;
201: Transaction trans = _transB;
202:
203: if (_peerB.isStored(_transB, obj)) {
204: if (!_peerA.isStored(_transA, obj)) {
205: container = _peerA;
206: trans = _transA;
207: }
208: }
209:
210: container.set(trans, obj);
211: }
212:
213: public void rollback() {
214: _peerA.rollback(_transA);
215: _peerB.rollback(_transB);
216: endReplication();
217: }
218:
219: public void setDirection(ObjectContainer replicateFrom,
220: ObjectContainer replicateTo) {
221: if (replicateFrom == _peerA && replicateTo == _peerB) {
222: _direction = TO_B;
223: }
224: if (replicateFrom == _peerB && replicateTo == _peerA) {
225: _direction = TO_A;
226: }
227: }
228:
229: private void shareBinding(ObjectReference sourceReference,
230: ObjectReference referenceA, Object objectA,
231: ObjectReference referenceB, Object objectB) {
232: if (sourceReference == null) {
233: return;
234: }
235: if (objectA instanceof Db4oTypeImpl) {
236: if (!((Db4oTypeImpl) objectA).canBind()) {
237: return;
238: }
239: }
240:
241: if (sourceReference == referenceA) {
242: _peerB.bind2(_transB, referenceB, objectA);
243: } else {
244: _peerA.bind2(_transA, referenceA, objectB);
245: }
246: }
247:
248: private int toA() {
249: if (_direction == CHECK_CONFLICT) {
250: return CHECK_CONFLICT;
251: }
252: if (_direction != TO_B) {
253: return TO_A;
254: }
255: return IGNORE;
256: }
257:
258: private int toB() {
259: if (_direction == CHECK_CONFLICT) {
260: return CHECK_CONFLICT;
261: }
262: if (_direction != TO_A) {
263: return TO_B;
264: }
265: return IGNORE;
266: }
267:
268: /**
269: * called by YapStream.set()
270: * @return id of reference in caller or 0 if not handled or -1
271: * if #set() should stop processing because of a direction
272: * setting.
273: */
274: public int tryToHandle(ObjectContainerBase caller, Object obj) {
275:
276: int notProcessed = 0;
277: ObjectContainerBase other = null;
278: ObjectReference sourceReference = null;
279:
280: if (caller == _peerA) {
281: other = _peerB;
282: if (_direction == TO_B) {
283: notProcessed = -1;
284: }
285: } else {
286: other = _peerA;
287: if (_direction == TO_A) {
288: notProcessed = -1;
289: }
290: }
291:
292: synchronized (other._lock) {
293:
294: Object objectA = obj;
295: Object objectB = obj;
296:
297: ObjectReference referenceA = _transA
298: .referenceForObject(obj);
299: ObjectReference referenceB = _transB
300: .referenceForObject(obj);
301:
302: VirtualAttributes attA = null;
303: VirtualAttributes attB = null;
304:
305: if (referenceA == null) {
306: if (referenceB == null) {
307: return notProcessed;
308: }
309:
310: sourceReference = referenceB;
311:
312: attB = referenceB.virtualAttributes(_transB);
313: if (attB == null) {
314: return notProcessed;
315: }
316:
317: HardObjectReference hardRef = _transA
318: .getHardReferenceBySignature(attB.i_uuid,
319: attB.i_database.i_signature);
320: if (hardRef._object == null) {
321: return notProcessed;
322: }
323:
324: referenceA = hardRef._reference;
325: objectA = hardRef._object;
326:
327: attA = referenceA.virtualAttributes(_transA);
328: } else {
329:
330: attA = referenceA.virtualAttributes(_transA);
331: if (attA == null) {
332: return notProcessed;
333: }
334:
335: if (referenceB == null) {
336:
337: sourceReference = referenceA;
338:
339: HardObjectReference hardRef = _transB
340: .getHardReferenceBySignature(attA.i_uuid,
341: attA.i_database.i_signature);
342:
343: if (hardRef._object == null) {
344: return notProcessed;
345: }
346:
347: referenceB = hardRef._reference;
348: objectB = hardRef._object;
349:
350: }
351:
352: attB = referenceB.virtualAttributes(_transB);
353: }
354:
355: if (attA == null || attB == null) {
356: return notProcessed;
357: }
358:
359: if (objectA == objectB) {
360: if (caller == _peerA && _direction == TO_B) {
361: return -1;
362: }
363: if (caller == _peerB && _direction == TO_A) {
364: return -1;
365: }
366: return idInCaller(caller, referenceA, referenceB);
367: }
368:
369: _peerA.refresh(_transA, objectA, 1);
370: _peerB.refresh(_transB, objectB, 1);
371:
372: if (attA.i_version <= _record._version
373: && attB.i_version <= _record._version) {
374:
375: if (_direction != CHECK_CONFLICT) {
376: shareBinding(sourceReference, referenceA, objectA,
377: referenceB, objectB);
378: }
379: return idInCaller(caller, referenceA, referenceB);
380: }
381:
382: int direction = ignoreOrCheckConflict();
383:
384: if (isInConflict(attA.i_version, attB.i_version)) {
385:
386: Object prevailing = _conflictHandler.resolveConflict(
387: this , objectA, objectB);
388:
389: if (prevailing == objectA) {
390: direction = (_direction == TO_A) ? IGNORE : toB();
391: }
392:
393: if (prevailing == objectB) {
394: direction = (_direction == TO_B) ? IGNORE : toA();
395: }
396:
397: if (direction == IGNORE) {
398: return -1;
399: }
400:
401: } else {
402: direction = attB.i_version > _record._version ? toA()
403: : toB();
404: }
405:
406: if (direction == TO_A) {
407: if (!referenceB.isActive()) {
408: referenceB.activate(_transB, objectB, 1, false);
409: }
410: int idA = bindAndSet(_transA, _peerA, referenceA,
411: objectB);
412: if (caller == _peerA) {
413: return idA;
414: }
415: }
416:
417: if (direction == TO_B) {
418: if (!referenceA.isActive()) {
419: referenceA.activate(_transA, objectA, 1, false);
420: }
421: int idB = bindAndSet(_transB, _peerB, referenceB,
422: objectA);
423: if (caller == _peerB) {
424: return idB;
425: }
426: }
427:
428: return idInCaller(caller, referenceA, referenceB);
429: }
430:
431: }
432:
433: public void whereModified(Query query) {
434: query.descend(VirtualField.VERSION).constrain(
435: new Long(lastSynchronization())).greater();
436: }
437:
438: }
|