Async分析
在SpringBoot中,使用异步调用很简单的一种方式就是在该方法上面使用@Async了。现在我们就它的使用,分析下其内部的原理。
使用@Async
只要@EnableAsync就可以使用多线程。使用@Async就可以定义一个线程任务。通过spring给我们提供的ThreadPoolTaskExecutor就可以使用线程池。
默认情况下Spring将要么在上下文中搜索唯一的TaskExecutor的Bean,要么搜索名为taskExecutor
的Executor的Bean。如果两者都无法解析,则将使用SimpleAsyncTaskExecutor
来处理异步方法调用。
配置Executor
@Configuration
@EnableAsync
public class ThreadPoolTaskConfig {
@Bean("taskExecutor")
public ThreadPoolTaskExecutor getAsyncExecutor(){
return new ThreadPoolTaskExecutor();
}
}
@EnableAsync
这个是必须的,因为它开启了对异步任务的支持。
线程池配置的另外一种写法:直接实现AsyncConfigurer接口,重写getAsyncExecutor方法即可,代码如下
@Configuration
@EnableAsync
public class AppConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 这行是需要的,因为返回的Executor其实不是被Spring管理的Bean,initialize方法不会自动调用。
// 如果在getAsyncExecutor方法上加上@Bean,那么这一行可以删除。因为一旦Executor成为Bean之后,spring会对其调用initialize方法进行初始化。
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new CustomizeAsyncUncaughtExceptionHandler();
}
}
通过实现AsyncConfigurer
接口来自定义Executor有一个好处:如果其他地方也使用Executor,可以不被在@Async定义的Executor可以影响到。
使用例子
@Async注解来声明一个或多个异步任务,可以加在方法或者类上,加在类上表示这整个类都是使用这个自定义线程池进行操作
@Service
public class UserAsyncService {
private final static Logger log = LoggerFactory.getLogger(UserAsyncService.class);
@Async("taskExecutor")
public void create() throws InterruptedException {
log.info("--------start------------");
Thread.sleep(5000);
log.info("--------end------------");
}
}
Async的分析
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
Class<? extends Annotation> annotation() default Annotation.class;
boolean proxyTargetClass() default false;
AdviceMode mode() default AdviceMode.PROXY;
int order() default Ordered.LOWEST_PRECEDENCE;
}
我们关注下@Import(AsyncConfigurationSelector.class)
,由于AsyncConfigurationSelector
是继承了ImportSelector
,所以会把selectImports
方法中的返回注册成 Bean。
具体可参考:https://www.cnblogs.com/better-farther-world2099/articles/16020640.html
在默认情况下adviceMode为PROXY
,所以下面的ProxyAsyncConfiguration
会生效。
@Override
@NonNull
public String[] selectImports(AdviceMode adviceMode) {
return switch (adviceMode) {
case PROXY -> new String[] {ProxyAsyncConfiguration.class.getName()};
case ASPECTJ -> new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
};
}
第9行的AsyncAnnotationBeanPostProcessor
,在这里面会定义 point 和 advice。
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.state(this.enableAsync != null, "@EnableAsync annotation metadata was not injected");
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
bpp.configure(this.executor, this.exceptionHandler);
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
return bpp;
}
}
于是在AsyncAnnotationBeanPostProcessor
中继承了BeanFactoryAware
,在初始化该 Bean之后会调用setBeanFactory
方法。
@Override
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
在第5行的AsyncAnnotationAdvisor
中是真正做切面和Aop的逻辑的。
public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
asyncAnnotationTypes.add(Async.class);
ClassLoader classLoader = AsyncAnnotationAdvisor.class.getClassLoader();
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("jakarta.ejb.Asynchronous", classLoader));
}
catch (ClassNotFoundException ex) {
// If EJB API not present, simply ignore.
}
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("jakarta.enterprise.concurrent.Asynchronous", classLoader));
}
catch (ClassNotFoundException ex) {
// If Jakarta Concurrent API not present, simply ignore.
}
this.advice = buildAdvice(executor, exceptionHandler);
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
buildPointcut
方法中,定义了只要@Async 注解在 class 上或者方法上生效。
buildAdvice
方法中,定义了AnnotationAsyncExecutionInterceptor
,该类配置executor
和exceptionHandler
,其中我们可以去看下默认的executor的逻辑:
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
在AnnotationAsyncExecutionInterceptor
的configure方法中
public void configure(@Nullable Supplier<Executor> defaultExecutor,
@Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new);
}
接下来在getDefaultExecutor的方法中
@Override
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}
而其父类的getDefaultExecutor的方法如下:
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
if (beanFactory != null) {
try {
return beanFactory.getBean(TaskExecutor.class); // 尝试找TaskExecutor的Bean
}
catch (NoUniqueBeanDefinitionException ex) {
logger.debug("Could not find unique TaskExecutor bean. " +
"Continuing search for an Executor bean named 'taskExecutor'", ex);
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); // 接着尝试找 name 为taskExecutor 的 Executor
}
catch (NoSuchBeanDefinitionException ex2) {
if (logger.isInfoEnabled()) {
logger.info("More than one TaskExecutor bean found within the context, and none is named " +
"'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +
"as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDefinitionException ex) {
logger.debug("Could not find default TaskExecutor bean. " +
"Continuing search for an Executor bean named 'taskExecutor'", ex);
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
logger.info("No task executor bean found for async processing: " +
"no bean of type TaskExecutor and no bean named 'taskExecutor' either");
}
// Giving up -> either using local default executor or none at all...
}
}
return null;
}
至此,我们过了Spring中是如何查找默认的executor的流程。接下来我们看下核心的逻辑AOP的实现,这里的Aop实现原理其实就是MethodInterceptor
。关于MethodInterceptor
可以参考:https://juejin.cn/post/7158006110849335304
由于AnnotationAsyncExecutionInterceptor继承了MethodInterceptor
。所以会代理被@Async标记的方法,以下是invoke
方法的实现。
@Override
@Nullable
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod); // 拿到相应的 TaskExecutor
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
Callable<Object> task = () -> {
try {
Object result = invocation.proceed();
if (result instanceof Future<?> future) {
return future.get();
}
}
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};
return doSubmit(task, executor, invocation.getMethod().getReturnType()); // 使用上述的 TaksExecutor 提交任务
}
在AbstractAsyncConfiguration
中,会把AsyncConfigurer
的Bean取其第一个。然后设置 executor 和 exceptionHandler
。这也是上面为什么可以通过继承AsyncConfigurer
实现对 executor 和 exceptionHandler的配置。
@Autowired
void setConfigurers(ObjectProvider<AsyncConfigurer> configurers) {
Supplier<AsyncConfigurer> configurer = SingletonSupplier.of(() -> {
List<AsyncConfigurer> candidates = configurers.stream().toList();
if (CollectionUtils.isEmpty(candidates)) {
return null;
}
if (candidates.size() > 1) {
throw new IllegalStateException("Only one AsyncConfigurer may exist");
}
return candidates.get(0);
});
this.executor = adapt(configurer, AsyncConfigurer::getAsyncExecutor);
this.exceptionHandler = adapt(configurer, AsyncConfigurer::getAsyncUncaughtExceptionHandler);
}
这里面提到的ObjectProvider
可以参考:https://zhuanlan.zhihu.com/p/647889226
总结
要想使用异步,首先加上@EnableAsync开启异步的支持
默认的Executor是从上下文中搜索唯一的TaskExecutor的Bean,要么搜索名为
taskExecutor
的Executor的Bean。如果两者都无法找到,则将使用SimpleAsyncTaskExecutor
自定义Executor,可以声明
taskExecutor
的Bean,也可以通过实现AsyncConfigurer
FAQ
proxyBeanMethods是什么含义?
可以参考文章:https://www.cnblogs.com/krock/p/15743401.html