001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.objectserver.handler;
006:
007: import com.tc.async.api.AbstractEventHandler;
008: import com.tc.async.api.ConfigurationContext;
009: import com.tc.async.api.EventContext;
010: import com.tc.async.api.Sink;
011: import com.tc.io.TCByteBufferOutputStream;
012: import com.tc.logging.TCLogger;
013: import com.tc.net.protocol.tcm.MessageChannel;
014: import com.tc.net.protocol.tcm.TCMessageType;
015: import com.tc.object.dna.impl.ObjectStringSerializer;
016: import com.tc.object.msg.ObjectsNotFoundMessage;
017: import com.tc.object.msg.RequestManagedObjectResponseMessage;
018: import com.tc.object.net.DSOChannelManager;
019: import com.tc.object.net.NoSuchChannelException;
020: import com.tc.objectserver.api.ObjectManager;
021: import com.tc.objectserver.context.ManagedObjectRequestContext;
022: import com.tc.objectserver.core.api.ManagedObject;
023: import com.tc.objectserver.core.api.ServerConfigurationContext;
024: import com.tc.objectserver.l1.api.ClientStateManager;
025: import com.tc.util.sequence.Sequence;
026: import com.tc.util.sequence.SimpleSequence;
027:
028: import gnu.trove.THashSet;
029:
030: import java.util.Collection;
031: import java.util.HashSet;
032: import java.util.Iterator;
033: import java.util.LinkedList;
034: import java.util.Set;
035:
036: public class RespondToObjectRequestHandler extends AbstractEventHandler {
037:
038: // XXX:: move to property file
039: private static final int MAX_OBJECTS_TO_LOOKUP = 50;
040:
041: private DSOChannelManager channelManager;
042: private ObjectManager objectManager;
043: private ClientStateManager stateManager;
044: private TCLogger logger;
045: private Sequence batchIDSequence = new SimpleSequence();
046: private Sink managedObjectRequestSink;
047:
048: public void handleEvent(EventContext context) {
049: long batchID = batchIDSequence.next();
050: ManagedObjectRequestContext morc = (ManagedObjectRequestContext) context;
051: Collection objs = morc.getObjects();
052: LinkedList objectsInOrder = new LinkedList();
053:
054: // Check to see if more objects needs to be looked for this request
055: createNewLookupRequestsIfNecessary(morc);
056:
057: Collection requestedObjectIDs = morc.getLookupIDs();
058: Set ids = new HashSet(Math.max((int) (objs.size() / .75f) + 1,
059: 16));
060: for (Iterator i = objs.iterator(); i.hasNext();) {
061: ManagedObject mo = (ManagedObject) i.next();
062: ids.add(mo.getID());
063: if (requestedObjectIDs.contains(mo.getID())) {
064: objectsInOrder.addLast(mo);
065: } else {
066: objectsInOrder.addFirst(mo);
067: }
068: }
069:
070: try {
071: MessageChannel channel = channelManager
072: .getActiveChannel(morc.getRequestedNodeID());
073:
074: // Only send objects that are NOT already there in the client. Look at the comment below.
075: Set newIds = stateManager.addReferences(morc
076: .getRequestedNodeID(), ids);
077: int sendCount = 0;
078: int batches = 0;
079: ObjectStringSerializer serializer = new ObjectStringSerializer();
080: TCByteBufferOutputStream out = new TCByteBufferOutputStream();
081: for (Iterator i = objectsInOrder.iterator(); i.hasNext();) {
082:
083: ManagedObject m = (ManagedObject) i.next();
084: i.remove();
085: // We dont want to send any object twice to the client even the client requested it 'coz it only means
086: // that the object is on its way to the client. This is true because we process the removeObjectIDs and
087: // lookups in Order. Earlier the if condition used to look like ...
088: // if (ids.contains(m.getID()) || morc.getObjectIDs().contains(m.getID())) {}
089: if (newIds.contains(m.getID())) {
090: m.toDNA(out, serializer);
091: sendCount++;
092: } else if (morc.getLookupIDs().contains(m.getID())) {
093: // logger.info("Ignoring request for look up from " + morc.getChannelID() + " for " + m.getID());
094: }
095: objectManager.releaseReadOnly(m);
096:
097: if (sendCount > 1000 || (sendCount > 0 && !i.hasNext())) {
098: batches++;
099: RequestManagedObjectResponseMessage responseMessage = (RequestManagedObjectResponseMessage) channel
100: .createMessage(TCMessageType.REQUEST_MANAGED_OBJECT_RESPONSE_MESSAGE);
101: responseMessage.initialize(out.toArray(),
102: sendCount, serializer, batchID,
103: i.hasNext() ? 0 : batches);
104: responseMessage.send();
105: if (i.hasNext()) {
106: sendCount = 0;
107: serializer = new ObjectStringSerializer();
108: out = new TCByteBufferOutputStream();
109: }
110: }
111: }
112: Set missingOids = morc.getMissingObjectIDs();
113: if (!missingOids.isEmpty()) {
114: ObjectsNotFoundMessage notFound = (ObjectsNotFoundMessage) channel
115: .createMessage(TCMessageType.OBJECTS_NOT_FOUND_RESPONSE_MESSAGE);
116: notFound.initialize(missingOids, batchID);
117: notFound.send();
118: }
119:
120: } catch (NoSuchChannelException e) {
121: logger
122: .info("Not sending response because channel is disconnected: "
123: + morc.getRequestedNodeID()
124: + ". Releasing all checked-out objects...");
125: for (Iterator i = objectsInOrder.iterator(); i.hasNext();) {
126: objectManager.releaseReadOnly((ManagedObject) i.next());
127: }
128: return;
129: }
130: }
131:
132: private void createNewLookupRequestsIfNecessary(
133: ManagedObjectRequestContext morc) {
134: Set oids = morc.getLookupPendingObjectIDs();
135: if (oids.isEmpty()) {
136: return;
137: }
138: int maxRequestDepth = morc.getMaxRequestDepth();
139: if (logger.isDebugEnabled()) {
140: logger
141: .debug("Creating Server initiated requests for : "
142: + morc.getRequestedNodeID()
143: + " org request Id length = "
144: + morc.getLookupIDs().size()
145: + " Reachable object(s) to be looked up length = "
146: + oids.size());
147: }
148: if (oids.size() <= MAX_OBJECTS_TO_LOOKUP) {
149: this .managedObjectRequestSink
150: .add(new ManagedObjectRequestContext(morc
151: .getRequestedNodeID(), morc.getRequestID(),
152: oids, -1, morc.getSink(),
153: "RespondToObjectRequestHandler"));
154: } else {
155: // split into multiple request
156: Set split = new HashSet(MAX_OBJECTS_TO_LOOKUP);
157: for (Iterator i = oids.iterator(); i.hasNext();) {
158: split.add(i.next());
159: if (split.size() >= MAX_OBJECTS_TO_LOOKUP) {
160: this .managedObjectRequestSink
161: .add(new ManagedObjectRequestContext(morc
162: .getRequestedNodeID(), morc
163: .getRequestID(), split, -1, morc
164: .getSink(),
165: "RespondToObjectRequestHandler"));
166: if (i.hasNext())
167: split = new THashSet(maxRequestDepth);
168: }
169: }
170: }
171: }
172:
173: public void initialize(ConfigurationContext context) {
174: super .initialize(context);
175: ServerConfigurationContext oscc = (ServerConfigurationContext) context;
176: this.channelManager = oscc.getChannelManager();
177: this.objectManager = oscc.getObjectManager();
178: this.logger = oscc.getLogger(getClass());
179: this.stateManager = oscc.getClientStateManager();
180: this.managedObjectRequestSink = oscc
181: .getStage(
182: ServerConfigurationContext.MANAGED_OBJECT_REQUEST_STAGE)
183: .getSink();
184: }
185: }
|