SpringBoot+Redis布隆过滤器防恶意流量击穿缓存

布隆过滤器

BloomFilter是一种空间效率的概率型数据结构,由Burton Howard Bloom 1970年提出的。通常用来判断一个元素是否在集合中。具有极高的空间效率,但是会带来假阳性(False positive)的错误。

  • False positive && False negatives: 由于BloomFiter牺牲了一定的准确率换取空间效率。所以带来了False positive的问题
  • False positive: BloomFilter在判断一个元素在集合中的时候,会出现一定的错误率,这个错误率称为False positive的。通常缩写为fpp
  • False negatives: BloomFilter判断一个元素不在集合中的时候的错误率。BloomFilter判断该元素不在集合中,则该元素一定不再集合中。故False negatives概率为0

BloomFilter使用长度为m bit的字节数组,使用k个hash函数,增加一个元素: 通过k次hash将元素映射到字节数组中k个位置中,并设置对应位置的字节为1。
查询元素是否存在: 将元素k次hash得到k个位置,如果对应k个位置的bit是1则认为存在,反之则认为不存在。

由于它里面存的都是bit,因此这个数据量会很小很小,小到什么样的程度呢?在写本博客时我插了100万条email信息进入Redis的bloom filter也只占用了3Mb不到。

Bloom Filter会有几比较关键的值,根据这个值你是大致可以算出放多少条数据然后它的误伤率在多少时会占用多少系统资源的。这个算法有一个网址:https://krisives.github.io/bloom-calculator ,我们放入100万条数据,假设误伤率在0.001%,它自动得出Redis需要申请的系统内存资源是多少?

Bloom Filter Calculator

那么怎么解决这个误伤率呢?很简单的,当有误伤时业务或者是运营会来报误伤率,这时只要添加一个白名单即可,相对于100万条数据来说,1000个白名单不是问题。并且bloom filter的返回速度超块,80-100毫秒内即返回调用端该Key存在或者是不存在。

给Redis安装Bloom Filter

Redis从4.0才开始支持bloom filter,因此本例中我们使用的是Redis 5.4

Redis的bloom filter下载地址:https://github.com/RedisLabsModules/redisbloom.git

git clone https://github.com/RedisLabsModules/redisbloom.git
cd redisbloom
make # 编译

让Redis启动时可以加载bloom filter有两种方式:

(1)手工加载式

redis-server --loadmodule ./redisbloom/rebloom.so

(2)每次启动自加载

编辑Redis的redis.conf文件,加入:

loadmodule /soft/redisbloom/redisbloom.so

在Redis里使用Bloom Filter

基本指令:

  • bf.reserve {key} {error_rate} {size}
127.0.0.1:6379> bf.reserve userid 0.01 100000
OK

上面这条命令就是:创建一个空的布隆过滤器,并设置一个期望的错误率和初始大小。{error_rate}过滤器的错误率在0-1之间,如果要设置0.1%,则应该是0.001。该数值越接近0,内存消耗越大,对cpu利用率越高

  • bf.add {key} {item}
127.0.0.1:6379> bf.add userid '181920'
(integer) 1

上面这条命令就是:往过滤器中添加元素。如果key不存在,过滤器会自动创建

  • bf.exists {key} {item}
127.0.0.1:6379> bf.exists userid '101310299'
(integer) 1

上面这条命令就是:判断指定key的value是否在bloomfilter里存在。存在:返回1,不存在:返回0

结合SpringBoot使用

BloomFilter实现

借用google的guava包来实现核心算法,核心代码如下:

BloomFilterHelper.java

import com.google.common.base.Preconditions;
import com.google.common.hash.Funnel;
import com.google.common.hash.Hashing;

public class BloomFilterHelper<T> {
    private int numHashFunctions;

    private int bitSize;

    private Funnel<T> funnel;

    public BloomFilterHelper(Funnel<T> funnel, int expectedInsertions, double fpp) {
        Preconditions.checkArgument(funnel != null, "funnel不能为空");
        this.funnel = funnel;
        bitSize = optimalNumOfBits(expectedInsertions, fpp);
        numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, bitSize);
    }

    int[] murmurHashOffset(T value) {
        int[] offset = new int[numHashFunctions];

        long hash64 = Hashing.murmur3_128().hashObject(value, funnel).asLong();
        int hash1 = (int) hash64;
        int hash2 = (int) (hash64 >>> 32);
        for (int i = 1; i <= numHashFunctions; i++) {
            int nextHash = hash1 + i * hash2;
            if (nextHash < 0) {
                nextHash = ~nextHash;
            }
            offset[i - 1] = nextHash % bitSize;
        }

        return offset;
    }

    /**
     * 计算bit数组的长度
     */
    private int optimalNumOfBits(long n, double p) {
        if (p == 0) {
            p = Double.MIN_VALUE;
        }
        return (int) (-n * Math.log(p) / (Math.log(2) * Math.log(2)));
    }

    /**
     * 计算hash方法执行次数
     */
    private int optimalNumOfHashFunctions(long n, long m) {
        return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));
    }
}

项目Redis配置

(1)Redis连接配置

spring.redis.database=0
spring.redis.host=192.168.56.101
spring.redis.port=6379
spring.redis.password=111111
spring.redis.pool.max-active=10
spring.redis.pool.max-wait=-1
spring.redis.pool.max-idle=10
spring.redis.pool.min-idle=0
spring.redis.timeout=1000

(2)RedisConfig.java

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.*;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport {
    /**
     * 选择redis作为默认缓存工具
     * 
     * @param redisTemplate
     * @return
     */
    @Bean
    public CacheManager cacheManager(RedisTemplate redisTemplate) {
        RedisCacheManager rcm = new RedisCacheManager(redisTemplate);
        return rcm;
    }

    /**
     * retemplate相关配置
     * 
     * @param factory
     * @return
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {

        RedisTemplate<String, Object> template = new RedisTemplate<>();
        // 配置连接工厂
        template.setConnectionFactory(factory);

        // 使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值(默认使用JDK的序列化方式)
        Jackson2JsonRedisSerializer jacksonSeial = new Jackson2JsonRedisSerializer(Object.class);

        ObjectMapper om = new ObjectMapper();
        // 指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和public
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        // 指定序列化输入的类型,类必须是非final修饰的,final修饰的类,比如String,Integer等会跑出异常
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jacksonSeial.setObjectMapper(om);

        // 值采用json序列化
        template.setValueSerializer(jacksonSeial);
        // 使用StringRedisSerializer来序列化和反序列化redis的key值
        template.setKeySerializer(new StringRedisSerializer());

        // 设置hash key 和value序列化模式
        template.setHashKeySerializer(new StringRedisSerializer());
        template.setHashValueSerializer(jacksonSeial);
        template.afterPropertiesSet();

        return template;
    }

    /**
     * 对hash类型的数据操作
     *
     * @param redisTemplate
     * @return
     */
    @Bean
    public HashOperations<String, String, Object> hashOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForHash();
    }

    /**
     * 对redis字符串类型数据操作
     *
     * @param redisTemplate
     * @return
     */
    @Bean
    public ValueOperations<String, Object> valueOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForValue();
    }

    /**
     * 对链表类型的数据操作
     *
     * @param redisTemplate
     * @return
     */
    @Bean
    public ListOperations<String, Object> listOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForList();
    }

    /**
     * 对无序集合类型的数据操作
     *
     * @param redisTemplate
     * @return
     */
    @Bean
    public SetOperations<String, Object> setOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForSet();
    }

    /**
     * 对有序集合类型的数据操作
     *
     * @param redisTemplate
     * @return
     */
    @Bean
    public ZSetOperations<String, Object> zSetOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForZSet();
    }
}

(3)RedisUtil.java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.util.Collection;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.google.common.base.Preconditions;
import org.springframework.data.redis.core.RedisTemplate;

@Component
public class RedisUtil {
    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    /**
     * 默认过期时长,单位:秒
     */
    public static final long DEFAULT_EXPIRE = 60 * 60 * 24;

    /**
     * 不设置过期时长
     */
    public static final long NOT_EXPIRE = -1;

    public boolean existsKey(String key) {
        return redisTemplate.hasKey(key);
    }

    /**
     * 重名名key,如果newKey已经存在,则newKey的原值被覆盖
     *
     * @param oldKey
     * @param newKey
     */
    public void renameKey(String oldKey, String newKey) {
        redisTemplate.rename(oldKey, newKey);
    }

    /**
     * newKey不存在时才重命名
     *
     * @param oldKey
     * @param newKey
     * @return 修改成功返回true
     */
    public boolean renameKeyNotExist(String oldKey, String newKey) {
        return redisTemplate.renameIfAbsent(oldKey, newKey);
    }

    /**
     * 删除key
     *
     * @param key
     */
    public void deleteKey(String key) {
        redisTemplate.delete(key);
    }

    /**
     * 删除多个key
     *
     * @param keys
     */
    public void deleteKey(String... keys) {
        Set<String> kSet = Stream.of(keys).map(k -> k).collect(Collectors.toSet());
        redisTemplate.delete(kSet);
    }

    /**
     * 删除Key的集合
     *
     * @param keys
     */
    public void deleteKey(Collection<String> keys) {
        Set<String> kSet = keys.stream().map(k -> k).collect(Collectors.toSet());
        redisTemplate.delete(kSet);
    }

    /**
     * 设置key的生命周期
     *
     * @param key
     * @param time
     * @param timeUnit
     */
    public void expireKey(String key, long time, TimeUnit timeUnit) {
        redisTemplate.expire(key, time, timeUnit);
    }

    /**
     * 指定key在指定的日期过期
     *
     * @param key
     * @param date
     */
    public void expireKeyAt(String key, Date date) {
        redisTemplate.expireAt(key, date);
    }

    /**
     * 查询key的生命周期
     *
     * @param key
     * @param timeUnit
     * @return
     */
    public long getKeyExpire(String key, TimeUnit timeUnit) {
        return redisTemplate.getExpire(key, timeUnit);
    }

    /**
     * 将key设置为永久有效
     *
     * @param key
     */
    public void persistKey(String key) {
        redisTemplate.persist(key);
    }

    /**
     * 根据给定的布隆过滤器添加值
     */
    public <T> void addByBloomFilter(BloomFilterHelper<T> bloomFilterHelper, String key, T value) {
        Preconditions.checkArgument(bloomFilterHelper != null, "bloomFilterHelper不能为空");
        int[] offset = bloomFilterHelper.murmurHashOffset(value);
        for (int i : offset) {
            redisTemplate.opsForValue().setBit(key, i, true);
        }
    }

    /**
     * 根据给定的布隆过滤器判断值是否存在
     */
    public <T> boolean includeByBloomFilter(BloomFilterHelper<T> bloomFilterHelper, String key, T value) {
        Preconditions.checkArgument(bloomFilterHelper != null, "bloomFilterHelper不能为空");
        int[] offset = bloomFilterHelper.murmurHashOffset(value);
        for (int i : offset) {
            if (!redisTemplate.opsForValue().getBit(key, i)) {
                return false;
            }
        }
        return true;
    }
}

(4)RedisKeyUtil.java

public class RedisKeyUtil {
    /**
     * redis的key 形式为: 表名:主键名:主键值:列名
     *
     * @param tableName     表名
     * @param majorKey      主键名
     * @param majorKeyValue 主键值
     * @param column        列名
     * @return
     */
    public static String getKeyWithColumn(String tableName, String majorKey, String majorKeyValue, String column) {
        StringBuffer buffer = new StringBuffer();
        buffer.append(tableName).append(":");
        buffer.append(majorKey).append(":");
        buffer.append(majorKeyValue).append(":");
        buffer.append(column);
        return buffer.toString();
    }

    /**
     * redis的key 形式为: 表名:主键名:主键值
     *
     * @param tableName     表名
     * @param majorKey      主键名
     * @param majorKeyValue 主键值
     * @return
     */
    public static String getKey(String tableName, String majorKey, String majorKeyValue) {
        StringBuffer buffer = new StringBuffer();
        buffer.append(tableName).append(":");
        buffer.append(majorKey).append(":");
        buffer.append(majorKeyValue).append(":");
        return buffer.toString();
    }
}

测试接口

UserVO.java

@Data
public class UserVO implements Serializable {
    private String name;
    private String address;
    private Integer age;
    private String email;
}

UserController.java

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import javax.annotation.Resource;

import org.sky.platform.util.BloomFilterHelper;
import org.sky.platform.util.RedisUtil;
import org.sky.vo.UserVO;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Charsets;
import com.google.common.hash.Funnel;

@RestController
@RequestMapping("user")
public class UserController extends BaseController {

    @Resource
    private RedisTemplate redisTemplate;

    @Resource
    private RedisUtil redisUtil;

    @PostMapping(value = "/addEmailToBloom", produces = "application/json")
    public ResponseEntity<String> addUser(@RequestBody String params) {
        ResponseEntity<String> response = null;
        String returnResultStr;
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON_UTF8);
        Map<String, Object> result = new HashMap<>();
        try {
            JSONObject requestJsonObj = JSON.parseObject(params);
            UserVO inputUser = getUserFromJson(requestJsonObj);
            BloomFilterHelper<String> myBloomFilterHelper = new BloomFilterHelper<>((Funnel<String>) (from,
                    into) -> into.putString(from, Charsets.UTF_8).putString(from, Charsets.UTF_8), 1500000, 0.00001);
            redisUtil.addByBloomFilter(myBloomFilterHelper, "email_existed_bloom", inputUser.getEmail());
            result.put("code", HttpStatus.OK.value());
            result.put("message", "add into bloomFilter successfully");
            result.put("email", inputUser.getEmail());
            returnResultStr = JSON.toJSONString(result);
            logger.info("returnResultStr======>" + returnResultStr);
            response = new ResponseEntity<>(returnResultStr, headers, HttpStatus.OK);
        } catch (Exception e) {
            logger.error("add a new product with error: " + e.getMessage(), e);
            result.put("message", "add a new product with error: " + e.getMessage());
            returnResultStr = JSON.toJSONString(result);
            response = new ResponseEntity<>(returnResultStr, headers, HttpStatus.INTERNAL_SERVER_ERROR);
        }
        return response;
    }

    @PostMapping(value = "/checkEmailInBloom", produces = "application/json")
    public ResponseEntity<String> findEmailInBloom(@RequestBody String params) {
        ResponseEntity<String> response = null;
        String returnResultStr;
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON_UTF8);
        Map<String, Object> result = new HashMap<>();
        try {
            JSONObject requestJsonObj = JSON.parseObject(params);
            UserVO inputUser = getUserFromJson(requestJsonObj);
            BloomFilterHelper<String> myBloomFilterHelper = new BloomFilterHelper<>((Funnel<String>) (from,
                    into) -> into.putString(from, Charsets.UTF_8).putString(from, Charsets.UTF_8), 1500000, 0.00001);
            boolean answer = redisUtil.includeByBloomFilter(myBloomFilterHelper, "email_existed_bloom",
                    inputUser.getEmail());
            result.put("code", HttpStatus.OK.value());
            result.put("email", inputUser.getEmail());
            result.put("exist", answer);
            returnResultStr = JSON.toJSONString(result);
            logger.info("returnResultStr======>" + returnResultStr);
            response = new ResponseEntity<>(returnResultStr, headers, HttpStatus.OK);
        } catch (Exception e) {
            logger.error("add a new product with error: " + e.getMessage(), e);
            result.put("message", "add a new product with error: " + e.getMessage());
            returnResultStr = JSON.toJSONString(result);
            response = new ResponseEntity<>(returnResultStr, headers, HttpStatus.INTERNAL_SERVER_ERROR);
        }
        return response;
    }

    private UserVO getUserFromJson(JSONObject requestObj) {
        String userName = requestObj.getString("username");
        String userAddress = requestObj.getString("address");
        String userEmail = requestObj.getString("email");
        int userAge = requestObj.getInteger("age");
        UserVO u = new UserVO();
        u.setName(userName);
        u.setAge(userAge);
        u.setEmail(userEmail);
        u.setAddress(userAddress);
        return u;
    }
}

版权声明:
作者:Joe.Ye
链接:https://www.appblog.cn/index.php/2023/04/02/spring-boot-and-redis-bloom-filter-anti-malicious-traffic-breakdown-cache/
来源:APP全栈技术分享
文章版权归作者所有,未经允许请勿转载。

THE END
分享
二维码
打赏
海报
SpringBoot+Redis布隆过滤器防恶意流量击穿缓存
布隆过滤器 BloomFilter是一种空间效率的概率型数据结构,由Burton Howard Bloom 1970年提出的。通常用来判断一个元素是否在集合中。具有极高的空间效率,但是……
<<上一篇
下一篇>>
文章目录
关闭
目 录