0001: /*_############################################################################
0002: _##
0003: _## SNMP4J-AgentX - AgentXCommandProcessor.java
0004: _##
0005: _## Copyright (C) 2005-2007 Frank Fock (SNMP4J.org)
0006: _##
0007: _## This program is free software; you can redistribute it and/or modify
0008: _## it under the terms of the GNU General Public License version 2 as
0009: _## published by the Free Software Foundation.
0010: _##
0011: _## This program is distributed in the hope that it will be useful,
0012: _## but WITHOUT ANY WARRANTY; without even the implied warranty of
0013: _## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
0014: _## GNU General Public License for more details.
0015: _##
0016: _## You should have received a copy of the GNU General Public License
0017: _## along with this program; if not, write to the Free Software
0018: _## Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
0019: _## MA 02110-1301 USA
0020: _##
0021: _##########################################################################*/
0022:
0023: package org.snmp4j.agent.agentx.master;
0024:
0025: import java.io.IOException;
0026: import java.util.*;
0027:
0028: import org.snmp4j.CommandResponderEvent;
0029: import org.snmp4j.PDU;
0030: import org.snmp4j.TransportMapping;
0031: import org.snmp4j.agent.*;
0032: import org.snmp4j.agent.agentx.*;
0033: import org.snmp4j.agent.agentx.master.AgentXQueue.AgentXQueueEntry;
0034: import org.snmp4j.agent.agentx.master.index.AgentXIndexRegistry;
0035: import org.snmp4j.agent.mo.snmp.AgentCapabilityList;
0036: import org.snmp4j.agent.mo.snmp.SNMPv2MIB.SysUpTimeImpl;
0037: import org.snmp4j.agent.mo.snmp.SysUpTime;
0038: import org.snmp4j.agent.request.*;
0039: import org.snmp4j.agent.security.VACM;
0040: import org.snmp4j.log.LogAdapter;
0041: import org.snmp4j.log.LogFactory;
0042: import org.snmp4j.mp.SnmpConstants;
0043: import org.snmp4j.smi.*;
0044: import org.snmp4j.transport.ConnectionOrientedTransportMapping;
0045: import org.snmp4j.transport.TransportStateEvent;
0046: import org.snmp4j.transport.TransportStateListener;
0047:
0048: public class AgentXCommandProcessor extends CommandProcessor implements
0049: AgentXCommandListener, TransportStateListener,
0050: AgentXResponseListener {
0051:
0052: public static final int MAX_REPROCESSING_DEFAULT = 100;
0053:
0054: private static final LogAdapter LOGGER = LogFactory
0055: .getLogger(AgentXCommandProcessor.class);
0056:
0057: private static final OctetString DEFAULT_CONTEXT = new OctetString();
0058:
0059: private AgentXQueue agentXQueue;
0060: private AgentX agentX;
0061: private Map sessions = new HashMap();
0062: private Map peers = new HashMap(10);
0063: private Set registrations = new TreeSet(
0064: new AgentXRegEntryComparator());
0065: private MOServer server;
0066: private int nextSessionID = 1;
0067: private byte defaultTimeout = AgentXProtocol.DEFAULT_TIMEOUT_SECONDS;
0068: private int maxConsecutiveTimeouts = AgentXProtocol.DEFAULT_MAX_CONSECUTIVE_TIMEOUTS;
0069: private Map contextInfo = new HashMap(10);
0070: private boolean acceptNewContexts = false;
0071:
0072: private int nextPacketID = 0;
0073:
0074: protected AgentXIndexRegistry indexRegistry = new AgentXIndexRegistry();
0075:
0076: private transient Vector agentXMasterListeners;
0077:
0078: private int maxReprocessing = MAX_REPROCESSING_DEFAULT;
0079:
0080: public AgentXCommandProcessor(OctetString contextEngineID,
0081: AgentXQueue queue, AgentX agentX, MOServer server) {
0082: super (contextEngineID);
0083: this .agentXQueue = queue;
0084: this .agentX = agentX;
0085: this .server = server;
0086: if (this .agentXQueue.getServer4BulkOptimization() == null) {
0087: this .agentXQueue.setServer4BulkOptimization(server);
0088: }
0089: }
0090:
0091: private synchronized int createNextPacketID() {
0092: return nextPacketID++;
0093: }
0094:
0095: public void setMaxReprocessing(int maxReprocessing) {
0096: this .maxReprocessing = maxReprocessing;
0097: }
0098:
0099: public int getMaxReprocessing() {
0100: return maxReprocessing;
0101: }
0102:
0103: protected void finalizeRequest(CommandResponderEvent command,
0104: Request req, MOServer server) {
0105: boolean complete = req.isComplete();
0106: AgentXQueueEntry entry = agentXQueue
0107: .get(req.getTransactionID());
0108: if (entry != null) {
0109: Collection pending = entry.getPending();
0110: entry.updateTimestamp();
0111: for (Iterator it = pending.iterator(); it.hasNext();) {
0112: AgentXPending p = (AgentXPending) it.next();
0113: if (pending != null) {
0114: AgentXPDU agentXPDU = p.getAgentXPDU();
0115: AgentXMasterSession session = p.getSession();
0116: agentXPDU.setSessionID(session.getSessionID());
0117: agentXPDU.setTransactionID(req.getTransactionID());
0118: agentXPDU.setPacketID(createNextPacketID());
0119: p.updateTimestamp();
0120: try {
0121: agentX.send(agentXPDU, session
0122: .createAgentXTarget(), session
0123: .getPeer().getTransport(), p, this );
0124: } catch (IOException ex) {
0125: LOGGER
0126: .error("Failed to send AgentX subrequest: "
0127: + ex.getMessage());
0128: ((SubRequest) p.getReferences().next())
0129: .getStatus().setErrorStatus(PDU.genErr);
0130: break;
0131: }
0132: }
0133: }
0134: } else {
0135: if (complete) {
0136: agentXQueue.removeAll(req.getTransactionID());
0137: } else {
0138: // there are still incomplete sub-requests -> reprocess them
0139: if (req.getReprocessCounter() < this .maxReprocessing) {
0140: reprocessRequest(server, (SnmpRequest) req);
0141: } else {
0142: req.setErrorStatus(PDU.genErr);
0143: LOGGER
0144: .warn("The following request has been repeocessed "
0145: + req.getReprocessCounter()
0146: + " which exceeds the agent's "
0147: + "upper limit of "
0148: + this .maxReprocessing + ": " + req);
0149: }
0150: }
0151: super .finalizeRequest(command, req, server);
0152: }
0153: }
0154:
0155: protected synchronized int getNextSessionID() {
0156: return nextSessionID++;
0157: }
0158:
0159: public MOServer getServer() {
0160: return server;
0161: }
0162:
0163: public byte getDefaultTimeout() {
0164: return defaultTimeout;
0165: }
0166:
0167: public int getMaxConsecutiveTimeouts() {
0168: return maxConsecutiveTimeouts;
0169: }
0170:
0171: /**
0172: * Indicates whether subagents can register contexts that are not yet
0173: * supported by this master agent.
0174: * @return
0175: * <code>true</code> if subagents can register objects for new contexts.
0176: */
0177: public boolean isAcceptNewContexts() {
0178: return acceptNewContexts;
0179: }
0180:
0181: public void setDefaultTimeout(byte defaultTimeout) {
0182: this .defaultTimeout = defaultTimeout;
0183: }
0184:
0185: public void setMaxConsecutiveTimeouts(int maxConsecutiveTimeouts) {
0186: this .maxConsecutiveTimeouts = maxConsecutiveTimeouts;
0187: }
0188:
0189: /**
0190: * Enables or disables accepting new contexts from subagents.
0191: * @param acceptNewContexts
0192: * <code>true</code> if subagents are allowed to register objects for new
0193: * contexts, <code>false</code> otherwise. Default is <code>false</code>.
0194: */
0195: public void setAcceptNewContexts(boolean acceptNewContexts) {
0196: this .acceptNewContexts = acceptNewContexts;
0197: }
0198:
0199: public void processCommand(AgentXCommandEvent event) {
0200: boolean pendingClose = false;
0201: if (event.isException()) {
0202: /**@todo implement parse exception handling*/
0203: LOGGER.warn("AgentX parse exception: "
0204: + event.getException());
0205: } else {
0206: AgentXPDU pdu = event.getCommand();
0207: AgentXMasterSession session = getSession(pdu);
0208: AgentXResponsePDU response = null;
0209: if (LOGGER.isDebugEnabled()) {
0210: LOGGER.debug("Processing AgentX PDU " + pdu
0211: + " for session " + session);
0212: }
0213: switch (pdu.getType()) {
0214: case AgentXPDU.AGENTX_RESPONSE_PDU: {
0215: LOGGER
0216: .error("Internal error: received AgentX response without request");
0217: return;
0218: }
0219: case AgentXPDU.AGENTX_OPEN_PDU: {
0220: response = openSession((AgentXOpenPDU) pdu, event);
0221: session = getSession(response.getSessionID());
0222: break;
0223: }
0224: case AgentXPDU.AGENTX_CLOSE_PDU: {
0225: response = closeSession((AgentXClosePDU) pdu, session);
0226: pendingClose = true;
0227: break;
0228: }
0229: case AgentXPDU.AGENTX_REGISTER_PDU: {
0230: response = register((AgentXRegisterPDU) pdu, event,
0231: session);
0232: break;
0233: }
0234: case AgentXPDU.AGENTX_UNREGISTER_PDU: {
0235: response = unregister((AgentXUnregisterPDU) pdu, event,
0236: session);
0237: break;
0238: }
0239: case AgentXPDU.AGENTX_ADDAGENTCAPS_PDU: {
0240: response = addAgentCaps((AgentXAddAgentCapsPDU) pdu,
0241: session);
0242: break;
0243: }
0244: case AgentXPDU.AGENTX_REMOVEAGENTCAPS_PDU: {
0245: response = removeAgentCaps(
0246: (AgentXRemoveAgentCapsPDU) pdu, session);
0247: break;
0248: }
0249: case AgentXPDU.AGENTX_NOTIFY_PDU: {
0250: response = notify((AgentXNotifyPDU) pdu, session);
0251: break;
0252: }
0253: case AgentXPDU.AGENTX_PING_PDU: {
0254: response = ping((AgentXPingPDU) pdu, session);
0255: break;
0256: }
0257: case AgentXPDU.AGENTX_INDEXALLOCATE_PDU: {
0258: response = indexAllocate((AgentXIndexAllocatePDU) pdu,
0259: session);
0260: break;
0261: }
0262: case AgentXPDU.AGENTX_INDEXDEALLOCATE_PDU: {
0263: response = indexDeallocate(
0264: (AgentXIndexDeallocatePDU) pdu, session);
0265: break;
0266: }
0267: default:
0268: LOGGER.warn("Unknown AgentX PDU type received: " + pdu);
0269: }
0270: if ((response != null) && (session != null)) {
0271: sendResponse(response, session);
0272: }
0273: if (pendingClose) {
0274: if (session != null) {
0275: closePeer(session.getPeer());
0276: }
0277: }
0278: }
0279: event.setProcessed(true);
0280: }
0281:
0282: private void closePeer(AgentXPeer peer) {
0283: TransportMapping transport = peer.getTransport();
0284: if (transport instanceof ConnectionOrientedTransportMapping) {
0285: try {
0286: if (((ConnectionOrientedTransportMapping) transport)
0287: .close(peer.getAddress())) {
0288: if (LOGGER.isInfoEnabled()) {
0289: LOGGER.info("Closed sub-agent connection to "
0290: + peer.getAddress());
0291: }
0292: } else {
0293: LOGGER
0294: .warn("Failed to close sub-agent connection to "
0295: + peer.getAddress());
0296: }
0297: } catch (IOException ex) {
0298: LOGGER.error("Failed to close transport mapping "
0299: + peer.getTransport() + " because: "
0300: + ex.getMessage(), ex);
0301: }
0302: }
0303: }
0304:
0305: public AgentXResponsePDU indexDeallocate(
0306: AgentXIndexDeallocatePDU pdu, AgentXMasterSession session) {
0307: AgentXResponsePDU response = createResponse(pdu, session);
0308: boolean contextSupported = server.isContextSupported(pdu
0309: .getContext());
0310: if (contextSupported) {
0311: VariableBinding[] vbs = pdu.getVariableBindings();
0312: // test index allocation
0313: deallocateIndexes(response, pdu, session, vbs, true);
0314: if (response.getErrorStatus() == AgentXProtocol.AGENTX_SUCCESS) {
0315: // do it on success
0316: deallocateIndexes(response, pdu, session, vbs, false);
0317: response.setVariableBindings(vbs);
0318: }
0319: } else {
0320: response
0321: .setErrorStatus(AgentXProtocol.AGENTX_UNSUPPORTED_CONTEXT);
0322: }
0323: return response;
0324: }
0325:
0326: private boolean checkIfContextIsSupported(OctetString context) {
0327: boolean contextSupported = server.isContextSupported(context);
0328: if (LOGGER.isDebugEnabled()) {
0329: LOGGER.debug("Checking context '" + context
0330: + "' is supported");
0331: }
0332: if (isAcceptNewContexts() && !contextSupported) {
0333: server.addContext(context);
0334: contextSupported = server.isContextSupported(context);
0335: if (LOGGER.isInfoEnabled()) {
0336: LOGGER.info("Adding new context '" + context
0337: + "' on subagent request returned: "
0338: + contextSupported);
0339: }
0340: }
0341: return contextSupported;
0342: }
0343:
0344: public AgentXResponsePDU indexAllocate(AgentXIndexAllocatePDU pdu,
0345: AgentXMasterSession session) {
0346: AgentXResponsePDU response = createResponse(pdu, session);
0347: response.setVariableBindings(pdu.getVariableBindings());
0348: boolean contextSupported = checkIfContextIsSupported(pdu
0349: .getContext());
0350: if (contextSupported) {
0351: VariableBinding[] vbs = pdu.getVariableBindings();
0352: // test index allocation
0353: allocateIndexes(response, pdu, session, vbs, true);
0354: if (response.getErrorStatus() == AgentXProtocol.AGENTX_SUCCESS) {
0355: // do it on success
0356: allocateIndexes(response, pdu, session, vbs, false);
0357: response.setVariableBindings(vbs);
0358: }
0359: } else {
0360: response
0361: .setErrorStatus(AgentXProtocol.AGENTX_UNSUPPORTED_CONTEXT);
0362: }
0363: return response;
0364: }
0365:
0366: private int allocateIndexes(AgentXResponsePDU response,
0367: AgentXIndexAllocatePDU pdu, AgentXMasterSession session,
0368: VariableBinding[] vbs, boolean testOnly) {
0369: int status = AgentXProtocol.AGENTX_SUCCESS;
0370: int i = 0;
0371: for (; (i < vbs.length)
0372: && (status == AgentXProtocol.AGENTX_SUCCESS); i++) {
0373: VariableBinding vb = vbs[i];
0374: if (pdu.isFlagSet(AgentXProtocol.FLAG_ANY_INDEX)) {
0375: status = indexRegistry.anyIndex(session.getSessionID(),
0376: pdu.getContext(), vb, testOnly);
0377: } else if (pdu.isFlagSet(AgentXProtocol.FLAG_NEW_INDEX)) {
0378: status = indexRegistry.newIndex(session.getSessionID(),
0379: pdu.getContext(), vb, testOnly);
0380: } else {
0381: status = indexRegistry.allocate(session.getSessionID(),
0382: pdu.getContext(), vb, testOnly);
0383: }
0384: }
0385: response.setErrorStatus(status);
0386: if (status != AgentXProtocol.AGENTX_SUCCESS) {
0387: response.setErrorIndex(i);
0388: }
0389: return status;
0390: }
0391:
0392: private int deallocateIndexes(AgentXResponsePDU response,
0393: AgentXIndexDeallocatePDU pdu, AgentXMasterSession session,
0394: VariableBinding[] vbs, boolean testOnly) {
0395: int status = AgentXProtocol.AGENTX_SUCCESS;
0396: int i = 0;
0397: for (; (i < vbs.length)
0398: && (status == AgentXProtocol.AGENTX_SUCCESS); i++) {
0399: VariableBinding vb = vbs[i];
0400: status = indexRegistry.release(session.getSessionID(), pdu
0401: .getContext(), vb, testOnly);
0402: }
0403: response.setErrorStatus(status);
0404: if (status != AgentXProtocol.AGENTX_SUCCESS) {
0405: response.setErrorIndex(i);
0406: }
0407: return status;
0408: }
0409:
0410: protected void processAgentXSearchResponse(AgentXPending pending,
0411: AgentXResponsePDU pdu) {
0412: if (pdu.getErrorStatus() != PDU.noError) {
0413: processsErrorResponse(pending, pdu);
0414: } else {
0415: // no error -> normal processing
0416: if (pending.getAgentXPDU().getType() == AgentXPDU.AGENTX_GETBULK_PDU) {
0417: processAgentXNextResponse(pending, pdu,
0418: Integer.MAX_VALUE);
0419: } else {
0420: processAgentXNextResponse(pending, pdu,
0421: ((AgentXRequestPDU) pending.getAgentXPDU())
0422: .getRanges().length);
0423: }
0424: }
0425: }
0426:
0427: private SubRequestIterator processAgentXNextResponse(
0428: AgentXPending pending, AgentXResponsePDU pdu,
0429: int subRequestIndexUpperBound)
0430: throws NoSuchElementException {
0431: VariableBinding[] vbs = pdu.getVariableBindings();
0432: AgentXRequestPDU axReqPDU = (AgentXRequestPDU) pending
0433: .getAgentXPDU();
0434: SubRequestIterator subRequests = pending.getReferences();
0435: for (int i = 0; (i < subRequestIndexUpperBound)
0436: && subRequests.hasNext(); i++) {
0437: SnmpSubRequest sreq = (SnmpSubRequest) subRequests
0438: .nextSubRequest();
0439: processNextSubRequest(vbs, axReqPDU, i, i, sreq);
0440: }
0441: return subRequests;
0442: }
0443:
0444: private void processNextSubRequest(VariableBinding[] vbs,
0445: AgentXRequestPDU axReqPDU, int vbIndex, int rangeIndex,
0446: SnmpSubRequest sreq) {
0447: MOScope srange = axReqPDU.getRanges()[rangeIndex];
0448: if (vbIndex < vbs.length) {
0449: VariableBinding vb = vbs[vbIndex];
0450: if (vb.getSyntax() == SMIConstants.EXCEPTION_END_OF_MIB_VIEW) {
0451: processEndOfMibView(sreq, srange, vb.getOid());
0452: } else if (!srange.covers(vb.getOid())) {
0453: processEndOfMibView(sreq, srange, null);
0454: } else if ((vb.isException())
0455: || (super .vacm.isAccessAllowed(sreq
0456: .getSnmpRequest().getViewName(), vb
0457: .getOid()) != VACM.VACM_OK)) {
0458: DefaultMOContextScope nscope = (DefaultMOContextScope) sreq
0459: .getScope();
0460: nscope.substractScope(srange);
0461: nscope.setUpperBound(null);
0462: nscope.setUpperIncluded(true);
0463: // reset query because scope changed!
0464: sreq.setQuery(null);
0465: sreq.getStatus().setProcessed(false);
0466: } else {
0467: sreq.getVariableBinding().setOid(vb.getOid());
0468: sreq.getVariableBinding().setVariable(vb.getVariable());
0469: sreq.getStatus().setPhaseComplete(true);
0470: if (LOGGER.isDebugEnabled()) {
0471: LOGGER.debug("Assigned next subrequest " + sreq);
0472: }
0473: // Not needed here because bulk processing does it anyway:
0474: sreq.updateNextRepetition();
0475: }
0476: } else {
0477: // less VBs than expected
0478: processEndOfMibView(sreq, srange, null);
0479: }
0480: }
0481:
0482: private static void processEndOfMibView(SnmpSubRequest sreq,
0483: MOScope srange, OID oid) {
0484: if (srange.getUpperBound() == null) {
0485: // unbounded
0486: // set also all following repetitions to endOfMibView
0487: SubRequestIterator tail = sreq.repetitions();
0488: while (tail.hasNext()) {
0489: SubRequest sr = tail.nextSubRequest();
0490: if (oid == null) {
0491: sr.getVariableBinding().setOid(
0492: srange.getLowerBound());
0493: } else {
0494: sreq.getVariableBinding().setOid(oid);
0495: }
0496: sr.getVariableBinding().setVariable(Null.endOfMibView);
0497: sr.getStatus().setPhaseComplete(true);
0498: }
0499: return;
0500: } else {
0501: sreq.getStatus().setProcessed(false);
0502: }
0503: DefaultMOContextScope nscope = (DefaultMOContextScope) sreq
0504: .getScope();
0505: nscope.substractScope(srange);
0506: nscope.setUpperBound(null);
0507: nscope.setUpperIncluded(true);
0508: // reset query because scope changed!
0509: sreq.setQuery(null);
0510: }
0511:
0512: protected void processAgentXBulkResponse(AgentXPending pending,
0513: AgentXResponsePDU pdu) {
0514: if (pdu.getErrorStatus() != PDU.noError) {
0515: processsErrorResponse(pending, pdu);
0516: } else {
0517: AgentXGetBulkPDU requestPDU = (AgentXGetBulkPDU) pending
0518: .getAgentXPDU();
0519: VariableBinding[] vbs = pdu.getVariableBindings();
0520: int numBindings = vbs.length;
0521: int repeaters = requestPDU.getRanges().length
0522: - requestPDU.getNonRepeaters();
0523: if (numBindings - requestPDU.getNonRepeaters() > requestPDU
0524: .getMaxRepetitions()
0525: * repeaters) {
0526: LOGGER
0527: .warn("Bulk response with more repetitions ("
0528: + ((numBindings - requestPDU
0529: .getNonRepeaters()) / repeaters)
0530: + ") than max rep. "
0531: + requestPDU.getMaxRepetitions());
0532: numBindings = requestPDU.getMaxRepetitions()
0533: * repeaters + requestPDU.getNonRepeaters();
0534: }
0535: if (numBindings == 0) {
0536: // this is IMHO outside the AgentX/SNMP spec but it is in fact
0537: // needed to be interoperable with NET-SNMP sub-agent
0538: AgentXRequestPDU axReqPDU = (AgentXRequestPDU) pending
0539: .getAgentXPDU();
0540: SubRequestIterator subRequests = pending
0541: .getReferences();
0542: for (int i = 0; subRequests.hasNext(); i++) {
0543: SnmpSubRequest sreq = (SnmpSubRequest) subRequests
0544: .nextSubRequest();
0545: MOScope srange = axReqPDU.getRanges()[i];
0546: processEndOfMibView(sreq, srange, null);
0547: }
0548: } else {
0549: // process non repeaters first
0550: SubRequestIterator it = processAgentXNextResponse(
0551: pending, pdu, requestPDU.getNonRepeaters());
0552: int nonRep = requestPDU.getNonRepeaters();
0553: for (int c = 0; (c + nonRep < requestPDU.getRanges().length)
0554: && it.hasNext(); c++) {
0555: int rangeIndex = c + nonRep;
0556: SnmpSubRequest sreq = (SnmpSubRequest) it
0557: .nextSubRequest();
0558: SubRequestIterator rsreq = sreq.repetitions();
0559: for (int r = 0; (nonRep + (r * repeaters) + c < numBindings)
0560: && rsreq.hasNext(); r++) {
0561: SnmpSubRequest repetition = (SnmpSubRequest) rsreq
0562: .nextSubRequest();
0563: /*
0564: System.err.println("nr="+nonRep+",r="+r+",repeaters="+repeaters+
0565: ",c="+c+",rangeIndex="+rangeIndex+",rep="+repetition);
0566: */
0567: processNextSubRequest(vbs, requestPDU, nonRep
0568: + (r * repeaters) + c, rangeIndex,
0569: repetition);
0570: }
0571: }
0572: }
0573: }
0574: }
0575:
0576: protected static void processsErrorResponse(AgentXPending pending,
0577: AgentXResponsePDU pdu) throws NoSuchElementException {
0578: SubRequestIterator subRequests = pending.getReferences();
0579: for (int i = 1; i < pdu.getErrorIndex(); i++) {
0580: if (subRequests.hasNext()) {
0581: subRequests.next();
0582: } else {
0583: pending.getRequest().setErrorStatus(PDU.genErr);
0584: return;
0585: }
0586: }
0587: if (subRequests.hasNext()) {
0588: SubRequest sreq = subRequests.nextSubRequest();
0589: RequestStatus status = sreq.getStatus();
0590: status.setErrorStatus(pdu.getErrorStatus());
0591: } else {
0592: pending.getRequest().setErrorStatus(PDU.genErr);
0593: }
0594: }
0595:
0596: private static boolean checkAgentXResponse(AgentXResponsePDU pdu,
0597: AgentXPending pending) {
0598: switch (pending.getAgentXPDU().getType()) {
0599: case AgentXPDU.AGENTX_GET_PDU:
0600: case AgentXPDU.AGENTX_GETNEXT_PDU: {
0601: if (((AgentXRequestPDU) pending.getAgentXPDU()).getRanges().length != pdu
0602: .size()) {
0603: pending.getRequest().setErrorStatus(PDU.genErr);
0604: return false;
0605: }
0606: break;
0607: }
0608: default: {
0609: // no check?
0610: }
0611: }
0612: return true;
0613: }
0614:
0615: protected AgentXResponsePDU ping(AgentXPingPDU pdu,
0616: AgentXMasterSession session) {
0617: AgentXResponsePDU response = createResponse(pdu, session);
0618: if (!checkIfContextIsSupported(pdu.getContext())) {
0619: response
0620: .setErrorStatus(AgentXProtocol.AGENTX_UNSUPPORTED_CONTEXT);
0621: return response;
0622: }
0623: return response;
0624: }
0625:
0626: protected AgentXResponsePDU notify(AgentXNotifyPDU pdu,
0627: AgentXMasterSession session) {
0628: AgentXResponsePDU response = createResponse(pdu, session);
0629: if (session != null) {
0630: if (!checkIfContextIsSupported(pdu.getContext())) {
0631: response
0632: .setErrorStatus(AgentXProtocol.AGENTX_UNSUPPORTED_CONTEXT);
0633: return response;
0634: }
0635: VariableBinding[] vbs = pdu.getVariableBindings();
0636: response.setVariableBindings(vbs);
0637: int payloadIndex = 1;
0638: OID trapoid = null;
0639: TimeTicks timestamp = new TimeTicks(
0640: getContextSysUpTime(DEFAULT_CONTEXT));
0641:
0642: if (vbs.length >= 1) {
0643: if (SnmpConstants.sysUpTime.equals(vbs[0].getOid())) {
0644: payloadIndex++;
0645: if ((vbs.length < 2)
0646: || (!SnmpConstants.snmpTrapOID
0647: .equals(vbs[1].getOid()))) {
0648: response
0649: .setErrorStatus(AgentXProtocol.AGENTX_PROCESSING_ERROR);
0650: response.setErrorIndex(2);
0651: } else {
0652: timestamp = (TimeTicks) vbs[0].getVariable();
0653: trapoid = (OID) vbs[1].getVariable();
0654: }
0655: } else if (SnmpConstants.snmpTrapOID.equals(vbs[0]
0656: .getOid())) {
0657: trapoid = (OID) vbs[0].getVariable();
0658: } else {
0659: response
0660: .setErrorStatus(AgentXProtocol.AGENTX_PROCESSING_ERROR);
0661: response.setErrorIndex(1);
0662: }
0663: }
0664: if (trapoid != null) {
0665: VariableBinding[] pvbs = new VariableBinding[vbs.length
0666: - payloadIndex];
0667: System.arraycopy(vbs, payloadIndex, pvbs, 0,
0668: pvbs.length);
0669: notify(pdu.getContext(), trapoid, timestamp, pvbs);
0670: }
0671: }
0672: return response;
0673: }
0674:
0675: protected TimeTicks getContextSysUpTime(OctetString context) {
0676: MasterContextInfo info = (MasterContextInfo) contextInfo
0677: .get(context);
0678: SysUpTime contextSysUpTime;
0679: if (info == null) {
0680: MOContextScope scope = new DefaultMOContextScope(context,
0681: SnmpConstants.sysUpTime, true,
0682: SnmpConstants.sysUpTime, true);
0683: ManagedObject mo = server.lookup(new DefaultMOQuery(scope));
0684: if (mo instanceof SysUpTime) {
0685: contextSysUpTime = (SysUpTime) mo;
0686: } else {
0687: /**@todo May be we can use an integer of the found object to
0688: * initialize the time?
0689: */
0690: LOGGER.warn("SysUpTime could not be found in '"
0691: + context
0692: + "' context, using a new instance instead");
0693: contextSysUpTime = new SysUpTimeImpl();
0694: }
0695: contextInfo.put(context, new MasterContextInfo(context,
0696: contextSysUpTime));
0697: } else {
0698: contextSysUpTime = info.getUpTime();
0699: }
0700: if (contextSysUpTime != null) {
0701: return contextSysUpTime.get();
0702: }
0703: return null;
0704: }
0705:
0706: public AgentXResponsePDU addAgentCaps(AgentXAddAgentCapsPDU pdu,
0707: AgentXMasterSession session) {
0708: AgentXResponsePDU response = createResponse(pdu, session);
0709: if (session != null) {
0710: if (!checkIfContextIsSupported(pdu.getContext())) {
0711: response
0712: .setErrorStatus(AgentXProtocol.AGENTX_UNSUPPORTED_CONTEXT);
0713: return response;
0714: }
0715: AgentCapabilityList agentCaps = getAgentCaps(pdu
0716: .getContext());
0717: if (agentCaps != null) {
0718: OID index = agentCaps.addSysOREntry(pdu.getId(), pdu
0719: .getDescr());
0720: session.addAgentCaps(pdu.getId(), index);
0721: }
0722: }
0723: return response;
0724: }
0725:
0726: protected AgentCapabilityList getAgentCaps(OctetString contextName) {
0727: MOContextScope scope = new DefaultMOContextScope(contextName,
0728: SnmpConstants.sysOREntry, true,
0729: SnmpConstants.sysOREntry, true);
0730: ManagedObject mo = server.lookup(new DefaultMOQuery(scope));
0731: if (mo instanceof AgentCapabilityList) {
0732: return (AgentCapabilityList) mo;
0733: } else {
0734: LOGGER.warn("SysOREntry managed object for context "
0735: + contextName + " not found, instead found: " + mo);
0736: }
0737: return null;
0738: }
0739:
0740: public AgentXResponsePDU removeAgentCaps(
0741: AgentXRemoveAgentCapsPDU pdu, AgentXMasterSession session) {
0742: AgentXResponsePDU response = createResponse(pdu, session);
0743: if (session != null) {
0744: OID index = session.removeAgentCaps(pdu.getId());
0745: AgentCapabilityList agentCaps = getAgentCaps(pdu
0746: .getContext());
0747: if (agentCaps != null) {
0748: Object ac = agentCaps.removeSysOREntry(index);
0749: if (ac == null) {
0750: response
0751: .setErrorStatus(AgentXProtocol.AGENTX_UNKNOWN_AGENTCAPS);
0752: }
0753: } else {
0754: response
0755: .setErrorStatus(AgentXProtocol.AGENTX_UNKNOWN_AGENTCAPS);
0756: }
0757: }
0758: return response;
0759: }
0760:
0761: public AgentXResponsePDU closeSession(AgentXClosePDU pdu,
0762: AgentXMasterSession session) {
0763: if (LOGGER.isInfoEnabled()) {
0764: LOGGER.info("Subagent is closing session " + session
0765: + " because " + pdu.getReason());
0766: }
0767: AgentXResponsePDU response = createResponse(pdu, session);
0768: if (session != null) {
0769: removeSession(session.getSessionID());
0770: removeAllRegistrations(session);
0771: session.setClosed(true);
0772: }
0773: return response;
0774: }
0775:
0776: public void closeSession(AgentXMasterSession session, byte reason) {
0777: if (LOGGER.isInfoEnabled()) {
0778: LOGGER.info("Closing sub-agent session " + session
0779: + " because " + reason);
0780: }
0781: AgentXClosePDU closePDU = new AgentXClosePDU(reason);
0782: try {
0783: agentX.send(closePDU, session.createAgentXTarget(), session
0784: .getPeer().getTransport(), new AgentXPendingClose(
0785: session, closePDU), this );
0786: } catch (IOException ex) {
0787: LOGGER.error(
0788: "Failed to send CloseSessionPDU to close session "
0789: + session + ": " + ex.getMessage(), ex);
0790: }
0791: removeSession(session.getSessionID());
0792: removeAllRegistrations(session);
0793: session.setClosed(true);
0794: }
0795:
0796: protected synchronized void removeAllRegistrations(
0797: AgentXMasterSession session) {
0798: if (LOGGER.isDebugEnabled()) {
0799: LOGGER.debug("Removing all registrations (out of "
0800: + registrations.size() + ") of session " + session);
0801: }
0802: for (Iterator it = registrations.iterator(); it.hasNext();) {
0803: AgentXRegEntry r = (AgentXRegEntry) it.next();
0804: if (r.getSession().equals(session)) {
0805: removeRegistration(r, it);
0806: }
0807: }
0808: }
0809:
0810: protected AgentXMasterSession getSession(int sessionID) {
0811: return (AgentXMasterSession) sessions
0812: .get(new Integer(sessionID));
0813: }
0814:
0815: protected synchronized AgentXMasterSession getSession(AgentXPDU pdu) {
0816: int sessionID = pdu.getSessionID();
0817: return getSession(sessionID);
0818: }
0819:
0820: protected AgentXResponsePDU register(AgentXRegisterPDU pdu,
0821: AgentXCommandEvent command, AgentXMasterSession session) {
0822: AgentXResponsePDU response = createResponse(pdu, session);
0823: if (session != null) {
0824: if (!checkIfContextIsSupported(pdu.getContext())) {
0825: response
0826: .setErrorStatus(AgentXProtocol.AGENTX_UNSUPPORTED_CONTEXT);
0827: return response;
0828: }
0829: AgentXRegEntry regEntry = new AgentXRegEntry(session, pdu
0830: .getRegion(), pdu.getPriority(), pdu.getContext(),
0831: pdu.getTimeout());
0832: if (isDuplicate(regEntry)) {
0833: response
0834: .setErrorStatus(AgentXProtocol.AGENTX_DUPLICATE_REGISTRATION);
0835: return response;
0836: }
0837: AgentXMasterEvent event = new AgentXMasterEvent(this ,
0838: AgentXMasterEvent.REGISTRATION_TO_ADD, regEntry);
0839: fireMasterChanged(event);
0840: if (event.getVetoReason() == AgentXProtocol.AGENTX_SUCCESS) {
0841: try {
0842: addRegistration(regEntry);
0843: } catch (DuplicateRegistrationException drex) {
0844: if (LOGGER.isDebugEnabled()) {
0845: drex.printStackTrace();
0846: }
0847: response
0848: .setErrorStatus(AgentXProtocol.AGENTX_DUPLICATE_REGISTRATION);
0849: return response;
0850: }
0851: } else {
0852: response.setErrorStatus(event.getVetoReason());
0853: }
0854: }
0855: return response;
0856: }
0857:
0858: protected AgentXResponsePDU unregister(AgentXUnregisterPDU pdu,
0859: AgentXCommandEvent event, AgentXMasterSession session) {
0860: AgentXResponsePDU response = createResponse(pdu, session);
0861: if (session != null) {
0862: AgentXRegEntry regEntry = new AgentXRegEntry(session, pdu
0863: .getRegion(), pdu.getPriority(), pdu.getContext(),
0864: pdu.getTimeout());
0865: boolean found = false;
0866: for (Iterator it = registrations.iterator(); it.hasNext();) {
0867: AgentXRegEntry r = (AgentXRegEntry) it.next();
0868: if (r.equals(regEntry)) {
0869: found = true;
0870: if (!removeRegistration(r, it)) {
0871: response
0872: .setErrorStatus(AgentXProtocol.AGENTX_UNKNOWN_REGISTRATION);
0873: }
0874: break;
0875: }
0876: }
0877: if (!found) {
0878: response
0879: .setErrorStatus(AgentXProtocol.AGENTX_UNKNOWN_REGISTRATION);
0880: }
0881: }
0882: return response;
0883: }
0884:
0885: protected synchronized boolean isDuplicate(
0886: AgentXRegEntry registration) {
0887: if (registrations.contains(registration)) {
0888: if (LOGGER.isDebugEnabled()) {
0889: LOGGER.debug("Identical registration attempt for "
0890: + registration);
0891: }
0892: return true;
0893: }
0894: AgentXNodeQuery query = new AgentXNodeQuery(registration
0895: .getContext(), registration.getRegion(),
0896: AgentXNodeQuery.QUERY_NON_AGENTX_NODES);
0897: ManagedObject mo = server.lookup(query);
0898: if (mo != null) {
0899: // overlaps non AgentX --> return duplicate region error
0900: if (LOGGER.isDebugEnabled()) {
0901: LOGGER
0902: .debug("New registration is rejected as duplicate because it "
0903: + "overlaps with non AgentX managed object: "
0904: + mo);
0905: }
0906: return true;
0907: }
0908: return false;
0909: }
0910:
0911: protected synchronized void addRegistration(
0912: AgentXRegEntry registration)
0913: throws DuplicateRegistrationException {
0914: registrations.add(registration);
0915: if (registration.getRegion().isRange()) {
0916: AgentXRegion r = registration.getRegion();
0917: long start = r.getLowerBoundSubID() & 0xFFFFFFFFL;
0918: long stop = r.getUpperBoundSubID() & 0xFFFFFFFFL;
0919: if (start > stop) {
0920: LOGGER.warn("Empty range registration " + registration);
0921: } else {
0922: for (long s = start; s <= stop; s++) {
0923: OID root = new OID(r.getLowerBound());
0924: root.set(r.getRangeSubID() - 1, (int) s);
0925: AgentXRegion sr = new AgentXRegion(root, root
0926: .nextPeer());
0927: addRegion(registration, sr);
0928: }
0929: }
0930: } else {
0931: addRegion(registration, registration.getRegion());
0932: }
0933: AgentXMasterEvent e = new AgentXMasterEvent(this ,
0934: AgentXMasterEvent.REGISTRATION_ADDED, registration);
0935: fireMasterChanged(e);
0936: }
0937:
0938: private static AgentXNodeQuery nextQuery(AgentXNodeQuery lastQuery,
0939: AgentXNode lastNode) {
0940: if (lastNode != null) {
0941: lastQuery.getMutableScope().setLowerBound(
0942: lastNode.getScope().getUpperBound());
0943: lastQuery.getMutableScope().setLowerIncluded(
0944: !lastNode.getScope().isUpperIncluded());
0945: }
0946: return lastQuery;
0947: }
0948:
0949: protected synchronized void addRegion(AgentXRegEntry registration,
0950: AgentXRegion region) throws DuplicateRegistrationException {
0951: if (region.isRange()) {
0952: String errText = "Regions with range cannot be added";
0953: LOGGER.error(errText);
0954: throw new IllegalArgumentException(errText);
0955: }
0956: AgentXNodeQuery query = new AgentXNodeQuery(registration
0957: .getContext(), region,
0958: AgentXNodeQuery.QUERY_AGENTX_NODES);
0959: AgentXNode lastNode = null;
0960: AgentXNode node = (AgentXNode) server.lookup(query);
0961: if (node != null) {
0962: LinkedList splitted = new LinkedList();
0963: AgentXRegion r1 = new AgentXRegion(region);
0964: for (; (node != null); node = (AgentXNode) server
0965: .lookup(nextQuery(query, lastNode))) {
0966: AgentXRegion r2 = new AgentXRegion(node.getScope()
0967: .getLowerBound(), node.getScope()
0968: .getUpperBound());
0969: if (LOGGER.isDebugEnabled()) {
0970: LOGGER.debug("Affected region r2=" + r2
0971: + " from registered region r1=" + r1);
0972: }
0973: if (r2.covers(r1)) {
0974: if (LOGGER.isDebugEnabled()) {
0975: LOGGER.debug("Region r2 covers r1 (r1=" + r1
0976: + ",r2=" + r2 + ")");
0977: }
0978: oldRegionCoversNew(registration, node, splitted,
0979: r1, r2);
0980: r1 = null;
0981: } else if (r1.covers(r2)) {
0982: if (LOGGER.isDebugEnabled()) {
0983: LOGGER.debug("Region r1 covers r2 (r1=" + r1
0984: + ",r2=" + r2 + ")");
0985: }
0986: r1 = newRegionCoversOld(registration, lastNode,
0987: node, splitted, r1, r2);
0988: } else if ((r1.isOverlapping(r2))
0989: && (r2.getLowerBound().compareTo(
0990: r1.getLowerBound()) < 0)) {
0991: if (LOGGER.isDebugEnabled()) {
0992: LOGGER
0993: .debug("Region r1 ovelaps r2 and r2 < r1 (r1="
0994: + r1 + ",r2=" + r2 + ")");
0995: }
0996: if (LOGGER.isDebugEnabled()) {
0997: LOGGER.debug("Shrinking node " + node + " to "
0998: + r1.getLowerBound());
0999: }
1000: node.shrink(r1.getLowerBound());
1001: AgentXNode r2b = node.getClone(new AgentXRegion(r1
1002: .getLowerBound(), r2.getUpperBound()));
1003: r2b.addRegistration(registration);
1004: splitted.add(r2b);
1005: r1 = new AgentXRegion(r2.getUpperBound(), r1
1006: .getLowerBound());
1007: }
1008: // r1.overlaps(r2) and (r1.get_lower() < r2.get_lower())
1009: else {
1010: if (LOGGER.isDebugEnabled()) {
1011: LOGGER
1012: .debug("Region r1 ovelaps r2 and r1 < r2 (r1="
1013: + r1 + ",r2=" + r2 + ")");
1014: }
1015: if (LOGGER.isDebugEnabled()) {
1016: LOGGER.debug("Shrinking node " + node + " to "
1017: + r1.getUpperBound());
1018: }
1019: node.shrink(r1.getUpperBound());
1020: AgentXNode r2b = node.getClone(new AgentXRegion(r1
1021: .getUpperBound(), r2.getUpperBound()));
1022: node.addRegistration(registration);
1023: splitted.add(r2b);
1024: AgentXNode r1a = new AgentXNode(new AgentXRegion(r1
1025: .getLowerBound(), r2.getLowerBound()),
1026: registration);
1027: splitted.add(r1a);
1028: r1 = null;
1029: }
1030: if (r1 != null) {
1031: if (r1.isEmpty()) {
1032: splitted.add(new AgentXNode(region,
1033: registration));
1034: } else {
1035: splitted.add(new AgentXNode(r1, registration));
1036: }
1037: }
1038: lastNode = node;
1039: }
1040: for (Iterator it = splitted.iterator(); it.hasNext();) {
1041: AgentXNode n = (AgentXNode) it.next();
1042: server.register(n, registration.getContext());
1043: if (LOGGER.isDebugEnabled()) {
1044: LOGGER.debug("Registered splitted AgentX node: "
1045: + n);
1046: }
1047: }
1048: } else {
1049: node = new AgentXNode(region, registration);
1050: server.register(node, registration.getContext());
1051: if (LOGGER.isDebugEnabled()) {
1052: LOGGER.debug("Registered AgentX node: " + node);
1053: }
1054: }
1055: }
1056:
1057: protected boolean removeRegistration(AgentXRegEntry registration,
1058: Iterator regIterator) {
1059: LinkedList remove = new LinkedList();
1060: AgentXRegion queryRegion = new AgentXRegion(registration
1061: .getRegion());
1062: queryRegion.setUpperIncluded(true);
1063: AgentXNodeQuery query = new AgentXNodeQuery(registration
1064: .getContext(), queryRegion,
1065: AgentXNodeQuery.QUERY_AGENTX_NODES);
1066: AgentXNode lastNode = null;
1067: AgentXNode node = (AgentXNode) server.lookup(query);
1068: if (node != null) {
1069: for (; (node != null); node = (AgentXNode) server
1070: .lookup(nextQuery(query, lastNode))) {
1071: if (node == lastNode) {
1072: break;
1073: }
1074: if ((node.removeRegistration(registration))
1075: && (node.getRegistrationCount() == 0)) {
1076: remove.add(node);
1077: } else {
1078: if ((lastNode != null)
1079: && (lastNode.getRegistrationCount() == 1)
1080: && (node.getRegistrationCount() == 1)
1081: && (lastNode.getScope().getUpperBound()
1082: .equals(node.getScope()
1083: .getLowerBound()))
1084: && (node.getActiveRegistration()
1085: .equals(lastNode
1086: .getActiveRegistration()))) {
1087: AgentXRegion r = new AgentXRegion(node
1088: .getScope().getLowerBound(), lastNode
1089: .getScope().getUpperBound());
1090: if (node.getActiveRegistration().getRegion()
1091: .covers(r)) {
1092: remove.add(node);
1093: lastNode.expand(node.getScope()
1094: .getUpperBound(), false);
1095: }
1096: }
1097: }
1098: lastNode = node;
1099: }
1100: } else {
1101: LOGGER
1102: .warn("A registration is removed with not associated subtree: "
1103: + registration);
1104: }
1105: for (Iterator it = remove.iterator(); it.hasNext();) {
1106: AgentXNode rnode = (AgentXNode) it.next();
1107: server.unregister(rnode, registration.getContext());
1108: }
1109: if (regIterator != null) {
1110: regIterator.remove();
1111: if (LOGGER.isDebugEnabled()) {
1112: LOGGER.debug("Removed registration " + registration
1113: + " by session close, " + registrations.size()
1114: + " left.");
1115: }
1116: fireMasterChanged(new AgentXMasterEvent(this ,
1117: AgentXMasterEvent.REGISTRATION_REMOVED,
1118: registration));
1119: return true;
1120: } else if (registrations.remove(registration)) {
1121: if (LOGGER.isDebugEnabled()) {
1122: LOGGER.debug("Removed registration " + registration
1123: + ", " + registrations.size() + " left.");
1124: }
1125: fireMasterChanged(new AgentXMasterEvent(this ,
1126: AgentXMasterEvent.REGISTRATION_REMOVED,
1127: registration));
1128: return true;
1129: }
1130: return false;
1131: }
1132:
1133: private static AgentXRegion newRegionCoversOld(
1134: AgentXRegEntry registration, AgentXNode lastNode,
1135: AgentXNode node, LinkedList splitted, AgentXRegion r1,
1136: AgentXRegion r2) {
1137: AgentXNode r1a = null;
1138: if (lastNode != null) {
1139: AgentXRegion r = new AgentXRegion(lastNode.getScope()
1140: .getUpperBound(), r2.getLowerBound());
1141: r1a = new AgentXNode(r, registration);
1142: } else {
1143: AgentXRegion r = new AgentXRegion(r1.getLowerBound(), r2
1144: .getLowerBound());
1145: r1a = new AgentXNode(r, registration);
1146: }
1147: if (!splitted.isEmpty()) {
1148: if (LOGGER.isDebugEnabled()) {
1149: LOGGER.debug("Shrinking node " + splitted.getLast()
1150: + " to " + r2.getLowerBound());
1151: }
1152: ((AgentXNode) splitted.getLast())
1153: .shrink(r2.getLowerBound());
1154: }
1155: node.addRegistration(registration);
1156: if ((r1.getLowerBound().equals(r2.getLowerBound()))
1157: || ((!splitted.isEmpty()) && (((AgentXNode) splitted
1158: .getLast()).getScope().equals(r1a.getScope())))) {
1159: r1a = null;
1160: } else {
1161: splitted.add(r1a);
1162: }
1163: return new AgentXRegion(r2.getUpperBound(), r1.getUpperBound());
1164: }
1165:
1166: private static void oldRegionCoversNew(AgentXRegEntry registration,
1167: AgentXNode node, List splitted, AgentXRegion r1,
1168: AgentXRegion r2) {
1169: AgentXRegion r = new AgentXRegion(r1.getUpperBound(), node
1170: .getScope().getUpperBound());
1171: AgentXNode r2c = node.getClone(r);
1172: if (r2.getLowerBound().equals(r1.getLowerBound())) {
1173: if (LOGGER.isDebugEnabled()) {
1174: LOGGER.debug("Shrinking node " + node + " to "
1175: + r1.getUpperBound());
1176: }
1177: node.shrink(r1.getUpperBound());
1178: node.addRegistration(registration);
1179: } else {
1180: if (LOGGER.isDebugEnabled()) {
1181: LOGGER.debug("Shrinking node " + node + " to "
1182: + r1.getLowerBound());
1183: }
1184: node.shrink(r1.getLowerBound());
1185: AgentXNode r2b = node.getClone(r1);
1186: r2b.addRegistration(registration);
1187: splitted.add(r2b);
1188: }
1189: splitted.add(r2c);
1190: }
1191:
1192: public AgentXResponsePDU openSession(AgentXOpenPDU pdu,
1193: AgentXCommandEvent event) {
1194: AgentXMasterSession session = new AgentXMasterSession(
1195: getNextSessionID(), agentXQueue, pdu.getSubagentID(),
1196: pdu.getSubagentDescr());
1197: AgentXPeer peer = getPeer(event.getPeerAddress());
1198: if (peer == null) {
1199: peer = new AgentXPeer(event.getPeerTransport(), event
1200: .getPeerAddress());
1201: addPeer(peer);
1202: LOGGER.warn("Added peer during session opening: " + peer
1203: + " (peer should have been there already due "
1204: + "to connection setup)");
1205: }
1206: session.setPeer(peer);
1207: session.setAgentXVersion(pdu.getVersion() & 0xFF);
1208: if (pdu.getTimeout() != 0) {
1209: session.setTimeout(pdu.getTimeout());
1210: } else {
1211: session.setTimeout(defaultTimeout);
1212: }
1213: int sessionAccepted = acceptSession(session);
1214: if (sessionAccepted == AgentXProtocol.AGENTX_SUCCESS) {
1215: addSession(session);
1216: if (LOGGER.isInfoEnabled()) {
1217: LOGGER.info("Session " + session + " opened from "
1218: + peer.getAddress());
1219: }
1220: } else {
1221: LOGGER.warn("Session open rejected because "
1222: + sessionAccepted + " for " + session + " from "
1223: + event.getPeerAddress());
1224: }
1225: AgentXResponsePDU response = createResponse(pdu, session);
1226: response.setErrorStatus((short) sessionAccepted);
1227: return response;
1228: }
1229:
1230: protected synchronized void addPeer(AgentXPeer peer) {
1231: peers.put(peer.getAddress(), peer);
1232: fireMasterChanged(new AgentXMasterEvent(this ,
1233: AgentXMasterEvent.PEER_ADDED, peer));
1234: }
1235:
1236: protected synchronized AgentXPeer getPeer(Address address) {
1237: return (AgentXPeer) peers.get(address);
1238: }
1239:
1240: protected int acceptSession(AgentXMasterSession session) {
1241: AgentXMasterEvent event = new AgentXMasterEvent(this ,
1242: AgentXMasterEvent.SESSION_ADDED, session);
1243: fireMasterChanged(event);
1244: return event.getVetoReason();
1245: }
1246:
1247: protected synchronized void addSession(AgentXMasterSession session) {
1248: sessions.put(new Integer(session.getSessionID()), session);
1249: fireMasterChanged(new AgentXMasterEvent(this ,
1250: AgentXMasterEvent.SESSION_ADDED, session));
1251: }
1252:
1253: protected synchronized AgentXMasterSession removeSession(
1254: int sessionID) {
1255: AgentXMasterSession session = (AgentXMasterSession) sessions
1256: .remove(new Integer(sessionID));
1257: if (session != null) {
1258: fireMasterChanged(new AgentXMasterEvent(this ,
1259: AgentXMasterEvent.SESSION_REMOVED, session));
1260: }
1261: return session;
1262: }
1263:
1264: protected AgentXResponsePDU createResponse(AgentXPDU request,
1265: AgentXSession session) {
1266: OctetString context = DEFAULT_CONTEXT;
1267: if (request instanceof AgentXContextPDU) {
1268: OctetString reqContext = ((AgentXContextPDU) request)
1269: .getContext();
1270: if (server.isContextSupported(reqContext)) {
1271: context = reqContext;
1272: }
1273: }
1274: AgentXResponsePDU response = new AgentXResponsePDU(
1275: getContextSysUpTime(context).toInt(), (short) 0,
1276: (short) 0);
1277: if (session == null) {
1278: response.setSessionID(request.getSessionID());
1279: response.setErrorStatus(AgentXProtocol.AGENTX_NOT_OPEN);
1280: } else {
1281: response.setSessionID(session.getSessionID());
1282: }
1283: response.setPacketID(request.getPacketID());
1284: response.setTransactionID(request.getTransactionID());
1285: response.setByteOrder(request.getByteOrder());
1286: return response;
1287: }
1288:
1289: protected void sendResponse(AgentXPDU response,
1290: AgentXSession session) {
1291: if (LOGGER.isDebugEnabled()) {
1292: LOGGER.debug("Sending AgentX response " + response
1293: + " to session " + session);
1294: }
1295: try {
1296: agentX.send(response, session.createAgentXTarget(), session
1297: .getPeer().getTransport());
1298: } catch (IOException ex) {
1299: if (LOGGER.isDebugEnabled()) {
1300: ex.printStackTrace();
1301: }
1302: LOGGER.error("Failed to send AgentX response " + response
1303: + " to session " + session + " because: "
1304: + ex.getMessage(), ex);
1305: }
1306: }
1307:
1308: public synchronized void connectionStateChanged(
1309: TransportStateEvent change) {
1310: Address peerAddress = change.getPeerAddress();
1311: switch (change.getNewState()) {
1312: case TransportStateEvent.STATE_CLOSED:
1313: case TransportStateEvent.STATE_DISCONNECTED_REMOTELY:
1314: case TransportStateEvent.STATE_DISCONNECTED_TIMEOUT: {
1315: AgentXPeer removedPeer = removePeer(peerAddress);
1316: fireMasterChanged(new AgentXMasterEvent(this ,
1317: AgentXMasterEvent.PEER_REMOVED, removedPeer));
1318: break;
1319: }
1320: default: {
1321: AgentXPeer newPeer = new AgentXPeer(
1322: (TransportMapping) change.getSource(), peerAddress);
1323: addPeer(newPeer);
1324: }
1325: }
1326: }
1327:
1328: protected synchronized AgentXPeer removePeer(Address peerAddress) {
1329: AgentXPeer peer = (AgentXPeer) peers.remove(peerAddress);
1330: if (peer != null) {
1331: peer.setClosing(true);
1332: for (Iterator it = sessions.values().iterator(); it
1333: .hasNext();) {
1334: AgentXMasterSession session = (AgentXMasterSession) it
1335: .next();
1336: if (session.getPeer().equals(peer)) {
1337: it.remove();
1338: fireMasterChanged(new AgentXMasterEvent(this ,
1339: AgentXMasterEvent.SESSION_REMOVED, session));
1340: indexRegistry.release(session.getSessionID());
1341: removeAllRegistrations(session);
1342: session.setClosed(true);
1343: if (peer.getTransport() instanceof ConnectionOrientedTransportMapping) {
1344: try {
1345: ((ConnectionOrientedTransportMapping) peer
1346: .getTransport()).close(peer
1347: .getAddress());
1348: } catch (IOException iox) {
1349: LOGGER
1350: .warn("Caught exception while closing transport: "
1351: + iox.getMessage());
1352: }
1353: }
1354: }
1355: }
1356: /* Optional code for debugging of registry issues:
1357: if (LOGGER.isDebugEnabled()) {
1358: if (server instanceof DefaultMOServer) {
1359: SortedMap registry = ((DefaultMOServer) server).getRegistry();
1360: System.err.println(registry.toString());
1361: }
1362: }
1363: */
1364: } else {
1365: LOGGER.warn("Tried to remove peer with address "
1366: + peerAddress + " which is not part of peer list: "
1367: + peers);
1368: }
1369: return peer;
1370: }
1371:
1372: public byte getAgentXVersion() {
1373: return AgentXProtocol.VERSION_1_0;
1374: }
1375:
1376: public synchronized void addAgentXMasterListener(
1377: AgentXMasterListener l) {
1378: if (agentXMasterListeners == null) {
1379: agentXMasterListeners = new Vector(2);
1380: }
1381: agentXMasterListeners.add(l);
1382: }
1383:
1384: public synchronized void removeAgentXMasterListener(
1385: AgentXMasterListener l) {
1386: if (agentXMasterListeners != null) {
1387: agentXMasterListeners.remove(l);
1388: }
1389: }
1390:
1391: protected void fireMasterChanged(AgentXMasterEvent event) {
1392: if (agentXMasterListeners != null) {
1393: Vector listeners = agentXMasterListeners;
1394: int count = listeners.size();
1395: for (int i = 0; i < count; i++) {
1396: try {
1397: ((AgentXMasterListener) listeners.get(i))
1398: .masterChanged(event);
1399: } catch (Exception ex) {
1400: LOGGER.error("AgentXMasterListener "
1401: + listeners.get(i) + " threw exception on "
1402: + event + ": " + ex.getMessage(), ex);
1403: }
1404: }
1405: }
1406: }
1407:
1408: protected static class AgentXRegEntryComparator implements
1409: Comparator {
1410:
1411: public int compare(Object o1, Object o2) {
1412: AgentXRegEntry a = (AgentXRegEntry) o1;
1413: AgentXRegEntry b = (AgentXRegEntry) o2;
1414: int c = a.getRegion().compareTo(b.getRegion());
1415: if (c == 0) {
1416: c = a.getContext().compareTo(b.getContext());
1417: }
1418: return c;
1419: }
1420: }
1421:
1422: public void onResponse(AgentXResponseEvent event) {
1423: AgentXResponsePDU pdu = event.getResponse();
1424: AgentXPending pending = (AgentXPending) event.getUserObject();
1425: if (LOGGER.isDebugEnabled()) {
1426: LOGGER.debug("Processing AgentX response " + pdu
1427: + " for request " + pending);
1428: }
1429: if (pending.getRequest() != null) {
1430: AgentXPending p = agentXQueue.remove(pending.getAgentXPDU()
1431: .getSessionID(), pending.getRequest()
1432: .getTransactionID());
1433: if (p == null) {
1434: LOGGER
1435: .warn("Pending AgentX request not found (may be timed out already): "
1436: + "Received AgentX response from "
1437: + event.getPeerAddress()
1438: + " for request "
1439: + event.getUserObject()
1440: + " does not match any pending request:"
1441: + pdu);
1442: return;
1443: }
1444: }
1445: if ((pdu == null)
1446: && (pending.getAgentXPDU().getType() != AgentXPDU.AGENTX_CLOSE_PDU)) {
1447: pending.getSession().incConsecutiveTimeouts();
1448: pending.getReferences().nextSubRequest().getStatus()
1449: .setErrorStatus(PDU.genErr);
1450: if (pending.getSession().getConsecutiveTimeouts() > maxConsecutiveTimeouts) {
1451: closeSession(pending.getSession(),
1452: AgentXProtocol.REASON_TIMEOUTS);
1453: }
1454: }
1455: if (pdu != null) {
1456: pending.getSession().clearConsecutiveTimeouts();
1457: }
1458: if (requestList.contains(pending.getRequest())) {
1459: if (pdu != null) {
1460: if (checkAgentXResponse(pdu, pending)) {
1461: switch (pending.getAgentXPDU().getType()) {
1462: case AgentXPDU.AGENTX_GET_PDU: {
1463: processAgentXGetResponse(pending, pdu);
1464: break;
1465: }
1466: case AgentXPDU.AGENTX_GETNEXT_PDU: {
1467: processAgentXGetNextResponse(pending, pdu);
1468: break;
1469: }
1470: case AgentXPDU.AGENTX_GETBULK_PDU: {
1471: processAgentXBulkResponse(pending, pdu);
1472: break;
1473: }
1474: case AgentXPDU.AGENTX_CLEANUPSET_PDU:
1475: case AgentXPDU.AGENTX_UNDOSET_PDU:
1476: case AgentXPDU.AGENTX_COMMITSET_PDU:
1477: case AgentXPDU.AGENTX_TESTSET_PDU: {
1478: processAgentXSetResponse(pending, pdu);
1479: break;
1480: }
1481: default: {
1482: LOGGER.warn("Unhandled AgentX response " + pdu);
1483: }
1484: }
1485: } else {
1486: LOGGER.warn("Invalid AgentX response " + pdu
1487: + " on request " + pending);
1488: }
1489: }
1490: // reprocess SNMP request
1491: if (!pending.getRequest().isComplete()) {
1492: reprocessRequest(server, pending.getRequest());
1493: }
1494: finalizeRequest((CommandResponderEvent) pending
1495: .getRequest().getSource(), pending.getRequest(),
1496: server);
1497: } else {
1498: if (pending.getAgentXPDU().getType() == AgentXPDU.AGENTX_CLOSE_PDU) {
1499: if (pdu != null) {
1500: LOGGER
1501: .info("Subagent "
1502: + event.getPeerAddress()
1503: + " confirmed close, disconnection transport now");
1504: } else {
1505: LOGGER.info("Subagent " + event.getPeerAddress()
1506: + " did not answered on session close, "
1507: + "disconnection now");
1508: }
1509: AgentXPeer peer = pending.getSession().getPeer();
1510: if (peer != null) {
1511: closePeer(peer);
1512: }
1513: } else {
1514: LOGGER.info("Received late response " + pdu
1515: + " on AgentX request: " + pending);
1516: super .release(server, pending.getRequest());
1517: }
1518: }
1519: }
1520:
1521: protected void processAgentXGetResponse(AgentXPending pending,
1522: AgentXResponsePDU pdu) {
1523: if (pdu.getErrorStatus() != PDU.noError) {
1524: processsErrorResponse(pending, pdu);
1525: } else {
1526: VariableBinding[] vbs = pdu.getVariableBindings();
1527: SubRequestIterator subRequests = pending.getReferences();
1528: for (int i = 0; (i < pending.getRequest().size())
1529: && subRequests.hasNext(); i++) {
1530: SnmpSubRequest sreq = (SnmpSubRequest) subRequests
1531: .nextSubRequest();
1532: sreq.getVariableBinding().setVariable(
1533: vbs[i].getVariable());
1534: sreq.getStatus().setPhaseComplete(true);
1535: }
1536: }
1537: }
1538:
1539: protected void processAgentXGetNextResponse(AgentXPending pending,
1540: AgentXResponsePDU pdu) {
1541: if (pdu.getErrorStatus() != PDU.noError) {
1542: processsErrorResponse(pending, pdu);
1543: } else {
1544: processAgentXNextResponse(pending, pdu, pending
1545: .getRequest().size());
1546: }
1547: }
1548:
1549: protected void processAgentXSetResponse(AgentXPending pending,
1550: AgentXResponsePDU pdu) {
1551: if (pdu.getErrorStatus() != PDU.noError) {
1552: processsErrorResponse(pending, pdu);
1553: } else {
1554: SubRequestIterator it = pending.getReferences();
1555: while (it.hasNext()) {
1556: SubRequest sreq = it.nextSubRequest();
1557: sreq.getStatus().setPhaseComplete(true);
1558: }
1559: }
1560: }
1561:
1562: }
|