|
| 1 | +package com.fasterxml.jackson.dataformat.smile.async; |
| 2 | + |
| 3 | +import java.io.IOException; |
| 4 | +import java.util.*; |
| 5 | +import java.util.concurrent.CompletableFuture; |
| 6 | +import java.util.concurrent.ExecutorService; |
| 7 | +import java.util.concurrent.Executors; |
| 8 | + |
| 9 | +import com.fasterxml.jackson.core.*; |
| 10 | +import com.fasterxml.jackson.core.async.ByteArrayFeeder; |
| 11 | +import com.fasterxml.jackson.core.type.TypeReference; |
| 12 | +import com.fasterxml.jackson.databind.*; |
| 13 | +import com.fasterxml.jackson.databind.util.TokenBuffer; |
| 14 | +import com.fasterxml.jackson.dataformat.smile.SmileFactory; |
| 15 | + |
| 16 | +// for [dataformats-binary#384] |
| 17 | +public class ConcurrentAsyncTest extends AsyncTestBase |
| 18 | +{ |
| 19 | + public void testConcurrentHandling() throws Exception |
| 20 | + { |
| 21 | + Map<String, Map<String, String>> tags = new HashMap<>(); |
| 22 | + for (int i = 0; i < 10; i++) { |
| 23 | + Map<String, String> value = new HashMap<>(); |
| 24 | + for (int j = 0; j < 10; j++) { |
| 25 | + value.put("key_" + j, "val" + j); |
| 26 | + } |
| 27 | + tags.put("elt_" + i, value); |
| 28 | + } |
| 29 | + |
| 30 | + JsonFactory jsonFactory = new SmileFactory(); |
| 31 | + ObjectMapper objectMapper = new ObjectMapper(); |
| 32 | + ObjectWriter objectWriter = objectMapper.writer().with(jsonFactory); |
| 33 | + jsonFactory.setCodec(objectMapper); |
| 34 | + byte[] json = objectWriter.writeValueAsBytes(tags); |
| 35 | + TypeReference<Map<String, Map<String, String>>> typeReference = new TypeReference<Map<String, Map<String, String>>>() { |
| 36 | + }; |
| 37 | + |
| 38 | + ExecutorService executorService = Executors.newFixedThreadPool(10); |
| 39 | + List<CompletableFuture<?>> futures = new ArrayList<>(); |
| 40 | + |
| 41 | + // Exact count varies but this seems to be enough to produce the problem |
| 42 | + int count = 10_000; |
| 43 | + for (int i = 0; i < count; i++) { |
| 44 | + JsonParser parser = jsonFactory.createNonBlockingByteArrayParser(); |
| 45 | + ByteArrayFeeder inputFeeder = (ByteArrayFeeder) parser.getNonBlockingInputFeeder(); |
| 46 | + futures.add(CompletableFuture.supplyAsync(() -> { |
| 47 | + try { |
| 48 | + inputFeeder.feedInput(json, 0, json.length); |
| 49 | + @SuppressWarnings("resource") |
| 50 | + TokenBuffer tokenBuffer = new TokenBuffer(parser); |
| 51 | + while (true) { |
| 52 | + JsonToken token = parser.nextToken(); |
| 53 | + if (token == JsonToken.NOT_AVAILABLE || token == null) { |
| 54 | + break; |
| 55 | + } |
| 56 | + |
| 57 | + tokenBuffer.copyCurrentEvent(parser); |
| 58 | + } |
| 59 | + return tokenBuffer.asParser(jsonFactory.getCodec()).readValueAs(typeReference); |
| 60 | + } catch (IOException e) { |
| 61 | + throw new RuntimeException(e); |
| 62 | + } finally { |
| 63 | + try { |
| 64 | + inputFeeder.endOfInput(); |
| 65 | + parser.close(); |
| 66 | + } catch (IOException e) { |
| 67 | + throw new RuntimeException(e); |
| 68 | + } |
| 69 | + } |
| 70 | + }, executorService)); |
| 71 | + } |
| 72 | + |
| 73 | + CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).get(); |
| 74 | + } |
| 75 | +} |
0 commit comments