001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network.jms;
018
019import javax.jms.Connection;
020import javax.jms.Destination;
021import javax.jms.ExceptionListener;
022import javax.jms.JMSException;
023import javax.jms.Queue;
024import javax.jms.QueueConnection;
025import javax.jms.QueueConnectionFactory;
026import javax.jms.QueueSession;
027import javax.jms.Session;
028import javax.naming.NamingException;
029
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 */
035public class SimpleJmsQueueConnector extends JmsConnector {
036    private static final Logger LOG = LoggerFactory.getLogger(SimpleJmsQueueConnector.class);
037    private String outboundQueueConnectionFactoryName;
038    private String localConnectionFactoryName;
039    private QueueConnectionFactory outboundQueueConnectionFactory;
040    private QueueConnectionFactory localQueueConnectionFactory;
041    private InboundQueueBridge[] inboundQueueBridges;
042    private OutboundQueueBridge[] outboundQueueBridges;
043
044    /**
045     * @return Returns the inboundQueueBridges.
046     */
047    public InboundQueueBridge[] getInboundQueueBridges() {
048        return inboundQueueBridges;
049    }
050
051    /**
052     * @param inboundQueueBridges The inboundQueueBridges to set.
053     */
054    public void setInboundQueueBridges(InboundQueueBridge[] inboundQueueBridges) {
055        this.inboundQueueBridges = inboundQueueBridges;
056    }
057
058    /**
059     * @return Returns the outboundQueueBridges.
060     */
061    public OutboundQueueBridge[] getOutboundQueueBridges() {
062        return outboundQueueBridges;
063    }
064
065    /**
066     * @param outboundQueueBridges The outboundQueueBridges to set.
067     */
068    public void setOutboundQueueBridges(OutboundQueueBridge[] outboundQueueBridges) {
069        this.outboundQueueBridges = outboundQueueBridges;
070    }
071
072    /**
073     * @return Returns the localQueueConnectionFactory.
074     */
075    public QueueConnectionFactory getLocalQueueConnectionFactory() {
076        return localQueueConnectionFactory;
077    }
078
079    /**
080     * @param localConnectionFactory The localQueueConnectionFactory to
081     *                set.
082     */
083    public void setLocalQueueConnectionFactory(QueueConnectionFactory localConnectionFactory) {
084        this.localQueueConnectionFactory = localConnectionFactory;
085    }
086
087    /**
088     * @return Returns the outboundQueueConnectionFactory.
089     */
090    public QueueConnectionFactory getOutboundQueueConnectionFactory() {
091        return outboundQueueConnectionFactory;
092    }
093
094    /**
095     * @return Returns the outboundQueueConnectionFactoryName.
096     */
097    public String getOutboundQueueConnectionFactoryName() {
098        return outboundQueueConnectionFactoryName;
099    }
100
101    /**
102     * @param foreignQueueConnectionFactoryName The
103     *                foreignQueueConnectionFactoryName to set.
104     */
105    public void setOutboundQueueConnectionFactoryName(String foreignQueueConnectionFactoryName) {
106        this.outboundQueueConnectionFactoryName = foreignQueueConnectionFactoryName;
107    }
108
109    /**
110     * @return Returns the localConnectionFactoryName.
111     */
112    public String getLocalConnectionFactoryName() {
113        return localConnectionFactoryName;
114    }
115
116    /**
117     * @param localConnectionFactoryName The localConnectionFactoryName to set.
118     */
119    public void setLocalConnectionFactoryName(String localConnectionFactoryName) {
120        this.localConnectionFactoryName = localConnectionFactoryName;
121    }
122
123    /**
124     * @return Returns the localQueueConnection.
125     */
126    public QueueConnection getLocalQueueConnection() {
127        return (QueueConnection) localConnection.get();
128    }
129
130    /**
131     * @param localQueueConnection The localQueueConnection to set.
132     */
133    public void setLocalQueueConnection(QueueConnection localQueueConnection) {
134        this.localConnection.set(localQueueConnection);
135    }
136
137    /**
138     * @return Returns the outboundQueueConnection.
139     */
140    public QueueConnection getOutboundQueueConnection() {
141        return (QueueConnection) foreignConnection.get();
142    }
143
144    /**
145     * @param foreignQueueConnection The foreignQueueConnection to set.
146     */
147    public void setOutboundQueueConnection(QueueConnection foreignQueueConnection) {
148        this.foreignConnection.set(foreignQueueConnection);
149    }
150
151    /**
152     * @param foreignQueueConnectionFactory The foreignQueueConnectionFactory to set.
153     */
154    public void setOutboundQueueConnectionFactory(QueueConnectionFactory foreignQueueConnectionFactory) {
155        this.outboundQueueConnectionFactory = foreignQueueConnectionFactory;
156    }
157
158    @Override
159    protected void initializeForeignConnection() throws NamingException, JMSException {
160
161        QueueConnection newConnection = null;
162
163        try {
164            if (foreignConnection.get() == null) {
165                // get the connection factories
166                if (outboundQueueConnectionFactory == null) {
167                    // look it up from JNDI
168                    if (outboundQueueConnectionFactoryName != null) {
169                        outboundQueueConnectionFactory = jndiOutboundTemplate
170                            .lookup(outboundQueueConnectionFactoryName, QueueConnectionFactory.class);
171                        if (outboundUsername != null) {
172                            newConnection = outboundQueueConnectionFactory
173                                .createQueueConnection(outboundUsername, outboundPassword);
174                        } else {
175                            newConnection = outboundQueueConnectionFactory.createQueueConnection();
176                        }
177                    } else {
178                        throw new JMSException("Cannot create foreignConnection - no information");
179                    }
180                } else {
181                    if (outboundUsername != null) {
182                        newConnection = outboundQueueConnectionFactory
183                            .createQueueConnection(outboundUsername, outboundPassword);
184                    } else {
185                        newConnection = outboundQueueConnectionFactory.createQueueConnection();
186                    }
187                }
188            } else {
189                // Clear if for now in case something goes wrong during the init.
190                newConnection = (QueueConnection) foreignConnection.getAndSet(null);
191            }
192
193            // Register for any async error notifications now so we can reset in the
194            // case where there's not a lot of activity and a connection drops.
195            newConnection.setExceptionListener(new ExceptionListener() {
196                @Override
197                public void onException(JMSException exception) {
198                    handleConnectionFailure(foreignConnection.get());
199                }
200            });
201
202            if (outboundClientId != null && outboundClientId.length() > 0) {
203                newConnection.setClientID(getOutboundClientId());
204            }
205            newConnection.start();
206
207            outboundMessageConvertor.setConnection(newConnection);
208
209            // Configure the bridges with the new Outbound connection.
210            initializeInboundDestinationBridgesOutboundSide(newConnection);
211            initializeOutboundDestinationBridgesOutboundSide(newConnection);
212
213            // At this point all looks good, so this our current connection now.
214            foreignConnection.set(newConnection);
215        } catch (Exception ex) {
216            if (newConnection != null) {
217                try {
218                    newConnection.close();
219                } catch (Exception ignore) {}
220            }
221
222            throw ex;
223        }
224    }
225
226    @Override
227    protected void initializeLocalConnection() throws NamingException, JMSException {
228
229        QueueConnection newConnection = null;
230
231        try {
232            if (localConnection.get() == null) {
233                // get the connection factories
234                if (localQueueConnectionFactory == null) {
235                    if (embeddedConnectionFactory == null) {
236                        // look it up from JNDI
237                        if (localConnectionFactoryName != null) {
238                            localQueueConnectionFactory = jndiLocalTemplate
239                                .lookup(localConnectionFactoryName, QueueConnectionFactory.class);
240                            if (localUsername != null) {
241                                newConnection = localQueueConnectionFactory
242                                    .createQueueConnection(localUsername, localPassword);
243                            } else {
244                                newConnection = localQueueConnectionFactory.createQueueConnection();
245                            }
246                        } else {
247                            throw new JMSException("Cannot create localConnection - no information");
248                        }
249                    } else {
250                        newConnection = embeddedConnectionFactory.createQueueConnection();
251                    }
252                } else {
253                    if (localUsername != null) {
254                        newConnection = localQueueConnectionFactory.
255                                createQueueConnection(localUsername, localPassword);
256                    } else {
257                        newConnection = localQueueConnectionFactory.createQueueConnection();
258                    }
259                }
260
261            } else {
262                // Clear if for now in case something goes wrong during the init.
263                newConnection = (QueueConnection) localConnection.getAndSet(null);
264            }
265
266            // Register for any async error notifications now so we can reset in the
267            // case where there's not a lot of activity and a connection drops.
268            newConnection.setExceptionListener(new ExceptionListener() {
269                @Override
270                public void onException(JMSException exception) {
271                    handleConnectionFailure(localConnection.get());
272                }
273            });
274
275            if (localClientId != null && localClientId.length() > 0) {
276                newConnection.setClientID(getLocalClientId());
277            }
278            newConnection.start();
279
280            inboundMessageConvertor.setConnection(newConnection);
281
282            // Configure the bridges with the new Local connection.
283            initializeInboundDestinationBridgesLocalSide(newConnection);
284            initializeOutboundDestinationBridgesLocalSide(newConnection);
285
286            // At this point all looks good, so this our current connection now.
287            localConnection.set(newConnection);
288        } catch (Exception ex) {
289            if (newConnection != null) {
290                try {
291                    newConnection.close();
292                } catch (Exception ignore) {}
293            }
294
295            throw ex;
296        }
297    }
298
299    protected void initializeInboundDestinationBridgesOutboundSide(QueueConnection connection) throws JMSException {
300        if (inboundQueueBridges != null) {
301            QueueSession outboundSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
302
303            for (InboundQueueBridge bridge : inboundQueueBridges) {
304                String queueName = bridge.getInboundQueueName();
305                Queue foreignQueue = createForeignQueue(outboundSession, queueName);
306                bridge.setConsumer(null);
307                bridge.setConsumerQueue(foreignQueue);
308                bridge.setConsumerConnection(connection);
309                bridge.setJmsConnector(this);
310                addInboundBridge(bridge);
311            }
312            outboundSession.close();
313        }
314    }
315
316    protected void initializeInboundDestinationBridgesLocalSide(QueueConnection connection) throws JMSException {
317        if (inboundQueueBridges != null) {
318            QueueSession localSession = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
319
320            for (InboundQueueBridge bridge : inboundQueueBridges) {
321                String localQueueName = bridge.getLocalQueueName();
322                Queue activemqQueue = createActiveMQQueue(localSession, localQueueName);
323                bridge.setProducerQueue(activemqQueue);
324                bridge.setProducerConnection(connection);
325                if (bridge.getJmsMessageConvertor() == null) {
326                    bridge.setJmsMessageConvertor(getInboundMessageConvertor());
327                }
328                bridge.setJmsConnector(this);
329                addInboundBridge(bridge);
330            }
331            localSession.close();
332        }
333    }
334
335    protected void initializeOutboundDestinationBridgesOutboundSide(QueueConnection connection) throws JMSException {
336        if (outboundQueueBridges != null) {
337            QueueSession outboundSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
338
339            for (OutboundQueueBridge bridge : outboundQueueBridges) {
340                String queueName = bridge.getOutboundQueueName();
341                Queue foreignQueue = createForeignQueue(outboundSession, queueName);
342                bridge.setProducerQueue(foreignQueue);
343                bridge.setProducerConnection(connection);
344                if (bridge.getJmsMessageConvertor() == null) {
345                    bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
346                }
347                bridge.setJmsConnector(this);
348                addOutboundBridge(bridge);
349            }
350            outboundSession.close();
351        }
352    }
353
354    protected void initializeOutboundDestinationBridgesLocalSide(QueueConnection connection) throws JMSException {
355        if (outboundQueueBridges != null) {
356            QueueSession localSession =
357                    connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
358
359            for (OutboundQueueBridge bridge : outboundQueueBridges) {
360                String localQueueName = bridge.getLocalQueueName();
361                Queue activemqQueue = createActiveMQQueue(localSession, localQueueName);
362                bridge.setConsumer(null);
363                bridge.setConsumerQueue(activemqQueue);
364                bridge.setConsumerConnection(connection);
365                bridge.setJmsConnector(this);
366                addOutboundBridge(bridge);
367            }
368            localSession.close();
369        }
370    }
371
372    @Override
373    protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection,
374                                              Connection replyToConsumerConnection) {
375        Queue replyToProducerQueue = (Queue)destination;
376        boolean isInbound = replyToProducerConnection.equals(localConnection.get());
377
378        if (isInbound) {
379            InboundQueueBridge bridge = (InboundQueueBridge)replyToBridges.get(replyToProducerQueue);
380            if (bridge == null) {
381                bridge = new InboundQueueBridge() {
382                    @Override
383                    protected Destination processReplyToDestination(Destination destination) {
384                        return null;
385                    }
386                };
387                try {
388                    QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection)
389                        .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
390                    Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue();
391                    replyToConsumerSession.close();
392                    bridge.setConsumerQueue(replyToConsumerQueue);
393                    bridge.setProducerQueue(replyToProducerQueue);
394                    bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
395                    bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
396                    bridge.setDoHandleReplyTo(false);
397                    if (bridge.getJmsMessageConvertor() == null) {
398                        bridge.setJmsMessageConvertor(getInboundMessageConvertor());
399                    }
400                    bridge.setJmsConnector(this);
401                    bridge.start();
402                    LOG.info("Created replyTo bridge for {}", replyToProducerQueue);
403                } catch (Exception e) {
404                    LOG.error("Failed to create replyTo bridge for queue: {}", replyToProducerQueue, e);
405                    return null;
406                }
407                replyToBridges.put(replyToProducerQueue, bridge);
408            }
409            return bridge.getConsumerQueue();
410        } else {
411            OutboundQueueBridge bridge = (OutboundQueueBridge)replyToBridges.get(replyToProducerQueue);
412            if (bridge == null) {
413                bridge = new OutboundQueueBridge() {
414                    @Override
415                    protected Destination processReplyToDestination(Destination destination) {
416                        return null;
417                    }
418                };
419                try {
420                    QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection)
421                        .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
422                    Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue();
423                    replyToConsumerSession.close();
424                    bridge.setConsumerQueue(replyToConsumerQueue);
425                    bridge.setProducerQueue(replyToProducerQueue);
426                    bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
427                    bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
428                    bridge.setDoHandleReplyTo(false);
429                    if (bridge.getJmsMessageConvertor() == null) {
430                        bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
431                    }
432                    bridge.setJmsConnector(this);
433                    bridge.start();
434                    LOG.info("Created replyTo bridge for {}", replyToProducerQueue);
435                } catch (Exception e) {
436                    LOG.error("Failed to create replyTo bridge for queue: {}", replyToProducerQueue, e);
437                    return null;
438                }
439                replyToBridges.put(replyToProducerQueue, bridge);
440            }
441            return bridge.getConsumerQueue();
442        }
443    }
444
445    protected Queue createActiveMQQueue(QueueSession session, String queueName) throws JMSException {
446        return session.createQueue(queueName);
447    }
448
449    protected Queue createForeignQueue(QueueSession session, String queueName) throws JMSException {
450        Queue result = null;
451
452        if (preferJndiDestinationLookup) {
453            try {
454                // look-up the Queue
455                result = jndiOutboundTemplate.lookup(queueName, Queue.class);
456            } catch (NamingException e) {
457                try {
458                    result = session.createQueue(queueName);
459                } catch (JMSException e1) {
460                    String errStr = "Failed to look-up or create Queue for name: " + queueName;
461                    LOG.error(errStr, e);
462                    JMSException jmsEx = new JMSException(errStr);
463                    jmsEx.setLinkedException(e1);
464                    throw jmsEx;
465                }
466            }
467        } else {
468            try {
469                result = session.createQueue(queueName);
470            } catch (JMSException e) {
471                // look-up the Queue
472                try {
473                    result = jndiOutboundTemplate.lookup(queueName, Queue.class);
474                } catch (NamingException e1) {
475                    String errStr = "Failed to look-up Queue for name: " + queueName;
476                    LOG.error(errStr, e);
477                    JMSException jmsEx = new JMSException(errStr);
478                    jmsEx.setLinkedException(e1);
479                    throw jmsEx;
480                }
481            }
482        }
483
484        return result;
485    }
486}