001: package distributedcache;
002:
003: import java.io.IOException;
004: import java.io.Serializable;
005: import java.util.ArrayList;
006: import java.util.Arrays;
007: import java.util.Collections;
008: import java.util.HashMap;
009: import java.util.List;
010: import java.util.Map;
011: import java.util.Set;
012: import java.util.logging.Level;
013: import java.util.logging.Logger;
014:
015: import org.xsocket.group.Address;
016: import org.xsocket.group.IGroupEndpoint;
017: import org.xsocket.group.ObjectMessage;
018:
019: final class MultiRpcCaller {
020:
021: private static final Logger LOG = Logger
022: .getLogger(MultiRpcCaller.class.getName());
023:
024: private IGroupEndpoint endpoint = null;
025: private final Map<Long, Call> openRequests = Collections
026: .synchronizedMap(new HashMap<Long, Call>());
027:
028: MultiRpcCaller(IGroupEndpoint endpoint) {
029: this .endpoint = endpoint;
030: }
031:
032: List<Response> call(Request request) throws IOException {
033: Set<Address> peers = endpoint.getPeerAddresses();
034: return call(request, peers.toArray(new Address[peers.size()]));
035: }
036:
037: List<Response> call(Request request,
038: Address... destinationAddresses) throws IOException {
039: Call call = new Call(request, destinationAddresses);
040: return call.perform();
041: }
042:
043: boolean handleResponse(ObjectMessage responseMsg)
044: throws IOException {
045: if (responseMsg.getObject() instanceof Response) {
046: Response response = (Response) responseMsg.getObject();
047: Call call = openRequests.get(response.getRequestMsgId());
048: if (call != null) {
049: call.registerResponse(response, responseMsg
050: .getSourceAddress());
051: } else {
052: LOG
053: .warning("got response message for unknown request (msgID="
054: + response.getRequestMsgId() + ")");
055: }
056: return true;
057:
058: } else {
059: return false;
060: }
061: }
062:
063: interface Request extends Serializable {
064:
065: }
066:
067: interface Response extends Serializable {
068: long getRequestMsgId();
069: }
070:
071: private final class Call {
072:
073: private final List<Address> notYetResponded = new ArrayList<Address>();
074: private final List<Response> responses = new ArrayList<Response>();
075: private ObjectMessage requestMsg = null;
076:
077: Call(Request request, Address... destinationAddresses)
078: throws IOException {
079: this .notYetResponded.addAll(Arrays
080: .asList(destinationAddresses));
081:
082: requestMsg = endpoint.createObjectMessage(request);
083: for (Address address : destinationAddresses) {
084: requestMsg.addDestinationAddress(address);
085: }
086: }
087:
088: synchronized List<Response> perform() throws IOException {
089: openRequests.put(requestMsg.getId(), this );
090:
091: endpoint.send(requestMsg);
092:
093: while (!notYetResponded.isEmpty()) {
094: try {
095: wait();
096: } catch (InterruptedException ignore) {
097: }
098: }
099:
100: openRequests.remove(this );
101: return responses;
102: }
103:
104: synchronized void registerResponse(Response response,
105: Address sourceAddress) {
106: responses.add(response);
107: notYetResponded.remove(sourceAddress);
108: if (LOG.isLoggable(Level.FINE)) {
109: LOG.fine("got response for msgId "
110: + response.getRequestMsgId()
111: + " waiting for remaining "
112: + notYetResponded.size());
113: }
114:
115: if (notYetResponded.isEmpty()) {
116: notifyAll();
117: }
118: }
119: }
120: }
|