Skip to content

Commit

Permalink
Adding Jakarta Messaging 3.0 support (#674)
Browse files Browse the repository at this point in the history
* Initializing jakarta JMS with jakarta.jms-api 3.0.0

* Fixing imports + Adding pinned deps to scala steward.

* Updating config definition.

* Common artemis version.

* Removing JMS 2.0 comment
  • Loading branch information
samueleresca authored Jul 2, 2024
1 parent 9d05b3c commit deccaf7
Show file tree
Hide file tree
Showing 64 changed files with 11,736 additions and 1 deletion.
1 change: 1 addition & 0 deletions .github/workflows/check-build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ jobs:
- { connector: influxdb, pre_cmd: 'docker-compose up -d influxdb' }
# ironmq disabled while we resolve https://github.com/apache/pekko-connectors/issues/697
# - { connector: ironmq, pre_cmd: 'docker-compose up -d ironauth ironmq' }
- { connector: jakartams, pre_cmd: 'docker-compose up -d ibmmq' }
- { connector: jms, pre_cmd: 'docker-compose up -d ibmmq' }
- { connector: json-streaming }
- { connector: kinesis }
Expand Down
2 changes: 2 additions & 0 deletions .scala-steward.conf
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ updates.pin = [
{ groupId = "org.mockito", version = "4." }
# activemq 5.17+ requires Java 11 (only used in tests)
{ groupId = "org.apache.activemq", version = "5.16." }
# jakarta 3.0+ requires Java 11
{ groupId = "jakarta.jms", version="3.0."}
# wiremock 3.0+ requires Java 11 (only used in tests)
{ groupId = "com.github.tomakehurst", version = "2." }
# jetty 10.+ requires Java 11 (only used in tests - via wiremock)
Expand Down
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ lazy val ironmq = pekkoConnectorProject(

lazy val jms = pekkoConnectorProject("jms", "jms", Dependencies.Jms)

lazy val jakartams = pekkoConnectorProject("jakartams", "jakartams", Dependencies.JakartaMs)

lazy val jsonStreaming = pekkoConnectorProject("json-streaming", "json.streaming", Dependencies.JsonStreaming)

lazy val kinesis = pekkoConnectorProject("kinesis", "aws.kinesis", Dependencies.Kinesis)
Expand Down
134 changes: 134 additions & 0 deletions jakartams/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# SPDX-License-Identifier: Apache-2.0

# Settings for the Apache Pekko Connectors JMS connector
#
pekko.connectors.jakartams {
#connection-retry
# Connection Retry Settings
# these set the defaults for Consumer, Producer, and Browse settings
connection-retry {
# Time allowed to establish and start a connection.
connect-timeout = 10 seconds
# Wait time before retrying the connection the first time.
initial-retry = 100 millis
# Back-off factor for subsequent retries.
backoff-factor = 2
# Maximum back-off time for subsequent retries.
max-backoff = 1 minute
# Maximum number of retries allowed.
# "infinite", or positive integer
max-retries = 10
}

#connection-retry

#consumer
# Jms Consumer Settings
# sets default values
consumer {
# Configure connection retrying by providing settings for ConnectionRetrySettings.
connection-retry = ${pekko.connectors.jakartams.connection-retry}
# Credentials to connect to the JMS broker.
# credentials {
# username = "some text"
# password = "some text"
# }
# "off" to not use any credentials.
credentials = off
# Number of parallel sessions to use for receiving JMS messages.
session-count = 1
# Buffer size for maximum number for messages read from JMS when there is no demand.
buffer-size = 100
# JMS selector expression.
# See https://docs.oracle.com/cd/E19798-01/821-1841/bncer/index.html
# empty string for unset
selector = "" # optional
# Set an explicit acknowledge mode.
# (Consumers have specific defaults.)
# See eg. jakarta.jms.Session.AUTO_ACKNOWLEDGE
# Allowed values: "off", "auto", "client", "duplicates-ok", "session", integer value
acknowledge-mode = off
# Timeout for acknowledge.
# (Used by TX consumers.)
ack-timeout = 1 second
# For use with transactions, if true the stream fails if Apache Pekko Connectors rolls back the transaction
# when `ack-timeout` is hit.
fail-stream-on-ack-timeout = false
# Max interval before sending queued acknowledges back to the broker. (Used by AckSources.)
# max-ack-interval = 5 seconds
# Max number of acks queued by AckSource before they are sent to broker. (Unless MaxAckInterval is specified).
max-pending-acks = ${pekko.connectors.jakartams.consumer.buffer-size}
# How long the stage should preserve connection status events for the first subscriber before discarding them
connection-status-subscription-timeout = 5 seconds
}
#consumer

#send-retry
# Send Retry Settings
# these set the defaults for Producer settings
send-retry {
# Wait time before retrying the first time.
initial-retry = 20 millis
# Back-off factor for subsequent retries.
backoff-factor = 1.5
# Maximum back-off time allowed, after which all retries will happen after this delay.
max-backoff = 500 millis
# Maximum number of retries allowed.
# "infinite", or positive integer
max-retries = 10
}
#send-retry

# #producer
# Jms Producer Settings
# sets default values
producer {
# Configure connection retrying by providing settings for ConnectionRetrySettings.
connection-retry = ${pekko.connectors.jakartams.connection-retry}
# Configure re-sending by providing settings for SendRetrySettings.
send-retry = ${pekko.connectors.jakartams.send-retry}
# Credentials to connect to the JMS broker.
# credentials {
# username = "some text"
# password = "some text"
# }
# "off" to not use any credentials.
credentials = off
# Number of parallel sessions to use for sending JMS messages.
# Increasing the number of parallel sessions increases throughput at the cost of message ordering.
# While the messages may arrive out of order on the JMS broker, the producer flow outputs messages
# in the order they are received.
session-count = 1
# Time messages should be kept on the JMS broker.
# This setting can be overridden on individual messages.
# "off" to not let messages expire.
time-to-live = off
# How long the stage should preserve connection status events for the first subscriber before discarding them
connection-status-subscription-timeout = 5 seconds
}
# #producer

#browse
# Jms Browse Settings
# sets default values
browse {
# Configure connection retrying by providing settings for ConnectionRetrySettings.
connection-retry = ${pekko.connectors.jakartams.connection-retry}
# Credentials to connect to the JMS broker.
# credentials {
# username = "some text"
# password = "some text"
# }
# "off" to not use any credentials.
credentials = off
# JMS selector expression.
# See https://docs.oracle.com/cd/E19798-01/821-1841/bncer/index.html
# empty string for unset
selector = "" # optional
# Set an explicit acknowledge mode.
# See eg. jakarta.jms.Session.AUTO_ACKNOWLEDGE
# Allowed values: "auto", "client", "duplicates-ok", "session", integer value
acknowledge-mode = auto
}
#browse
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/

/*
* Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
*/

package org.apache.pekko.stream.connectors.jakartams

import jakarta.jms

/**
* JMS acknowledge modes.
* See [[jakarta.jms.Connection#createSession-boolean-int-]]
*/
final class AcknowledgeMode(val mode: Int) {
override def equals(other: Any): Boolean = other match {
case that: AcknowledgeMode => this.mode == that.mode
case _ => false
}
override def hashCode: Int = mode
override def toString: String = s"AcknowledgeMode(${AcknowledgeMode.asString(this)})"
}

object AcknowledgeMode {
val AutoAcknowledge: AcknowledgeMode = new AcknowledgeMode(jms.Session.AUTO_ACKNOWLEDGE)
val ClientAcknowledge: AcknowledgeMode = new AcknowledgeMode(jms.Session.CLIENT_ACKNOWLEDGE)
val DupsOkAcknowledge: AcknowledgeMode = new AcknowledgeMode(jms.Session.DUPS_OK_ACKNOWLEDGE)
val SessionTransacted: AcknowledgeMode = new AcknowledgeMode(jms.Session.SESSION_TRANSACTED)

/**
* Interpret string to corresponding acknowledge mode.
*/
def from(s: String): AcknowledgeMode = s match {
case "auto" => AutoAcknowledge
case "client" => ClientAcknowledge
case "duplicates-ok" => DupsOkAcknowledge
case "session" => SessionTransacted
case other =>
try {
val mode = other.toInt
new AcknowledgeMode(mode)
} catch {
case _: NumberFormatException =>
throw new IllegalArgumentException(
s"can't read AcknowledgeMode '$other', (known are auto, client, duplicates-ok, session, or an integer value)")
}
}

/**
* Convert to a string presentation.
*/
def asString(mode: AcknowledgeMode): String = mode match {
case AutoAcknowledge => "auto"
case ClientAcknowledge => "client"
case DupsOkAcknowledge => "duplicates-ok"
case SessionTransacted => "session"
case other => other.mode.toString
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/

/*
* Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
*/

package org.apache.pekko.stream.connectors.jakartams

import com.typesafe.config.Config
import org.apache.pekko.actor.{ ActorSystem, ClassicActorSystemProvider }
import org.apache.pekko.util.JavaDurationConverters._

import scala.concurrent.duration._

/**
* When a connection to a broker cannot be established and errors out, or is timing out being established or
* started, the connection can be retried.
* All JMS publishers, consumers, and browsers are configured with connection retry settings.
*/
final class ConnectionRetrySettings private (
val connectTimeout: scala.concurrent.duration.FiniteDuration,
val initialRetry: scala.concurrent.duration.FiniteDuration,
val backoffFactor: Double,
val maxBackoff: scala.concurrent.duration.FiniteDuration,
val maxRetries: Int) {

/** Time allowed to establish and start a connection. */
def withConnectTimeout(timeout: scala.concurrent.duration.FiniteDuration): ConnectionRetrySettings =
copy(connectTimeout = timeout)

/** Java API: Time allowed to establish and start a connection. */
def withConnectTimeout(timeout: java.time.Duration): ConnectionRetrySettings = copy(connectTimeout = timeout.asScala)

/** Wait time before retrying the first time. */
def withInitialRetry(timeout: scala.concurrent.duration.FiniteDuration): ConnectionRetrySettings =
copy(initialRetry = timeout)

/** Java API: Wait time before retrying the first time. */
def withInitialRetry(timeout: java.time.Duration): ConnectionRetrySettings = copy(initialRetry = timeout.asScala)

/** Back-off factor for subsequent retries. */
def withBackoffFactor(factor: Double): ConnectionRetrySettings = copy(backoffFactor = factor)

/** Maximum back-off time allowed, after which all retries will happen after this delay. */
def withMaxBackoff(backoffTime: scala.concurrent.duration.FiniteDuration): ConnectionRetrySettings =
copy(maxBackoff = backoffTime)

/** Java API: Maximum back-off time allowed, after which all retries will happen after this delay. */
def withMaxBackoff(backoffTime: java.time.Duration): ConnectionRetrySettings = copy(maxBackoff = backoffTime.asScala)

/** Maximum number of retries allowed. */
def withMaxRetries(value: Int): ConnectionRetrySettings = copy(maxRetries = value)

/** Do not limit the number of retries. */
def withInfiniteRetries(): ConnectionRetrySettings = withMaxRetries(ConnectionRetrySettings.infiniteRetries)

/** The wait time before the next attempt may be made. */
def waitTime(retryNumber: Int): FiniteDuration =
(initialRetry * Math.pow(retryNumber, backoffFactor)).asInstanceOf[FiniteDuration].min(maxBackoff)

private def copy(
connectTimeout: scala.concurrent.duration.FiniteDuration = connectTimeout,
initialRetry: scala.concurrent.duration.FiniteDuration = initialRetry,
backoffFactor: Double = backoffFactor,
maxBackoff: scala.concurrent.duration.FiniteDuration = maxBackoff,
maxRetries: Int = maxRetries): ConnectionRetrySettings = new ConnectionRetrySettings(
connectTimeout = connectTimeout,
initialRetry = initialRetry,
backoffFactor = backoffFactor,
maxBackoff = maxBackoff,
maxRetries = maxRetries)

override def toString: String =
"ConnectionRetrySettings(" +
s"connectTimeout=${connectTimeout.toCoarsest}," +
s"initialRetry=${initialRetry.toCoarsest}," +
s"backoffFactor=$backoffFactor," +
s"maxBackoff=${maxBackoff.toCoarsest}," +
s"maxRetries=${if (maxRetries == ConnectionRetrySettings.infiniteRetries) "infinite" else maxRetries}" +
")"
}

object ConnectionRetrySettings {
val configPath = "pekko.connectors.jakartams.connection-retry"

val infiniteRetries: Int = -1

/**
* Reads from the given config.
*/
def apply(c: Config): ConnectionRetrySettings = {
val connectTimeout = c.getDuration("connect-timeout").asScala
val initialRetry = c.getDuration("initial-retry").asScala
val backoffFactor = c.getDouble("backoff-factor")
val maxBackoff = c.getDuration("max-backoff").asScala
val maxRetries = if (c.getString("max-retries") == "infinite") infiniteRetries else c.getInt("max-retries")
new ConnectionRetrySettings(
connectTimeout,
initialRetry,
backoffFactor,
maxBackoff,
maxRetries)
}

/** Java API: Reads from the given config. */
def create(c: Config): ConnectionRetrySettings = apply(c)

/**
* Reads from the default config provided by the actor system at `pekko.connectors.jakartams.connection-retry`.
*
* @param actorSystem The actor system
*/
def apply(actorSystem: ActorSystem): ConnectionRetrySettings =
apply(actorSystem.settings.config.getConfig(configPath))

/**
* Reads from the default config provided by the actor system at `pekko.connectors.jakartams.connection-retry`.
*
* @param actorSystem The actor system
*/
def apply(actorSystem: ClassicActorSystemProvider): ConnectionRetrySettings =
apply(actorSystem.classicSystem.settings.config.getConfig(configPath))

/**
* Java API: Reads from the default config provided by the actor system at `pekko.connectors.jakartams.connection-retry`.
*
* @param actorSystem The actor system
*/
def create(actorSystem: ActorSystem): ConnectionRetrySettings = apply(actorSystem)

/**
* Java API: Reads from the default config provided by the actor system at `pekko.connectors.jakartams.connection-retry`.
*
* @param actorSystem The actor system
*/
def create(actorSystem: ClassicActorSystemProvider): ConnectionRetrySettings = apply(actorSystem)

}
Loading

0 comments on commit deccaf7

Please sign in to comment.