001: /*
002: * JacORB - a free Java ORB
003: *
004: * Copyright (C) 1999-2004 Gerald Brose
005: *
006: * This library is free software; you can redistribute it and/or
007: * modify it under the terms of the GNU Library General Public
008: * License as published by the Free Software Foundation; either
009: * version 2 of the License, or (at your option) any later version.
010: *
011: * This library is distributed in the hope that it will be useful,
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
014: * Library General Public License for more details.
015: *
016: * You should have received a copy of the GNU Library General Public
017: * License along with this library; if not, write to the Free
018: * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
019: *
020: */
021:
022: package org.jacorb.notification.servant;
023:
024: import org.jacorb.notification.conf.Default;
025: import org.omg.CosEventComm.Disconnected;
026:
027: import edu.emory.mathcs.backport.java.util.concurrent.Semaphore;
028: import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
029:
030: public class PullMessagesOperation {
031: /**
032: * Total number of pull-Operations
033: */
034: private int pullCounter_;
035:
036: /**
037: * Total time spent within pull-Operations
038: */
039: private long timeSpentInPull_;
040:
041: /**
042: * Total number of successful pull-Operations
043: */
044: private int successfulPullCounter_;
045:
046: private final MessageSupplierDelegate delegate_;
047:
048: private final Semaphore pullSync_ = new Semaphore(
049: Default.DEFAULT_CONCURRENT_PULL_OPERATIONS_ALLOWED);
050:
051: public PullMessagesOperation(MessageSupplierDelegate delegate) {
052: delegate_ = delegate;
053: }
054:
055: public void runPull() throws Disconnected {
056: if (!delegate_.getConnected()) {
057: throw new Disconnected();
058: }
059:
060: if (delegate_.isSuspended()) {
061: return;
062: }
063:
064: runPullInternal();
065: }
066:
067: private void runPullInternal() throws Disconnected {
068: try {
069: boolean _acquired = pullSync_.tryAcquire(1000,
070: TimeUnit.MILLISECONDS);
071:
072: if (_acquired) {
073: final MessageSupplierDelegate.PullResult _data;
074: final long _now = System.currentTimeMillis();
075:
076: try {
077: _data = delegate_.pullMessages();
078: } finally {
079: pullSync_.release();
080: timeSpentInPull_ += (System.currentTimeMillis() - _now);
081: }
082:
083: ++pullCounter_;
084:
085: if (_data.success_) {
086: ++successfulPullCounter_;
087: delegate_.queueMessages(_data);
088: }
089: }
090: } catch (InterruptedException e) {
091: // ignored
092: // TODO log
093: }
094: }
095:
096: public int getPullCounter() {
097: return pullCounter_;
098: }
099:
100: public int getSuccessfulPullCounter() {
101: return successfulPullCounter_;
102: }
103:
104: public long getTimeSpentInPull() {
105: return timeSpentInPull_;
106: }
107: }
|