网站首页 > 技术文章 正文
前言
在我们扩展scg时,获取requestbody也是一个挺常见的需求了,比如记录日志,我们要获取请求体里面的内容。在HTTP协议中,服务器接收到客户端的请求时,请求体(RequestBody)通常是以流的形式传输的。这个流在设计上是只读且不可重复读取的。即request body只能读取一次,但我们很多时候是更希望这个requestbody可以被多次读取,那我们今天就来聊下这个话题
实现思路
通常我们会实现一个全局过滤器,并将过滤器的优先级调到最高。
该过滤器调到最高的原因是防止一些内置过滤器优先读取到requestbody,会导致我们这个过滤器读取到requestbody,就已经报body只能读取一次的异常。
异常如下
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: Only one connection receive subscriber allowed.
Caused by: java.lang.IllegalStateException: Only one connection receive subscriber allowed.
在这个过滤器里面我们要实现的功能如下
- 将原有的request请求中的body内容读出来
- 使用ServerHttpRequestDecorator这个请求装饰器对request进行包装,重写getBody方法
- 将包装后的请求放到过滤器链中传递下去
示例
@RequiredArgsConstructor
public class RequestBodyParamsFetchGlobalFilter implements Ordered, GlobalFilter {
private final GwCommonProperty gwCommonProperty;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
if (isSkipFetchRequestBodyParams(exchange)) {
return chain.filter(exchange);
} else {
return DataBufferUtils.join(exchange.getRequest().getBody())
.flatMap(dataBuffer -> {
DataBufferUtils.retain(dataBuffer);
Flux<DataBuffer> cachedFlux = Flux
.defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));
exchange.getAttributes().put(REQUEST_BODY_PARAMS_ATRR_NAME, RouteUtil.getRequestBodyParams(exchange));
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(
exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return cachedFlux;
}
};
return chain.filter(exchange.mutate().request(mutatedRequest).build());
});
}
}
private boolean isSkipFetchRequestBodyParams(ServerWebExchange exchange){
if(!gwCommonProperty.isFetchRequestBodyParams()){
return true;
}
if(exchange.getRequest().getHeaders().getContentType() == null && !HttpMethod.POST.name().equalsIgnoreCase(Objects.requireNonNull(exchange.getRequest().getMethod()).name())){
return true;
}else{
return false;
}
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
}
大家如果搜索一下,scg获取请求体,有很大一部分都是这种写法。这种写法基本上是可以满足我们的需求。但是在请求压力比较大的情况下,可能会堆外内存溢出问题
reactor.netty.ReactorNetty$InternalNettyException: io.netty.util.internal.OutOfDirectMemoryError:failed to allocate
有没有更好的实现方式
我这边使用的springcloud版本是Hoxton.SR3,在这个版本我发现了一个挺好玩的过滤器
org.springframework.cloud.gateway.filter.AdaptCachedBodyGlobalFilter
见名之意,这就是一个自适应的缓存body全局过滤器。这个过滤器的代码如下
public class AdaptCachedBodyGlobalFilter
implements GlobalFilter, Ordered, ApplicationListener<EnableBodyCachingEvent> {
private ConcurrentMap<String, Boolean> routesToCache = new ConcurrentHashMap<>();
/**
* Cached request body key.
*/
@Deprecated
public static final String CACHED_REQUEST_BODY_KEY = CACHED_REQUEST_BODY_ATTR;
@Override
public void onApplicationEvent(EnableBodyCachingEvent event) {
this.routesToCache.putIfAbsent(event.getRouteId(), true);
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// the cached ServerHttpRequest is used when the ServerWebExchange can not be
// mutated, for example, during a predicate where the body is read, but still
// needs to be cached.
ServerHttpRequest cachedRequest = exchange
.getAttributeOrDefault(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR, null);
if (cachedRequest != null) {
exchange.getAttributes().remove(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR);
return chain.filter(exchange.mutate().request(cachedRequest).build());
}
//
DataBuffer body = exchange.getAttributeOrDefault(CACHED_REQUEST_BODY_ATTR, null);
Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
if (body != null || !this.routesToCache.containsKey(route.getId())) {
return chain.filter(exchange);
}
return ServerWebExchangeUtils.cacheRequestBody(exchange, (serverHttpRequest) -> {
// don't mutate and build if same request object
if (serverHttpRequest == exchange.getRequest()) {
return chain.filter(exchange);
}
return chain.filter(exchange.mutate().request(serverHttpRequest).build());
});
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE + 1000;
}
}
看到这个源码,是不是有种豁然开朗的感觉,它的实现套路不就是我们上文说的实现思路吗,根据源码,我们仅需发布EnableBodyCachingEvent事件,并将要监听的routeId送入EnableBodyCachingEvent,剩下缓存requestbody的事情,就交给AdaptCachedBodyGlobalFilter来帮我们处理
示例
**
* @see AdaptCachedBodyGlobalFilter
*/
@Configuration
@AutoConfigureAfter(GatewayAutoConfiguration.class)
@RequiredArgsConstructor
public class RequestBodyCacheConfig implements ApplicationContextAware, CommandLineRunner {
private final RouteLocator routeDefinitionRouteLocator;
private ApplicationContext applicationContext;
@Override
public void run(String... args) throws Exception {
List<Signal<Route>> routes = routeDefinitionRouteLocator.getRoutes().materialize()
.collect(Collectors.toList()).block();
assert routes != null;
routes.forEach(routeSignal -> {
if (routeSignal.get() != null) {
Route route = routeSignal.get();
System.out.println(route.getId());
publishEnableBodyCachingEvent(route.getId());
}
});
}
@EventListener
public void refreshRoutesEvent(RefreshRoutesEvent refreshRoutesEvent){
if(refreshRoutesEvent.getSource() instanceof NewRouteId){
publishEnableBodyCachingEvent(((NewRouteId) refreshRoutesEvent.getSource()).getRouteId());
}else{
routeDefinitionRouteLocator.getRoutes().subscribe(route -> {
publishEnableBodyCachingEvent(route.getId());
});
}
}
private void publishEnableBodyCachingEvent(String routeId){
EnableBodyCachingEvent enableBodyCachingEvent = new EnableBodyCachingEvent(this, routeId);
applicationContext.publishEvent(enableBodyCachingEvent);
}
public void addRouteRouteDefinition(RouteDefinition routeDefinition){
NewRouteId source = NewRouteId.builder().routeId(routeDefinition.getId()).source(this).build();
applicationContext.publishEvent(new RefreshRoutesEvent(source));
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
这个代码的意思就是在项目启动时,遍历一下路由,发送EnableBodyCachingEvent。并再监听RefreshRoutesEvent 事件,当有路由新增时,再次发送EnableBodyCachingEvent事件。其业务语义是让每个route都能被AdaptCachedBodyGlobalFilter处理,并缓存requestbody
发布EnableBodyCachingEvent事件的核心代码如下
private void publishEnableBodyCachingEvent(String routeId){
EnableBodyCachingEvent enableBodyCachingEvent = new EnableBodyCachingEvent(this, routeId);
applicationContext.publishEvent(enableBodyCachingEvent);
}
做完上述的事情后,我们仅需在我们需要获取requestbody的地方,写下如下代码即可
String bodyContent = null;
DataBuffer body = exchange.getAttributeOrDefault(CACHED_REQUEST_BODY_ATTR, null);
if(body != null){
bodyContent = body.toString(StandardCharsets.UTF_8);
}
总结
框架也是不断在演进,因此对于我们日常使用的框架,要多多关注下,有现成的轮子,就使用现成的轮子,现成轮子满不足不了,先看下该轮子是否有预留扩展点,如果没有,我们再考虑自己制造轮子
猜你喜欢
- 2024-11-06 JavaScript学习笔记(二十五)——HTTP
- 2024-11-06 原生js实现文件下载并设置请求头header
- 2024-11-06 干货-Http请求get、post工具类(get和post请求的区别是什么)
- 2024-11-06 python接口自动化-发送get请求(python get请求 url传参)
- 2024-11-06 想测试HTTP响应不知道如何开展怎么办?
- 2024-11-06 接口测试遇到500报错?别慌,你的头部可能有点问题
- 2024-11-06 一文讲清HPP的请求方法和过程(hp partsufer)
- 2024-11-06 HTTP请求对象(获取用户请求信息)(如何查看http请求的头部信息)
- 2024-11-06 学习笔记-HTTP 请求方法详解(学习笔记-HTTP 请求方法详解pdf)
- 2024-11-06 Servlet的创建和获取Get请求数据(如何在servlet中获取请求参数)
- 最近发表
-
- 使用Knative部署基于Spring Native的微服务
- 阿里p7大佬首次分享Spring Cloud学习笔记,带你从0搭建微服务
- ElasticSearch进阶篇之搞定在SpringBoot项目中的实战应用
- SpringCloud微服务架构实战:类目管理微服务开发
- SpringBoot+SpringCloud题目整理
- 《github精选系列》——SpringBoot 全家桶
- Springboot2.0学习2 超详细创建restful服务步骤
- SpringCloud系列:多模块聚合工程基本环境搭建「1」
- Spring Cloud Consul快速入门Demo
- Spring Cloud Contract快速入门Demo
- 标签列表
-
- cmd/c (57)
- c++中::是什么意思 (57)
- sqlset (59)
- ps可以打开pdf格式吗 (58)
- phprequire_once (61)
- localstorage.removeitem (74)
- routermode (59)
- vector线程安全吗 (70)
- & (66)
- java (73)
- org.redisson (64)
- log.warn (60)
- cannotinstantiatethetype (62)
- js数组插入 (83)
- resttemplateokhttp (59)
- gormwherein (64)
- linux删除一个文件夹 (65)
- mac安装java (72)
- reader.onload (61)
- outofmemoryerror是什么意思 (64)
- flask文件上传 (63)
- eacces (67)
- 查看mysql是否启动 (70)
- java是值传递还是引用传递 (58)
- 无效的列索引 (74)