001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.objectserver.handler;
006:
007: import com.tc.async.api.AbstractEventHandler;
008: import com.tc.async.api.ConfigurationContext;
009: import com.tc.async.api.EventContext;
010: import com.tc.async.api.Sink;
011: import com.tc.logging.TCLogger;
012: import com.tc.logging.TCLogging;
013: import com.tc.net.groups.ClientID;
014: import com.tc.net.protocol.tcm.MessageChannel;
015: import com.tc.object.msg.RequestManagedObjectMessage;
016: import com.tc.object.net.ChannelStats;
017: import com.tc.objectserver.api.ObjectRequestManager;
018: import com.tc.objectserver.context.ManagedObjectRequestContext;
019: import com.tc.objectserver.core.api.ServerConfigurationContext;
020: import com.tc.objectserver.l1.api.ClientStateManager;
021: import com.tc.stats.counter.Counter;
022:
023: import java.util.Collection;
024: import java.util.Set;
025:
026: /**
027: * Converts the request into a call to the objectManager with the proper next steps initialized I'm not convinced that
028: * this stage is necessary. May be able to merge it with another stage.
029: *
030: * @author steve
031: */
032: public class ManagedObjectRequestHandler extends AbstractEventHandler {
033:
034: private ClientStateManager stateManager;
035: private ChannelStats channelStats;
036: private Sink respondObjectRequestSink;
037:
038: private final Counter globalObjectRequestCounter;
039: private final Counter globalObjectFlushCounter;
040: private final ObjectRequestManager objectRequestManager;
041:
042: private static final TCLogger logger = TCLogging
043: .getLogger(ManagedObjectRequestHandler.class);
044:
045: public ManagedObjectRequestHandler(
046: Counter globalObjectRequestCounter,
047: Counter globalObjectFlushCounter,
048: ObjectRequestManager objectRequestManager) {
049: this .globalObjectRequestCounter = globalObjectRequestCounter;
050: this .globalObjectFlushCounter = globalObjectFlushCounter;
051: this .objectRequestManager = objectRequestManager;
052: }
053:
054: public void handleEvent(EventContext context) {
055: if (context instanceof RequestManagedObjectMessage) {
056: handleEventFromClient((RequestManagedObjectMessage) context);
057: } else if (context instanceof ManagedObjectRequestContext) {
058: handleEventFromServer((ManagedObjectRequestContext) context);
059: }
060: }
061:
062: private void handleEventFromServer(
063: ManagedObjectRequestContext context) {
064: Collection ids = context.getLookupIDs();
065: // XXX::TODO:: Server initiated lookups are not updated to the channel counter for now
066: final int numObjectsRequested = ids.size();
067: if (numObjectsRequested != 0) {
068: globalObjectRequestCounter.increment(numObjectsRequested);
069: }
070: objectRequestManager.requestObjects(context, context
071: .getMaxRequestDepth());
072: }
073:
074: private void handleEventFromClient(RequestManagedObjectMessage rmom) {
075:
076: MessageChannel channel = rmom.getChannel();
077: Set requestedIDs = rmom.getObjectIDs();
078: ClientID clientID = rmom.getClientID();
079: Set removedIDs = rmom.getRemoved();
080: int maxRequestDepth = rmom.getRequestDepth();
081:
082: final int numObjectsRequested = requestedIDs.size();
083: if (numObjectsRequested != 0) {
084: globalObjectRequestCounter.increment(numObjectsRequested);
085: channelStats.getCounter(channel,
086: ChannelStats.OBJECT_REQUEST_RATE).increment(
087: numObjectsRequested);
088: }
089:
090: final int numObjectsRemoved = removedIDs.size();
091: if (numObjectsRemoved != 0) {
092: globalObjectFlushCounter.increment(numObjectsRemoved);
093: channelStats.getCounter(channel,
094: ChannelStats.OBJECT_FLUSH_RATE).increment(
095: numObjectsRemoved);
096: }
097:
098: long t = System.currentTimeMillis();
099: stateManager.removeReferences(clientID, removedIDs);
100: t = System.currentTimeMillis() - t;
101: if (t > 1000 || numObjectsRemoved > 100000) {
102: logger.warn("Time to Remove " + numObjectsRemoved + " is "
103: + t + " ms");
104: }
105: if (numObjectsRequested > 0) {
106: ManagedObjectRequestContext reqContext = new ManagedObjectRequestContext(
107: clientID, rmom.getRequestID(), requestedIDs,
108: maxRequestDepth, this .respondObjectRequestSink,
109: rmom.getRequestingThreadName());
110: objectRequestManager.requestObjects(reqContext,
111: maxRequestDepth);
112: }
113: }
114:
115: public void initialize(ConfigurationContext context) {
116: super .initialize(context);
117: ServerConfigurationContext oscc = (ServerConfigurationContext) context;
118: stateManager = oscc.getClientStateManager();
119: channelStats = oscc.getChannelStats();
120: this.respondObjectRequestSink = oscc
121: .getStage(
122: ServerConfigurationContext.RESPOND_TO_OBJECT_REQUEST_STAGE)
123: .getSink();
124: }
125:
126: }
|