Skip to main content

Async分析

huhxAbout 5 min

在SpringBoot中,使用异步调用很简单的一种方式就是在该方法上面使用@Async了。现在我们就它的使用,分析下其内部的原理。

使用@Async

只要@EnableAsync就可以使用多线程。使用@Async就可以定义一个线程任务。通过spring给我们提供的ThreadPoolTaskExecutor就可以使用线程池。

默认情况下Spring将要么在上下文中搜索唯一的TaskExecutor的Bean,要么搜索名为taskExecutor的Executor的Bean。如果两者都无法解析,则将使用SimpleAsyncTaskExecutor来处理异步方法调用。

配置Executor

@Configuration
@EnableAsync
public class ThreadPoolTaskConfig {

    @Bean("taskExecutor")
    public ThreadPoolTaskExecutor getAsyncExecutor(){
        return new ThreadPoolTaskExecutor();
    }
}

@EnableAsync这个是必须的,因为它开启了对异步任务的支持。

线程池配置的另外一种写法:直接实现AsyncConfigurer接口,重写getAsyncExecutor方法即可,代码如下

@Configuration
@EnableAsync
public class AppConfig implements AsyncConfigurer {

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 这行是需要的,因为返回的Executor其实不是被Spring管理的Bean,initialize方法不会自动调用。
        // 如果在getAsyncExecutor方法上加上@Bean,那么这一行可以删除。因为一旦Executor成为Bean之后,spring会对其调用initialize方法进行初始化。
        executor.initialize();
        return executor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new CustomizeAsyncUncaughtExceptionHandler();
    }
}









 








通过实现AsyncConfigurer接口来自定义Executor有一个好处:如果其他地方也使用Executor,可以不被在@Async定义的Executor可以影响到。

使用例子

@Async注解来声明一个或多个异步任务,可以加在方法或者类上,加在类上表示这整个类都是使用这个自定义线程池进行操作

@Service
public class UserAsyncService {
    private final static Logger log = LoggerFactory.getLogger(UserAsyncService.class);

    @Async("taskExecutor")
    public void create() throws InterruptedException {
        log.info("--------start------------");
        Thread.sleep(5000);
        log.info("--------end------------");
    }
}

Async的分析

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
    Class<? extends Annotation> annotation() default Annotation.class;
    boolean proxyTargetClass() default false;
    AdviceMode mode() default AdviceMode.PROXY;
    int order() default Ordered.LOWEST_PRECEDENCE;
}



 






我们关注下@Import(AsyncConfigurationSelector.class),由于AsyncConfigurationSelector是继承了ImportSelector,所以会把selectImports方法中的返回注册成 Bean。
具体可参考:https://www.cnblogs.com/better-farther-world2099/articles/16020640.htmlopen in new window

在默认情况下adviceMode为PROXY,所以下面的ProxyAsyncConfiguration会生效。

@Override
@NonNull
public String[] selectImports(AdviceMode adviceMode) {
    return switch (adviceMode) {
        case PROXY -> new String[] {ProxyAsyncConfiguration.class.getName()};
        case ASPECTJ -> new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
    };
}

第9行的AsyncAnnotationBeanPostProcessor,在这里面会定义 point 和 advice。

@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {

	@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
		Assert.state(this.enableAsync != null, "@EnableAsync annotation metadata was not injected");
		AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
		bpp.configure(this.executor, this.exceptionHandler);
		Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
		if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
			bpp.setAsyncAnnotationType(customAsyncAnnotation);
		}
		bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
		bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
		return bpp;
	}
}








 










于是在AsyncAnnotationBeanPostProcessor中继承了BeanFactoryAware,在初始化该 Bean之后会调用setBeanFactory方法。

@Override
public void setBeanFactory(BeanFactory beanFactory) {
    super.setBeanFactory(beanFactory);

    AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
    if (this.asyncAnnotationType != null) {
        advisor.setAsyncAnnotationType(this.asyncAnnotationType);
    }
    advisor.setBeanFactory(beanFactory);
    this.advisor = advisor;
}




 






在第5行的AsyncAnnotationAdvisor中是真正做切面和Aop的逻辑的。

public AsyncAnnotationAdvisor(
        @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

    Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
    asyncAnnotationTypes.add(Async.class);

    ClassLoader classLoader = AsyncAnnotationAdvisor.class.getClassLoader();
    try {
        asyncAnnotationTypes.add((Class<? extends Annotation>)
                ClassUtils.forName("jakarta.ejb.Asynchronous", classLoader));
    }
    catch (ClassNotFoundException ex) {
        // If EJB API not present, simply ignore.
    }
    try {
        asyncAnnotationTypes.add((Class<? extends Annotation>)
                ClassUtils.forName("jakarta.enterprise.concurrent.Asynchronous", classLoader));
    }
    catch (ClassNotFoundException ex) {
        // If Jakarta Concurrent API not present, simply ignore.
    }

    this.advice = buildAdvice(executor, exceptionHandler);
    this.pointcut = buildPointcut(asyncAnnotationTypes);
}

buildPointcut方法中,定义了只要@Async 注解在 class 上或者方法上生效。

buildAdvice方法中,定义了AnnotationAsyncExecutionInterceptor,该类配置executorexceptionHandler,其中我们可以去看下默认的executor的逻辑:

protected Advice buildAdvice(
       @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

   AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
   interceptor.configure(executor, exceptionHandler);
   return interceptor;
}

AnnotationAsyncExecutionInterceptor的configure方法中

public void configure(@Nullable Supplier<Executor> defaultExecutor,
        @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

    this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
    this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new);
}

接下来在getDefaultExecutor的方法中

@Override
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
    Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
    return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}

而其父类的getDefaultExecutor的方法如下:

@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
    if (beanFactory != null) {
        try {
            return beanFactory.getBean(TaskExecutor.class); // 尝试找TaskExecutor的Bean
        }
        catch (NoUniqueBeanDefinitionException ex) {
            logger.debug("Could not find unique TaskExecutor bean. " +
                    "Continuing search for an Executor bean named 'taskExecutor'", ex);
            try {
                return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); // 接着尝试找 name 为taskExecutor 的 Executor
            }
            catch (NoSuchBeanDefinitionException ex2) {
                if (logger.isInfoEnabled()) {
                    logger.info("More than one TaskExecutor bean found within the context, and none is named " +
                            "'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +
                            "as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());
                }
            }
        }
        catch (NoSuchBeanDefinitionException ex) {
            logger.debug("Could not find default TaskExecutor bean. " +
                    "Continuing search for an Executor bean named 'taskExecutor'", ex);
            try {
                return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
            }
            catch (NoSuchBeanDefinitionException ex2) {
                logger.info("No task executor bean found for async processing: " +
                        "no bean of type TaskExecutor and no bean named 'taskExecutor' either");
            }
            // Giving up -> either using local default executor or none at all...
        }
    }
    return null;
}

至此,我们过了Spring中是如何查找默认的executor的流程。接下来我们看下核心的逻辑AOP的实现,这里的Aop实现原理其实就是MethodInterceptor。关于MethodInterceptor可以参考:https://juejin.cn/post/7158006110849335304open in new window

由于AnnotationAsyncExecutionInterceptor继承了MethodInterceptor。所以会代理被@Async标记的方法,以下是invoke方法的实现。

@Override
@Nullable
public Object invoke(final MethodInvocation invocation) throws Throwable {
   Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
   Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
   final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

   AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod); // 拿到相应的 TaskExecutor
   if (executor == null) {
       throw new IllegalStateException(
               "No executor specified and no default executor set on AsyncExecutionInterceptor either");
   }

   Callable<Object> task = () -> {
       try {
           Object result = invocation.proceed();
           if (result instanceof Future<?> future) {
               return future.get();
           }
       }
       catch (ExecutionException ex) {
           handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
       }
       catch (Throwable ex) {
           handleError(ex, userDeclaredMethod, invocation.getArguments());
       }
       return null;
   };

   return doSubmit(task, executor, invocation.getMethod().getReturnType()); // 使用上述的 TaksExecutor 提交任务
}

AbstractAsyncConfiguration中,会把AsyncConfigurer的Bean取其第一个。然后设置 executor 和 exceptionHandler。这也是上面为什么可以通过继承AsyncConfigurer实现对 executor 和 exceptionHandler的配置。

@Autowired
void setConfigurers(ObjectProvider<AsyncConfigurer> configurers) {
   Supplier<AsyncConfigurer> configurer = SingletonSupplier.of(() -> {
       List<AsyncConfigurer> candidates = configurers.stream().toList();
       if (CollectionUtils.isEmpty(candidates)) {
           return null;
       }
       if (candidates.size() > 1) {
           throw new IllegalStateException("Only one AsyncConfigurer may exist");
       }
       return candidates.get(0);
   });
   this.executor = adapt(configurer, AsyncConfigurer::getAsyncExecutor);
   this.exceptionHandler = adapt(configurer, AsyncConfigurer::getAsyncUncaughtExceptionHandler);
}

这里面提到的ObjectProvider可以参考:https://zhuanlan.zhihu.com/p/647889226open in new window

总结

  • 要想使用异步,首先加上@EnableAsync开启异步的支持

  • 默认的Executor是从上下文中搜索唯一的TaskExecutor的Bean,要么搜索名为taskExecutor的Executor的Bean。如果两者都无法找到,则将使用SimpleAsyncTaskExecutor

  • 自定义Executor,可以声明taskExecutor的Bean,也可以通过实现AsyncConfigurer

FAQ

proxyBeanMethods是什么含义?

可以参考文章:https://www.cnblogs.com/krock/p/15743401.htmlopen in new window

参考