01: /**
02: * Licensed to the Apache Software Foundation (ASF) under one
03: * or more contributor license agreements. See the NOTICE file
04: * distributed with this work for additional information
05: * regarding copyright ownership. The ASF licenses this file
06: * to you under the Apache License, Version 2.0 (the
07: * "License"); you may not use this file except in compliance
08: * with the License. You may obtain a copy of the License at
09: *
10: * http://www.apache.org/licenses/LICENSE-2.0
11: *
12: * Unless required by applicable law or agreed to in writing,
13: * software distributed under the License is distributed on an
14: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15: * KIND, either express or implied. See the License for the
16: * specific language governing permissions and limitations
17: * under the License.
18: */package org.apache.cxf.ws.rm;
19:
20: import java.io.ByteArrayOutputStream;
21: import java.io.OutputStream;
22: import java.util.logging.Logger;
23:
24: import org.apache.cxf.common.logging.LogUtils;
25: import org.apache.cxf.interceptor.Fault;
26: import org.apache.cxf.io.CachedOutputStream;
27: import org.apache.cxf.io.CachedOutputStreamCallback;
28: import org.apache.cxf.message.Message;
29: import org.apache.cxf.message.MessageUtils;
30: import org.apache.cxf.ws.addressing.AddressingProperties;
31: import org.apache.cxf.ws.rm.persistence.RMMessage;
32: import org.apache.cxf.ws.rm.persistence.RMStore;
33:
34: /**
35: *
36: */
37: public class RetransmissionCallback implements
38: CachedOutputStreamCallback {
39:
40: private static final Logger LOG = LogUtils
41: .getL7dLogger(RetransmissionCallback.class);
42:
43: Message message;
44: RMManager manager;
45:
46: RetransmissionCallback(Message m, RMManager mgr) {
47: message = m;
48: manager = mgr;
49: }
50:
51: public void onClose(CachedOutputStream cos) {
52: OutputStream os = cos.getOut();
53:
54: if (os instanceof ByteArrayOutputStream) {
55: ByteArrayOutputStream bos = (ByteArrayOutputStream) os;
56: message.put(RMMessageConstants.SAVED_OUTPUT_STREAM, bos);
57: manager.getRetransmissionQueue().addUnacknowledged(message);
58:
59: RMStore store = manager.getStore();
60: if (null != store) {
61: Source s = manager.getSource(message);
62: RMProperties rmps = RMContextUtils
63: .retrieveRMProperties(message, true);
64: Identifier sid = rmps.getSequence().getIdentifier();
65: SourceSequence ss = s.getSequence(sid);
66: RMMessage msg = new RMMessage();
67: msg.setMessageNumber(rmps.getSequence()
68: .getMessageNumber());
69: if (!MessageUtils.isRequestor(message)) {
70: AddressingProperties maps = RMContextUtils
71: .retrieveMAPs(message, false, true);
72: if (null != maps && null != maps.getTo()) {
73: msg.setTo(maps.getTo().getValue());
74: }
75: }
76: msg.setContent(bos.toByteArray());
77: store.persistOutgoing(ss, msg);
78: }
79: } else {
80: throw new Fault(new org.apache.cxf.common.i18n.Message(
81: "NO_CACHED_STREAM", LOG, os.getClass()));
82: }
83: }
84:
85: public void onFlush(CachedOutputStream cos) {
86:
87: }
88: }
|