Skip to content

Commit

Permalink
Merge pull request #19699 from njr-11/18669-transaction-context-for-M…
Browse files Browse the repository at this point in the history
…anagedExecutorDefinition

transaction context for ContextServiceDefinition
  • Loading branch information
njr-11 authored Jan 1, 2022
2 parents ff246eb + 6fa9c65 commit ff672d6
Show file tree
Hide file tree
Showing 11 changed files with 250 additions and 68 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2017,2021 IBM Corporation and others.
* Copyright (c) 2017,2022 IBM Corporation and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
Expand Down Expand Up @@ -27,7 +27,12 @@ public interface WSManagedExecutorService {
* or creates new thread context as determined by the execution properties.
* Do not expect the captured context to be serializable.</p>
*
* @param props execution properties. Custom property keys must not begin with "javax.enterprise.concurrent."
* @param props execution properties. Custom property keys must not begin with
* "javax.enterprise.concurrent." or "jakarta.enterprise.concurrent.".
* Null indicates to use execution properties that are consistent with
* with the managed executor's ContextServiceDefinition, or lacking a
* ContextServiceDefinition use execution properties that suspend the
* transaction on the thread of execution.
* @return captured thread context.
*/
ThreadContextDescriptor captureThreadContext(Map<String, String> props);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2021 IBM Corporation and others.
* Copyright (c) 2021,2022 IBM Corporation and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
Expand Down Expand Up @@ -39,7 +39,7 @@ public class AsyncMethod<I, T> extends ManagedCompletableFuture<T> {
* that is returned to the application, but it has the choice of
* returning a different stage.
*/
private final BiFunction<I, CompletableFuture<T>, CompletionStage<T>> action;
private final BiFunction<I, CompletableFuture<T>, CompletionStage<T>> asyncMethodImpl;

/**
* Thread context that is captured when the asynchronous method is requested,
Expand Down Expand Up @@ -78,10 +78,8 @@ public AsyncMethod(BiFunction<I, CompletableFuture<T>, CompletionStage<T>> invok
if (JAVA8)
throw new UnsupportedOperationException();

rejectManagedTask(invoker);

this.action = invoker;
this.contextDescriptor = ((WSManagedExecutorService) executor).captureThreadContext(XPROPS_SUSPEND_TRAN);
this.asyncMethodImpl = invoker;
this.contextDescriptor = ((WSManagedExecutorService) executor).captureThreadContext(null);
this.invocation = invocation;

((Executor) futureRef).execute(this::runIfNotStarted);
Expand Down Expand Up @@ -120,22 +118,22 @@ public T join() {
@FFDCIgnore({ CompletionException.class, Error.class, RuntimeException.class })
private void runIfNotStarted() {
if (!isDone() && started.compareAndSet(false, true)) {
CompletionStage<T> asyncMethodResultStage = null;
Throwable failure = null;
ArrayList<ThreadContext> contextApplied = null;
try {
if (contextDescriptor != null)
contextApplied = contextDescriptor.taskStarting();
asyncMethodResultStage = action.apply(invocation, this);

CompletionStage<T> asyncMethodResultStage = asyncMethodImpl.apply(invocation, this);

// The asynchronous method implementation can return a different stage or null if it wants to
if (asyncMethodResultStage != this)
if (asyncMethodResultStage == null) {
complete(null);
} else if (asyncMethodResultStage instanceof ManagedCompletableFuture) {
// bypass thread context capture & propagation because it is unnecessary here
((ManagedCompletableFuture<T>) asyncMethodResultStage).super_whenComplete(this::complete);
} else {
// TODO inefficient if a ManagedCompletableFuture. Instead do:
//} else if (asyncMethodResultStage instanceof ManagedCompletableFuture) {
// ((ManagedCompletableFuture) asyncMethodResultStage).super_whenComplete(this::complete);
asyncMethodResultStage.whenComplete(this::complete);
}
} catch (CompletionException x) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2012, 2021 IBM Corporation and others.
* Copyright (c) 2012, 2022 IBM Corporation and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
Expand Down Expand Up @@ -136,6 +136,13 @@ public class ContextServiceImpl implements ContextService, //
*/
private ServiceReference<JavaEEVersion> eeVersionRef;

/**
* Execution properties.
* If ContextServiceDefinition is used, the execution properties are populated upon activate
* to control which context types are cleared vs left unchanged. Otherwise, it remains empty.
*/
Map<String, String> execProps = Collections.emptyMap();

/**
* Hash code for this instance.
*/
Expand Down Expand Up @@ -242,6 +249,15 @@ protected void activate(ComponentContext context) {
if (contextSvcName == null)
contextSvcName = (String) props.get(CONFIG_ID);

if (!"file".equals(props.get("config.source"))) {
// execution properties for ContextServiceDefinition
execProps = new TreeMap<String, String>();
execProps.put(WSContextService.DEFAULT_CONTEXT, WSContextService.UNCONFIGURED_CONTEXT_TYPES);
String contextToSkip = (String) props.get("context.unchanged");
if (contextToSkip != null)
execProps.put(WSContextService.SKIP_CONTEXT_PROVIDERS, contextToSkip);
}

lock.writeLock().lock();
try {
componentContext = context;
Expand Down Expand Up @@ -362,7 +378,7 @@ public <R> Callable<R> contextualCallable(Callable<R> callable) {
throw new IllegalArgumentException(ContextualCallable.class.getSimpleName());

@SuppressWarnings("unchecked")
ThreadContextDescriptor contextDescriptor = captureThreadContext(Collections.emptyMap());
ThreadContextDescriptor contextDescriptor = captureThreadContext(execProps);
return new ContextualCallable<R>(contextDescriptor, callable);
}

Expand All @@ -372,7 +388,7 @@ public <T, U> BiConsumer<T, U> contextualConsumer(BiConsumer<T, U> consumer) {
throw new IllegalArgumentException(ContextualBiConsumer.class.getSimpleName());

@SuppressWarnings("unchecked")
ThreadContextDescriptor contextDescriptor = captureThreadContext(Collections.emptyMap());
ThreadContextDescriptor contextDescriptor = captureThreadContext(execProps);
return new ContextualBiConsumer<T, U>(contextDescriptor, consumer);
}

Expand All @@ -382,7 +398,7 @@ public <T> Consumer<T> contextualConsumer(Consumer<T> consumer) {
throw new IllegalArgumentException(ContextualConsumer.class.getSimpleName());

@SuppressWarnings("unchecked")
ThreadContextDescriptor contextDescriptor = captureThreadContext(Collections.emptyMap());
ThreadContextDescriptor contextDescriptor = captureThreadContext(execProps);
return new ContextualConsumer<T>(contextDescriptor, consumer);
}

Expand All @@ -392,7 +408,7 @@ public <T, U, R> BiFunction<T, U, R> contextualFunction(BiFunction<T, U, R> func
throw new IllegalArgumentException(ContextualBiFunction.class.getSimpleName());

@SuppressWarnings("unchecked")
ThreadContextDescriptor contextDescriptor = captureThreadContext(Collections.emptyMap());
ThreadContextDescriptor contextDescriptor = captureThreadContext(execProps);
return new ContextualBiFunction<T, U, R>(contextDescriptor, function);
}

Expand All @@ -402,7 +418,7 @@ public <T, R> Function<T, R> contextualFunction(Function<T, R> function) {
throw new IllegalArgumentException(ContextualFunction.class.getSimpleName());

@SuppressWarnings("unchecked")
ThreadContextDescriptor contextDescriptor = captureThreadContext(Collections.emptyMap());
ThreadContextDescriptor contextDescriptor = captureThreadContext(execProps);
return new ContextualFunction<T, R>(contextDescriptor, function);
}

Expand All @@ -412,7 +428,7 @@ public Runnable contextualRunnable(Runnable runnable) {
throw new IllegalArgumentException(ContextualRunnable.class.getSimpleName());

@SuppressWarnings("unchecked")
ThreadContextDescriptor contextDescriptor = captureThreadContext(Collections.emptyMap());
ThreadContextDescriptor contextDescriptor = captureThreadContext(execProps);
return new ContextualRunnable(contextDescriptor, runnable);
}

Expand All @@ -422,7 +438,7 @@ public <R> Supplier<R> contextualSupplier(Supplier<R> supplier) {
throw new IllegalArgumentException(ContextualSupplier.class.getSimpleName());

@SuppressWarnings("unchecked")
ThreadContextDescriptor contextDescriptor = captureThreadContext(Collections.emptyMap());
ThreadContextDescriptor contextDescriptor = captureThreadContext(execProps);
return new ContextualSupplier<R>(contextDescriptor, supplier);
}

Expand Down Expand Up @@ -556,7 +572,7 @@ public Object createResource(ResourceInfo ref) throws Exception {
@Override
public Executor currentContextExecutor() {
@SuppressWarnings("unchecked")
ThreadContextDescriptor contextDescriptor = captureThreadContext(Collections.emptyMap());
ThreadContextDescriptor contextDescriptor = captureThreadContext(execProps);
return new ContextualExecutor(contextDescriptor);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2017, 2021 IBM Corporation and others.
* Copyright (c) 2017, 2022 IBM Corporation and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
Expand Down Expand Up @@ -118,15 +118,6 @@ public class ManagedCompletableFuture<T> extends CompletableFuture<T> {
}
}

/**
* Execution property that indicates a task should run with any previous transaction suspended.
*/
static final Map<String, String> XPROPS_SUSPEND_TRAN = new TreeMap<String, String>();
static {
XPROPS_SUSPEND_TRAN.put("jakarta.enterprise.concurrent.TRANSACTION", "SUSPEND");
XPROPS_SUSPEND_TRAN.put("javax.enterprise.concurrent.TRANSACTION", "SUSPEND");
}

/**
* Privileged action that obtains the Liberty non-deferrable ScheduledExecutorService.
*/
Expand Down Expand Up @@ -466,7 +457,7 @@ public static CompletableFuture<Void> runAsync(Runnable action, Executor executo
contextDescriptor = r.getContextDescriptor();
action = r.getAction();
} else if (executor instanceof WSManagedExecutorService) {
contextDescriptor = ((WSManagedExecutorService) executor).captureThreadContext(XPROPS_SUSPEND_TRAN);
contextDescriptor = ((WSManagedExecutorService) executor).captureThreadContext(null);
} else {
contextDescriptor = null;
}
Expand Down Expand Up @@ -515,7 +506,7 @@ public static <U> CompletableFuture<U> supplyAsync(Supplier<U> action, Executor
contextDescriptor = s.getContextDescriptor();
action = s.getAction();
} else if (executor instanceof WSManagedExecutorService) {
contextDescriptor = ((WSManagedExecutorService) executor).captureThreadContext(XPROPS_SUSPEND_TRAN);
contextDescriptor = ((WSManagedExecutorService) executor).captureThreadContext(null);
} else {
contextDescriptor = null;
}
Expand Down Expand Up @@ -697,7 +688,7 @@ private ThreadContextDescriptor captureThreadContext(Executor executor) {
if (managedExecutor == null)
return null;

return managedExecutor.captureThreadContext(XPROPS_SUSPEND_TRAN);
return managedExecutor.captureThreadContext(null);
}

/**
Expand Down Expand Up @@ -1410,6 +1401,17 @@ final boolean super_completeExceptionally(Throwable x) {
return super.completeExceptionally(x);
}

/**
* Invokes whenComplete on the superclass, bypassing thread context capture and
* propagation.
*/
final void super_whenComplete(BiConsumer<? super T, ? super Throwable> action) {
if (JAVA8)
throw new UnsupportedOperationException();
else
super.whenComplete(action);
}

/**
* Convenience method to validate that an executor supports running asynchronously
* and to wrap the executor, if an ExecutorService, with FutureRefExecutor.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2012, 2021 IBM Corporation and others.
* Copyright (c) 2012, 2022 IBM Corporation and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
Expand Down Expand Up @@ -117,6 +117,15 @@ public Void run() {
*/
private static final Map<String, String> JAVAX_SUSPEND_TRAN = Collections.singletonMap("javax.enterprise.concurrent.TRANSACTION", "SUSPEND");

/**
* Execution properties that specify to suspend the current transaction.
*/
private static final Map<String, String> XPROPS_SUSPEND_TRAN = new TreeMap<String, String>();
static {
XPROPS_SUSPEND_TRAN.putAll(JAKARTA_SUSPEND_TRAN);
XPROPS_SUSPEND_TRAN.putAll(JAVAX_SUSPEND_TRAN);
}

private final boolean allowLifeCycleMethods;

/**
Expand Down Expand Up @@ -302,12 +311,15 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE

@Override
public ThreadContextDescriptor captureThreadContext(Map<String, String> props) {
WSContextService contextSvc;
ContextServiceImpl contextSvc;
if (mpContextService == null)
contextSvc = contextSvcRef.getServiceWithException();
contextSvc = (ContextServiceImpl) contextSvcRef.getServiceWithException();
else
contextSvc = mpContextService;

if (props == null)
props = contextSvc.execProps.isEmpty() ? XPROPS_SUSPEND_TRAN : contextSvc.execProps;

@SuppressWarnings("unchecked")
ThreadContextDescriptor threadContext = contextSvc.captureThreadContext(props);
return threadContext;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2017,2021 IBM Corporation and others.
* Copyright (c) 2017,2022 IBM Corporation and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
Expand Down Expand Up @@ -81,13 +81,12 @@ public char getCharacter() {
* Obtain the transaction key and status and then commit the active transaction.
*/
@Asynchronous(executor = "java:module/concurrent/txexecutor")
public CompletableFuture<Entry<Object, Integer>> getTransactionInfoAndCommit() {
public CompletableFuture<Entry<Object, Integer>> getTransactionInfoAndCommit(TransactionSynchronizationRegistry tranSyncRegistry,
UserTransaction tx) {
try {
TransactionSynchronizationRegistry tranSyncRegistry = InitialContext.doLookup("java:comp/TransactionSynchronizationRegistry");
Object txKey = tranSyncRegistry.getTransactionKey();
int txStatus = tranSyncRegistry.getTransactionStatus();

UserTransaction tx = InitialContext.doLookup("java:comp/UserTransaction");
tx.commit();

return Asynchronous.Result.complete(new SimpleEntry<Object, Integer>(txKey, txStatus));
Expand Down
Loading

0 comments on commit ff672d6

Please sign in to comment.