Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TTL在ForkJoinPool线程池中失效 #343

Closed
liudaolunboluo opened this issue Dec 30, 2021 · 7 comments
Closed

TTL在ForkJoinPool线程池中失效 #343

liudaolunboluo opened this issue Dec 30, 2021 · 7 comments
Assignees
Labels
❓question Further information is requested

Comments

@liudaolunboluo
Copy link

liudaolunboluo commented Dec 30, 2021

Executor executor = TtlExecutors.getTtlExecutor(new TracedExecutor(
						"TrainingResultPopupProcessor-" + entry.getKey()
							.name(), ForkJoinPool.commonPool()));

...

CompletableFuture.supplyAsync(() -> tuple.getKey()
						.stream()
						.parallel()
						.map(assembler -> assembler.process(param)), executor)
						.get();

以上代码里,在子线程中TTL 只有一个子线程能在ttl中拿到值,其他的为null。

@oldratlee
Copy link
Member

oldratlee commented Dec 30, 2021

@liudaolunboluo 给的代码片段有不少基本问题:

  • .map(assembler -> assembler.process(param)), executor)
    括号不匹配,不可编译
  • TracedExecutor,这是你的业务应用里的类。
    别人不可运行,不可分析(不知道这个类的逻辑会不会有影响、带来问题)。
  • 缩进不对
  • ……

@liudaolunboluo 给一个可以说明/复现问题的、极简可运行的Demo工程 ❤️;推荐:

有一个可以说明/复现问题的、极简可运行的Demo工程,也可以排除可能的使用上的问题。

@oldratlee oldratlee added ❓question Further information is requested 😖 no runnable reproducible demo 😵 please provide a simple runnable demo that reproduce the problem labels Dec 30, 2021
@liudaolunboluo
Copy link
Author

liudaolunboluo commented Dec 31, 2021

@liudaolunboluo 给的代码片段有不少基本问题:

  • .map(assembler -> assembler.process(param)), executor)
    括号不匹配,不可编译
  • TracedExecutor,这是你的业务应用里的类。
    别人不可运行,不可分析(不知道这个类的逻辑会不会有影响、带来问题)。
  • 缩进不对
  • ……

@liudaolunboluo 给一个可以说明/复现问题的、极简可运行的Demo工程 ❤️;推荐:

有一个可以说明/复现问题的、极简可运行的Demo工程,也可以排除可能的使用上的问题。

抱歉,前面的说的有点乱。以下代码:

  TransmittableThreadLocal<String> CONTEXT = new TransmittableThreadLocal<>();
            CONTEXT.set("成功啦");
            List<Integer> list = Lists.newArrayList(1, 2, 3, 4);
            Executor executor = TtlExecutors.getTtlExecutor(Executors.newFixedThreadPool(10));
            CompletableFuture.supplyAsync(() -> list.stream().filter(Objects::nonNull).parallel().map(assembler -> {
                try {
                    System.out.println(CONTEXT.get());
                    return CONTEXT.get();
                } catch (Exception e) {
                    return "1";
                }
            }).filter(Objects::nonNull).findFirst().orElse(null), executor).get();

好像第一次执行都可以打印出成功啦,但是后面重复执行的话都只有一个线程能打印出成功啦

@oldratlee
Copy link
Member

oldratlee commented Dec 31, 2021

@liudaolunboluo

Parallel StreamStream.parallel()的支持
需要 2.3 使用Java Agent来修饰JDK线程池实现类的使用方式。

文档中只有 2.3 说到了(并行)Stream的支持;

其它使用方式并不支持Parallel Stream


已有的Issue

@oldratlee
Copy link
Member

oldratlee commented Dec 31, 2021

整理了 极简(只依赖JDK/TTL)、可运行、并复现问题 的Demo代码。 @liudaolunboluo

  • 复现问题:第一次运行就能确定性的不正确。
    • 原因是 inheritable的影响;通过 提前扩充好线程池 来解决(避免其影响)。
      这个做法在TTL的单元测试中 在使用的,以保证单元测试能确定性地测试到功能。
    • TransmittableThreadLocalInheritableThreadLocal
  • 代码格式整理与重构,以方便阅读。
    • 缩进
    • 换行
    • 抽取局部Lambda变量supplier
      长链方法调用中的参数 又是个 长链调用,这样很不方便代码阅读
    • ……
import com.alibaba.ttl.TransmittableThreadLocal;
import com.alibaba.ttl.threadpool.TtlExecutors;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class ParallelStreamUseCode {
    private static final TransmittableThreadLocal<String> CONTEXT = new TransmittableThreadLocal<>();

    private static final int THREAD_COUNT = Runtime.getRuntime().availableProcessors();

    public static void main(String[] args) throws Exception {
        //////////////////////////////////////////////////////////////////
        // Prepare executor service
        //////////////////////////////////////////////////////////////////

        final ExecutorService executor = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(THREAD_COUNT));
        expandThreadPool(executor);

        // Caution:
        // Parallel Stream use ForkJoinPool.commonPool underlay
        expandThreadPool(ForkJoinPool.commonPool());


        //////////////////////////////////////////////////////////////////
        // Biz logic
        //////////////////////////////////////////////////////////////////

        CONTEXT.set("value-set-in-main");

        final List<Integer> list = IntStream.range(0, THREAD_COUNT).boxed().collect(Collectors.toList());

        final Supplier<List<String>> supplier = () -> list.stream().parallel()
                .map(num -> "[" + Thread.currentThread().getName() + "] num: " + num + ", context: " + CONTEXT.get())
                .collect(Collectors.toList());

        final List<String> resultList = CompletableFuture.supplyAsync(supplier, executor).get();
        for (String r : resultList) {
            System.out.println(r);
        }


        //////////////////////////////////////////////////////////////////
        // Clean up thread pool
        //////////////////////////////////////////////////////////////////

        executor.shutdown();
        if (!executor.awaitTermination(3, TimeUnit.SECONDS)) {
            throw new IllegalStateException("Fail to shutdown executor service!");
        }
    }

    /**
     * expand thread pool to avoid "inheritable" ability impact
     */
    private static void expandThreadPool(final ExecutorService executor) throws Exception {
        List<Future<?>> futureList = new ArrayList<>();
        for (int i = 0; i < 2 * THREAD_COUNT; i++) {
            Future<?> future = executor.submit(() -> {
                        try {
                            Thread.sleep(10);
                            // System.out.printf("expand thread %s for executor service %s.%n",
                            //        Thread.currentThread().getName(), executor);
                        } catch (InterruptedException e) {
                            throw new IllegalStateException("fail to sleep!");
                        }
                    }
            );
            futureList.add(future);
        }

        for (Future<?> future : futureList) {
            future.get();
        }
    }
}

/*

An output:

[ForkJoinPool.commonPool-worker-19] num: 0, context: null
[ForkJoinPool.commonPool-worker-17] num: 1, context: null
[ForkJoinPool.commonPool-worker-15] num: 2, context: null
[ForkJoinPool.commonPool-worker-7] num: 3, context: null
[ForkJoinPool.commonPool-worker-25] num: 4, context: null
[ForkJoinPool.commonPool-worker-5] num: 5, context: null
[ForkJoinPool.commonPool-worker-23] num: 6, context: null
[ForkJoinPool.commonPool-worker-31] num: 7, context: null
[ForkJoinPool.commonPool-worker-15] num: 8, context: null
[ForkJoinPool.commonPool-worker-25] num: 9, context: null
[pool-1-thread-4] num: 10, context: value-set-in-main
[ForkJoinPool.commonPool-worker-7] num: 11, context: null
[ForkJoinPool.commonPool-worker-21] num: 12, context: null
[ForkJoinPool.commonPool-worker-7] num: 13, context: null
[ForkJoinPool.commonPool-worker-23] num: 14, context: null
[ForkJoinPool.commonPool-worker-29] num: 15, context: null

 */

@oldratlee oldratlee removed the 😖 no runnable reproducible demo 😵 please provide a simple runnable demo that reproduce the problem label Dec 31, 2021
@oldratlee oldratlee self-assigned this Dec 31, 2021
@liudaolunboluo
Copy link
Author

liudaolunboluo commented Dec 31, 2021

整理了 极简(只依赖JDK/TTL)、可运行、并复现问题 的Demo代码。 @liudaolunboluo

  • 复现问题:第一次运行就能确定性的不正确。

    • 原因是 inheritable的影响;通过 提前扩充好线程池 解决(避免其影响)。
      这个做法在TTL的单元测试中 在使用的,以保证单元测试能确定性地测试到功能。
    • TransmittableThreadLocalInheritableThreadLocal
  • 代码格式整理与重构,以方便阅读。

    • 缩进
    • 换行
    • 抽取局部Lambda变量supplier
      长链方法调用中的参数 又是个 长链调用,这样很不方便代码阅读
    • ……
import com.alibaba.ttl.TransmittableThreadLocal;
import com.alibaba.ttl.threadpool.TtlExecutors;

......

感谢大哥,这个是原来项目的老代码,这个代码目前线上运行没有问题,然后最近突然在测试环节报错了,就很疑惑。

我现在跟代码发现的是parallel会用到ForkJoinPool.commonPool里的线程即使我显式的指定了线程池,ForkJoinPool.commonPool里的线程貌似不会被TtlRunnable包裹去执行复制threadlocal里的代码,所以导致了没有值;这个不会走TtlRunnable里的代码所以要用java agent才能实现。

那为啥ForkJoinPool.commonPool里的线程不会走TtlRunnable里的代码呢?

@oldratlee
Copy link
Member

oldratlee commented Dec 31, 2021

那为啥ForkJoinPool.commonPool里的线程不会走TtlRunnable里的代码呢?

这个 Issue 的直接问题不是ForkJoinPool.commonPool,而是Parallel StreamStream.parallel()内部展开生成的任务没有走TtlRunnable里的代码。

简单地说,整个过程都没有写 Wrap 成TtlRunnable的逻辑。


并发Stream及其使用的执行器 都没有 设置/wrapAPI入口,所以简单TTL API使用方式 没有办法(相对的是TTL Agent使用方式,修改/增强了ForkJoinPool本身的实现逻辑)。

具体你看看源码吧~ 😄 @liudaolunboluo

@liudaolunboluo
Copy link
Author

liudaolunboluo commented Dec 31, 2021

那为啥ForkJoinPool.commonPool里的线程不会走TtlRunnable里的代码呢?

这个 Issue 的直接问题不是ForkJoinPool.commonPool,而是Parallel StreamStream.parallel()内部展开生成的任务没有走TtlRunnable里的代码。

简单地说,整个过程都没有写 Wrap 成TtlRunnable的逻辑。

并发Stream及其使用执行器 没有 设置/wrapAPI入口, 所以简单TTL API使用方式 没有办法(相对的是TTL Agent使用方式)。

具体你看看代码吧~ 😄 @liudaolunboluo

谢谢大哥,我自己后面再研究一下。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
❓question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants