Skip to content

Make TransactionalApplicationListenerAdapter support fallbackExecution #34681

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
*
* @author Juergen Hoeller
* @author Oliver Drotbohm
* @author Réda Housni Alaoui
* @since 5.3
* @param <E> the specific {@code ApplicationEvent} subclass to listen to
* @see TransactionalEventListener
Expand Down Expand Up @@ -134,10 +135,26 @@ static <T> TransactionalApplicationListener<PayloadApplicationEvent<T>> forPaylo
*/
static <T> TransactionalApplicationListener<PayloadApplicationEvent<T>> forPayload(
TransactionPhase phase, Consumer<T> consumer) {
return forPayload(phase, false, consumer);
}

/**
* Create a new {@code TransactionalApplicationListener} for the given payload consumer.
* @param phase the transaction phase in which to invoke the listener
* @param fallbackExecution Whether the event should be handled if no transaction is running.
* @param consumer the event payload consumer
* @param <T> the type of the event payload
* @return a corresponding {@code TransactionalApplicationListener} instance
* @see PayloadApplicationEvent#getPayload()
* @see TransactionalApplicationListenerAdapter
*/
static <T> TransactionalApplicationListener<PayloadApplicationEvent<T>> forPayload(
TransactionPhase phase, boolean fallbackExecution, Consumer<T> consumer) {

TransactionalApplicationListenerAdapter<PayloadApplicationEvent<T>> listener =
new TransactionalApplicationListenerAdapter<>(event -> consumer.accept(event.getPayload()));
listener.setTransactionPhase(phase);
listener.setFallbackExecution(fallbackExecution);
return listener;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.core.Ordered;
Expand All @@ -35,6 +37,7 @@
* as a convenient alternative to custom usage of this adapter class.
*
* @author Juergen Hoeller
* @author Réda Housni Alaoui
* @since 5.3
* @param <E> the specific {@code ApplicationEvent} subclass to listen to
* @see TransactionalApplicationListener
Expand All @@ -44,11 +47,14 @@
public class TransactionalApplicationListenerAdapter<E extends ApplicationEvent>
implements TransactionalApplicationListener<E>, Ordered {

private final Log logger = LogFactory.getLog(getClass());

private final ApplicationListener<E> targetListener;

private int order = Ordered.LOWEST_PRECEDENCE;

private TransactionPhase transactionPhase = TransactionPhase.AFTER_COMMIT;
private boolean fallbackExecution;

private String listenerId = "";

Expand Down Expand Up @@ -97,6 +103,13 @@ public TransactionPhase getTransactionPhase() {
return this.transactionPhase;
}

/**
* @param fallbackExecution Whether the event should be handled if no transaction is running.
*/
public void setFallbackExecution(boolean fallbackExecution) {
this.fallbackExecution = fallbackExecution;
}

/**
* Specify an id to identify the listener with.
* <p>The default is an empty String.
Expand Down Expand Up @@ -127,7 +140,23 @@ public void processEvent(E event) {

@Override
public void onApplicationEvent(E event) {
TransactionalApplicationListenerSynchronization.register(event, this, this.callbacks);
if (TransactionalApplicationListenerSynchronization.register(event, this, this.callbacks)) {
if (logger.isDebugEnabled()) {
logger.debug("Registered transaction synchronization for " + event);
}
}
else if (fallbackExecution) {
if (getTransactionPhase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) {
logger.warn("Processing " + event + " as a fallback execution on AFTER_ROLLBACK phase");
}
processEvent(event);
}
else {
// No transactional event execution at all
if (logger.isDebugEnabled()) {
logger.debug("No transaction is active - skipping " + event);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatRuntimeException;

import java.util.concurrent.atomic.AtomicReference;

/**
* @author Juergen Hoeller
* @author Réda Housni Alaoui
*/
class TransactionalApplicationListenerAdapterTests {

Expand Down Expand Up @@ -88,6 +91,44 @@ void useSpecifiedIdentifier() {
assertThat(adapter.getListenerId()).isEqualTo("identifier");
}

@Test
void invokesNothingOutsideOfTransaction() {
CapturingSynchronizationCallback callback = new CapturingSynchronizationCallback();
PayloadApplicationEvent<Object> event = new PayloadApplicationEvent<>(this, new Object());

AtomicReference<Object> consumedPayload = new AtomicReference<>();
TransactionalApplicationListener<PayloadApplicationEvent<Object>> adapter =
TransactionalApplicationListener.forPayload(consumedPayload::set);
adapter.addCallback(callback);
adapter.onApplicationEvent(event);

assertThat(consumedPayload.get()).isNull();
assertThat(callback.preEvent).isNull();
assertThat(callback.postEvent).isNull();
assertThat(callback.ex).isNull();
assertThat(adapter.getTransactionPhase()).isEqualTo(TransactionPhase.AFTER_COMMIT);
assertThat(adapter.getListenerId()).isEmpty();
}

@Test
void invokesConsumerOutsideOfTransactionIfFallbackExecutionEnabled() {
CapturingSynchronizationCallback callback = new CapturingSynchronizationCallback();
PayloadApplicationEvent<Object> event = new PayloadApplicationEvent<>(this, new Object());

AtomicReference<Object> consumedPayload = new AtomicReference<>();
TransactionalApplicationListener<PayloadApplicationEvent<Object>> adapter =
TransactionalApplicationListener.forPayload(TransactionPhase.AFTER_COMMIT, true, consumedPayload::set);
adapter.addCallback(callback);
adapter.onApplicationEvent(event);

assertThat(consumedPayload.get()).isSameAs(event.getPayload());
assertThat(callback.preEvent).isNull();
assertThat(callback.postEvent).isNull();
assertThat(callback.ex).isNull();
assertThat(adapter.getTransactionPhase()).isEqualTo(TransactionPhase.AFTER_COMMIT);
assertThat(adapter.getListenerId()).isEmpty();
}


private static void runInTransaction(Runnable runnable) {
TransactionSynchronizationManager.setActualTransactionActive(true);
Expand Down