Sample application of how to use the JMS client to send and receive messages using a topic via SAP Event Mesh.
-
Installed Java 8 → Java download
-
Installed Git → Git download
-
Installed Maven 3.x → Maven download
-
Optional: Installed Postman
-
A SAP BTP Account with
Event Mesh Service
is required.
For more detailed information and help on how to start with SAP Event Mesh please check the SAP help page.-
Optional: Installed CloudFoundry CLI → Installing the cf CLI
-
This must be also fully configured with the corresponding Cloud Foundry landscape to be able to do a
cf push
.
-
-
Created Event Mesh Service Instance
-
e.g. via cli:
cf cs enterprise-messaging default emjapi-samples-sapbtp -c
<contents of default descriptor>
-
The Service Descriptors can be found here
-
Remember to adjust the manifest file of the application
-
-
Create queues (e.g.
NameOfQueue
) and subscribe a queue to a topic of choice (e.g.NameOfTopic
) via e.g. MM API, UI-
For queue creation with the MM API you need to provide a fully qualified queue name including the namespace
-
-
-
Installed IDE of choice (e.g. Visual Studio with installed Java language support plugin)
The publish and subscribe (pubsub) sample is a small spring boot (version 2.0.6) applications which provides three different REST endpoints to send and receive messages from a topic.
-
Check all Prerequisites fulfilled (see section Prerequisites above)
-
Clone the repository via
git clone
-
Build the project with maven (
mvn clean install
) -
Push to Cloud Foundry via
cf push
(if the service was renamed please adopt the manifest accordingly)
-
The queue name has to be URL encoded. The samples provide a function to URL encode
-
curl -X POST -H "Content-Type: text/plain" -H "Cache-Control: no-cache" -d "https://<application-path>/encode"
(the body must contain the queue name)
-
-
Send a message with a HTTP POST via https://<application-path>/topic/<topic-name>/message (The body must contain the message)
-
Note that a client or a queue must be subscribed to the topic before
-
The queue name has to be url encoded. The
-
curl -X POST -H "Content-Type: text/plain" -H "Cache-Control: no-cache" -d '<message>' "https://<application-path>/topic/<topic-name>/message"
-
-
Receive a message via a queue with a HTTP GET via https://<application-path>/queue/<queue-name>/message
-
Note that a queue subscription is needed before. A queue subscription can be created via e.G. the UI.
-
curl -X GET -H "Content-Type: text/plain" -H "Cache-Control: no-cache" "https://<application-path>/queue/<queue-name>/message"
-
In order to use the messaging service the spring cloud service connector can be used to receive a MessagingServiceFactory
.
ServiceConnectorConfig config = null; // currently there are no configurations for the MessagingServiceFactory supported
Cloud cloud = new CloudFactory().getCloud();
// get a messaging service factory via the service connector
MessagingService messagingService = cloud.getSingletonServiceConnector(MessagingService.class, config);
Create a the MessagingServiceFactory
object with the help of the MessagingServiceFactoryCreator
and get a MessagingServiceJmsConnectionFactory
.
The Connection Factory can be configured with the MessagingServiceJmsSettings
. In case the reconnection feature is not needed and an individual
connection mechanism (e.G. through a connection cache) is used the settings can be skipped. The connection factory can be built with
messagingServiceFactory.createConnectionFactory(MessagingServiceJmsConnectionFactory.class,settings)
.
MessagingServiceJmsSettings settings = new MessagingServiceJmsSettings(); // settings are preset with default values (see JavaDoc)
settings.setMaxReconnectAttempts(5); // use -1 for unlimited attempts
settings.setInitialReconnectDelay(3000);
settings.setReconnectDelay(3000);
MessagingServiceFactory messagingServiceFactory = MessagingServiceFactoryCreator.createFactory(messagingService);
MessagingServiceJmsConnectionFactory connectionFactory = messagingServiceFactory.createConnectionFactory(MessagingServiceJmsConnectionFactory.class, settings)
For sending messages a Connection
and a Session
is needed first. Note that those resources must be closed if they are not needed anymore. As those objects are implementing the autoclosable
interface they will be closed automatically after the try-catch-block. Now a BytesMessage
can be created. In the next steps a topic is bound (not created) to a producer. Note, that the prefix "topic:" is mandatory. Finally, the message can be sent to the topic.
try (Connection connection = connectionFactory.createConnection();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
connection.start();
Topic topic = session.createTopic("topic:" + "<topic-name>");
BytesMessage byteMessage = session.createBytesMessage();
byteMessage.writeBytes(message.getBytes());
MessageProducer producer = session.createProducer(topic);
producer.send(byteMessage);
} catch (JMSException e) {
LOG.error("Could not send message={}.", message, e);
}
Currently direct topic subscription is not supported for the plan default.
In this example a consumer is subscribed to a specific topic. Again a Connection
and a Session
is needed. Note that those resources must be closed if they are not needed anymore. First a topic (not created) with the mandatory prefix "topic:" is bound to consumer. As the messages are sent as a ByteMassage
the message needs to be converted to e.g. a String
try (Connection connection = connectionFactory.createConnection();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
connection.start();
Topic topic = session.createTopic(TOPIC_PREFIX + topicName);
MessageConsumer consumer = session.createConsumer(topic);
// Blocking call. Define a timeout or use a Message Listener
BytesMessage message = (BytesMessage) consumer.receive();
byte[] byteData = new byte[(int) message.getBodyLength()];
message.readBytes(byteData);
} catch (JMSException e) {
LOG.error("Could not receive message.", e);
}
The messaging management api (MM API) provides functionality for creating, deleting and updating queues and queue subscriptions. Further more it provides APIs to get information on queues and queue subscriptions. The MM API documentation can be found here. The MM APIs have to be enabled in the service descriptor. A description for enabling the MM API can be found here.
Queues can be created through the SAP Business Technology Platform Cockpit UI. More information regarding the creation of queues through the UI can be found here
Examples for the different service descriptors can be found here on the help site and in the config folder of this project.
This project is 'as-is' with no support, no changes being made.
You are welcome to make changes to improve it but we are not available for questions or support of any kind.
Copyright (c) 2017 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the SAP SAMPLE CODE LICENSE AGREEMENT, v1.0-071618 except as noted otherwise in the LICENSE file.