001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2006 Continuent, Inc.
004: * Contact: sequoia@continuent.org
005: *
006: * Licensed under the Apache License, Version 2.0 (the "License");
007: * you may not use this file except in compliance with the License.
008: * You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing, software
013: * distributed under the License is distributed on an "AS IS" BASIS,
014: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015: * See the License for the specific language governing permissions and
016: * limitations under the License.
017: *
018: * Initial developer(s): Emmanuel Cecchet.
019: * Contributor(s): ______________________.
020: */package org.continuent.sequoia.controller.virtualdatabase.protocol;
021:
022: import java.io.Serializable;
023: import java.sql.SQLException;
024: import java.util.LinkedList;
025:
026: import org.continuent.hedera.common.Member;
027: import org.continuent.sequoia.common.exceptions.NoMoreBackendException;
028: import org.continuent.sequoia.common.exceptions.VirtualDatabaseStartingException;
029: import org.continuent.sequoia.controller.recoverylog.RecoveryLog;
030: import org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager;
031: import org.continuent.sequoia.controller.requests.UnknownWriteRequest;
032: import org.continuent.sequoia.controller.scheduler.AbstractScheduler;
033: import org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase;
034:
035: /**
036: * This class defines a DistributedOpenPersistentConnection
037: *
038: * @author <a href="mailto:emmanuel.cecchet@continuent.com">Emmanuel Cecchet</a>
039: * @version 1.0
040: */
041: public class DistributedOpenPersistentConnection extends
042: DistributedVirtualDatabaseMessage {
043: private static final long serialVersionUID = -693544521730643721L;
044: private String login;
045: private long persistentConnectionId;
046:
047: /**
048: * Creates a new <code>DistributedOpenPersistentConnection</code> object
049: *
050: * @param login login to retrieve the connection manager
051: * @param persistentConnectionId persistent connection id
052: */
053: public DistributedOpenPersistentConnection(String login,
054: long persistentConnectionId) {
055: this .login = login;
056: this .persistentConnectionId = persistentConnectionId;
057: }
058:
059: /**
060: * Returns the login value.
061: *
062: * @return Returns the login.
063: */
064: public final String getLogin() {
065: return login;
066: }
067:
068: /**
069: * Returns the persistentConnectionId value.
070: *
071: * @return Returns the persistentConnectionId.
072: */
073: public final long getPersistentConnectionId() {
074: return persistentConnectionId;
075: }
076:
077: /**
078: * @see org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedVirtualDatabaseMessage#handleMessageSingleThreaded(org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase,
079: * org.continuent.hedera.common.Member)
080: */
081: public Object handleMessageSingleThreaded(
082: DistributedVirtualDatabase dvdb, Member sender) {
083: if (!dvdb.isVirtualDatabaseStarted())
084: return new VirtualDatabaseStartingException();
085:
086: LinkedList totalOrderQueue = dvdb.getTotalOrderQueue();
087: synchronized (totalOrderQueue) {
088: totalOrderQueue.addLast(this );
089: }
090: return this ;
091: }
092:
093: /**
094: * @see org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedVirtualDatabaseMessage#handleMessageMultiThreaded(org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase,
095: * org.continuent.hedera.common.Member, java.lang.Object)
096: */
097: public Serializable handleMessageMultiThreaded(
098: DistributedVirtualDatabase dvdb, Member sender,
099: Object handleMessageSingleThreadedResult) {
100: if (handleMessageSingleThreadedResult instanceof Exception)
101: return (Serializable) handleMessageSingleThreadedResult;
102:
103: dvdb.getRequestManager().getLoadBalancer().waitForTotalOrder(
104: this , true);
105:
106: DistributedRequestManager drm = ((DistributedRequestManager) dvdb
107: .getRequestManager());
108: AbstractScheduler scheduler = drm.getScheduler();
109: RecoveryLog recoveryLog = drm.getRecoveryLog();
110: long entryId = -1;
111: try {
112: boolean success = false;
113: try {
114: scheduler.scheduleOpenPersistentConnection(this );
115:
116: entryId = recoveryLog.logOpenPersistentConnection(
117: login, persistentConnectionId);
118:
119: drm.getLoadBalancer().openPersistentConnection(login,
120: persistentConnectionId);
121: success = true;
122: recoveryLog.logRequestCompletion(entryId, success, 0);
123: } catch (NoMoreBackendException e) {
124: throw e;
125: } catch (SQLException e) {
126: throw e;
127: } finally {
128: scheduler.openPersistentConnectionCompleted(
129: persistentConnectionId, success);
130: }
131: return Boolean.TRUE;
132: } catch (SQLException e) {
133: UnknownWriteRequest notifRequest = new UnknownWriteRequest(
134: "open " + persistentConnectionId, false, 0, null);
135: notifRequest.setLogId(entryId);
136: notifRequest.setPersistentConnection(true);
137: notifRequest
138: .setPersistentConnectionId(persistentConnectionId);
139: drm.addFailedOnAllBackends(notifRequest, false);
140: return e;
141: }
142: }
143:
144: /**
145: * @see java.lang.Object#equals(java.lang.Object)
146: */
147: public boolean equals(Object obj) {
148: if (obj instanceof DistributedOpenPersistentConnection) {
149: DistributedOpenPersistentConnection other = (DistributedOpenPersistentConnection) obj;
150: return persistentConnectionId == other.persistentConnectionId;
151: }
152: return false;
153: }
154:
155: /**
156: * @see java.lang.Object#hashCode()
157: */
158: public int hashCode() {
159: return (int) persistentConnectionId;
160: }
161: }
|