forked from apache/pekko-connectors
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathReferenceTest.java
204 lines (169 loc) · 6.67 KB
/
ReferenceTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/
/*
* Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
*/
/*
* Start package with 'docs' prefix when testing APIs as a user.
* This prevents any visibility issues that may be hidden.
*/
package docs.javadsl;
import org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.stream.connectors.reference.*;
import org.apache.pekko.stream.connectors.reference.javadsl.Reference;
import org.apache.pekko.stream.connectors.testkit.javadsl.LogCapturingJunit4;
import org.apache.pekko.stream.javadsl.Flow;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.testkit.javadsl.TestKit;
import org.apache.pekko.util.ByteString;
import org.junit.*;
import java.util.*;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/** Append "Test" to every Java test suite. */
public class ReferenceTest {
@Rule public final LogCapturingJunit4 logCapturing = new LogCapturingJunit4();
static ActorSystem system;
static final String clientId = "test-client-id";
/** Called before test suite. */
@BeforeClass
public static void setUpBeforeClass() {
system = ActorSystem.create("ReferenceTest");
}
/** Called before every test. */
@Before
public void setUp() {}
@Test
public void compileSettings() {
final Authentication.Provided providedAuth =
Authentication.createProvided().withVerifierPredicate(c -> true);
final Authentication.None noAuth = Authentication.createNone();
final SourceSettings settings = SourceSettings.create(clientId);
settings.withAuthentication(providedAuth);
settings.withAuthentication(noAuth);
}
@Test
public void compileSource() {
// #source
final SourceSettings settings = SourceSettings.create(clientId);
final Source<ReferenceReadResult, CompletionStage<Done>> source = Reference.source(settings);
// #source
}
@Test
public void compileFlow() {
// #flow
final Flow<ReferenceWriteMessage, ReferenceWriteResult, NotUsed> flow = Reference.flow();
// #flow
final Executor ex = Executors.newCachedThreadPool();
final Flow<ReferenceWriteMessage, ReferenceWriteResult, NotUsed> flow2 =
Reference.flowAsyncMapped(ex);
}
@Test
public void runSource() throws Exception {
final Source<ReferenceReadResult, CompletionStage<Done>> source =
Reference.source(SourceSettings.create(clientId));
final CompletionStage<ReferenceReadResult> stage = source.runWith(Sink.head(), system);
final ReferenceReadResult msg = stage.toCompletableFuture().get(5, TimeUnit.SECONDS);
Assert.assertEquals(Collections.singletonList(ByteString.fromString("one")), msg.getData());
final OptionalInt expected = OptionalInt.of(100);
Assert.assertEquals(expected, msg.getBytesRead());
Assert.assertEquals(Optional.empty(), msg.getBytesReadFailure());
}
@Test
public void runFlow() throws Exception {
final Flow<ReferenceWriteMessage, ReferenceWriteResult, NotUsed> flow = Reference.flow();
Map<String, Long> metrics =
new HashMap<String, Long>() {
{
put("rps", 20L);
put("rpm", Long.valueOf(30L));
}
};
final Source<ReferenceWriteMessage, NotUsed> source =
Source.from(
Arrays.asList(
ReferenceWriteMessage.create()
.withData(Collections.singletonList(ByteString.fromString("one")))
.withMetrics(metrics),
ReferenceWriteMessage.create()
.withData(
Arrays.asList(
ByteString.fromString("two"),
ByteString.fromString("three"),
ByteString.fromString("four")))));
final CompletionStage<List<ReferenceWriteResult>> stage =
source.via(flow).runWith(Sink.seq(), system);
final List<ReferenceWriteResult> result = stage.toCompletableFuture().get(5, TimeUnit.SECONDS);
final List<ByteString> bytes =
result.stream()
.flatMap(m -> m.getMessage().getData().stream())
.collect(Collectors.toList());
Assert.assertEquals(
Arrays.asList(
ByteString.fromString("one"),
ByteString.fromString("two"),
ByteString.fromString("three"),
ByteString.fromString("four")),
bytes);
final long actual = result.stream().findFirst().get().getMetrics().get("total");
Assert.assertEquals(50L, actual);
}
@Test
public void resolveResourceFromApplicationConfig() throws Exception {
final List<ReferenceWriteResult> result =
Source.single(
ReferenceWriteMessage.create()
.withData(Collections.singletonList(ByteString.fromString("one"))))
.via(Reference.flowWithResource())
.runWith(Sink.seq(), system)
.toCompletableFuture()
.get(5, TimeUnit.SECONDS);
Assert.assertEquals(
Collections.singletonList("one default msg"),
result.stream()
.flatMap(m -> m.getMessage().getData().stream())
.map(ByteString::utf8String)
.collect(Collectors.toList()));
}
@Test
public void useResourceFromAttributes() throws Exception {
final List<ReferenceWriteResult> result =
Source.single(
ReferenceWriteMessage.create()
.withData(Collections.singletonList(ByteString.fromString("one"))))
.via(
Reference.flowWithResource()
.withAttributes(
ReferenceAttributes.resource(
Resource.create(ResourceSettings.create("attributes msg")))))
.runWith(Sink.seq(), system)
.toCompletableFuture()
.get(5, TimeUnit.SECONDS);
Assert.assertEquals(
Collections.singletonList("one attributes msg"),
result.stream()
.flatMap(m -> m.getMessage().getData().stream())
.map(ByteString::utf8String)
.collect(Collectors.toList()));
}
/** Called after every test. */
@After
public void tearDown() {}
/** Called after test suite. */
@AfterClass
public static void tearDownAfterClass() {
TestKit.shutdownActorSystem(system);
}
}