Source Code Cross Referenced for AgentXCommandProcessor.java in  » Net » snmp4j » org » snmp4j » agent » agentx » master » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » snmp4j » org.snmp4j.agent.agentx.master 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /*_############################################################################
0002:          _##
0003:          _##  SNMP4J-AgentX - AgentXCommandProcessor.java
0004:          _##
0005:          _##  Copyright (C) 2005-2007  Frank Fock (SNMP4J.org)
0006:          _##
0007:          _##  This program is free software; you can redistribute it and/or modify
0008:          _##  it under the terms of the GNU General Public License version 2 as
0009:          _##  published by the Free Software Foundation.
0010:          _##
0011:          _##  This program is distributed in the hope that it will be useful,
0012:          _##  but WITHOUT ANY WARRANTY; without even the implied warranty of
0013:          _##  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
0014:          _##  GNU General Public License for more details.
0015:          _##
0016:          _##  You should have received a copy of the GNU General Public License
0017:          _##  along with this program; if not, write to the Free Software
0018:          _##  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
0019:          _##  MA  02110-1301  USA
0020:          _##
0021:          _##########################################################################*/
0022:
0023:        package org.snmp4j.agent.agentx.master;
0024:
0025:        import java.io.IOException;
0026:        import java.util.*;
0027:
0028:        import org.snmp4j.CommandResponderEvent;
0029:        import org.snmp4j.PDU;
0030:        import org.snmp4j.TransportMapping;
0031:        import org.snmp4j.agent.*;
0032:        import org.snmp4j.agent.agentx.*;
0033:        import org.snmp4j.agent.agentx.master.AgentXQueue.AgentXQueueEntry;
0034:        import org.snmp4j.agent.agentx.master.index.AgentXIndexRegistry;
0035:        import org.snmp4j.agent.mo.snmp.AgentCapabilityList;
0036:        import org.snmp4j.agent.mo.snmp.SNMPv2MIB.SysUpTimeImpl;
0037:        import org.snmp4j.agent.mo.snmp.SysUpTime;
0038:        import org.snmp4j.agent.request.*;
0039:        import org.snmp4j.agent.security.VACM;
0040:        import org.snmp4j.log.LogAdapter;
0041:        import org.snmp4j.log.LogFactory;
0042:        import org.snmp4j.mp.SnmpConstants;
0043:        import org.snmp4j.smi.*;
0044:        import org.snmp4j.transport.ConnectionOrientedTransportMapping;
0045:        import org.snmp4j.transport.TransportStateEvent;
0046:        import org.snmp4j.transport.TransportStateListener;
0047:
0048:        public class AgentXCommandProcessor extends CommandProcessor implements 
0049:                AgentXCommandListener, TransportStateListener,
0050:                AgentXResponseListener {
0051:
0052:            public static final int MAX_REPROCESSING_DEFAULT = 100;
0053:
0054:            private static final LogAdapter LOGGER = LogFactory
0055:                    .getLogger(AgentXCommandProcessor.class);
0056:
0057:            private static final OctetString DEFAULT_CONTEXT = new OctetString();
0058:
0059:            private AgentXQueue agentXQueue;
0060:            private AgentX agentX;
0061:            private Map sessions = new HashMap();
0062:            private Map peers = new HashMap(10);
0063:            private Set registrations = new TreeSet(
0064:                    new AgentXRegEntryComparator());
0065:            private MOServer server;
0066:            private int nextSessionID = 1;
0067:            private byte defaultTimeout = AgentXProtocol.DEFAULT_TIMEOUT_SECONDS;
0068:            private int maxConsecutiveTimeouts = AgentXProtocol.DEFAULT_MAX_CONSECUTIVE_TIMEOUTS;
0069:            private Map contextInfo = new HashMap(10);
0070:            private boolean acceptNewContexts = false;
0071:
0072:            private int nextPacketID = 0;
0073:
0074:            protected AgentXIndexRegistry indexRegistry = new AgentXIndexRegistry();
0075:
0076:            private transient Vector agentXMasterListeners;
0077:
0078:            private int maxReprocessing = MAX_REPROCESSING_DEFAULT;
0079:
0080:            public AgentXCommandProcessor(OctetString contextEngineID,
0081:                    AgentXQueue queue, AgentX agentX, MOServer server) {
0082:                super (contextEngineID);
0083:                this .agentXQueue = queue;
0084:                this .agentX = agentX;
0085:                this .server = server;
0086:                if (this .agentXQueue.getServer4BulkOptimization() == null) {
0087:                    this .agentXQueue.setServer4BulkOptimization(server);
0088:                }
0089:            }
0090:
0091:            private synchronized int createNextPacketID() {
0092:                return nextPacketID++;
0093:            }
0094:
0095:            public void setMaxReprocessing(int maxReprocessing) {
0096:                this .maxReprocessing = maxReprocessing;
0097:            }
0098:
0099:            public int getMaxReprocessing() {
0100:                return maxReprocessing;
0101:            }
0102:
0103:            protected void finalizeRequest(CommandResponderEvent command,
0104:                    Request req, MOServer server) {
0105:                boolean complete = req.isComplete();
0106:                AgentXQueueEntry entry = agentXQueue
0107:                        .get(req.getTransactionID());
0108:                if (entry != null) {
0109:                    Collection pending = entry.getPending();
0110:                    entry.updateTimestamp();
0111:                    for (Iterator it = pending.iterator(); it.hasNext();) {
0112:                        AgentXPending p = (AgentXPending) it.next();
0113:                        if (pending != null) {
0114:                            AgentXPDU agentXPDU = p.getAgentXPDU();
0115:                            AgentXMasterSession session = p.getSession();
0116:                            agentXPDU.setSessionID(session.getSessionID());
0117:                            agentXPDU.setTransactionID(req.getTransactionID());
0118:                            agentXPDU.setPacketID(createNextPacketID());
0119:                            p.updateTimestamp();
0120:                            try {
0121:                                agentX.send(agentXPDU, session
0122:                                        .createAgentXTarget(), session
0123:                                        .getPeer().getTransport(), p, this );
0124:                            } catch (IOException ex) {
0125:                                LOGGER
0126:                                        .error("Failed to send AgentX subrequest: "
0127:                                                + ex.getMessage());
0128:                                ((SubRequest) p.getReferences().next())
0129:                                        .getStatus().setErrorStatus(PDU.genErr);
0130:                                break;
0131:                            }
0132:                        }
0133:                    }
0134:                } else {
0135:                    if (complete) {
0136:                        agentXQueue.removeAll(req.getTransactionID());
0137:                    } else {
0138:                        // there are still incomplete sub-requests -> reprocess them
0139:                        if (req.getReprocessCounter() < this .maxReprocessing) {
0140:                            reprocessRequest(server, (SnmpRequest) req);
0141:                        } else {
0142:                            req.setErrorStatus(PDU.genErr);
0143:                            LOGGER
0144:                                    .warn("The following request has been repeocessed "
0145:                                            + req.getReprocessCounter()
0146:                                            + " which exceeds the agent's "
0147:                                            + "upper limit of "
0148:                                            + this .maxReprocessing + ": " + req);
0149:                        }
0150:                    }
0151:                    super .finalizeRequest(command, req, server);
0152:                }
0153:            }
0154:
0155:            protected synchronized int getNextSessionID() {
0156:                return nextSessionID++;
0157:            }
0158:
0159:            public MOServer getServer() {
0160:                return server;
0161:            }
0162:
0163:            public byte getDefaultTimeout() {
0164:                return defaultTimeout;
0165:            }
0166:
0167:            public int getMaxConsecutiveTimeouts() {
0168:                return maxConsecutiveTimeouts;
0169:            }
0170:
0171:            /**
0172:             * Indicates whether subagents can register contexts that are not yet
0173:             * supported by this master agent.
0174:             * @return
0175:             *    <code>true</code> if subagents can register objects for new contexts.
0176:             */
0177:            public boolean isAcceptNewContexts() {
0178:                return acceptNewContexts;
0179:            }
0180:
0181:            public void setDefaultTimeout(byte defaultTimeout) {
0182:                this .defaultTimeout = defaultTimeout;
0183:            }
0184:
0185:            public void setMaxConsecutiveTimeouts(int maxConsecutiveTimeouts) {
0186:                this .maxConsecutiveTimeouts = maxConsecutiveTimeouts;
0187:            }
0188:
0189:            /**
0190:             * Enables or disables accepting new contexts from subagents.
0191:             * @param acceptNewContexts
0192:             *    <code>true</code> if subagents are allowed to register objects for new
0193:             *    contexts, <code>false</code> otherwise. Default is <code>false</code>.
0194:             */
0195:            public void setAcceptNewContexts(boolean acceptNewContexts) {
0196:                this .acceptNewContexts = acceptNewContexts;
0197:            }
0198:
0199:            public void processCommand(AgentXCommandEvent event) {
0200:                boolean pendingClose = false;
0201:                if (event.isException()) {
0202:                    /**@todo implement parse exception handling*/
0203:                    LOGGER.warn("AgentX parse exception: "
0204:                            + event.getException());
0205:                } else {
0206:                    AgentXPDU pdu = event.getCommand();
0207:                    AgentXMasterSession session = getSession(pdu);
0208:                    AgentXResponsePDU response = null;
0209:                    if (LOGGER.isDebugEnabled()) {
0210:                        LOGGER.debug("Processing AgentX PDU " + pdu
0211:                                + " for session " + session);
0212:                    }
0213:                    switch (pdu.getType()) {
0214:                    case AgentXPDU.AGENTX_RESPONSE_PDU: {
0215:                        LOGGER
0216:                                .error("Internal error: received AgentX response without request");
0217:                        return;
0218:                    }
0219:                    case AgentXPDU.AGENTX_OPEN_PDU: {
0220:                        response = openSession((AgentXOpenPDU) pdu, event);
0221:                        session = getSession(response.getSessionID());
0222:                        break;
0223:                    }
0224:                    case AgentXPDU.AGENTX_CLOSE_PDU: {
0225:                        response = closeSession((AgentXClosePDU) pdu, session);
0226:                        pendingClose = true;
0227:                        break;
0228:                    }
0229:                    case AgentXPDU.AGENTX_REGISTER_PDU: {
0230:                        response = register((AgentXRegisterPDU) pdu, event,
0231:                                session);
0232:                        break;
0233:                    }
0234:                    case AgentXPDU.AGENTX_UNREGISTER_PDU: {
0235:                        response = unregister((AgentXUnregisterPDU) pdu, event,
0236:                                session);
0237:                        break;
0238:                    }
0239:                    case AgentXPDU.AGENTX_ADDAGENTCAPS_PDU: {
0240:                        response = addAgentCaps((AgentXAddAgentCapsPDU) pdu,
0241:                                session);
0242:                        break;
0243:                    }
0244:                    case AgentXPDU.AGENTX_REMOVEAGENTCAPS_PDU: {
0245:                        response = removeAgentCaps(
0246:                                (AgentXRemoveAgentCapsPDU) pdu, session);
0247:                        break;
0248:                    }
0249:                    case AgentXPDU.AGENTX_NOTIFY_PDU: {
0250:                        response = notify((AgentXNotifyPDU) pdu, session);
0251:                        break;
0252:                    }
0253:                    case AgentXPDU.AGENTX_PING_PDU: {
0254:                        response = ping((AgentXPingPDU) pdu, session);
0255:                        break;
0256:                    }
0257:                    case AgentXPDU.AGENTX_INDEXALLOCATE_PDU: {
0258:                        response = indexAllocate((AgentXIndexAllocatePDU) pdu,
0259:                                session);
0260:                        break;
0261:                    }
0262:                    case AgentXPDU.AGENTX_INDEXDEALLOCATE_PDU: {
0263:                        response = indexDeallocate(
0264:                                (AgentXIndexDeallocatePDU) pdu, session);
0265:                        break;
0266:                    }
0267:                    default:
0268:                        LOGGER.warn("Unknown AgentX PDU type received: " + pdu);
0269:                    }
0270:                    if ((response != null) && (session != null)) {
0271:                        sendResponse(response, session);
0272:                    }
0273:                    if (pendingClose) {
0274:                        if (session != null) {
0275:                            closePeer(session.getPeer());
0276:                        }
0277:                    }
0278:                }
0279:                event.setProcessed(true);
0280:            }
0281:
0282:            private void closePeer(AgentXPeer peer) {
0283:                TransportMapping transport = peer.getTransport();
0284:                if (transport instanceof  ConnectionOrientedTransportMapping) {
0285:                    try {
0286:                        if (((ConnectionOrientedTransportMapping) transport)
0287:                                .close(peer.getAddress())) {
0288:                            if (LOGGER.isInfoEnabled()) {
0289:                                LOGGER.info("Closed sub-agent connection to "
0290:                                        + peer.getAddress());
0291:                            }
0292:                        } else {
0293:                            LOGGER
0294:                                    .warn("Failed to close sub-agent connection to "
0295:                                            + peer.getAddress());
0296:                        }
0297:                    } catch (IOException ex) {
0298:                        LOGGER.error("Failed to close transport mapping "
0299:                                + peer.getTransport() + " because: "
0300:                                + ex.getMessage(), ex);
0301:                    }
0302:                }
0303:            }
0304:
0305:            public AgentXResponsePDU indexDeallocate(
0306:                    AgentXIndexDeallocatePDU pdu, AgentXMasterSession session) {
0307:                AgentXResponsePDU response = createResponse(pdu, session);
0308:                boolean contextSupported = server.isContextSupported(pdu
0309:                        .getContext());
0310:                if (contextSupported) {
0311:                    VariableBinding[] vbs = pdu.getVariableBindings();
0312:                    // test index allocation
0313:                    deallocateIndexes(response, pdu, session, vbs, true);
0314:                    if (response.getErrorStatus() == AgentXProtocol.AGENTX_SUCCESS) {
0315:                        // do it on success
0316:                        deallocateIndexes(response, pdu, session, vbs, false);
0317:                        response.setVariableBindings(vbs);
0318:                    }
0319:                } else {
0320:                    response
0321:                            .setErrorStatus(AgentXProtocol.AGENTX_UNSUPPORTED_CONTEXT);
0322:                }
0323:                return response;
0324:            }
0325:
0326:            private boolean checkIfContextIsSupported(OctetString context) {
0327:                boolean contextSupported = server.isContextSupported(context);
0328:                if (LOGGER.isDebugEnabled()) {
0329:                    LOGGER.debug("Checking context '" + context
0330:                            + "' is supported");
0331:                }
0332:                if (isAcceptNewContexts() && !contextSupported) {
0333:                    server.addContext(context);
0334:                    contextSupported = server.isContextSupported(context);
0335:                    if (LOGGER.isInfoEnabled()) {
0336:                        LOGGER.info("Adding new context '" + context
0337:                                + "' on subagent request returned: "
0338:                                + contextSupported);
0339:                    }
0340:                }
0341:                return contextSupported;
0342:            }
0343:
0344:            public AgentXResponsePDU indexAllocate(AgentXIndexAllocatePDU pdu,
0345:                    AgentXMasterSession session) {
0346:                AgentXResponsePDU response = createResponse(pdu, session);
0347:                response.setVariableBindings(pdu.getVariableBindings());
0348:                boolean contextSupported = checkIfContextIsSupported(pdu
0349:                        .getContext());
0350:                if (contextSupported) {
0351:                    VariableBinding[] vbs = pdu.getVariableBindings();
0352:                    // test index allocation
0353:                    allocateIndexes(response, pdu, session, vbs, true);
0354:                    if (response.getErrorStatus() == AgentXProtocol.AGENTX_SUCCESS) {
0355:                        // do it on success
0356:                        allocateIndexes(response, pdu, session, vbs, false);
0357:                        response.setVariableBindings(vbs);
0358:                    }
0359:                } else {
0360:                    response
0361:                            .setErrorStatus(AgentXProtocol.AGENTX_UNSUPPORTED_CONTEXT);
0362:                }
0363:                return response;
0364:            }
0365:
0366:            private int allocateIndexes(AgentXResponsePDU response,
0367:                    AgentXIndexAllocatePDU pdu, AgentXMasterSession session,
0368:                    VariableBinding[] vbs, boolean testOnly) {
0369:                int status = AgentXProtocol.AGENTX_SUCCESS;
0370:                int i = 0;
0371:                for (; (i < vbs.length)
0372:                        && (status == AgentXProtocol.AGENTX_SUCCESS); i++) {
0373:                    VariableBinding vb = vbs[i];
0374:                    if (pdu.isFlagSet(AgentXProtocol.FLAG_ANY_INDEX)) {
0375:                        status = indexRegistry.anyIndex(session.getSessionID(),
0376:                                pdu.getContext(), vb, testOnly);
0377:                    } else if (pdu.isFlagSet(AgentXProtocol.FLAG_NEW_INDEX)) {
0378:                        status = indexRegistry.newIndex(session.getSessionID(),
0379:                                pdu.getContext(), vb, testOnly);
0380:                    } else {
0381:                        status = indexRegistry.allocate(session.getSessionID(),
0382:                                pdu.getContext(), vb, testOnly);
0383:                    }
0384:                }
0385:                response.setErrorStatus(status);
0386:                if (status != AgentXProtocol.AGENTX_SUCCESS) {
0387:                    response.setErrorIndex(i);
0388:                }
0389:                return status;
0390:            }
0391:
0392:            private int deallocateIndexes(AgentXResponsePDU response,
0393:                    AgentXIndexDeallocatePDU pdu, AgentXMasterSession session,
0394:                    VariableBinding[] vbs, boolean testOnly) {
0395:                int status = AgentXProtocol.AGENTX_SUCCESS;
0396:                int i = 0;
0397:                for (; (i < vbs.length)
0398:                        && (status == AgentXProtocol.AGENTX_SUCCESS); i++) {
0399:                    VariableBinding vb = vbs[i];
0400:                    status = indexRegistry.release(session.getSessionID(), pdu
0401:                            .getContext(), vb, testOnly);
0402:                }
0403:                response.setErrorStatus(status);
0404:                if (status != AgentXProtocol.AGENTX_SUCCESS) {
0405:                    response.setErrorIndex(i);
0406:                }
0407:                return status;
0408:            }
0409:
0410:            protected void processAgentXSearchResponse(AgentXPending pending,
0411:                    AgentXResponsePDU pdu) {
0412:                if (pdu.getErrorStatus() != PDU.noError) {
0413:                    processsErrorResponse(pending, pdu);
0414:                } else {
0415:                    // no error -> normal processing
0416:                    if (pending.getAgentXPDU().getType() == AgentXPDU.AGENTX_GETBULK_PDU) {
0417:                        processAgentXNextResponse(pending, pdu,
0418:                                Integer.MAX_VALUE);
0419:                    } else {
0420:                        processAgentXNextResponse(pending, pdu,
0421:                                ((AgentXRequestPDU) pending.getAgentXPDU())
0422:                                        .getRanges().length);
0423:                    }
0424:                }
0425:            }
0426:
0427:            private SubRequestIterator processAgentXNextResponse(
0428:                    AgentXPending pending, AgentXResponsePDU pdu,
0429:                    int subRequestIndexUpperBound)
0430:                    throws NoSuchElementException {
0431:                VariableBinding[] vbs = pdu.getVariableBindings();
0432:                AgentXRequestPDU axReqPDU = (AgentXRequestPDU) pending
0433:                        .getAgentXPDU();
0434:                SubRequestIterator subRequests = pending.getReferences();
0435:                for (int i = 0; (i < subRequestIndexUpperBound)
0436:                        && subRequests.hasNext(); i++) {
0437:                    SnmpSubRequest sreq = (SnmpSubRequest) subRequests
0438:                            .nextSubRequest();
0439:                    processNextSubRequest(vbs, axReqPDU, i, i, sreq);
0440:                }
0441:                return subRequests;
0442:            }
0443:
0444:            private void processNextSubRequest(VariableBinding[] vbs,
0445:                    AgentXRequestPDU axReqPDU, int vbIndex, int rangeIndex,
0446:                    SnmpSubRequest sreq) {
0447:                MOScope srange = axReqPDU.getRanges()[rangeIndex];
0448:                if (vbIndex < vbs.length) {
0449:                    VariableBinding vb = vbs[vbIndex];
0450:                    if (vb.getSyntax() == SMIConstants.EXCEPTION_END_OF_MIB_VIEW) {
0451:                        processEndOfMibView(sreq, srange, vb.getOid());
0452:                    } else if (!srange.covers(vb.getOid())) {
0453:                        processEndOfMibView(sreq, srange, null);
0454:                    } else if ((vb.isException())
0455:                            || (super .vacm.isAccessAllowed(sreq
0456:                                    .getSnmpRequest().getViewName(), vb
0457:                                    .getOid()) != VACM.VACM_OK)) {
0458:                        DefaultMOContextScope nscope = (DefaultMOContextScope) sreq
0459:                                .getScope();
0460:                        nscope.substractScope(srange);
0461:                        nscope.setUpperBound(null);
0462:                        nscope.setUpperIncluded(true);
0463:                        // reset query because scope changed!
0464:                        sreq.setQuery(null);
0465:                        sreq.getStatus().setProcessed(false);
0466:                    } else {
0467:                        sreq.getVariableBinding().setOid(vb.getOid());
0468:                        sreq.getVariableBinding().setVariable(vb.getVariable());
0469:                        sreq.getStatus().setPhaseComplete(true);
0470:                        if (LOGGER.isDebugEnabled()) {
0471:                            LOGGER.debug("Assigned next subrequest " + sreq);
0472:                        }
0473:                        // Not needed here because bulk processing does it anyway:
0474:                        sreq.updateNextRepetition();
0475:                    }
0476:                } else {
0477:                    // less VBs than expected
0478:                    processEndOfMibView(sreq, srange, null);
0479:                }
0480:            }
0481:
0482:            private static void processEndOfMibView(SnmpSubRequest sreq,
0483:                    MOScope srange, OID oid) {
0484:                if (srange.getUpperBound() == null) {
0485:                    // unbounded
0486:                    // set also all following repetitions to endOfMibView
0487:                    SubRequestIterator tail = sreq.repetitions();
0488:                    while (tail.hasNext()) {
0489:                        SubRequest sr = tail.nextSubRequest();
0490:                        if (oid == null) {
0491:                            sr.getVariableBinding().setOid(
0492:                                    srange.getLowerBound());
0493:                        } else {
0494:                            sreq.getVariableBinding().setOid(oid);
0495:                        }
0496:                        sr.getVariableBinding().setVariable(Null.endOfMibView);
0497:                        sr.getStatus().setPhaseComplete(true);
0498:                    }
0499:                    return;
0500:                } else {
0501:                    sreq.getStatus().setProcessed(false);
0502:                }
0503:                DefaultMOContextScope nscope = (DefaultMOContextScope) sreq
0504:                        .getScope();
0505:                nscope.substractScope(srange);
0506:                nscope.setUpperBound(null);
0507:                nscope.setUpperIncluded(true);
0508:                // reset query because scope changed!
0509:                sreq.setQuery(null);
0510:            }
0511:
0512:            protected void processAgentXBulkResponse(AgentXPending pending,
0513:                    AgentXResponsePDU pdu) {
0514:                if (pdu.getErrorStatus() != PDU.noError) {
0515:                    processsErrorResponse(pending, pdu);
0516:                } else {
0517:                    AgentXGetBulkPDU requestPDU = (AgentXGetBulkPDU) pending
0518:                            .getAgentXPDU();
0519:                    VariableBinding[] vbs = pdu.getVariableBindings();
0520:                    int numBindings = vbs.length;
0521:                    int repeaters = requestPDU.getRanges().length
0522:                            - requestPDU.getNonRepeaters();
0523:                    if (numBindings - requestPDU.getNonRepeaters() > requestPDU
0524:                            .getMaxRepetitions()
0525:                            * repeaters) {
0526:                        LOGGER
0527:                                .warn("Bulk response with more repetitions ("
0528:                                        + ((numBindings - requestPDU
0529:                                                .getNonRepeaters()) / repeaters)
0530:                                        + ") than max rep. "
0531:                                        + requestPDU.getMaxRepetitions());
0532:                        numBindings = requestPDU.getMaxRepetitions()
0533:                                * repeaters + requestPDU.getNonRepeaters();
0534:                    }
0535:                    if (numBindings == 0) {
0536:                        // this is IMHO outside the AgentX/SNMP spec but it is in fact
0537:                        // needed to be interoperable with NET-SNMP sub-agent
0538:                        AgentXRequestPDU axReqPDU = (AgentXRequestPDU) pending
0539:                                .getAgentXPDU();
0540:                        SubRequestIterator subRequests = pending
0541:                                .getReferences();
0542:                        for (int i = 0; subRequests.hasNext(); i++) {
0543:                            SnmpSubRequest sreq = (SnmpSubRequest) subRequests
0544:                                    .nextSubRequest();
0545:                            MOScope srange = axReqPDU.getRanges()[i];
0546:                            processEndOfMibView(sreq, srange, null);
0547:                        }
0548:                    } else {
0549:                        // process non repeaters first
0550:                        SubRequestIterator it = processAgentXNextResponse(
0551:                                pending, pdu, requestPDU.getNonRepeaters());
0552:                        int nonRep = requestPDU.getNonRepeaters();
0553:                        for (int c = 0; (c + nonRep < requestPDU.getRanges().length)
0554:                                && it.hasNext(); c++) {
0555:                            int rangeIndex = c + nonRep;
0556:                            SnmpSubRequest sreq = (SnmpSubRequest) it
0557:                                    .nextSubRequest();
0558:                            SubRequestIterator rsreq = sreq.repetitions();
0559:                            for (int r = 0; (nonRep + (r * repeaters) + c < numBindings)
0560:                                    && rsreq.hasNext(); r++) {
0561:                                SnmpSubRequest repetition = (SnmpSubRequest) rsreq
0562:                                        .nextSubRequest();
0563:                                /*
0564:                                 System.err.println("nr="+nonRep+",r="+r+",repeaters="+repeaters+
0565:                                 ",c="+c+",rangeIndex="+rangeIndex+",rep="+repetition);
0566:                                 */
0567:                                processNextSubRequest(vbs, requestPDU, nonRep
0568:                                        + (r * repeaters) + c, rangeIndex,
0569:                                        repetition);
0570:                            }
0571:                        }
0572:                    }
0573:                }
0574:            }
0575:
0576:            protected static void processsErrorResponse(AgentXPending pending,
0577:                    AgentXResponsePDU pdu) throws NoSuchElementException {
0578:                SubRequestIterator subRequests = pending.getReferences();
0579:                for (int i = 1; i < pdu.getErrorIndex(); i++) {
0580:                    if (subRequests.hasNext()) {
0581:                        subRequests.next();
0582:                    } else {
0583:                        pending.getRequest().setErrorStatus(PDU.genErr);
0584:                        return;
0585:                    }
0586:                }
0587:                if (subRequests.hasNext()) {
0588:                    SubRequest sreq = subRequests.nextSubRequest();
0589:                    RequestStatus status = sreq.getStatus();
0590:                    status.setErrorStatus(pdu.getErrorStatus());
0591:                } else {
0592:                    pending.getRequest().setErrorStatus(PDU.genErr);
0593:                }
0594:            }
0595:
0596:            private static boolean checkAgentXResponse(AgentXResponsePDU pdu,
0597:                    AgentXPending pending) {
0598:                switch (pending.getAgentXPDU().getType()) {
0599:                case AgentXPDU.AGENTX_GET_PDU:
0600:                case AgentXPDU.AGENTX_GETNEXT_PDU: {
0601:                    if (((AgentXRequestPDU) pending.getAgentXPDU()).getRanges().length != pdu
0602:                            .size()) {
0603:                        pending.getRequest().setErrorStatus(PDU.genErr);
0604:                        return false;
0605:                    }
0606:                    break;
0607:                }
0608:                default: {
0609:                    // no check?
0610:                }
0611:                }
0612:                return true;
0613:            }
0614:
0615:            protected AgentXResponsePDU ping(AgentXPingPDU pdu,
0616:                    AgentXMasterSession session) {
0617:                AgentXResponsePDU response = createResponse(pdu, session);
0618:                if (!checkIfContextIsSupported(pdu.getContext())) {
0619:                    response
0620:                            .setErrorStatus(AgentXProtocol.AGENTX_UNSUPPORTED_CONTEXT);
0621:                    return response;
0622:                }
0623:                return response;
0624:            }
0625:
0626:            protected AgentXResponsePDU notify(AgentXNotifyPDU pdu,
0627:                    AgentXMasterSession session) {
0628:                AgentXResponsePDU response = createResponse(pdu, session);
0629:                if (session != null) {
0630:                    if (!checkIfContextIsSupported(pdu.getContext())) {
0631:                        response
0632:                                .setErrorStatus(AgentXProtocol.AGENTX_UNSUPPORTED_CONTEXT);
0633:                        return response;
0634:                    }
0635:                    VariableBinding[] vbs = pdu.getVariableBindings();
0636:                    response.setVariableBindings(vbs);
0637:                    int payloadIndex = 1;
0638:                    OID trapoid = null;
0639:                    TimeTicks timestamp = new TimeTicks(
0640:                            getContextSysUpTime(DEFAULT_CONTEXT));
0641:
0642:                    if (vbs.length >= 1) {
0643:                        if (SnmpConstants.sysUpTime.equals(vbs[0].getOid())) {
0644:                            payloadIndex++;
0645:                            if ((vbs.length < 2)
0646:                                    || (!SnmpConstants.snmpTrapOID
0647:                                            .equals(vbs[1].getOid()))) {
0648:                                response
0649:                                        .setErrorStatus(AgentXProtocol.AGENTX_PROCESSING_ERROR);
0650:                                response.setErrorIndex(2);
0651:                            } else {
0652:                                timestamp = (TimeTicks) vbs[0].getVariable();
0653:                                trapoid = (OID) vbs[1].getVariable();
0654:                            }
0655:                        } else if (SnmpConstants.snmpTrapOID.equals(vbs[0]
0656:                                .getOid())) {
0657:                            trapoid = (OID) vbs[0].getVariable();
0658:                        } else {
0659:                            response
0660:                                    .setErrorStatus(AgentXProtocol.AGENTX_PROCESSING_ERROR);
0661:                            response.setErrorIndex(1);
0662:                        }
0663:                    }
0664:                    if (trapoid != null) {
0665:                        VariableBinding[] pvbs = new VariableBinding[vbs.length
0666:                                - payloadIndex];
0667:                        System.arraycopy(vbs, payloadIndex, pvbs, 0,
0668:                                pvbs.length);
0669:                        notify(pdu.getContext(), trapoid, timestamp, pvbs);
0670:                    }
0671:                }
0672:                return response;
0673:            }
0674:
0675:            protected TimeTicks getContextSysUpTime(OctetString context) {
0676:                MasterContextInfo info = (MasterContextInfo) contextInfo
0677:                        .get(context);
0678:                SysUpTime contextSysUpTime;
0679:                if (info == null) {
0680:                    MOContextScope scope = new DefaultMOContextScope(context,
0681:                            SnmpConstants.sysUpTime, true,
0682:                            SnmpConstants.sysUpTime, true);
0683:                    ManagedObject mo = server.lookup(new DefaultMOQuery(scope));
0684:                    if (mo instanceof  SysUpTime) {
0685:                        contextSysUpTime = (SysUpTime) mo;
0686:                    } else {
0687:                        /**@todo May be we can use an integer of the found object to
0688:                         * initialize the time?
0689:                         */
0690:                        LOGGER.warn("SysUpTime could not be found in '"
0691:                                + context
0692:                                + "' context, using a new instance instead");
0693:                        contextSysUpTime = new SysUpTimeImpl();
0694:                    }
0695:                    contextInfo.put(context, new MasterContextInfo(context,
0696:                            contextSysUpTime));
0697:                } else {
0698:                    contextSysUpTime = info.getUpTime();
0699:                }
0700:                if (contextSysUpTime != null) {
0701:                    return contextSysUpTime.get();
0702:                }
0703:                return null;
0704:            }
0705:
0706:            public AgentXResponsePDU addAgentCaps(AgentXAddAgentCapsPDU pdu,
0707:                    AgentXMasterSession session) {
0708:                AgentXResponsePDU response = createResponse(pdu, session);
0709:                if (session != null) {
0710:                    if (!checkIfContextIsSupported(pdu.getContext())) {
0711:                        response
0712:                                .setErrorStatus(AgentXProtocol.AGENTX_UNSUPPORTED_CONTEXT);
0713:                        return response;
0714:                    }
0715:                    AgentCapabilityList agentCaps = getAgentCaps(pdu
0716:                            .getContext());
0717:                    if (agentCaps != null) {
0718:                        OID index = agentCaps.addSysOREntry(pdu.getId(), pdu
0719:                                .getDescr());
0720:                        session.addAgentCaps(pdu.getId(), index);
0721:                    }
0722:                }
0723:                return response;
0724:            }
0725:
0726:            protected AgentCapabilityList getAgentCaps(OctetString contextName) {
0727:                MOContextScope scope = new DefaultMOContextScope(contextName,
0728:                        SnmpConstants.sysOREntry, true,
0729:                        SnmpConstants.sysOREntry, true);
0730:                ManagedObject mo = server.lookup(new DefaultMOQuery(scope));
0731:                if (mo instanceof  AgentCapabilityList) {
0732:                    return (AgentCapabilityList) mo;
0733:                } else {
0734:                    LOGGER.warn("SysOREntry managed object for context "
0735:                            + contextName + " not found, instead found: " + mo);
0736:                }
0737:                return null;
0738:            }
0739:
0740:            public AgentXResponsePDU removeAgentCaps(
0741:                    AgentXRemoveAgentCapsPDU pdu, AgentXMasterSession session) {
0742:                AgentXResponsePDU response = createResponse(pdu, session);
0743:                if (session != null) {
0744:                    OID index = session.removeAgentCaps(pdu.getId());
0745:                    AgentCapabilityList agentCaps = getAgentCaps(pdu
0746:                            .getContext());
0747:                    if (agentCaps != null) {
0748:                        Object ac = agentCaps.removeSysOREntry(index);
0749:                        if (ac == null) {
0750:                            response
0751:                                    .setErrorStatus(AgentXProtocol.AGENTX_UNKNOWN_AGENTCAPS);
0752:                        }
0753:                    } else {
0754:                        response
0755:                                .setErrorStatus(AgentXProtocol.AGENTX_UNKNOWN_AGENTCAPS);
0756:                    }
0757:                }
0758:                return response;
0759:            }
0760:
0761:            public AgentXResponsePDU closeSession(AgentXClosePDU pdu,
0762:                    AgentXMasterSession session) {
0763:                if (LOGGER.isInfoEnabled()) {
0764:                    LOGGER.info("Subagent is closing session " + session
0765:                            + " because " + pdu.getReason());
0766:                }
0767:                AgentXResponsePDU response = createResponse(pdu, session);
0768:                if (session != null) {
0769:                    removeSession(session.getSessionID());
0770:                    removeAllRegistrations(session);
0771:                    session.setClosed(true);
0772:                }
0773:                return response;
0774:            }
0775:
0776:            public void closeSession(AgentXMasterSession session, byte reason) {
0777:                if (LOGGER.isInfoEnabled()) {
0778:                    LOGGER.info("Closing sub-agent session " + session
0779:                            + " because " + reason);
0780:                }
0781:                AgentXClosePDU closePDU = new AgentXClosePDU(reason);
0782:                try {
0783:                    agentX.send(closePDU, session.createAgentXTarget(), session
0784:                            .getPeer().getTransport(), new AgentXPendingClose(
0785:                            session, closePDU), this );
0786:                } catch (IOException ex) {
0787:                    LOGGER.error(
0788:                            "Failed to send CloseSessionPDU to close session "
0789:                                    + session + ": " + ex.getMessage(), ex);
0790:                }
0791:                removeSession(session.getSessionID());
0792:                removeAllRegistrations(session);
0793:                session.setClosed(true);
0794:            }
0795:
0796:            protected synchronized void removeAllRegistrations(
0797:                    AgentXMasterSession session) {
0798:                if (LOGGER.isDebugEnabled()) {
0799:                    LOGGER.debug("Removing all registrations (out of "
0800:                            + registrations.size() + ") of session " + session);
0801:                }
0802:                for (Iterator it = registrations.iterator(); it.hasNext();) {
0803:                    AgentXRegEntry r = (AgentXRegEntry) it.next();
0804:                    if (r.getSession().equals(session)) {
0805:                        removeRegistration(r, it);
0806:                    }
0807:                }
0808:            }
0809:
0810:            protected AgentXMasterSession getSession(int sessionID) {
0811:                return (AgentXMasterSession) sessions
0812:                        .get(new Integer(sessionID));
0813:            }
0814:
0815:            protected synchronized AgentXMasterSession getSession(AgentXPDU pdu) {
0816:                int sessionID = pdu.getSessionID();
0817:                return getSession(sessionID);
0818:            }
0819:
0820:            protected AgentXResponsePDU register(AgentXRegisterPDU pdu,
0821:                    AgentXCommandEvent command, AgentXMasterSession session) {
0822:                AgentXResponsePDU response = createResponse(pdu, session);
0823:                if (session != null) {
0824:                    if (!checkIfContextIsSupported(pdu.getContext())) {
0825:                        response
0826:                                .setErrorStatus(AgentXProtocol.AGENTX_UNSUPPORTED_CONTEXT);
0827:                        return response;
0828:                    }
0829:                    AgentXRegEntry regEntry = new AgentXRegEntry(session, pdu
0830:                            .getRegion(), pdu.getPriority(), pdu.getContext(),
0831:                            pdu.getTimeout());
0832:                    if (isDuplicate(regEntry)) {
0833:                        response
0834:                                .setErrorStatus(AgentXProtocol.AGENTX_DUPLICATE_REGISTRATION);
0835:                        return response;
0836:                    }
0837:                    AgentXMasterEvent event = new AgentXMasterEvent(this ,
0838:                            AgentXMasterEvent.REGISTRATION_TO_ADD, regEntry);
0839:                    fireMasterChanged(event);
0840:                    if (event.getVetoReason() == AgentXProtocol.AGENTX_SUCCESS) {
0841:                        try {
0842:                            addRegistration(regEntry);
0843:                        } catch (DuplicateRegistrationException drex) {
0844:                            if (LOGGER.isDebugEnabled()) {
0845:                                drex.printStackTrace();
0846:                            }
0847:                            response
0848:                                    .setErrorStatus(AgentXProtocol.AGENTX_DUPLICATE_REGISTRATION);
0849:                            return response;
0850:                        }
0851:                    } else {
0852:                        response.setErrorStatus(event.getVetoReason());
0853:                    }
0854:                }
0855:                return response;
0856:            }
0857:
0858:            protected AgentXResponsePDU unregister(AgentXUnregisterPDU pdu,
0859:                    AgentXCommandEvent event, AgentXMasterSession session) {
0860:                AgentXResponsePDU response = createResponse(pdu, session);
0861:                if (session != null) {
0862:                    AgentXRegEntry regEntry = new AgentXRegEntry(session, pdu
0863:                            .getRegion(), pdu.getPriority(), pdu.getContext(),
0864:                            pdu.getTimeout());
0865:                    boolean found = false;
0866:                    for (Iterator it = registrations.iterator(); it.hasNext();) {
0867:                        AgentXRegEntry r = (AgentXRegEntry) it.next();
0868:                        if (r.equals(regEntry)) {
0869:                            found = true;
0870:                            if (!removeRegistration(r, it)) {
0871:                                response
0872:                                        .setErrorStatus(AgentXProtocol.AGENTX_UNKNOWN_REGISTRATION);
0873:                            }
0874:                            break;
0875:                        }
0876:                    }
0877:                    if (!found) {
0878:                        response
0879:                                .setErrorStatus(AgentXProtocol.AGENTX_UNKNOWN_REGISTRATION);
0880:                    }
0881:                }
0882:                return response;
0883:            }
0884:
0885:            protected synchronized boolean isDuplicate(
0886:                    AgentXRegEntry registration) {
0887:                if (registrations.contains(registration)) {
0888:                    if (LOGGER.isDebugEnabled()) {
0889:                        LOGGER.debug("Identical registration attempt for "
0890:                                + registration);
0891:                    }
0892:                    return true;
0893:                }
0894:                AgentXNodeQuery query = new AgentXNodeQuery(registration
0895:                        .getContext(), registration.getRegion(),
0896:                        AgentXNodeQuery.QUERY_NON_AGENTX_NODES);
0897:                ManagedObject mo = server.lookup(query);
0898:                if (mo != null) {
0899:                    // overlaps non AgentX --> return duplicate region error
0900:                    if (LOGGER.isDebugEnabled()) {
0901:                        LOGGER
0902:                                .debug("New registration is rejected as duplicate because it "
0903:                                        + "overlaps with non AgentX managed object: "
0904:                                        + mo);
0905:                    }
0906:                    return true;
0907:                }
0908:                return false;
0909:            }
0910:
0911:            protected synchronized void addRegistration(
0912:                    AgentXRegEntry registration)
0913:                    throws DuplicateRegistrationException {
0914:                registrations.add(registration);
0915:                if (registration.getRegion().isRange()) {
0916:                    AgentXRegion r = registration.getRegion();
0917:                    long start = r.getLowerBoundSubID() & 0xFFFFFFFFL;
0918:                    long stop = r.getUpperBoundSubID() & 0xFFFFFFFFL;
0919:                    if (start > stop) {
0920:                        LOGGER.warn("Empty range registration " + registration);
0921:                    } else {
0922:                        for (long s = start; s <= stop; s++) {
0923:                            OID root = new OID(r.getLowerBound());
0924:                            root.set(r.getRangeSubID() - 1, (int) s);
0925:                            AgentXRegion sr = new AgentXRegion(root, root
0926:                                    .nextPeer());
0927:                            addRegion(registration, sr);
0928:                        }
0929:                    }
0930:                } else {
0931:                    addRegion(registration, registration.getRegion());
0932:                }
0933:                AgentXMasterEvent e = new AgentXMasterEvent(this ,
0934:                        AgentXMasterEvent.REGISTRATION_ADDED, registration);
0935:                fireMasterChanged(e);
0936:            }
0937:
0938:            private static AgentXNodeQuery nextQuery(AgentXNodeQuery lastQuery,
0939:                    AgentXNode lastNode) {
0940:                if (lastNode != null) {
0941:                    lastQuery.getMutableScope().setLowerBound(
0942:                            lastNode.getScope().getUpperBound());
0943:                    lastQuery.getMutableScope().setLowerIncluded(
0944:                            !lastNode.getScope().isUpperIncluded());
0945:                }
0946:                return lastQuery;
0947:            }
0948:
0949:            protected synchronized void addRegion(AgentXRegEntry registration,
0950:                    AgentXRegion region) throws DuplicateRegistrationException {
0951:                if (region.isRange()) {
0952:                    String errText = "Regions with range cannot be added";
0953:                    LOGGER.error(errText);
0954:                    throw new IllegalArgumentException(errText);
0955:                }
0956:                AgentXNodeQuery query = new AgentXNodeQuery(registration
0957:                        .getContext(), region,
0958:                        AgentXNodeQuery.QUERY_AGENTX_NODES);
0959:                AgentXNode lastNode = null;
0960:                AgentXNode node = (AgentXNode) server.lookup(query);
0961:                if (node != null) {
0962:                    LinkedList splitted = new LinkedList();
0963:                    AgentXRegion r1 = new AgentXRegion(region);
0964:                    for (; (node != null); node = (AgentXNode) server
0965:                            .lookup(nextQuery(query, lastNode))) {
0966:                        AgentXRegion r2 = new AgentXRegion(node.getScope()
0967:                                .getLowerBound(), node.getScope()
0968:                                .getUpperBound());
0969:                        if (LOGGER.isDebugEnabled()) {
0970:                            LOGGER.debug("Affected region r2=" + r2
0971:                                    + " from registered region r1=" + r1);
0972:                        }
0973:                        if (r2.covers(r1)) {
0974:                            if (LOGGER.isDebugEnabled()) {
0975:                                LOGGER.debug("Region r2 covers r1 (r1=" + r1
0976:                                        + ",r2=" + r2 + ")");
0977:                            }
0978:                            oldRegionCoversNew(registration, node, splitted,
0979:                                    r1, r2);
0980:                            r1 = null;
0981:                        } else if (r1.covers(r2)) {
0982:                            if (LOGGER.isDebugEnabled()) {
0983:                                LOGGER.debug("Region r1 covers r2 (r1=" + r1
0984:                                        + ",r2=" + r2 + ")");
0985:                            }
0986:                            r1 = newRegionCoversOld(registration, lastNode,
0987:                                    node, splitted, r1, r2);
0988:                        } else if ((r1.isOverlapping(r2))
0989:                                && (r2.getLowerBound().compareTo(
0990:                                        r1.getLowerBound()) < 0)) {
0991:                            if (LOGGER.isDebugEnabled()) {
0992:                                LOGGER
0993:                                        .debug("Region r1 ovelaps r2 and r2 < r1 (r1="
0994:                                                + r1 + ",r2=" + r2 + ")");
0995:                            }
0996:                            if (LOGGER.isDebugEnabled()) {
0997:                                LOGGER.debug("Shrinking node " + node + " to "
0998:                                        + r1.getLowerBound());
0999:                            }
1000:                            node.shrink(r1.getLowerBound());
1001:                            AgentXNode r2b = node.getClone(new AgentXRegion(r1
1002:                                    .getLowerBound(), r2.getUpperBound()));
1003:                            r2b.addRegistration(registration);
1004:                            splitted.add(r2b);
1005:                            r1 = new AgentXRegion(r2.getUpperBound(), r1
1006:                                    .getLowerBound());
1007:                        }
1008:                        // r1.overlaps(r2) and (r1.get_lower() < r2.get_lower())
1009:                        else {
1010:                            if (LOGGER.isDebugEnabled()) {
1011:                                LOGGER
1012:                                        .debug("Region r1 ovelaps r2 and r1 < r2 (r1="
1013:                                                + r1 + ",r2=" + r2 + ")");
1014:                            }
1015:                            if (LOGGER.isDebugEnabled()) {
1016:                                LOGGER.debug("Shrinking node " + node + " to "
1017:                                        + r1.getUpperBound());
1018:                            }
1019:                            node.shrink(r1.getUpperBound());
1020:                            AgentXNode r2b = node.getClone(new AgentXRegion(r1
1021:                                    .getUpperBound(), r2.getUpperBound()));
1022:                            node.addRegistration(registration);
1023:                            splitted.add(r2b);
1024:                            AgentXNode r1a = new AgentXNode(new AgentXRegion(r1
1025:                                    .getLowerBound(), r2.getLowerBound()),
1026:                                    registration);
1027:                            splitted.add(r1a);
1028:                            r1 = null;
1029:                        }
1030:                        if (r1 != null) {
1031:                            if (r1.isEmpty()) {
1032:                                splitted.add(new AgentXNode(region,
1033:                                        registration));
1034:                            } else {
1035:                                splitted.add(new AgentXNode(r1, registration));
1036:                            }
1037:                        }
1038:                        lastNode = node;
1039:                    }
1040:                    for (Iterator it = splitted.iterator(); it.hasNext();) {
1041:                        AgentXNode n = (AgentXNode) it.next();
1042:                        server.register(n, registration.getContext());
1043:                        if (LOGGER.isDebugEnabled()) {
1044:                            LOGGER.debug("Registered splitted AgentX node: "
1045:                                    + n);
1046:                        }
1047:                    }
1048:                } else {
1049:                    node = new AgentXNode(region, registration);
1050:                    server.register(node, registration.getContext());
1051:                    if (LOGGER.isDebugEnabled()) {
1052:                        LOGGER.debug("Registered AgentX node: " + node);
1053:                    }
1054:                }
1055:            }
1056:
1057:            protected boolean removeRegistration(AgentXRegEntry registration,
1058:                    Iterator regIterator) {
1059:                LinkedList remove = new LinkedList();
1060:                AgentXRegion queryRegion = new AgentXRegion(registration
1061:                        .getRegion());
1062:                queryRegion.setUpperIncluded(true);
1063:                AgentXNodeQuery query = new AgentXNodeQuery(registration
1064:                        .getContext(), queryRegion,
1065:                        AgentXNodeQuery.QUERY_AGENTX_NODES);
1066:                AgentXNode lastNode = null;
1067:                AgentXNode node = (AgentXNode) server.lookup(query);
1068:                if (node != null) {
1069:                    for (; (node != null); node = (AgentXNode) server
1070:                            .lookup(nextQuery(query, lastNode))) {
1071:                        if (node == lastNode) {
1072:                            break;
1073:                        }
1074:                        if ((node.removeRegistration(registration))
1075:                                && (node.getRegistrationCount() == 0)) {
1076:                            remove.add(node);
1077:                        } else {
1078:                            if ((lastNode != null)
1079:                                    && (lastNode.getRegistrationCount() == 1)
1080:                                    && (node.getRegistrationCount() == 1)
1081:                                    && (lastNode.getScope().getUpperBound()
1082:                                            .equals(node.getScope()
1083:                                                    .getLowerBound()))
1084:                                    && (node.getActiveRegistration()
1085:                                            .equals(lastNode
1086:                                                    .getActiveRegistration()))) {
1087:                                AgentXRegion r = new AgentXRegion(node
1088:                                        .getScope().getLowerBound(), lastNode
1089:                                        .getScope().getUpperBound());
1090:                                if (node.getActiveRegistration().getRegion()
1091:                                        .covers(r)) {
1092:                                    remove.add(node);
1093:                                    lastNode.expand(node.getScope()
1094:                                            .getUpperBound(), false);
1095:                                }
1096:                            }
1097:                        }
1098:                        lastNode = node;
1099:                    }
1100:                } else {
1101:                    LOGGER
1102:                            .warn("A registration is removed with not associated subtree: "
1103:                                    + registration);
1104:                }
1105:                for (Iterator it = remove.iterator(); it.hasNext();) {
1106:                    AgentXNode rnode = (AgentXNode) it.next();
1107:                    server.unregister(rnode, registration.getContext());
1108:                }
1109:                if (regIterator != null) {
1110:                    regIterator.remove();
1111:                    if (LOGGER.isDebugEnabled()) {
1112:                        LOGGER.debug("Removed registration " + registration
1113:                                + " by session close, " + registrations.size()
1114:                                + " left.");
1115:                    }
1116:                    fireMasterChanged(new AgentXMasterEvent(this ,
1117:                            AgentXMasterEvent.REGISTRATION_REMOVED,
1118:                            registration));
1119:                    return true;
1120:                } else if (registrations.remove(registration)) {
1121:                    if (LOGGER.isDebugEnabled()) {
1122:                        LOGGER.debug("Removed registration " + registration
1123:                                + ", " + registrations.size() + " left.");
1124:                    }
1125:                    fireMasterChanged(new AgentXMasterEvent(this ,
1126:                            AgentXMasterEvent.REGISTRATION_REMOVED,
1127:                            registration));
1128:                    return true;
1129:                }
1130:                return false;
1131:            }
1132:
1133:            private static AgentXRegion newRegionCoversOld(
1134:                    AgentXRegEntry registration, AgentXNode lastNode,
1135:                    AgentXNode node, LinkedList splitted, AgentXRegion r1,
1136:                    AgentXRegion r2) {
1137:                AgentXNode r1a = null;
1138:                if (lastNode != null) {
1139:                    AgentXRegion r = new AgentXRegion(lastNode.getScope()
1140:                            .getUpperBound(), r2.getLowerBound());
1141:                    r1a = new AgentXNode(r, registration);
1142:                } else {
1143:                    AgentXRegion r = new AgentXRegion(r1.getLowerBound(), r2
1144:                            .getLowerBound());
1145:                    r1a = new AgentXNode(r, registration);
1146:                }
1147:                if (!splitted.isEmpty()) {
1148:                    if (LOGGER.isDebugEnabled()) {
1149:                        LOGGER.debug("Shrinking node " + splitted.getLast()
1150:                                + " to " + r2.getLowerBound());
1151:                    }
1152:                    ((AgentXNode) splitted.getLast())
1153:                            .shrink(r2.getLowerBound());
1154:                }
1155:                node.addRegistration(registration);
1156:                if ((r1.getLowerBound().equals(r2.getLowerBound()))
1157:                        || ((!splitted.isEmpty()) && (((AgentXNode) splitted
1158:                                .getLast()).getScope().equals(r1a.getScope())))) {
1159:                    r1a = null;
1160:                } else {
1161:                    splitted.add(r1a);
1162:                }
1163:                return new AgentXRegion(r2.getUpperBound(), r1.getUpperBound());
1164:            }
1165:
1166:            private static void oldRegionCoversNew(AgentXRegEntry registration,
1167:                    AgentXNode node, List splitted, AgentXRegion r1,
1168:                    AgentXRegion r2) {
1169:                AgentXRegion r = new AgentXRegion(r1.getUpperBound(), node
1170:                        .getScope().getUpperBound());
1171:                AgentXNode r2c = node.getClone(r);
1172:                if (r2.getLowerBound().equals(r1.getLowerBound())) {
1173:                    if (LOGGER.isDebugEnabled()) {
1174:                        LOGGER.debug("Shrinking node " + node + " to "
1175:                                + r1.getUpperBound());
1176:                    }
1177:                    node.shrink(r1.getUpperBound());
1178:                    node.addRegistration(registration);
1179:                } else {
1180:                    if (LOGGER.isDebugEnabled()) {
1181:                        LOGGER.debug("Shrinking node " + node + " to "
1182:                                + r1.getLowerBound());
1183:                    }
1184:                    node.shrink(r1.getLowerBound());
1185:                    AgentXNode r2b = node.getClone(r1);
1186:                    r2b.addRegistration(registration);
1187:                    splitted.add(r2b);
1188:                }
1189:                splitted.add(r2c);
1190:            }
1191:
1192:            public AgentXResponsePDU openSession(AgentXOpenPDU pdu,
1193:                    AgentXCommandEvent event) {
1194:                AgentXMasterSession session = new AgentXMasterSession(
1195:                        getNextSessionID(), agentXQueue, pdu.getSubagentID(),
1196:                        pdu.getSubagentDescr());
1197:                AgentXPeer peer = getPeer(event.getPeerAddress());
1198:                if (peer == null) {
1199:                    peer = new AgentXPeer(event.getPeerTransport(), event
1200:                            .getPeerAddress());
1201:                    addPeer(peer);
1202:                    LOGGER.warn("Added peer during session opening: " + peer
1203:                            + " (peer should have been there already due "
1204:                            + "to connection setup)");
1205:                }
1206:                session.setPeer(peer);
1207:                session.setAgentXVersion(pdu.getVersion() & 0xFF);
1208:                if (pdu.getTimeout() != 0) {
1209:                    session.setTimeout(pdu.getTimeout());
1210:                } else {
1211:                    session.setTimeout(defaultTimeout);
1212:                }
1213:                int sessionAccepted = acceptSession(session);
1214:                if (sessionAccepted == AgentXProtocol.AGENTX_SUCCESS) {
1215:                    addSession(session);
1216:                    if (LOGGER.isInfoEnabled()) {
1217:                        LOGGER.info("Session " + session + " opened from "
1218:                                + peer.getAddress());
1219:                    }
1220:                } else {
1221:                    LOGGER.warn("Session open rejected because "
1222:                            + sessionAccepted + " for " + session + " from "
1223:                            + event.getPeerAddress());
1224:                }
1225:                AgentXResponsePDU response = createResponse(pdu, session);
1226:                response.setErrorStatus((short) sessionAccepted);
1227:                return response;
1228:            }
1229:
1230:            protected synchronized void addPeer(AgentXPeer peer) {
1231:                peers.put(peer.getAddress(), peer);
1232:                fireMasterChanged(new AgentXMasterEvent(this ,
1233:                        AgentXMasterEvent.PEER_ADDED, peer));
1234:            }
1235:
1236:            protected synchronized AgentXPeer getPeer(Address address) {
1237:                return (AgentXPeer) peers.get(address);
1238:            }
1239:
1240:            protected int acceptSession(AgentXMasterSession session) {
1241:                AgentXMasterEvent event = new AgentXMasterEvent(this ,
1242:                        AgentXMasterEvent.SESSION_ADDED, session);
1243:                fireMasterChanged(event);
1244:                return event.getVetoReason();
1245:            }
1246:
1247:            protected synchronized void addSession(AgentXMasterSession session) {
1248:                sessions.put(new Integer(session.getSessionID()), session);
1249:                fireMasterChanged(new AgentXMasterEvent(this ,
1250:                        AgentXMasterEvent.SESSION_ADDED, session));
1251:            }
1252:
1253:            protected synchronized AgentXMasterSession removeSession(
1254:                    int sessionID) {
1255:                AgentXMasterSession session = (AgentXMasterSession) sessions
1256:                        .remove(new Integer(sessionID));
1257:                if (session != null) {
1258:                    fireMasterChanged(new AgentXMasterEvent(this ,
1259:                            AgentXMasterEvent.SESSION_REMOVED, session));
1260:                }
1261:                return session;
1262:            }
1263:
1264:            protected AgentXResponsePDU createResponse(AgentXPDU request,
1265:                    AgentXSession session) {
1266:                OctetString context = DEFAULT_CONTEXT;
1267:                if (request instanceof  AgentXContextPDU) {
1268:                    OctetString reqContext = ((AgentXContextPDU) request)
1269:                            .getContext();
1270:                    if (server.isContextSupported(reqContext)) {
1271:                        context = reqContext;
1272:                    }
1273:                }
1274:                AgentXResponsePDU response = new AgentXResponsePDU(
1275:                        getContextSysUpTime(context).toInt(), (short) 0,
1276:                        (short) 0);
1277:                if (session == null) {
1278:                    response.setSessionID(request.getSessionID());
1279:                    response.setErrorStatus(AgentXProtocol.AGENTX_NOT_OPEN);
1280:                } else {
1281:                    response.setSessionID(session.getSessionID());
1282:                }
1283:                response.setPacketID(request.getPacketID());
1284:                response.setTransactionID(request.getTransactionID());
1285:                response.setByteOrder(request.getByteOrder());
1286:                return response;
1287:            }
1288:
1289:            protected void sendResponse(AgentXPDU response,
1290:                    AgentXSession session) {
1291:                if (LOGGER.isDebugEnabled()) {
1292:                    LOGGER.debug("Sending AgentX response " + response
1293:                            + " to session " + session);
1294:                }
1295:                try {
1296:                    agentX.send(response, session.createAgentXTarget(), session
1297:                            .getPeer().getTransport());
1298:                } catch (IOException ex) {
1299:                    if (LOGGER.isDebugEnabled()) {
1300:                        ex.printStackTrace();
1301:                    }
1302:                    LOGGER.error("Failed to send AgentX response " + response
1303:                            + " to session " + session + " because: "
1304:                            + ex.getMessage(), ex);
1305:                }
1306:            }
1307:
1308:            public synchronized void connectionStateChanged(
1309:                    TransportStateEvent change) {
1310:                Address peerAddress = change.getPeerAddress();
1311:                switch (change.getNewState()) {
1312:                case TransportStateEvent.STATE_CLOSED:
1313:                case TransportStateEvent.STATE_DISCONNECTED_REMOTELY:
1314:                case TransportStateEvent.STATE_DISCONNECTED_TIMEOUT: {
1315:                    AgentXPeer removedPeer = removePeer(peerAddress);
1316:                    fireMasterChanged(new AgentXMasterEvent(this ,
1317:                            AgentXMasterEvent.PEER_REMOVED, removedPeer));
1318:                    break;
1319:                }
1320:                default: {
1321:                    AgentXPeer newPeer = new AgentXPeer(
1322:                            (TransportMapping) change.getSource(), peerAddress);
1323:                    addPeer(newPeer);
1324:                }
1325:                }
1326:            }
1327:
1328:            protected synchronized AgentXPeer removePeer(Address peerAddress) {
1329:                AgentXPeer peer = (AgentXPeer) peers.remove(peerAddress);
1330:                if (peer != null) {
1331:                    peer.setClosing(true);
1332:                    for (Iterator it = sessions.values().iterator(); it
1333:                            .hasNext();) {
1334:                        AgentXMasterSession session = (AgentXMasterSession) it
1335:                                .next();
1336:                        if (session.getPeer().equals(peer)) {
1337:                            it.remove();
1338:                            fireMasterChanged(new AgentXMasterEvent(this ,
1339:                                    AgentXMasterEvent.SESSION_REMOVED, session));
1340:                            indexRegistry.release(session.getSessionID());
1341:                            removeAllRegistrations(session);
1342:                            session.setClosed(true);
1343:                            if (peer.getTransport() instanceof  ConnectionOrientedTransportMapping) {
1344:                                try {
1345:                                    ((ConnectionOrientedTransportMapping) peer
1346:                                            .getTransport()).close(peer
1347:                                            .getAddress());
1348:                                } catch (IOException iox) {
1349:                                    LOGGER
1350:                                            .warn("Caught exception while closing transport: "
1351:                                                    + iox.getMessage());
1352:                                }
1353:                            }
1354:                        }
1355:                    }
1356:                    /* Optional code for debugging of registry issues:
1357:                     if (LOGGER.isDebugEnabled()) {
1358:                     if (server instanceof DefaultMOServer) {
1359:                     SortedMap registry = ((DefaultMOServer) server).getRegistry();
1360:                     System.err.println(registry.toString());
1361:                     }
1362:                     }
1363:                     */
1364:                } else {
1365:                    LOGGER.warn("Tried to remove peer with address "
1366:                            + peerAddress + " which is not part of peer list: "
1367:                            + peers);
1368:                }
1369:                return peer;
1370:            }
1371:
1372:            public byte getAgentXVersion() {
1373:                return AgentXProtocol.VERSION_1_0;
1374:            }
1375:
1376:            public synchronized void addAgentXMasterListener(
1377:                    AgentXMasterListener l) {
1378:                if (agentXMasterListeners == null) {
1379:                    agentXMasterListeners = new Vector(2);
1380:                }
1381:                agentXMasterListeners.add(l);
1382:            }
1383:
1384:            public synchronized void removeAgentXMasterListener(
1385:                    AgentXMasterListener l) {
1386:                if (agentXMasterListeners != null) {
1387:                    agentXMasterListeners.remove(l);
1388:                }
1389:            }
1390:
1391:            protected void fireMasterChanged(AgentXMasterEvent event) {
1392:                if (agentXMasterListeners != null) {
1393:                    Vector listeners = agentXMasterListeners;
1394:                    int count = listeners.size();
1395:                    for (int i = 0; i < count; i++) {
1396:                        try {
1397:                            ((AgentXMasterListener) listeners.get(i))
1398:                                    .masterChanged(event);
1399:                        } catch (Exception ex) {
1400:                            LOGGER.error("AgentXMasterListener "
1401:                                    + listeners.get(i) + " threw exception on "
1402:                                    + event + ": " + ex.getMessage(), ex);
1403:                        }
1404:                    }
1405:                }
1406:            }
1407:
1408:            protected static class AgentXRegEntryComparator implements 
1409:                    Comparator {
1410:
1411:                public int compare(Object o1, Object o2) {
1412:                    AgentXRegEntry a = (AgentXRegEntry) o1;
1413:                    AgentXRegEntry b = (AgentXRegEntry) o2;
1414:                    int c = a.getRegion().compareTo(b.getRegion());
1415:                    if (c == 0) {
1416:                        c = a.getContext().compareTo(b.getContext());
1417:                    }
1418:                    return c;
1419:                }
1420:            }
1421:
1422:            public void onResponse(AgentXResponseEvent event) {
1423:                AgentXResponsePDU pdu = event.getResponse();
1424:                AgentXPending pending = (AgentXPending) event.getUserObject();
1425:                if (LOGGER.isDebugEnabled()) {
1426:                    LOGGER.debug("Processing AgentX response " + pdu
1427:                            + " for request " + pending);
1428:                }
1429:                if (pending.getRequest() != null) {
1430:                    AgentXPending p = agentXQueue.remove(pending.getAgentXPDU()
1431:                            .getSessionID(), pending.getRequest()
1432:                            .getTransactionID());
1433:                    if (p == null) {
1434:                        LOGGER
1435:                                .warn("Pending AgentX request not found (may be timed out already): "
1436:                                        + "Received AgentX response from "
1437:                                        + event.getPeerAddress()
1438:                                        + " for request "
1439:                                        + event.getUserObject()
1440:                                        + " does not match any pending request:"
1441:                                        + pdu);
1442:                        return;
1443:                    }
1444:                }
1445:                if ((pdu == null)
1446:                        && (pending.getAgentXPDU().getType() != AgentXPDU.AGENTX_CLOSE_PDU)) {
1447:                    pending.getSession().incConsecutiveTimeouts();
1448:                    pending.getReferences().nextSubRequest().getStatus()
1449:                            .setErrorStatus(PDU.genErr);
1450:                    if (pending.getSession().getConsecutiveTimeouts() > maxConsecutiveTimeouts) {
1451:                        closeSession(pending.getSession(),
1452:                                AgentXProtocol.REASON_TIMEOUTS);
1453:                    }
1454:                }
1455:                if (pdu != null) {
1456:                    pending.getSession().clearConsecutiveTimeouts();
1457:                }
1458:                if (requestList.contains(pending.getRequest())) {
1459:                    if (pdu != null) {
1460:                        if (checkAgentXResponse(pdu, pending)) {
1461:                            switch (pending.getAgentXPDU().getType()) {
1462:                            case AgentXPDU.AGENTX_GET_PDU: {
1463:                                processAgentXGetResponse(pending, pdu);
1464:                                break;
1465:                            }
1466:                            case AgentXPDU.AGENTX_GETNEXT_PDU: {
1467:                                processAgentXGetNextResponse(pending, pdu);
1468:                                break;
1469:                            }
1470:                            case AgentXPDU.AGENTX_GETBULK_PDU: {
1471:                                processAgentXBulkResponse(pending, pdu);
1472:                                break;
1473:                            }
1474:                            case AgentXPDU.AGENTX_CLEANUPSET_PDU:
1475:                            case AgentXPDU.AGENTX_UNDOSET_PDU:
1476:                            case AgentXPDU.AGENTX_COMMITSET_PDU:
1477:                            case AgentXPDU.AGENTX_TESTSET_PDU: {
1478:                                processAgentXSetResponse(pending, pdu);
1479:                                break;
1480:                            }
1481:                            default: {
1482:                                LOGGER.warn("Unhandled AgentX response " + pdu);
1483:                            }
1484:                            }
1485:                        } else {
1486:                            LOGGER.warn("Invalid AgentX response " + pdu
1487:                                    + " on request " + pending);
1488:                        }
1489:                    }
1490:                    // reprocess SNMP request
1491:                    if (!pending.getRequest().isComplete()) {
1492:                        reprocessRequest(server, pending.getRequest());
1493:                    }
1494:                    finalizeRequest((CommandResponderEvent) pending
1495:                            .getRequest().getSource(), pending.getRequest(),
1496:                            server);
1497:                } else {
1498:                    if (pending.getAgentXPDU().getType() == AgentXPDU.AGENTX_CLOSE_PDU) {
1499:                        if (pdu != null) {
1500:                            LOGGER
1501:                                    .info("Subagent "
1502:                                            + event.getPeerAddress()
1503:                                            + " confirmed close, disconnection transport now");
1504:                        } else {
1505:                            LOGGER.info("Subagent " + event.getPeerAddress()
1506:                                    + " did not answered on session close, "
1507:                                    + "disconnection now");
1508:                        }
1509:                        AgentXPeer peer = pending.getSession().getPeer();
1510:                        if (peer != null) {
1511:                            closePeer(peer);
1512:                        }
1513:                    } else {
1514:                        LOGGER.info("Received late response " + pdu
1515:                                + " on AgentX request: " + pending);
1516:                        super .release(server, pending.getRequest());
1517:                    }
1518:                }
1519:            }
1520:
1521:            protected void processAgentXGetResponse(AgentXPending pending,
1522:                    AgentXResponsePDU pdu) {
1523:                if (pdu.getErrorStatus() != PDU.noError) {
1524:                    processsErrorResponse(pending, pdu);
1525:                } else {
1526:                    VariableBinding[] vbs = pdu.getVariableBindings();
1527:                    SubRequestIterator subRequests = pending.getReferences();
1528:                    for (int i = 0; (i < pending.getRequest().size())
1529:                            && subRequests.hasNext(); i++) {
1530:                        SnmpSubRequest sreq = (SnmpSubRequest) subRequests
1531:                                .nextSubRequest();
1532:                        sreq.getVariableBinding().setVariable(
1533:                                vbs[i].getVariable());
1534:                        sreq.getStatus().setPhaseComplete(true);
1535:                    }
1536:                }
1537:            }
1538:
1539:            protected void processAgentXGetNextResponse(AgentXPending pending,
1540:                    AgentXResponsePDU pdu) {
1541:                if (pdu.getErrorStatus() != PDU.noError) {
1542:                    processsErrorResponse(pending, pdu);
1543:                } else {
1544:                    processAgentXNextResponse(pending, pdu, pending
1545:                            .getRequest().size());
1546:                }
1547:            }
1548:
1549:            protected void processAgentXSetResponse(AgentXPending pending,
1550:                    AgentXResponsePDU pdu) {
1551:                if (pdu.getErrorStatus() != PDU.noError) {
1552:                    processsErrorResponse(pending, pdu);
1553:                } else {
1554:                    SubRequestIterator it = pending.getReferences();
1555:                    while (it.hasNext()) {
1556:                        SubRequest sreq = it.nextSubRequest();
1557:                        sreq.getStatus().setPhaseComplete(true);
1558:                    }
1559:                }
1560:            }
1561:
1562:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.