Spring JMS


JMS – technologia, którą ugryzłem podczas przygotowań do SCBCD, lecz później odeszła w niepamięć. Na szkoleniu “Developing Architectures for Enterprise Java Applications” wspominano o niej namiętnie w ramach zagadnień integracji systemów B2B. Stąd ten post i wewnętrzna konieczność poznania Spring JMS. Zaprezentuję jak za pomocą framework’u Spring wstawiać i pobierać wiadomości z kolejki serwera aplikacyjnego JBoss 5.1.0. GA z poziomu “grubego” klienta. W dalszej części artykułu poruszę także zagadnienie “współdzielenia” transakcji między systemami JMS i bazami danych RDBMS.

Tworzenie kolejki na serwerze JBoss

Przejdź do katalogu $JBOSS_HOME/server/default/deploy/, a następnie wyedytuj nowy plik o nazwie hrjob-destination-service.xml. Jego finalną zawartość prezentuje poniższy listing.

<?xml version="1.0" encoding="UTF-8"?>
<server>
  <mbean code="org.jboss.jms.server.destination.QueueService"
         name="jboss.messaging.destination:service=Queue,name=HrJobQueue"
         xmbean-dd="xmdesc/Queue-xmbean.xml">
    <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
    <depends>jboss.messaging:service=PostOffice</depends>
  </mbean>
</server>

Tym samym wykorzystaliśmy funkcję Hot Deploy’u serwisów JBoss (dokumentacja). Mam nadzieję, że wszystko poszło pomyślnie. Jeśli nie, tzn. nie widzicie nowej kolejki w panelu administracyjnym, wymagany jest restart serwera.

Wstawianie wiadomości do kolejki

Referencję do obiektu kolejki HrJobQueue pobierzemy z kontenera JNDI serwera JBoss. W tym celu posłużymy się Spring’owym JndiTemplate.

<bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
  <property name="environment">
    <props>
      <prop key="java.naming.factory.initial">org.jnp.interfaces.NamingContextFactory</prop>
      <prop key="java.naming.provider.url">jnp://localhost:1099</prop>
      <prop key="java.naming.factory.url.pkgs">org.jboss.naming:org.jnp.interfaces</prop>
    </props>
  </property>
</bean>

Powyższa konfiguracja może różnić się dla poszczególnych serwerów aplikacyjnych. Następnie zdefiniujemy ziarna symbolizujące ConnectionFactory oraz JmsTemplate.

<bean id="jmsConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
  <property name="jndiTemplate" ref="jndiTemplate"/>
  <property name="jndiName" value="ConnectionFactory"/>
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
  <property name="connectionFactory" ref="jmsConnectionFactory"/>
  <property name="receiveTimeout" value="100"/>
  <property name="defaultDestination" ref="defaultDestination"/>
  <property name="messageConverter" ref="oxmMessageConverter"/>
</bean>

Na wzór postu Gordon’a Dickens’a (http://gordondickens.com/wordpress/2011/02/07/sending-beans-as-xml-with-jmstemplate), wiadomości przesyłać będę w postaci XML’owej. Podejście to uniezależnia pod względem architektury innych użytkowników kolejki. Przed wysłaniem, obiekt POJO języka Java zostanie zserializowany do formatu XML za pomocą standardu JAXB. Przy odbiorze nastąpi automatyczna deserializacja. Parametr receiveTimeout JmsTemplate’a określa maksymalny czas oczekiwania na wiadomość podczas korzystania z metod typu JmsTemplate.receiveAndConvert(). Podejście to omówię szerzej w sekcji dotyczącej odbioru komunikatu. defaultDestination to kolejny bean pobrany ze zdalnego serwisu JNDI prezentujący domyślną lokację docelową – kolejkę HrJobQueue.

<bean name="defaultDestination" class="org.springframework.jndi.JndiObjectFactoryBean">
  <property name="jndiTemplate" ref="jndiTemplate"/>
  <property name="jndiName" value="queue/HrJobQueue"/>
</bean>

messageConverter odpowiada za serializację i deserializację wiadomości. W naszym przypadku całość implementacji sprowadza się do stworzenia klasy POJO z adnotacjami JAXB oraz niewielką konfiguracją Spring’ową.

@XmlRootElement(name = "payload")
@XmlAccessorType(XmlAccessType.FIELD)
@XmlType(name= "PayloadClass", propOrder={"firstName", "lastName"})
public class PayloadClass implements Serializable {
   @XmlElement(required=true, nillable=false)
   private String firstName = null;

   @XmlElement(required=true, nillable=false)
   private String lastName = null;

   /* Getters & Setters */
   ...
}
<oxm:jaxb2-marshaller id="marshaller">
  <oxm:class-to-be-bound name="edu.lantoniak.spring.jms.pojo.PayloadClass"/>
</oxm:jaxb2-marshaller>
<bean id="oxmMessageConverter"
      class="org.springframework.jms.support.converter.MarshallingMessageConverter">
  <property name="marshaller" ref="marshaller"/>
  <property name="unmarshaller" ref="marshaller"/>
</bean>

Serwis wysyłający wiadomość prezentuje się bardzo prosto (patrz niżej). Za wstrzyknięcie odpowiedniej referencji do jmsTemplate odpowiada Spring.

public class JMSSender {
   private JmsTemplate jmsTemplate = null;

   public void sendObjectMessage() {
      PayloadClass payload = new PayloadClass();
      payload.setFirstName("Lukasz");
      payload.setLastName("Antoniak");
      jmsTemplate.convertAndSend(payload);
   }

   public void setJmsTemplate(JmsTemplate jmsTemplate) {
      this.jmsTemplate = jmsTemplate;
   }

   public JmsTemplate getJmsTemplate() {
      return jmsTemplate;
   }
}

Pobieranie wiadomości z kolejki

Spring JMS umożliwia pobranie wiadomości z kolejki na dwa główne sposoby. Pierwszy polega na wywołaniu funkcji JmsTemplate.receiveXXX(). Metoda ta oczekuje maksymalnie receiveTimeout milisekund. Jeśli w zadanym czasie wiadomość nie nadejdzie – zwraca null. JmsTemplate.receiveAndConvert() automatycznie konwertuje komunikat wykorzystując wpięty w JmsTemplate messageConverter. Fragment kodu źródłowego zamieściłem poniżej.

PayloadClass payload = (PayloadClass) jmsSender.getJmsTemplate().receiveAndConvert();
if (payload != null) {
   System.out.println("First Name: " + payload.getFirstName());
   System.out.println("Last Name: " + payload.getLastName());
}

Drugi sposób pobierania wiadomości polega na implementacji interfejsu MessageListener (zupełnie jak MDB). Klasa implementująca musi zostać wpięta w Spring’owy komponent DefaultMessageListenerContainer (czytaj dokumentację i JavaDoc!). Niezbędną konfigurację przedstawia kolejny listing.

<bean id="messageListener" class="edu.lantoniak.spring.jms.services.JMSReceiver">
  <property name="messageConverter" ref="oxmMessageConverter"/>
</bean>
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  <property name="connectionFactory" ref="jmsConnectionFactory"/>
  <property name="destination" ref="defaultDestination"/>
  <property name="messageListener" ref="messageListener"/>
</bean>

Nie widziałem żeby Spring udostępniał abstrakcyjną klasę implementującą MessageListener zapewniającą automatyczną konwersję wiadomości. Stąd konieczność wstrzyknięcia obiektu oxmMessageConverter. Poniżej zamieszczam kompletny kod źródłowy listener’a.

public class JMSReceiver implements MessageListener {
   private MessageConverter messageConverter = null;

   public void onMessage(Message message) {
      if (message instanceof BytesMessage) {
         try {
            PayloadClass payload = (PayloadClass) messageConverter.fromMessage(message);
            System.out.println("First Name: " + payload.getFirstName());
            System.out.println("Last Name: " + payload.getLastName());
         } catch (JMSException e) {
            throw new RuntimeException(e);
         }
      } else {
         throw new IllegalArgumentException("Message must be of type BytesMessage.");
      }
   }

   public void setMessageConverter(MessageConverter messageConverter) {
      this.messageConverter = messageConverter;
   }
}

Prezentowany MessageListener zostanie uruchomiony (zacznie nasłuchiwać na kolejce) od razu po starcie kontenera Spring. W celu jego tymczasowego wyłączenia, podczas zabaw własnych z aplikacją, należy wykomentować deklarację bean’a jmsContainer. Istnieje ponadto możliwość zniszczenia wspomnianego bean’a w dowolnym momencie.

DefaultMessageListenerContainer jmsContainer = (DefaultMessageListenerContainer) ctx.getBean("jmsContainer");
jmsContainer.destroy();

Warto wykonać ową metodę podczas sekwencji kończącej działanie aplikacji. Dzięki temu listener elegancko zajmknie połączenie JMS. W przeciwnym wypadku JBoss po jakimś czasie zgłosi w logach, iż istnieje problem z komunikacją z określonym klientem JMS i nastąpi jego wyrejestrowanie.

Transakcyjność, czyli JMS i RDBMS

W aplikacjach enterprise rzadko kiedy wykorzystuje się samą technologię JMS. Najczęściej współistnieje ona z bazą danych, gdzie informacje przekazane w komunikatach są przechowywane. Jeśli niepowodzeniem zakończy się rozlokowanie danych w schemacie RDBMS, to ferelna wiadomość nie powinna zniknąć z kolejki. Być może problem stanowi jedynie ilość wolnego miejsca w przestrzeni tabel, a ta zostanie niebawem rozszerzona. Obsługa transakcji w Spring Framework została dokładnie opisana w dokumentacji (http://static.springsource.org/spring/docs/2.5.x/reference/jms.html) i każdy powinien się z nią zapoznać. Ogromne znaczenie ma fakt łączenia się do kolejki z wewnątrz kontenera Java EE lub ze zdalnego klienta. W naszym przypadku (zdalny klient) należy ustawić parametr sessionTransacted obiektu JmsTemplate na true. Operacja ta spowoduje wysłanie bądź nie wiadomości w zależności od Spring’owego zarządcy transakcji. Konfiguracja przedstawia się następująco.

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
  <property name="connectionFactory" ref="jmsConnectionFactory"/>
  <property name="receiveTimeout" value="100"/>
  <property name="defaultDestination" ref="defaultDestination"/>
  <property name="messageConverter" ref="oxmMessageConverter"/>
  <property name="sessionTransacted" value="true"/>
</bean>
<tx:annotation-driven transaction-manager="transactionManager"/>
<bean id="transactionManager"
      class="org.springframework.orm.hibernate3.HibernateTransactionManager">
  <property name="sessionFactory" ref="hibernateSessionFactory"/>
</bean>

Założyłem, iż posiadamy gotową fabrykę sesji Hibernate. Po Internecie krążą stosy materiałów na temat integracji Spring z Hibernate (słowa kluczowe: HibernateDaoSupport, HibernateTemplate) oraz obsłudze transakcji przez Spring (http://slawekturowicz.blogspot.com/2011/01/transakcyjnosc-w-springu.html). Na łamach niniejszego artykułu nie zamierzam skupiać się na owych zagadnieniach. Spójrzmy na poniższy serwis wstawiający komunikat do kolejki JMS, a następnie rekord do bazy danych.

<bean id="jmsAndJdbcService" class="edu.lantoniak.spring.jms.services.JmsAndJdbcServiceImpl">
  <property name="jmsTemplate" ref="jmsTemplate"/>
  <property name="sessionFactory" ref="hibernateSessionFactory"/>
</bean>
public class JmsAndJdbcServiceImpl extends HibernateDaoSupport implements JmsAndJdbcService {
   private JmsTemplate jmsTemplate = null;

   @Transactional
   public void insertJmsMessageAndGenerateDatabaseFault() {
      jmsTemplate.send(new MessageCreator() {
         public Message createMessage(Session session) throws JMSException {
            return session.createTextMessage("This message shall not be seen.");
         }
      });

      /* Try to persist a JOB record that will cause unique key constraint violation. */
      JobEntity job = new JobEntity();
      job.setJobId("AD_PRES");
      job.setJobTitle("President");
      getHibernateTemplate().persist(job);
   }

   public void setJmsTemplate(JmsTemplate jmsTemplate) {
      this.jmsTemplate = jmsTemplate;
   }
}

Jeśli proces utrwalania obiektu typu JobEntity nie powiedzie się, całość operacji insertJmsMessageAndGenerateDatabaseFault zakończy się niepowodzeniem (komunikat JMS nie zostanie wysłany). W listingu powyżej staram się wstawić stanowisko o istniejącym wcześniej identyfikatorze (domyślny schemat HR bazy danych Oracle).

Transakcyjne odbieranie komunikatów JMS okazuje się równie przyjemne (czyt. proste) :). Wspomnianego wcześniej zarządce transakcji wpinamy do obiektu jmsContainer. Taka obsługa transakcji poprawna jest dla grubego klienta, gdzie transakcją zarządza Spring, a nie kontener Java EE (podrozdział “19.4.5. Processing messages within transactions” dokumentacji)! W innym przypadku manager’em transakcji powinien zostać obiekt typu JtaTransactionManager.

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  <property name="connectionFactory" ref="jmsConnectionFactory"/>
  <property name="destination" ref="defaultDestination"/>
  <property name="messageListener" ref="messageListener"/>
  <property name="transactionManager" ref="transactionManager"/>
</bean>

Zmodyfikowana klasa JMSReceiver wygląda następująco.

public class JMSReceiver extends HibernateDaoSupport implements MessageListener {
   private MessageConverter messageConverter = null;

   @Transactional
   public void onMessage(Message message) {
      JobEntity job = new JobEntity();
      job.setJobId("New");
      job.setJobTitle("New Title");
      getHibernateTemplate().persist(job);

      if (message instanceof BytesMessage) {
         try {
            PayloadClass payload = (PayloadClass) messageConverter.fromMessage(message);
            System.out.println("First Name: " + payload.getFirstName());
            System.out.println("Last Name: " + payload.getLastName());
         } catch (JMSException e) {
            throw new RuntimeException(e);
         }
      } else {
         throw new IllegalArgumentException("Message must be of type BytesMessage.");
      }
   }

   public void setMessageConverter(MessageConverter messageConverter) {
      this.messageConverter = messageConverter;
   }
}
<bean id="messageListener" class="edu.lantoniak.spring.jms.services.JMSReceiver">
  <property name="messageConverter" ref="oxmMessageConverter"/>
  <property name="sessionFactory" ref="hibernateSessionFactory"/>
</bean>

Jeśli utrwalenie obiektu JobEntity zakończy się niepowodzeniem, przetwarzany komunikat zostanie zwrócony do kolejki.

Komunikacja spoza kontenera J2EE

Gruntownie przemyślana architektura aplikacji typu enterprise narzuca sztywne ograniczenia na korzystanie z zasobów współdzielonych. Powodów należy doszukiwać się oczywiście w aspektach wydajności. Co by się stało gdyby każda aplikacja grubego klienta używała dowolnej ilości połączeń do zdalnej bazy danych? Katastrofa. Aby zapewnić wykorzystanie tylko jednego połączenia JMS (przez producenta i konsumenta) zastosować należy klasę SingleConnectionFactory opakowującą standardową fabrykę połączeń.

<bean id="jmsSingleConnectionFactory"
      class="org.springframework.jms.connection.SingleConnectionFactory">
  <property name="targetConnectionFactory" ref="jmsConnectionFactory"/>
  <property name="reconnectOnException" value="true"/>
</bean>

Zamiast jmsConnectionFactory do jmsTemplate i jmsContainer wpinamy jmsSingleConnectionFactory.

JMS i WebLogic

Serwer aplikacyjny WebLogic udostępnia spory wachlarz rozszerzeń technologii JMS. Za przykład posłuży mi funkcja Unit of Order (UOO) zapewniająca chronologiczną kolejność przychodzących komunikatów. Polecam odwiedzenie WebLogic YouTube Channel (http://www.youtube.com/oracleweblogic) oraz zabawę z przykładami z projektu wls-1034-examples (https://www.samplecode.oracle.com/sf/projects/wls-1034-examples).

Na koniec zamieszczam oczywiście kompletną wersję przykładowego projektu mojego autorstwa (tutaj). Długie godziny męczyłem się z dopasowaniem poszczególnych bibliotek zależnych (głównie JMS i JNDI) do JBoss 5.1.0 GA, więc zachęcam do pobrania źródeł.

<bean id=”jndiTemplate” class=”org.springframework.jndi.JndiTemplate”>
<property name=”environment”>
<props>
<prop key=”java.naming.factory.initial”>org.jnp.interfaces.NamingContextFactory</prop>
<prop key=”java.naming.provider.url”>jnp://localhost:1099</prop>
<prop key=”java.naming.factory.url.pkgs”>org.jboss.naming:org.jnp.interfaces</prop>
</props>
</property>
</bean>
Advertisements

One Response to Spring JMS

  1. Dusty says:

    If some one wishes expert view about blogging and site-building afterward i suggest him/her
    to pay a visit this weblog, Keep up the good job.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: