diff --git a/README.md b/README.md index 4dcba05..0cdb50a 100644 --- a/README.md +++ b/README.md @@ -277,37 +277,38 @@ For troubleshooting, or to better understand the handshake performed by the IBM The configuration options for the Kafka Connect source connector for IBM MQ are as follows: -| Name | Description | Type | Default | Valid values | -| --------------------------------------- | ----------------------------------------------------------------------------------------------------------------------- | ------- | -------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| topic | The name of the target Kafka topic | string | | Topic name | -| mq.queue.manager | The name of the MQ queue manager | string | | MQ queue manager name | -| mq.connection.mode | The connection mode - bindings or client | string | client | client, bindings | -| mq.connection.name.list | List of connection names for queue manager | string | | host(port)[,host(port),...] | -| mq.channel.name | The name of the server-connection channel | string | | MQ channel name | -| mq.queue | The name of the source MQ queue | string | | MQ queue name | -| mq.exactly.once.state.queue | The name of the MQ queue used to store state when running with exactly-once semantics | string | | MQ state queue name | -| mq.user.name | The user name for authenticating with the queue manager | string | | User name | -| mq.password | The password for authenticating with the queue manager | string | | Password | -| mq.user.authentication.mqcsp | Whether to use MQ connection security parameters (MQCSP) | boolean | true | | -| mq.ccdt.url | The URL for the CCDT file containing MQ connection details | string | | URL for obtaining a CCDT file | -| mq.record.builder | The class used to build the Kafka Connect record | string | | Class implementing RecordBuilder | -| mq.message.body.jms | Whether to interpret the message body as a JMS message type | boolean | false | | -| mq.record.builder.key.header | The JMS message header to use as the Kafka record key | string | | JMSMessageID, JMSCorrelationID, JMSCorrelationIDAsBytes, JMSDestination | -| mq.jms.properties.copy.to.kafka.headers | Whether to copy JMS message properties to Kafka headers | boolean | false | | -| mq.ssl.cipher.suite | The name of the cipher suite for TLS (SSL) connection | string | | Blank or valid cipher suite | -| mq.ssl.peer.name | The distinguished name pattern of the TLS (SSL) peer | string | | Blank or DN pattern | -| mq.ssl.keystore.location | The path to the JKS keystore to use for SSL (TLS) connections | string | JVM keystore | Local path to a JKS file | -| mq.ssl.keystore.password | The password of the JKS keystore to use for SSL (TLS) connections | string | | | -| mq.ssl.truststore.location | The path to the JKS truststore to use for SSL (TLS) connections | string | JVM truststore | Local path to a JKS file | -| mq.ssl.truststore.password | The password of the JKS truststore to use for SSL (TLS) connections | string | | | -| mq.ssl.use.ibm.cipher.mappings | Whether to set system property to control use of IBM cipher mappings | boolean | | | -| mq.batch.size | The maximum number of messages in a batch (unit of work) | integer | 250 | 1 or greater | -| mq.message.mqmd.read | Whether to enable reading of all MQMD fields | boolean | false | | -| mq.max.poll.blocked.time.ms | How long the connector will wait for the previous batch of messages to be delivered to Kafka before starting a new poll | integer | 2000 | It is important that this is less than the time defined for `task.shutdown.graceful.timeout.ms` as that is how long connect will wait for the task to perform lifecycle operations. | -| mq.client.reconnect.options | Options governing MQ reconnection. | string | ASDEF | ASDEF, ANY, QMGR, DISABLED | -| mq.message.receive.timeout | The timeout (in milliseconds) for receiving messages from the queue manager before returning to Kafka Connect. | long | 2000 | 1 or greater | -| mq.reconnect.delay.min.ms | The minimum delay (in milliseconds) for reconnecting to the queue manager after a connection error. | long | 64 | 1 or greater | -| mq.reconnect.delay.max.ms | The maximum delay (in milliseconds) for reconnecting to the queue manager after a connection error. | long | 8192 | 1 or greater | +| Name | Description | Type | Default | Valid values | +| --------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------- | ------- | -------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| topic | The name of the target Kafka topic | string | | Topic name | +| mq.queue.manager | The name of the MQ queue manager | string | | MQ queue manager name | +| mq.connection.mode | The connection mode - bindings or client | string | client | client, bindings | +| mq.connection.name.list | List of connection names for queue manager | string | | host(port)[,host(port),...] | +| mq.channel.name | The name of the server-connection channel | string | | MQ channel name | +| mq.queue | The name of the source MQ queue | string | | MQ queue name | +| mq.exactly.once.state.queue | The name of the MQ queue used to store state when running with exactly-once semantics | string | | MQ state queue name | +| mq.user.name | The user name for authenticating with the queue manager | string | | User name | +| mq.password | The password for authenticating with the queue manager | string | | Password | +| mq.user.authentication.mqcsp | Whether to use MQ connection security parameters (MQCSP) | boolean | true | | +| mq.ccdt.url | The URL for the CCDT file containing MQ connection details | string | | URL for obtaining a CCDT file | +| mq.record.builder | The class used to build the Kafka Connect record | string | | Class implementing RecordBuilder | +| mq.message.body.jms | Whether to interpret the message body as a JMS message type | boolean | false | | +| mq.record.builder.key.header | The JMS message header to use as the Kafka record key | string | | JMSMessageID, JMSCorrelationID, JMSCorrelationIDAsBytes, JMSDestination | +| mq.jms.properties.copy.to.kafka.headers | Whether to copy JMS message properties to Kafka headers | boolean | false | | +| mq.ssl.cipher.suite | The name of the cipher suite for TLS (SSL) connection | string | | Blank or valid cipher suite | +| mq.ssl.peer.name | The distinguished name pattern of the TLS (SSL) peer | string | | Blank or DN pattern | +| mq.ssl.keystore.location | The path to the JKS keystore to use for SSL (TLS) connections | string | JVM keystore | Local path to a JKS file | +| mq.ssl.keystore.password | The password of the JKS keystore to use for SSL (TLS) connections | string | | | +| mq.ssl.truststore.location | The path to the JKS truststore to use for SSL (TLS) connections | string | JVM truststore | Local path to a JKS file | +| mq.ssl.truststore.password | The password of the JKS truststore to use for SSL (TLS) connections | string | | | +| mq.ssl.use.ibm.cipher.mappings | Whether to set system property to control use of IBM cipher mappings | boolean | | | +| mq.batch.size | The maximum number of messages in a batch (unit of work) | integer | 250 | 1 or greater | +| mq.message.mqmd.read | Whether to enable reading of all MQMD fields | boolean | false | | +| mq.max.poll.blocked.time.ms | How long the connector will wait for the previous batch of messages to be delivered to Kafka before starting a new poll | integer | 2000 | It is important that this is less than the time defined for `task.shutdown.graceful.timeout.ms` as that is how long connect will wait for the task to perform lifecycle operations. | +| mq.client.reconnect.options | Options governing MQ reconnection. | string | ASDEF | ASDEF, ANY, QMGR, DISABLED | +| mq.message.receive.timeout | The timeout (in milliseconds) for the first request to receiving messages from the queue manager before returning to Kafka Connect. | long | 2000 | 1 or greater | +| mq.receive.subsequent.timeout.ms | The timeout (in milliseconds) for receiving messages from the queue manager on the subsequent receives before returning to Kafka Connect. | long | 0 | 1 or greater | +| mq.reconnect.delay.min.ms | The minimum delay (in milliseconds) for reconnecting to the queue manager after a connection error. | long | 64 | 1 or greater | +| mq.reconnect.delay.max.ms | The maximum delay (in milliseconds) for reconnecting to the queue manager after a connection error. | long | 8192 | 1 or greater | ### Using a CCDT file diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java index b1c4fae..f391e6b 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java @@ -87,6 +87,7 @@ private Map createDefaultConnectorProperties() { props.put("mq.user.authentication.mqcsp", "false"); props.put("topic", "mytopic"); props.put("mq.message.receive.timeout", "5000"); + props.put("mq.receive.subsequent.timeout.ms", "2000"); props.put("mq.reconnect.delay.min.ms", "100"); props.put("mq.reconnect.delay.max.ms", "10000"); return props; @@ -666,7 +667,8 @@ public void testJmsWorkerWithCustomReciveForConsumerAndCustomReconnectValues() t connectorConfigProps.put("mq.message.body.jms", "true"); connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); - connectorConfigProps.put("mq.message.receive.timeout", "5000"); + connectorConfigProps.put("mq.message.receive.timeout", "2000"); + connectorConfigProps.put("mq.receive.subsequent.timeout.ms", "3000"); connectorConfigProps.put("mq.reconnect.delay.min.ms", "100"); connectorConfigProps.put("mq.reconnect.delay.max.ms", "10000"); @@ -688,7 +690,8 @@ public void testJmsWorkerWithCustomReciveForConsumerAndCustomReconnectValues() t shared.attemptRollback(); assertThat(stateMsgs1.size()).isEqualTo(1); - assertEquals(5000L, shared.getReceiveTimeout()); + assertEquals(2000L, shared.getInitialReceiveTimeoutMs()); + assertEquals(3000L, shared.getSubsequentReceiveTimeoutMs()); assertEquals(100L, shared.getReconnectDelayMillisMin()); assertEquals(10000L, shared.getReconnectDelayMillisMax()); } diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSWorker.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSWorker.java index 4e93659..85fb78d 100755 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSWorker.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSWorker.java @@ -71,12 +71,17 @@ public class JMSWorker { private boolean connected = false; // Whether connected to MQ private AtomicBoolean closeNow; // Whether close has been requested private AbstractConfig config; - private long receiveTimeout; // Receive timeout for the jms consumer + private long initialReceiveTimeoutMs; // Receive timeout for the jms consumer on the first call in each Connect poll + private long subsequentReceiveTimeoutMs; // Receive timeout for the jms consumer on the subsequent calls private long reconnectDelayMillisMin; // Delay between repeated reconnect attempts min private long reconnectDelayMillisMax; // Delay between repeated reconnect attempts max - long getReceiveTimeout() { - return receiveTimeout; + long getInitialReceiveTimeoutMs() { + return initialReceiveTimeoutMs; + } + + long getSubsequentReceiveTimeoutMs() { + return subsequentReceiveTimeoutMs; } long getReconnectDelayMillisMin() { @@ -144,7 +149,8 @@ public void configure(final AbstractConfig config) { userName = config.getString(MQSourceConnector.CONFIG_NAME_MQ_USER_NAME); password = config.getPassword(MQSourceConnector.CONFIG_NAME_MQ_PASSWORD); topic = config.getString(MQSourceConnector.CONFIG_NAME_TOPIC); - receiveTimeout = config.getLong(MQSourceConnector.CONFIG_MAX_RECEIVE_TIMEOUT); + initialReceiveTimeoutMs = config.getLong(MQSourceConnector.CONFIG_MAX_RECEIVE_TIMEOUT); + subsequentReceiveTimeoutMs = config.getLong(MQSourceConnector.CONFIG_SUBSEQUENT_RECEIVE_TIMEOUT); reconnectDelayMillisMin = config.getLong(MQSourceConnector.CONFIG_RECONNECT_DELAY_MIN); reconnectDelayMillisMax = config.getLong(MQSourceConnector.CONFIG_RECONNECT_DELAY_MAX); } catch (JMSException | JMSRuntimeException jmse) { @@ -222,10 +228,14 @@ public void connect() { * * @param queueName The name of the queue to get messages from * @param queueConfig Any particular queue configuration that should be applied - * @param wait Whether to wait indefinitely for a message + * @param initialCall Indicates whether this is the initial receive call in the polling cycle. + * Determines which configured timeout to use: + * - If true, uses the initial receive timeout. + * - If false, uses the subsequent receive timeout. + * A timeout value of 0 results in a non-blocking receiveNoWait() call. * @return The Message retrieved from MQ */ - public Message receive(final String queueName, final QueueConfig queueConfig, final boolean wait) throws JMSRuntimeException, JMSException { + public Message receive(final String queueName, final QueueConfig queueConfig, final boolean initialCall) throws JMSRuntimeException, JMSException { log.trace("[{}] Entry {}.receive", Thread.currentThread().getId(), this.getClass().getName()); if (!maybeReconnect()) { @@ -243,21 +253,25 @@ public Message receive(final String queueName, final QueueConfig queueConfig, fi jmsConsumers.put(queueName, internalConsumer); } - Message message = null; - if (wait) { - log.debug("Waiting {} ms for message", receiveTimeout); - message = internalConsumer.receive(receiveTimeout); + final long timeoutMs = initialCall + ? initialReceiveTimeoutMs + : subsequentReceiveTimeoutMs; + + Message message = null; + if (timeoutMs > 0) { + // block up to timeoutMs + message = internalConsumer.receive(timeoutMs); if (message == null) { - log.debug("No message received"); + log.debug("No message received within {} ms on queue={}", timeoutMs, queueName); } } else { + // non‐blocking message = internalConsumer.receiveNoWait(); } log.trace("[{}] Exit {}.receive, retval={}", Thread.currentThread().getId(), this.getClass().getName(), message); - return message; } diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java index cbf320a..de2f53c 100644 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java @@ -165,19 +165,25 @@ public class MQSourceConnector extends SourceConnector { public static final String CONFIG_MAX_RECEIVE_TIMEOUT = "mq.message.receive.timeout"; public static final String CONFIG_DOCUMENTATION_MAX_RECEIVE_TIMEOUT = "How long the connector should wait (in milliseconds) for a message to arrive if no message is available immediately"; - public static final String CONFIG_DISPLAY_MAX_RECEIVE_TIMEOUT = "message receive timeout"; + public static final String CONFIG_DISPLAY_MAX_RECEIVE_TIMEOUT = "Initial receive timeout (ms)"; public static final long CONFIG_MAX_RECEIVE_TIMEOUT_DEFAULT = 2000L; public static final long CONFIG_MAX_RECEIVE_TIMEOUT_MINIMUM = 1L; + public static final String CONFIG_SUBSEQUENT_RECEIVE_TIMEOUT = "mq.receive.subsequent.timeout.ms"; + public static final String CONFIG_DOCUMENTATION_SUBSEQUENT_RECEIVE_TIMEOUT = "How long (in milliseconds) the connector should wait for subsequent receives, " + + "defaults to 0 (no-wait) and uses receiveNoWait()."; + public static final String CONFIG_DISPLAY_SUBSEQUENT_RECEIVE_TIMEOUT = "Subsequent receive timeout (ms)"; + public static final long CONFIG_SUBSEQUENT_RECEIVE_TIMEOUT_DEFAULT = 0L; + public static final String CONFIG_RECONNECT_DELAY_MIN = "mq.reconnect.delay.min.ms"; public static final String CONFIG_DOCUMENTATION_RECONNECT_DELAY_MIN = "The minimum delay in milliseconds for reconnect attempts."; - public static final String CONFIG_DISPLAY_RECONNECT_DELAY_MIN = "reconnect minimum delay"; + public static final String CONFIG_DISPLAY_RECONNECT_DELAY_MIN = "Reconnect minimum delay"; public static final long CONFIG_RECONNECT_DELAY_MIN_DEFAULT = 64L; public static final long CONFIG_RECONNECT_DELAY_MIN_MINIMUM = 1L; public static final String CONFIG_RECONNECT_DELAY_MAX = "mq.reconnect.delay.max.ms"; public static final String CONFIG_DOCUMENTATION_RECONNECT_DELAY_MAX = "The maximum delay in milliseconds for reconnect attempts."; - public static final String CONFIG_DISPLAY_RECONNECT_DELAY_MAX = "reconnect maximum delay"; + public static final String CONFIG_DISPLAY_RECONNECT_DELAY_MAX = "Reconnect maximum delay"; public static final long CONFIG_RECONNECT_DELAY_MAX_DEFAULT = 8192L; public static final long CONFIG_RECONNECT_DELAY_MAX_MINIMUM = 10L; @@ -576,19 +582,30 @@ null, new ReadableFile(), Width.SHORT, CONFIG_DISPLAY_MQ_CLIENT_RECONNECT_OPTIONS); CONFIGDEF.define(CONFIG_MAX_RECEIVE_TIMEOUT, - Type.LONG, - CONFIG_MAX_RECEIVE_TIMEOUT_DEFAULT, ConfigDef.Range.atLeast(CONFIG_MAX_RECEIVE_TIMEOUT_MINIMUM), - Importance.MEDIUM, + ConfigDef.Type.LONG, + CONFIG_MAX_RECEIVE_TIMEOUT_DEFAULT, + ConfigDef.Range.atLeast(CONFIG_MAX_RECEIVE_TIMEOUT_MINIMUM), + ConfigDef.Importance.MEDIUM, CONFIG_DOCUMENTATION_MAX_RECEIVE_TIMEOUT, - CONFIG_GROUP_MQ, 26, - Width.MEDIUM, + CONFIG_GROUP_MQ, + 26, + ConfigDef.Width.MEDIUM, CONFIG_DISPLAY_MAX_RECEIVE_TIMEOUT); + CONFIGDEF.define(CONFIG_SUBSEQUENT_RECEIVE_TIMEOUT, + ConfigDef.Type.LONG, + CONFIG_SUBSEQUENT_RECEIVE_TIMEOUT_DEFAULT, + ConfigDef.Importance.LOW, + CONFIG_DOCUMENTATION_SUBSEQUENT_RECEIVE_TIMEOUT, + CONFIG_GROUP_MQ, + 27, + ConfigDef.Width.MEDIUM, + CONFIG_DISPLAY_SUBSEQUENT_RECEIVE_TIMEOUT); CONFIGDEF.define(CONFIG_RECONNECT_DELAY_MIN, Type.LONG, CONFIG_RECONNECT_DELAY_MIN_DEFAULT, ConfigDef.Range.atLeast(CONFIG_RECONNECT_DELAY_MIN_MINIMUM), Importance.MEDIUM, CONFIG_DOCUMENTATION_RECONNECT_DELAY_MIN, - CONFIG_GROUP_MQ, 27, + CONFIG_GROUP_MQ, 28, Width.MEDIUM, CONFIG_DISPLAY_RECONNECT_DELAY_MIN); CONFIGDEF.define(CONFIG_RECONNECT_DELAY_MAX, @@ -596,7 +613,7 @@ null, new ReadableFile(), CONFIG_RECONNECT_DELAY_MAX_DEFAULT, ConfigDef.Range.atLeast(CONFIG_RECONNECT_DELAY_MAX_MINIMUM), Importance.MEDIUM, CONFIG_DOCUMENTATION_RECONNECT_DELAY_MAX, - CONFIG_GROUP_MQ, 28, + CONFIG_GROUP_MQ, 29, Width.MEDIUM, CONFIG_DISPLAY_RECONNECT_DELAY_MAX);