.Messaging is a key concept for distributed enterprise applications. There are a lot of use cases, where you don't want or need a sync response you get with e.g. blocking REST calls. The counterpart, async messaging, can make sense for the following: IoT (sensor data), event streaming, data duplication, etc. With Kafka and other highly-distributed messaging solutions, you may forget that there is already a proven messaging standard. This is part of Java/Jakarta EE and called JMS (Java Message Service). Most of the major application servers (like Payara) also provide an embedded broker.
For this Java EE standard, I'm providing a simple introduction for sending and receiving JSON messages with this blog post. The technology setup is the following: Java 8, Java EE 8, Payara 5.191 and H2 for storing the messages.
JMS prerequisites
As Payara already comes with OpenMQ, which implements the Java Message Service (JMS) standard, you don't have to set up an external JMS broker (e.g ActiveMQ, RabbitMQ ..) for this example and can use the embedded version (think twice if you use this in production).
The connection pool for the embedded OpenMQ is preconfigured and we can directly make use of it via its JNDI name jms/__defaultConnectionFactory
. If you want to connect to an external broker you would have to set up the connection manually (take a look at this excellent example for ActiveMQ from Steve Millidge itself).
With JMS you can make use of two different concepts for delivering our messages: Topics (publish & subscribe) and Queues (point-to-point). In this example, I'm using a javax.jms.Queue
but the code would look quite similar for using a Topic.
The JMS Queue or Topic has to be first configured within Payara as a JMS Destination Resource and can be configured either via the Payara admin panel (Resources – JMS Resources – Destination Resources) or using asadmin
:
1 | asadmin create-jms-resource --restype javax.jms.Queue --property Name=STOCKS jms/stocks |
You first have to specify the resource type (Topic or Queue). Next, the physical name of the resource within the broker (will be created if it doesn't exist). Last but not least the JNDI name of the resource.
Let's start coding
The Maven project for this showcase is a simple and thin Java EE project:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>de.rieckpil.blog</groupId> <artifactId>messaging-with-jms-using-payara</artifactId> <version>1.0-SNAPSHOT</version> <packaging>war</packaging> <dependencies> <dependency> <groupId>javax</groupId> <artifactId>javaee-api</artifactId> <version>8.0</version> <scope>provided</scope> </dependency> </dependencies> <build> <finalName>messaging-with-jms-using-payara</finalName> </build> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <failOnMissingWebXml>false</failOnMissingWebXml> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> </properties> </project> |
In this showcase I'm sending random stock information (using JSON with JSONP) every two seconds with a simple EJB timer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | @Singleton public class StockPublisher { @Resource(lookup = "jms/__defaultConnectionFactory") private ConnectionFactory jmsFactory; @Resource(lookup = "jms/stocks") private Queue jmsQueue; private String[] stockCodes = { "MSFT", "GOOGL", "AAPL", "AMZN" }; @Schedule(second = "*/2", minute = "*", hour = "*", persistent = false) public void sendStockInformation() { TextMessage message; try (Connection connection = jmsFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer(jmsQueue)) { JsonObject stockInformation = Json.createObjectBuilder() .add("stockCode", stockCodes[ThreadLocalRandom.current().nextInt(stockCodes.length)]) .add("price", ThreadLocalRandom.current().nextDouble(1.0, 150.0)) .add("timestamp", Instant.now().toEpochMilli()).build(); message = session.createTextMessage(); message.setText(stockInformation.toString()); producer.send(message); } catch (JMSException e) { e.printStackTrace(); } } } |
For sending messages, you first have to create a connection via the JMS ConnectionFactory
, then create a Session
and finally a message producer for the concrete topic or queue. You can access the required resources via their JNDI names as configured before.
Receiving messages is achieved with so-called Message-Driven Beans (MDB) which implement the MessageListener
interface and are configured (which topic or queue to listen) using the @MessageDriven
annotation. In this example, I'm sending and receiving the message within the same application (for simplicity). Furthermore, I'm storing the message payload in the embedded H2 database of Payara:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | @MessageDriven(name = "stockmdb", activationConfig = { @ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "jms/stocks"), @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue") }) public class StockListener implements MessageListener { @PersistenceContext private EntityManager em; @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("A new stock information arrived: " + textMessage.getText()); JsonReader jsonReader = Json.createReader(new StringReader(textMessage.getText())); JsonObject stockInformation = jsonReader.readObject(); em.persist(new StockHistory(stockInformation)); } catch (JMSException e) { e.printStackTrace(); } } } |
Once deployed to Payara, the console output looks like the following:
1 2 3 4 5 6 7 8 | [#|2019-04-25T13:04:08.007+0000|INFO|Payara 5.191||_ThreadID=186;_ThreadName=orb-thread-pool-1 (pool #1): worker-2;_TimeMillis=1556197448007;_LevelValue=800;| A new stock information arrived: {"stockCode":"MSFT","price":148.55312721701924,"timestamp":1556197448002}|#] [#|2019-04-25T13:04:10.012+0000|INFO|Payara 5.191||_ThreadID=188;_ThreadName=orb-thread-pool-1 (pool #1): worker-3;_TimeMillis=1556197450012;_LevelValue=800;| A new stock information arrived: {"stockCode":"AMZN","price":77.60891905653475,"timestamp":1556197450003}|#] [#|2019-04-25T13:04:12.009+0000|INFO|Payara 5.191||_ThreadID=190;_ThreadName=orb-thread-pool-1 (pool #1): worker-4;_TimeMillis=1556197452009;_LevelValue=800;| A new stock information arrived: {"stockCode":"MSFT","price":8.593186941846369,"timestamp":1556197452002}|#] |
The JPA entity StockHistory
looks like the following:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | @Entity public class StockHistory { @Id @GeneratedValue private Long id; @Column(nullable = false) private String stockCode; @Column(nullable = false) private Double price; @Column(nullable = false) private Instant timestamp; public StockHistory(JsonObject json) { this.stockCode = json.getString("stockCode"); this.price = json.getJsonNumber("price").doubleValue(); this.timestamp = Instant.ofEpochMilli(json.getJsonNumber("timestamp").longValue()); } // further constructors, getters & setters } |
For the sake of completeness, this is the persistence.xml
for this small application:
1 2 3 4 5 6 7 8 9 10 | <?xml version="1.0" encoding="UTF-8"?> <persistence version="2.2" xmlns="http://xmlns.jcp.org/xml/ns/persistence" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/persistence http://xmlns.jcp.org/xml/ns/persistence/persistence_2_2.xsd"> <persistence-unit name="prod" transaction-type="JTA"> <properties> <property name="javax.persistence.schema-generation.database.action" value="drop-and-create" /> </properties> </persistence-unit> </persistence> |
You can find the full code on GitHub with a step-by-step guide to run this example locally on your machine with Docker. If you are looking for a simple JMS quickstart with Open Liberty, have a look at one of my previous posts.
Keep sending/receiving messages,
Phil