001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.objectserver.handler;
006:
007: import com.tc.async.api.AbstractEventHandler;
008: import com.tc.async.api.ConfigurationContext;
009: import com.tc.async.api.EventContext;
010: import com.tc.async.api.Sink;
011: import com.tc.net.groups.ClientID;
012: import com.tc.net.groups.NodeID;
013: import com.tc.net.protocol.tcm.MessageChannel;
014: import com.tc.net.protocol.tcm.TCMessageType;
015: import com.tc.object.ObjectRequestID;
016: import com.tc.object.dmi.DmiDescriptor;
017: import com.tc.object.msg.BroadcastTransactionMessage;
018: import com.tc.object.net.DSOChannelManager;
019: import com.tc.object.tx.TransactionID;
020: import com.tc.objectserver.context.BroadcastChangeContext;
021: import com.tc.objectserver.context.ManagedObjectRequestContext;
022: import com.tc.objectserver.core.api.ServerConfigurationContext;
023: import com.tc.objectserver.l1.api.ClientStateManager;
024: import com.tc.objectserver.tx.ServerTransactionManager;
025:
026: import java.util.ArrayList;
027: import java.util.Collections;
028: import java.util.HashSet;
029: import java.util.List;
030: import java.util.Map;
031: import java.util.Set;
032:
033: /**
034: * Broadcast the change to all connected clients
035: *
036: * @author steve
037: */
038: public class BroadcastChangeHandler extends AbstractEventHandler {
039: private DSOChannelManager channelManager;
040: private ClientStateManager clientStateManager;
041: private ServerTransactionManager transactionManager;
042: private Sink managedObjectRequestSink;
043: private Sink respondObjectRequestSink;
044:
045: public void handleEvent(EventContext context) {
046: BroadcastChangeContext bcc = (BroadcastChangeContext) context;
047:
048: final NodeID committerID = bcc.getNodeID();
049: final TransactionID txnID = bcc.getTransactionID();
050:
051: final MessageChannel[] channels = channelManager
052: .getActiveChannels();
053:
054: for (int i = 0; i < channels.length; i++) {
055: MessageChannel client = channels[i];
056: // TODO:: make message channel return clientID and short channelManager call.
057: ClientID clientID = channelManager.getClientIDFor(client
058: .getChannelID());
059:
060: Map newRoots = bcc.getNewRoots();
061: Set notifiedWaiters = bcc.getNewlyPendingWaiters()
062: .getNotifiedFor(clientID);
063: List prunedChanges = Collections.EMPTY_LIST;
064: Set lookupObjectIDs = new HashSet();
065:
066: if (!clientID.equals(committerID)) {
067: prunedChanges = clientStateManager
068: .createPrunedChangesAndAddObjectIDTo(bcc
069: .getChanges(), bcc.getIncludeIDs(),
070: clientID, lookupObjectIDs);
071: }
072:
073: DmiDescriptor[] prunedDmis = pruneDmiDescriptors(bcc
074: .getDmiDescriptors(), clientID, clientStateManager);
075: final boolean includeDmi = !clientID.equals(committerID)
076: && prunedDmis.length > 0;
077: if (!prunedChanges.isEmpty() || !lookupObjectIDs.isEmpty()
078: || !notifiedWaiters.isEmpty()
079: || !newRoots.isEmpty() || includeDmi) {
080: transactionManager.addWaitingForAcknowledgement(
081: committerID, txnID, clientID);
082: if (lookupObjectIDs.size() > 0) {
083: // TODO:: Request ID is not used anywhere. RemoveIT.
084: // XXX:: It is important to keep the maxReachableSize to <= 0 so that we dont go into recursive lookups @see
085: // ObjectManagerImpl
086: this .managedObjectRequestSink
087: .add(new ManagedObjectRequestContext(
088: clientID, ObjectRequestID.NULL_ID,
089: lookupObjectIDs, -1,
090: this .respondObjectRequestSink,
091: "BroadcastChangeHandler"));
092: }
093: final DmiDescriptor[] dmi = (includeDmi) ? prunedDmis
094: : DmiDescriptor.EMPTY_ARRAY;
095: BroadcastTransactionMessage responseMessage = (BroadcastTransactionMessage) client
096: .createMessage(TCMessageType.BROADCAST_TRANSACTION_MESSAGE);
097: responseMessage.initialize(prunedChanges,
098: lookupObjectIDs, bcc.getSerializer(), bcc
099: .getLockIDs(),
100: getNextChangeIDFor(clientID), txnID,
101: committerID, bcc.getGlobalTransactionID(), bcc
102: .getTransactionType(), bcc
103: .getLowGlobalTransactionIDWatermark(),
104: notifiedWaiters, newRoots, dmi);
105:
106: responseMessage.send();
107: }
108: }
109: transactionManager.broadcasted(committerID, txnID);
110: }
111:
112: private static DmiDescriptor[] pruneDmiDescriptors(
113: DmiDescriptor[] dmiDescriptors, ClientID clientID,
114: ClientStateManager clientStateManager) {
115: if (dmiDescriptors.length == 0) {
116: return dmiDescriptors;
117: }
118:
119: List list = new ArrayList();
120: for (int i = 0; i < dmiDescriptors.length; i++) {
121: DmiDescriptor dd = dmiDescriptors[i];
122: if (dd.isFaultReceiver()
123: || clientStateManager.hasReference(clientID, dd
124: .getReceiverId())) {
125: list.add(dd);
126: }
127: }
128: DmiDescriptor[] rv = new DmiDescriptor[list.size()];
129: list.toArray(rv);
130: return rv;
131: }
132:
133: private synchronized long getNextChangeIDFor(ClientID clientID) {
134: // FIXME Fix this facility. Should keep a counter for every client and
135: // increment on every
136: return 0;
137: }
138:
139: protected void initialize(ConfigurationContext context) {
140: ServerConfigurationContext scc = (ServerConfigurationContext) context;
141: this.channelManager = scc.getChannelManager();
142: this.clientStateManager = scc.getClientStateManager();
143: this.transactionManager = scc.getTransactionManager();
144: this.managedObjectRequestSink = scc
145: .getStage(
146: ServerConfigurationContext.MANAGED_OBJECT_REQUEST_STAGE)
147: .getSink();
148: this.respondObjectRequestSink = scc
149: .getStage(
150: ServerConfigurationContext.RESPOND_TO_OBJECT_REQUEST_STAGE)
151: .getSink();
152: }
153: }
|