Skip to content

Commit f91e8b4

Browse files
committed
Add a failing test for #476
1 parent c2b6942 commit f91e8b4

File tree

2 files changed

+164
-0
lines changed

2 files changed

+164
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
package com.fasterxml.jackson.core.json.async;
2+
3+
import java.io.IOException;
4+
import java.util.ArrayList;
5+
import java.util.concurrent.*;
6+
import java.util.concurrent.atomic.AtomicInteger;
7+
import java.util.concurrent.atomic.AtomicReference;
8+
9+
import com.fasterxml.jackson.core.*;
10+
import com.fasterxml.jackson.core.async.AsyncTestBase;
11+
import com.fasterxml.jackson.core.testsupport.AsyncReaderWrapper;
12+
13+
public class AsyncConcurrencyTest extends AsyncTestBase
14+
{
15+
private final static JsonFactory JSON_F = new JsonFactory();
16+
static {
17+
// To make it pass, try:
18+
// JSON_F.disable(JsonFactory.Feature.USE_THREAD_LOCAL_FOR_BUFFER_RECYCLING);
19+
}
20+
21+
private final static String TEXT1 = "Short";
22+
private final static String TEXT2 = "Some longer text";
23+
private final static String TEXT3 = "and yet more";
24+
private final static String TEXT4 = "... Longest yet although not superbly long still (see 'apos'?)";
25+
26+
private final static byte[] JSON_DOC = utf8Bytes(String.format(
27+
"[\"%s\", \"%s\",\n\"%s\",\"%s\" ]", TEXT1, TEXT2, TEXT3, TEXT4));
28+
29+
private class WorkUnit {
30+
31+
private int stage = 0;
32+
33+
private AsyncReaderWrapper parser;
34+
35+
private boolean errored = false;
36+
37+
public boolean process() throws Exception {
38+
// short-cut through if this instance has already failed
39+
if (errored) {
40+
return false;
41+
}
42+
try {
43+
switch (stage++) {
44+
case 0:
45+
parser = asyncForBytes(JSON_F, 100, JSON_DOC, 0);
46+
break;
47+
case 1:
48+
_assert(JsonToken.START_ARRAY);
49+
break;
50+
case 2:
51+
_assert(TEXT1);
52+
break;
53+
case 3:
54+
_assert(TEXT2);
55+
break;
56+
case 4:
57+
_assert(TEXT3);
58+
break;
59+
case 5:
60+
_assert(TEXT4);
61+
break;
62+
case 6:
63+
_assert(JsonToken.END_ARRAY);
64+
break;
65+
default:
66+
/*
67+
if (parser.nextToken() != null) {
68+
throw new IOException("Unexpected token at "+stage+"; expected `null`, got "+parser.currentToken());
69+
}
70+
*/
71+
parser.close();
72+
parser = null;
73+
stage = 0;
74+
return true;
75+
}
76+
} catch (Exception e) {
77+
errored = true;
78+
throw e;
79+
}
80+
return false;
81+
}
82+
83+
private void _assert(String exp) throws IOException {
84+
_assert(JsonToken.VALUE_STRING);
85+
String str = parser.currentText();
86+
if (!exp.equals(str)) {
87+
throw new IOException("Unexpected VALUE_STRING: expected '"+exp+"', got '"+str+"'");
88+
}
89+
}
90+
91+
private void _assert(JsonToken exp) throws IOException {
92+
JsonToken t = parser.nextToken();
93+
if (t != exp) {
94+
throw new IOException("Unexpected token at "+stage+"; expected "+exp+", got "+t);
95+
}
96+
}
97+
}
98+
99+
public void testConcurrentAsync() throws Exception
100+
{
101+
for (int i = 0; i < 50; ++i) {
102+
_testConcurrentAsyncOnce(i, 50);
103+
}
104+
}
105+
106+
private void _testConcurrentAsyncOnce(final int round, final int maxRounds) throws Exception
107+
{
108+
final int numThreads = 3;
109+
final ExecutorService executor = Executors.newFixedThreadPool(numThreads);
110+
final AtomicInteger errorCount = new AtomicInteger(0);
111+
final AtomicInteger completedCount = new AtomicInteger(0);
112+
final AtomicReference<String> errorRef = new AtomicReference<String>();
113+
114+
// First, add a few shared work units
115+
final ArrayBlockingQueue<WorkUnit> q = new ArrayBlockingQueue<WorkUnit>(20);
116+
for (int i = 0; i < 7; ++i) {
117+
q.add(new WorkUnit());
118+
}
119+
120+
// then invoke swarm of workers on it...
121+
122+
final int REP_COUNT = 99000;
123+
ArrayList<Future<?>> futures = new ArrayList<Future<?>>();
124+
for (int i = 0; i < REP_COUNT; i++) {
125+
Callable<Void> c = new Callable<Void>() {
126+
@Override
127+
public Void call() throws Exception {
128+
WorkUnit w = q.take();
129+
try {
130+
if (w.process()) {
131+
completedCount.incrementAndGet();
132+
}
133+
} catch (Throwable t) {
134+
if (errorCount.getAndIncrement() == 0) {
135+
errorRef.set(t.toString());
136+
}
137+
} finally {
138+
q.add(w);
139+
}
140+
return null;
141+
}
142+
143+
};
144+
futures.add(executor.submit(c));
145+
}
146+
executor.shutdown();
147+
executor.awaitTermination(5, TimeUnit.SECONDS);
148+
int count = errorCount.get();
149+
150+
if (count > 0) {
151+
fail("Expected no problems (round "+round+"/"+maxRounds
152+
+"); got "+count+", first with: "+errorRef.get());
153+
}
154+
final int EXP_COMPL = ((REP_COUNT + 7) / 8);
155+
int compl = completedCount.get();
156+
157+
if (compl < (EXP_COMPL-10) || compl > EXP_COMPL) {
158+
fail("Expected about "+EXP_COMPL+" completed rounds, got: "+compl);
159+
}
160+
}
161+
}

src/test/java/com/fasterxml/jackson/core/json/async/AsyncTokenFilterTest.java

+3
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ public class AsyncTokenFilterTest extends AsyncTestBase
1515
private final static String INPUT_STRING = aposToQuotes("{'a': 1, 'b': [2, {'c': 3}]}");
1616
private final static byte[] INPUT_BYTES = utf8Bytes(INPUT_STRING);
1717
private final static TokenFilter TOKEN_FILTER = new TokenFilter() {
18+
@Override
1819
public TokenFilter includeProperty(String name) {
1920
return name == "a" ? TokenFilter.INCLUDE_ALL : null;
2021
}
@@ -45,6 +46,7 @@ public void testFilteredNonBlockingParserAllContent() throws IOException
4546
}
4647

4748
filteredParser.close();
49+
nonBlockingParser.close();
4850
}
4951

5052
public void testSkipChildrenFailOnSplit() throws IOException
@@ -63,5 +65,6 @@ public void testSkipChildrenFailOnSplit() throws IOException
6365
verifyException(e, "skipChildren()");
6466
}
6567
nbParser.close();
68+
filteredParser.close();
6669
}
6770
}

0 commit comments

Comments
 (0)