记录一次微服务异步化实现过程

-


-

个人笔记,如有描述不当,欢迎留言指出~

前提

公司使用springcloud开发微服务,我在开发审批模块中,其中一个新增审批请求中请求了其他几个模块,使用feign接口依次请求,最后发现请求平均耗时1.5s。这速度怎么能忍,于是进行了异步化改造。

异步化

同步版关键代码

1
2
3
4
5
6
7
8
9
10
11
12
13
logger.info("rpc请求开始");
long rpcstart = System.currentTimeMillis();

StoreStaffDTO staffDTO = staffClient.getStaffDTO(userDTO.getUuid()); //获取门店员工信息
SupplierDTO supplierDTO = supplierClient.getSupplierDTO(userDTO.getSupplierCode()); //获取供应商信息
StoreDTO staffStoreDTO = storeClient.getStoreDTO(userDTO.getStoreUuid()); //获取员工所在门店信息
CustomerInfoDTO customerInfoDTO = customerInfoClient.getCustomerInfo(approval.getCustomerUuid()); //获取顾客信息
StoreDTO inStoreDto = ObjectUtils.isEmpty(approval.getInStoreUuid()) ? null : storeClient.getStoreDTO(approval.getInStoreUuid());//获取客户转入门店信息
ServerDto serverDto = ObjectUtils.isEmpty(approval.getServerUuid()) ? null : serveClient.getServeSpecTypeByuuid(approval.getServerUuid()); //获取服务信息
CustAccountDto custAccountDto = ObjectUtils.isEmpty(approval.getCustCardUuid()) ? null : accountClient.getCustAccount(approval.getCustCardUuid()); //获取顾客账户信息

long rpcend = System.currentTimeMillis();
logger.info("rpc请求结束,耗时:{}毫秒", (rpcend - rpcstart));

console
Alt text
Alt text
Alt text

从打印日志中可以看出,由于网络波动性,请求耗时可能只要1秒也可能将近4秒,但rpc请求总是会占总耗时的90%左右,所以影响新增审批的请求的瓶颈就是这些rpc请求,下面我使用异步线程发送这些rpc请求。

异步版关键代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
logger.info("rpc请求开始");
long rpcstart = System.currentTimeMillis();

Approval approvalFina = approval;
CompletableFuture<StoreDTO> storeDTOCompletableFuture = CompletableFuture.supplyAsync(() -> {
try {
logger.info("当前线程名:{}", Thread.currentThread().getName());
return storeClient.getStoreDTO(userDTO.getStoreUuid());
} catch (Exception e) {
logger.error("获取门店信息rpc错误,原因:", e);
throw e;
}
});
CompletableFuture<StoreDTO> inStoreDTOCompletableFuture = CompletableFuture.supplyAsync(() -> {
try {
logger.info("当前线程名:{}", Thread.currentThread().getName());
if (!ObjectUtils.isEmpty(approvalFina.getInStoreUuid())) {
return storeClient.getStoreDTO(approvalFina.getInStoreUuid());
}
return null;
} catch (Exception e) {
logger.error("获取转入门店信息rpc错误,原因:{}", e);
throw e;
}
});
CompletableFuture<SupplierDTO> supplierDTOCompletableFuture = CompletableFuture.supplyAsync(() -> {
try {
logger.info("当前线程名:{}", Thread.currentThread().getName());
return supplierClient.getSupplierDTO(userDTO.getSupplierCode());
} catch (Exception e) {
logger.error("获取供应商信息rpc错误,原因:{}", e);
throw e;
}
});
CompletableFuture<StoreStaffDTO> staffDTOCompletableFuture = CompletableFuture.supplyAsync(() -> {
try {
logger.info("当前线程名:{}", Thread.currentThread().getName());
return staffClient.getStaffDTO(userDTO.getUuid());
} catch (Exception e) {
logger.error("获取门店员工信息rpc错误,原因:{}", e);
throw e;
}
});
CompletableFuture<CustomerInfoDTO> customerInfoDTOCompletableFuture = CompletableFuture.supplyAsync(() -> {
try {
logger.info("当前线程名:{}", Thread.currentThread().getName());
return customerInfoClient.getCustomerInfo(approvalFina.getCustomerUuid());
} catch (Exception e) {
logger.error("获取顾客信息rpc错误,原因:{}", e);
throw e;
}
});
CompletableFuture<ServerDto> serverDtoCompletableFuture = CompletableFuture.supplyAsync(() -> {
try {
logger.info("当前线程名:{}", Thread.currentThread().getName());
if (!ObjectUtils.isEmpty(approvalFina.getServerUuid())) {
return serveClient.getServeSpecTypeByuuid(approvalFina.getServerUuid());
}
return null;
} catch (Exception e) {
logger.error("获取服务信息rpc错误,原因:{}", e);
throw e;
}
});
CompletableFuture<CustAccountDto> custAccountDtoCompletableFuture = CompletableFuture.supplyAsync(() -> {
try {
logger.info("当前线程名:{}", Thread.currentThread().getName());
if (!ObjectUtils.isEmpty(approvalFina.getCustCardUuid())) {
return accountClient.getCustAccount(approvalFina.getCustCardUuid());
}
return null;
} catch (Exception e) {
logger.error("获取顾客账户信息rpc错误,原因:{}", e);
throw e;
}
});
CompletableFuture.allOf(storeDTOCompletableFuture, inStoreDTOCompletableFuture, serverDtoCompletableFuture,
supplierDTOCompletableFuture, staffDTOCompletableFuture, customerInfoDTOCompletableFuture, custAccountDtoCompletableFuture);

StoreStaffDTO staffDTO = null;//员工信息
SupplierDTO supplierDTO = null; //供应商信息
StoreDTO staffStoreDTO = null;//员工所在门店信息
CustomerInfoDTO customerInfoDTO = null; //顾客信息
StoreDTO inStoreDto = null;//顾客转入门店信息
ServerDto serverDto = null; //服务信息
CustAccountDto custAccountDto = null; //顾客账户信息
try {
staffStoreDTO = storeDTOCompletableFuture.get();
inStoreDto = inStoreDTOCompletableFuture.get();
staffDTO = staffDTOCompletableFuture.get();
supplierDTO = supplierDTOCompletableFuture.get();
customerInfoDTO = customerInfoDTOCompletableFuture.get();
serverDto = serverDtoCompletableFuture.get();
custAccountDto = custAccountDtoCompletableFuture.get();
} catch (InterruptedException e) {
logger.error("线程中断错误信息:{}", e);
} catch (ExecutionException e) {
logger.error("线程执行错误信息:{}", e);
}

long rpcend = System.currentTimeMillis();
logger.info("rpc请求结束,耗时:{}毫秒", (rpcend - rpcstart));

console
Alt text

exception分析

what😨!竟然报未授权登录异常,不可能,feign请求里应该是带了请求头阿,下面拓展一些知识。

知识拓展

completableFuture

completableFuture扩展了Future的功能,并且实现了线程间同步的功能,我们用它提供的语法,可以很简单的实现异步编程。

oauth2

框架里引用了spring security以及oauth2来实现授权认证服务,所以客户请求是要带上Authorization请求头的。

feign

feign的本质,其实就是使用httpclient帮我们封装好了http请求。所以调用feign接口的方法就是发起一次http请求而已。如果不做处理的,那么feign发送的http请求里是没有Authorization请求头的。

RequestInterceptor

RequestInterceptor是feign提供的一个拦截器
放出我的配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Configuration
public class FeignOauth2RequestInterceptor implements RequestInterceptor {
private final String AUTHORIZATION_HEADER = "Authorization";
private final String BEARER_TOKEN_TYPE = "Bearer";

@Override
public void apply(RequestTemplate requestTemplate) {
SecurityContext securityContext = SecurityContextHolder.getContext();
Authentication authentication = securityContext.getAuthentication();
if (authentication != null && authentication.getDetails() instanceof OAuth2AuthenticationDetails) {
OAuth2AuthenticationDetails details = (OAuth2AuthenticationDetails) authentication.getDetails();
requestTemplate.header(AUTHORIZATION_HEADER, String.format("%s %s", BEARER_TOKEN_TYPE, details.getTokenValue()));
}
}
}

拦截器的作用就是从securityContext拿到当前请求用户的认证信息authentication,然后为feign的请求模板·requestTemplate·赋上Authorization请求头。

断点跟踪

断点1

打个断点跟进,看看拦截后feign执行了哪些操作,关键位置如下图:
Alt text
可以看到feign将requestTemplate封装成request对象,并最终被client执行得到response,里面详细细节就不深入。从这里我们可以知道,调用feign发送的请求是携带Authorization请求头的,这样模块间发起请求就不会报未授权错误了。
@完整的请求流程

断点2

所以按道理是不应该报用户未登录授权异常的,所以我们在拦截器打个端点,看看requestTemplate到底有没有被赋上请求头。
Alt text
what😮!这不是推翻了我的认知吗,眼神逐渐呆滞😑
开个玩笑,authentication从securityContext里获取,securityContext从SecurityContextHolder静态方法中获取
@SecurityContextHolder

从图中可以看出securityContext是从strategy里拿到的。
SecurityContextHolderStrategy是spring security安全上下文的存取策略。SecurityContextHolderStrategy接口有三个实现类,对应三种实现策略:

  • GlobalSecurityContextHolderStrategy:使用 一个静态变量存放securityContext
  • ThreadLocalSecurityContextHolderStrategy:使用ThreadLocal存放securityContext
  • InheritableThreadLocalSecurityContextHolderStrategy:使用InheritableThreadLocal存放securityContext
    断点3
    那我当前环境里SecurityContextHolder里使用的是哪个策略呢,打个断点
    @SecurityContextHolder

恍然大悟!🤣原来采用的是ThreadLocalSecurityContextHolderStrategy
还记得我异步改造里怎么写的吗

1
2
3
4
5
6
7
8
9
CompletableFuture<StoreDTO> storeDTOCompletableFuture = CompletableFuture.supplyAsync(() -> {
try {
logger.info("当前线程名:{}", Thread.currentThread().getName());
return storeClient.getStoreDTO(userDTO.getStoreUuid());
} catch (Exception e) {
logger.error("获取门店信息rpc错误,原因:", e);
throw e;
}
});

我将调用feign请求代码写在CompletableFuture.supplyAsync(()->{})中,而CompletableFuture里默认线程池会分配新线程去执行任务,所以新线程里是没有securityContext的线程副本的!所以在拦截器里才会取到空的securityContext,最终报未授权登录异常!bingo😎

最终异步版关键代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
logger.info("rpc请求开始");
long rpcstart = System.currentTimeMillis();

SecurityContext securityContext = SecurityContextHolder.getContext();//从当前请求线程中拿到securityContext安全上下文
Approval approvalFina = approval;
CompletableFuture<StoreDTO> storeDTOCompletableFuture = CompletableFuture.supplyAsync(() -> {
try {
logger.info("当前线程名:{}", Thread.currentThread().getName());
SecurityContextHolder.setContext(securityContext); //设置securityContext安全上下文线程副本
return storeClient.getStoreDTO(userDTO.getStoreUuid());
} catch (Exception e) {
logger.error("获取门店信息rpc错误,原因:", e);
throw e;
}
});
CompletableFuture<StoreDTO> inStoreDTOCompletableFuture = CompletableFuture.supplyAsync(() -> {
try {
logger.info("当前线程名:{}", Thread.currentThread().getName());
if (!ObjectUtils.isEmpty(approvalFina.getInStoreUuid())) {
SecurityContextHolder.setContext(securityContext);
return storeClient.getStoreDTO(approvalFina.getInStoreUuid());
}
return null;
} catch (Exception e) {
logger.error("获取转入门店信息rpc错误,原因:{}", e);
throw e;
}
});
CompletableFuture<SupplierDTO> supplierDTOCompletableFuture = CompletableFuture.supplyAsync(() -> {
try {
logger.info("当前线程名:{}", Thread.currentThread().getName());
SecurityContextHolder.setContext(securityContext);
return supplierClient.getSupplierDTO(userDTO.getSupplierCode());
} catch (Exception e) {
logger.error("获取供应商信息rpc错误,原因:{}", e);
throw e;
}
});
CompletableFuture<StoreStaffDTO> staffDTOCompletableFuture = CompletableFuture.supplyAsync(() -> {
try {
logger.info("当前线程名:{}", Thread.currentThread().getName());
SecurityContextHolder.setContext(securityContext);
return staffClient.getStaffDTO(userDTO.getUuid());
} catch (Exception e) {
logger.error("获取门店员工信息rpc错误,原因:{}", e);
throw e;
}
});
CompletableFuture<CustomerInfoDTO> customerInfoDTOCompletableFuture = CompletableFuture.supplyAsync(() -> {
try {
logger.info("当前线程名:{}", Thread.currentThread().getName());
SecurityContextHolder.setContext(securityContext);
return customerInfoClient.getCustomerInfo(approvalFina.getCustomerUuid());
} catch (Exception e) {
logger.error("获取顾客信息rpc错误,原因:{}", e);
throw e;
}
});
CompletableFuture<ServerDto> serverDtoCompletableFuture = CompletableFuture.supplyAsync(() -> {
try {
logger.info("当前线程名:{}", Thread.currentThread().getName());
if (!ObjectUtils.isEmpty(approvalFina.getServerUuid())) {
SecurityContextHolder.setContext(securityContext);
return serveClient.getServeSpecTypeByuuid(approvalFina.getServerUuid());
}
return null;
} catch (Exception e) {
logger.error("获取服务信息rpc错误,原因:{}", e);
throw e;
}
});
CompletableFuture<CustAccountDto> custAccountDtoCompletableFuture = CompletableFuture.supplyAsync(() -> {
try {
logger.info("当前线程名:{}", Thread.currentThread().getName());
if (!ObjectUtils.isEmpty(approvalFina.getCustCardUuid())) {
SecurityContextHolder.setContext(securityContext);
return accountClient.getCustAccount(approvalFina.getCustCardUuid());
}
return null;
} catch (Exception e) {
logger.error("获取顾客账户信息rpc错误,原因:{}", e);
throw e;
}
});
CompletableFuture.allOf(storeDTOCompletableFuture, inStoreDTOCompletableFuture, serverDtoCompletableFuture,
supplierDTOCompletableFuture, staffDTOCompletableFuture, customerInfoDTOCompletableFuture, custAccountDtoCompletableFuture);


StoreStaffDTO staffDTO = null;//员工信息
SupplierDTO supplierDTO = null; //供应商信息
StoreDTO staffStoreDTO = null;//员工所在门店信息
CustomerInfoDTO customerInfoDTO = null; //顾客信息
StoreDTO inStoreDto = null;//顾客转入门店信息
ServerDto serverDto = null; //服务信息
CustAccountDto custAccountDto = null; //顾客账户信息

try {
staffStoreDTO = storeDTOCompletableFuture.get();
inStoreDto = inStoreDTOCompletableFuture.get();
staffDTO = staffDTOCompletableFuture.get();
supplierDTO = supplierDTOCompletableFuture.get();
customerInfoDTO = customerInfoDTOCompletableFuture.get();
serverDto = serverDtoCompletableFuture.get();
custAccountDto = custAccountDtoCompletableFuture.get();
} catch (InterruptedException e) {
logger.error("线程中断错误信息:{}", e);
} catch (ExecutionException e) {
logger.error("线程执行错误信息:{}", e);
}

long rpcend = System.currentTimeMillis();
logger.info("rpc请求结束,耗时:{}毫秒", (rpcend - rpcstart));

console
Alt text
Alt text

可以看到rpc请求的耗时被平均缩短到500毫秒内😁

CompletableFuture学习可以看这篇👉here,我觉得这位博主写的还行。

请博主喝咖啡