Skip to content

Commit

Permalink
Release 1.0.5
Browse files Browse the repository at this point in the history
  • Loading branch information
Liang Mei committed Jul 6, 2018
1 parent 7d406dc commit b9db8c5
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 143 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ googleJavaFormat {
}

group = 'com.uber.cadence'
version = '1.0.4'
version = '1.0.5'

description = """Uber Cadence Java Client"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,12 +291,12 @@ public void processSignal(String signalName, byte[] input, long eventId) {
signalMethod.invoke(workflow, args);
} catch (IllegalAccessException e) {
throw new Error("Failure processing \"" + signalName + "\" at eventID " + eventId, e);
} catch (DataConverterException e){
} catch (DataConverterException e) {
logSerializationException(signalName, eventId, e);
} catch (InvocationTargetException e) {
Throwable targetException = e.getTargetException();
if (targetException instanceof DataConverterException) {
logSerializationException(signalName, eventId, (DataConverterException)targetException);
logSerializationException(signalName, eventId, (DataConverterException) targetException);
} else {
throw new Error(
"Failure processing \"" + signalName + "\" at eventID " + eventId, targetException);
Expand All @@ -305,14 +305,15 @@ public void processSignal(String signalName, byte[] input, long eventId) {
}
}

void logSerializationException(String signalName, Long eventId, DataConverterException exception){
void logSerializationException(
String signalName, Long eventId, DataConverterException exception) {
log.error(
"Failure deserializing signal input for \""
+ signalName
+ "\" at eventID "
+ eventId
+ ". Dropping it.",
exception);
"Failure deserializing signal input for \""
+ signalName
+ "\" at eventID "
+ eventId
+ ". Dropping it.",
exception);
metricsScope.counter(MetricsType.CORRUPTED_SIGNALS_COUNTER).inc(1);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,22 +112,23 @@ public TestWorkflowEnvironmentInternal(TestEnvironmentOptions options) {

@Override
public Worker newWorker(String taskList) {
return newWorker(taskList, x->x);
return newWorker(taskList, x -> x);
}

@Override
public Worker newWorker(String taskList, Function<WorkerOptions.Builder,WorkerOptions.Builder> overrideOptions) {
public Worker newWorker(
String taskList, Function<WorkerOptions.Builder, WorkerOptions.Builder> overrideOptions) {
WorkerOptions.Builder builder =
new WorkerOptions.Builder()
.setInterceptorFactory(testEnvironmentOptions.getInterceptorFactory())
.setMetricsScope(testEnvironmentOptions.getMetricsScope())
.setEnableLoggingInReplay(testEnvironmentOptions.isLoggingEnabledInReplay());
new WorkerOptions.Builder()
.setInterceptorFactory(testEnvironmentOptions.getInterceptorFactory())
.setMetricsScope(testEnvironmentOptions.getMetricsScope())
.setEnableLoggingInReplay(testEnvironmentOptions.isLoggingEnabledInReplay());
if (testEnvironmentOptions.getDataConverter() != null) {
builder.setDataConverter(testEnvironmentOptions.getDataConverter());
}
builder = overrideOptions.apply(builder);
Worker result =
new Worker(service, testEnvironmentOptions.getDomain(), taskList, builder.build());
new Worker(service, testEnvironmentOptions.getDomain(), taskList, builder.build());
workers.add(result);
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ static TestWorkflowEnvironment newInstance(TestEnvironmentOptions options) {
* @param taskList task list to poll.
* @param overrideOptions is used to override the default worker options.
*/
Worker newWorker(String taskList, Function<WorkerOptions.Builder,WorkerOptions.Builder> overrideOptions);
Worker newWorker(
String taskList, Function<WorkerOptions.Builder, WorkerOptions.Builder> overrideOptions);

/** Creates a WorkflowClient that is connected to the in-memory test Cadence service. */
WorkflowClient newWorkflowClient();
Expand Down
71 changes: 38 additions & 33 deletions src/test/java/com/uber/cadence/workflow/MetricsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.time.Duration;
import java.util.Map;
import java.util.function.Function;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
Expand Down Expand Up @@ -116,10 +115,12 @@ public interface ReceiveSignalObjectChildWorkflow {
void close();
}

public static class ReceiveSignalObjectChildWorkflowImpl implements ReceiveSignalObjectChildWorkflow {
public static class ReceiveSignalObjectChildWorkflowImpl
implements ReceiveSignalObjectChildWorkflow {
private String receivedSignal = "Initial State";
// Keep workflow open so that we can send signal
CompletablePromise<Void> promise = Workflow.newPromise();

@Override
public String execute() {
promise.get();
Expand Down Expand Up @@ -147,7 +148,7 @@ public static class SendSignalObjectWorkflowImpl implements SendSignalObjectWork
@Override
public String execute() {
ReceiveSignalObjectChildWorkflow child =
Workflow.newChildWorkflowStub(ReceiveSignalObjectChildWorkflow.class);
Workflow.newChildWorkflowStub(ReceiveSignalObjectChildWorkflow.class);
Promise<String> greeting = Async.function(child::execute);
Signal sig = new Signal();
sig.value = "Hello World";
Expand All @@ -162,15 +163,12 @@ public static class Signal {
public String value;
}

public void setUp(com.uber.m3.util.Duration reportingFrequecy){
public void setUp(com.uber.m3.util.Duration reportingFrequecy) {
reporter = mock(StatsReporter.class);
Scope scope =
new RootScopeBuilder()
.reporter(reporter)
.reportEvery(reportingFrequecy);
Scope scope = new RootScopeBuilder().reporter(reporter).reportEvery(reportingFrequecy);

TestEnvironmentOptions testOptions =
new Builder().setDomain(WorkflowTest.DOMAIN).setMetricsScope(scope).build();
new Builder().setDomain(WorkflowTest.DOMAIN).setMetricsScope(scope).build();
testEnvironment = TestWorkflowEnvironment.newInstance(testOptions);
}

Expand Down Expand Up @@ -222,45 +220,52 @@ public void testWorkflowMetrics() throws InterruptedException {
public void testCorruptedSignalMetrics() throws InterruptedException {
setUp(com.uber.m3.util.Duration.ofMillis(300));

Worker worker = testEnvironment.newWorker(taskList, builder ->
builder.setInterceptorFactory(new CorruptedSignalWorkflowInterceptorFactory()));
Worker worker =
testEnvironment.newWorker(
taskList,
builder ->
builder.setInterceptorFactory(new CorruptedSignalWorkflowInterceptorFactory()));

worker.registerWorkflowImplementationTypes(
SendSignalObjectWorkflowImpl.class, ReceiveSignalObjectChildWorkflowImpl.class);
SendSignalObjectWorkflowImpl.class, ReceiveSignalObjectChildWorkflowImpl.class);
worker.start();

WorkflowOptions options =
new WorkflowOptions.Builder()
.setExecutionStartToCloseTimeout(Duration.ofSeconds(1000))
.setTaskList(taskList)
.build();
new WorkflowOptions.Builder()
.setExecutionStartToCloseTimeout(Duration.ofSeconds(1000))
.setTaskList(taskList)
.build();

WorkflowClient workflowClient = testEnvironment.newWorkflowClient();
SendSignalObjectWorkflow workflow = workflowClient.newWorkflowStub(SendSignalObjectWorkflow.class, options);
SendSignalObjectWorkflow workflow =
workflowClient.newWorkflowStub(SendSignalObjectWorkflow.class, options);
workflow.execute();

//Wait for reporter
// Wait for reporter
Thread.sleep(600);

Map<String, String> tags =
new ImmutableMap.Builder<String, String>(2)
.put(MetricsTag.DOMAIN, WorkflowTest.DOMAIN)
.put(MetricsTag.TASK_LIST, taskList)
.build();
new ImmutableMap.Builder<String, String>(2)
.put(MetricsTag.DOMAIN, WorkflowTest.DOMAIN)
.put(MetricsTag.TASK_LIST, taskList)
.build();
verify(reporter, times(1)).reportCounter(MetricsType.CORRUPTED_SIGNALS_COUNTER, tags, 2);
}

private static class CorruptedSignalWorkflowInterceptorFactory
implements Function<WorkflowInterceptor, WorkflowInterceptor> {

@Override
public WorkflowInterceptor apply(WorkflowInterceptor next) {
return new SignalWorkflowInterceptor(args -> {
if(args != null && args.length > 0){
return new Object [] {"Corrupted Signal"};
}
return args;
}, sig->sig, next);
}
implements Function<WorkflowInterceptor, WorkflowInterceptor> {

@Override
public WorkflowInterceptor apply(WorkflowInterceptor next) {
return new SignalWorkflowInterceptor(
args -> {
if (args != null && args.length > 0) {
return new Object[] {"Corrupted Signal"};
}
return args;
},
sig -> sig,
next);
}
}
}
Loading

0 comments on commit b9db8c5

Please sign in to comment.