001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network.jms; 018 019import java.util.concurrent.atomic.AtomicBoolean; 020 021import javax.jms.Connection; 022import javax.jms.Destination; 023import javax.jms.JMSException; 024import javax.jms.Message; 025import javax.jms.MessageConsumer; 026import javax.jms.MessageListener; 027import javax.jms.MessageProducer; 028 029import org.apache.activemq.Service; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * A Destination bridge is used to bridge between to different JMS systems 035 */ 036public abstract class DestinationBridge implements Service, MessageListener { 037 038 private static final Logger LOG = LoggerFactory.getLogger(DestinationBridge.class); 039 040 protected MessageConsumer consumer; 041 protected AtomicBoolean started = new AtomicBoolean(false); 042 protected JmsMesageConvertor jmsMessageConvertor; 043 protected boolean doHandleReplyTo = true; 044 protected JmsConnector jmsConnector; 045 046 /** 047 * @return Returns the consumer. 048 */ 049 public MessageConsumer getConsumer() { 050 return consumer; 051 } 052 053 /** 054 * @param consumer The consumer to set. 055 */ 056 public void setConsumer(MessageConsumer consumer) { 057 this.consumer = consumer; 058 } 059 060 /** 061 * @param connector 062 */ 063 public void setJmsConnector(JmsConnector connector) { 064 this.jmsConnector = connector; 065 } 066 067 /** 068 * @return Returns the inboundMessageConvertor. 069 */ 070 public JmsMesageConvertor getJmsMessageConvertor() { 071 return jmsMessageConvertor; 072 } 073 074 /** 075 * @param jmsMessageConvertor 076 */ 077 public void setJmsMessageConvertor(JmsMesageConvertor jmsMessageConvertor) { 078 this.jmsMessageConvertor = jmsMessageConvertor; 079 } 080 081 protected Destination processReplyToDestination(Destination destination) { 082 return jmsConnector.createReplyToBridge(destination, getConnnectionForConsumer(), getConnectionForProducer()); 083 } 084 085 @Override 086 public void start() throws Exception { 087 if (started.compareAndSet(false, true)) { 088 createConsumer(); 089 createProducer(); 090 } 091 } 092 093 @Override 094 public void stop() throws Exception { 095 started.set(false); 096 } 097 098 @Override 099 public void onMessage(Message message) { 100 101 int attempt = 0; 102 final int maxRetries = jmsConnector.getReconnectionPolicy().getMaxSendRetries(); 103 104 while (started.get() && message != null && (maxRetries == ReconnectionPolicy.INFINITE || attempt <= maxRetries)) { 105 106 try { 107 108 if (attempt++ > 0) { 109 try { 110 Thread.sleep(jmsConnector.getReconnectionPolicy().getNextDelay(attempt)); 111 } catch(InterruptedException e) { 112 break; 113 } 114 } 115 116 Message converted; 117 if (jmsMessageConvertor != null) { 118 if (doHandleReplyTo) { 119 Destination replyTo = message.getJMSReplyTo(); 120 if (replyTo != null) { 121 converted = jmsMessageConvertor.convert(message, processReplyToDestination(replyTo)); 122 } else { 123 converted = jmsMessageConvertor.convert(message); 124 } 125 } else { 126 message.setJMSReplyTo(null); 127 converted = jmsMessageConvertor.convert(message); 128 } 129 } else { 130 // The Producer side is not up or not yet configured, retry. 131 continue; 132 } 133 134 try { 135 sendMessage(converted); 136 } catch(Exception e) { 137 jmsConnector.handleConnectionFailure(getConnectionForProducer()); 138 continue; 139 } 140 141 try { 142 message.acknowledge(); 143 } catch(Exception e) { 144 jmsConnector.handleConnectionFailure(getConnnectionForConsumer()); 145 continue; 146 } 147 148 // if we got here then it made it out and was ack'd 149 return; 150 151 } catch (Exception e) { 152 LOG.info("failed to forward message on attempt: {} reason: {} message: {}", new Object[]{ attempt, e, message }, e); 153 } 154 } 155 } 156 157 /** 158 * @return Returns the doHandleReplyTo. 159 */ 160 public boolean isDoHandleReplyTo() { 161 return doHandleReplyTo; 162 } 163 164 /** 165 * @param doHandleReplyTo The doHandleReplyTo to set. 166 */ 167 public void setDoHandleReplyTo(boolean doHandleReplyTo) { 168 this.doHandleReplyTo = doHandleReplyTo; 169 } 170 171 protected abstract MessageConsumer createConsumer() throws JMSException; 172 173 protected abstract MessageProducer createProducer() throws JMSException; 174 175 protected abstract void sendMessage(Message message) throws JMSException; 176 177 protected abstract Connection getConnnectionForConsumer(); 178 179 protected abstract Connection getConnectionForProducer(); 180 181}