Spring Cloud Gateway自定义过滤器进行限流

1
2
3
4
5
6
7
8
9
10
11
12
13
<dependencies>
<!-- Spring Cloud Gateway的依赖-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<!-- Bucket4j限流依赖-->
<dependency>
<groupId>com.github.vladimir-bukhtoyarov</groupId>
<artifactId>bucket4j-core</artifactId>
<version>4.0.0</version>
</dependency>
</dependencies>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@SpringBootApplication
public class GatewayApplication {

@Bean
public RouteLocator customRouteLocator(RouteLocatorBuilder builder) {
return builder.routes()
.route(r -> r.path("/test/rateLimit")
.filters(f -> f.rewritePath("test", "hello")
.filter(new GatewayRateLimitByIpFilter(10, 1, Duration.ofSeconds(1))))
.uri("http://127.0.0.1:9001")
)
.build();
}

public static void main(String[] args) {
SpringApplication.run(GatewayApplication.class, args);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import io.github.bucket4j.Bandwidth;
import io.github.bucket4j.Bucket;
import io.github.bucket4j.Bucket4j;
import io.github.bucket4j.Refill;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.core.Ordered;
import org.springframework.http.HttpStatus;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* @author yezhou
*/
@Slf4j
public class GatewayRateLimitByIpFilter implements GatewayFilter, Ordered {

/**
* 单机网关限流用一个ConcurrentHashMap来存储 bucket
* 如果是分布式集群限流的话,可以采用 Redis等分布式解决方案
*/
private static final Map<String, Bucket> LOCAL_CACHE = new ConcurrentHashMap<>();

/**
* 桶的最大容量,即能装载 Token 的最大数量
*/
@Getter
@Setter
int capacity;
/**
* 每次 Token 补充量
*/
@Getter
@Setter
int refillTokens;
/**
*补充 Token 的时间间隔
*/
@Getter
@Setter
Duration refillDuration;

public GatewayRateLimitByIpFilter() {
}

public GatewayRateLimitByIpFilter(int capacity, int refillTokens, Duration refillDuration) {
this.capacity = capacity;
this.refillTokens = refillTokens;
this.refillDuration = refillDuration;
}

private Bucket createNewBucket() {
Refill refill = Refill.of(refillTokens, refillDuration);
Bandwidth limit = Bandwidth.classic(capacity, refill);
return Bucket4j.builder().addLimit(limit).build();
}

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
String ip = exchange.getRequest().getRemoteAddress().getAddress().getHostAddress();
Bucket bucket = LOCAL_CACHE.computeIfAbsent(ip, k -> createNewBucket());
log.info("IP:{}, 令牌通可用的Token数量:{}", ip, bucket.getAvailableTokens());
if (bucket.tryConsume(1)) {
return chain.filter(exchange);
} else {
//当可用的令牌书为0是,进行限流返回429状态码
exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
return exchange.getResponse().setComplete();
}
}

@Override
public int getOrder() {
return -100;
}

public static Map<String, Bucket> getLocalCache() {
return LOCAL_CACHE;
}
}

Powered by AppBlog.CN     浙ICP备14037229号

Copyright © 2012 - 2021 APP开发技术博客 All Rights Reserved.

访客数 : | 访问量 :