概要
實(shí)時(shí)風(fēng)控解決方案 總體架構(gòu)介紹 第一版需求實(shí)現(xiàn)
1. 風(fēng)控背景
互聯(lián)網(wǎng)場(chǎng)景中,典型的風(fēng)控場(chǎng)景包括:注冊(cè)風(fēng)控、登陸風(fēng)控、交易風(fēng)控、活動(dòng)風(fēng)控等,而風(fēng)控的最佳效果是防患于未然,所以事前事中和事后三種實(shí)現(xiàn)方案中,又以事前預(yù)警和事中控制最好。 這要求風(fēng)控系統(tǒng)一定要有實(shí)時(shí)性。我們將實(shí)時(shí)風(fēng)控架構(gòu)作為重點(diǎn)講解。
2. 總體架構(gòu)
風(fēng)控是業(yè)務(wù)場(chǎng)景的產(chǎn)物,風(fēng)控系統(tǒng)直接服務(wù)于業(yè)務(wù)系統(tǒng),與之相關(guān)的還有懲罰系統(tǒng)和分析系統(tǒng),各系統(tǒng)關(guān)系與角色如下:
風(fēng)控系統(tǒng)有規(guī)則和模型兩種技術(shù)路線,規(guī)則的優(yōu)點(diǎn)是簡(jiǎn)單直觀、可解釋性強(qiáng)、靈活,所以長(zhǎng)期活躍在風(fēng)控系統(tǒng)之中,但缺點(diǎn)是容易被攻破,一但被黑產(chǎn)猜中就會(huì)失效,于是在實(shí)際的風(fēng)控系統(tǒng)中,往往需要再結(jié)合上基于模型的風(fēng)控環(huán)節(jié)來(lái)增加健壯性。
實(shí)時(shí)風(fēng)控
實(shí)時(shí)風(fēng)控是整個(gè)系統(tǒng)的核心,被業(yè)務(wù)系統(tǒng)同步調(diào)用,完成對(duì)應(yīng)的風(fēng)控判斷。 前面提到規(guī)則往往由人編寫并且需要?jiǎng)討B(tài)調(diào)整,所以我們會(huì)把風(fēng)控判斷部分與規(guī)則管理部分拆開。規(guī)則管理后臺(tái)為運(yùn)營(yíng)服務(wù),由運(yùn)營(yíng)人員去進(jìn)行相關(guān)操作: 場(chǎng)景管理:決定某個(gè)場(chǎng)景是否實(shí)施風(fēng)控,比如活動(dòng)場(chǎng)景,在活動(dòng)結(jié)束后可以關(guān)閉該場(chǎng)景; 黑白名單:人工/程序找到系統(tǒng)的黑白名單,直接過濾 規(guī)則管理:管理規(guī)則,包括增刪或修改,比如登陸新增 IP 地址判斷,比如下單新增頻率校驗(yàn)等; 閾值管理:管理指標(biāo)的閾值,比如規(guī)則為某 IP 最近 1 小時(shí)注冊(cè)賬號(hào)數(shù)不能超過 10 個(gè),那 1 和 10 都屬于閾值; 基于管理后臺(tái),那規(guī)則判斷部分的邏輯也就十分清晰了,分別包括前置過濾、事實(shí)數(shù)據(jù)準(zhǔn)備、規(guī)則判斷三個(gè)環(huán)節(jié)。
規(guī)則判斷
在得到事實(shí)數(shù)據(jù)之后,系統(tǒng)會(huì)根據(jù)規(guī)則和閾值進(jìn)行判斷,然后返回結(jié)果,整個(gè)過程便結(jié)束了。 整個(gè)過程邏輯上是清晰的,我們常說的規(guī)則引擎主要在這部分起作用,一般來(lái)說這個(gè)過程有兩種實(shí)現(xiàn)方式: 借助成熟的規(guī)則引擎,比如 Drools,Drools 和 Java 環(huán)境結(jié)合的非常好,本身也非常完善,支持很多特性。 基于 Groovy 等動(dòng)態(tài)語(yǔ)言完成 這兩種方案都支持規(guī)則的動(dòng)態(tài)更新
分析系統(tǒng) 前面的東西靜態(tài)來(lái)看是一個(gè)完整的風(fēng)控系統(tǒng),但動(dòng)態(tài)來(lái)看就有缺失了,這種缺失不體現(xiàn)在功能性上,而是體現(xiàn)在演進(jìn)上。即如果從動(dòng)態(tài)的角度來(lái)看一個(gè)風(fēng)控系統(tǒng)的話,我們至少還需要兩部分,一是衡量系統(tǒng)的整體效果,一是為系統(tǒng)提供規(guī)則/邏輯升級(jí)的依據(jù)。 在衡量整體效果方面,我們需要: 判斷規(guī)則是否失效,比如攔截率的突然降低; 判斷規(guī)則是否多余,比如某規(guī)則從來(lái)沒攔截過任何事件; 判斷規(guī)則是否有漏洞,比如在舉辦某個(gè)促銷活動(dòng)或發(fā)放代金券后,福利被領(lǐng)完了,但沒有達(dá)到預(yù)期效果; 在為系統(tǒng)提供規(guī)則/邏輯升級(jí)依據(jù)方面,我們需要: 發(fā)現(xiàn)全局規(guī)則:比如某人在電子產(chǎn)品的花費(fèi)突然增長(zhǎng)了 100 倍,單獨(dú)來(lái)看是有問題的,但整體來(lái)看,可能很多人都出現(xiàn)了這個(gè)現(xiàn)象,原來(lái)是蘋果發(fā)新品了。 識(shí)別某種行為的組合:?jiǎn)未涡袨槭钦5?,但組合是異常的,比如用戶買菜刀是正常的,買車票是正常的,買繩子也是正常的,去加油站加油也是正常的,但短時(shí)間內(nèi)同時(shí)做這些事情就不是正常的。 群體識(shí)別:比如通過圖分析技術(shù),發(fā)現(xiàn)某個(gè)群體,然后給給這個(gè)群體的所有賬號(hào)都打上群體標(biāo)簽,防止出現(xiàn)那種每個(gè)賬號(hào)表現(xiàn)都正常,但整個(gè)群體卻在集中薅羊毛的情況。 這便是分析系統(tǒng)的角色定位,在他的工作中有部分是確定性的,也有部分是探索性的,為了完成這種工作,該系統(tǒng)需要盡可能多的數(shù)據(jù)支持,如: 業(yè)務(wù)系統(tǒng)的數(shù)據(jù),業(yè)務(wù)的埋點(diǎn)數(shù)據(jù),記錄詳細(xì)的用戶、交易或活動(dòng)數(shù)據(jù); 風(fēng)控?cái)r截?cái)?shù)據(jù),風(fēng)控系統(tǒng)的埋點(diǎn)數(shù)據(jù),比如某個(gè)用戶在具有某些特征的狀態(tài)下因?yàn)槟硹l規(guī)則而被攔截,這條攔截本身就是一個(gè)事件數(shù)據(jù); 這是一個(gè)典型的大數(shù)據(jù)分析場(chǎng)景,架構(gòu)也比較靈活
相對(duì)來(lái)說這個(gè)系統(tǒng)是最開放的,既有固定的指標(biāo)分析,也可以使用機(jī)器學(xué)習(xí)/數(shù)據(jù)分析技術(shù)發(fā)現(xiàn)更多新的規(guī)則或模式。
3.第一版需求開發(fā)
數(shù)據(jù)源介紹:
包名:com.star.engine.pojo
日志實(shí)體類
package com.star.engine.pojo; import lombok.AllArgsConstructor; import lombok.NoArgsConstructor; private String userNo; // 用戶ID private String userName; // 用戶名 private String appId; // app的編號(hào) private String appVersion; // APP的版本 private String addr; // 地址 private String carrier; // 運(yùn)營(yíng)商 private String imei; // 設(shè)備編號(hào) private String deviceType; // 設(shè)備類型 private String ip; // 客戶端IP private String netType; // 網(wǎng)絡(luò)類型:WIFI,4G,5G private String osName; // 操作系統(tǒng)類型 private String osVersion; // 操作系統(tǒng)版本 private String sessionId; // 會(huì)話ID private String detailTime; // 創(chuàng)建詳細(xì)時(shí)間 private String eventId; // 事件編號(hào) private String eventType; // 事件類型 private String createTime; // 創(chuàng)建時(shí)間 private String gps; // 經(jīng)緯度信息 private Map<String,String> properties; // 事件詳細(xì)屬性
包名:com.star.engine.utils;
Constants 常量類
package com.star.engine.utils; public static String CLIENT_LOG = "client_log"; public static String HBASE_TABLE = "events_db:users"; public static String BROKERS = "star01:9092,star02:9092,star03:9092"; public static Integer REDIS_PORT = 6379; public static String HOST = "star01"; public static String REDIS_ADDR = "star01"; public static String ZOOKEEPER_PORT = "2181"; public static String RULE_TYPE_LOGIN = "login";
FlinkKafkaUtils FlinkKafka工具類
package com.star.engine.utils; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Properties; public class FlinkKafkaUtils { public static Properties getProducerProperties(String brokers) { Properties properties = getCommonProperties(); properties.setProperty("bootstrap.servers", brokers); properties.setProperty("metadata.broker.list", brokers); properties.setProperty("zookeeper.connect", Constants.HOST+":"+Constants.ZOOKEEPER_PORT); public static Properties getCommonProperties() { Properties properties = new Properties(); properties.setProperty("linger.ms", "100"); properties.setProperty("retries", "100"); properties.setProperty("retry.backoff.ms", "200"); properties.setProperty("buffer.memory", "524288"); properties.setProperty("batch.size", "100"); properties.setProperty("max.request.size", "524288"); properties.setProperty("compression.type", "snappy"); properties.setProperty("request.timeout.ms", "180000"); properties.setProperty("max.block.ms", "180000"); public static FlinkKafkaConsumer<String> getKafkaEventSource(){ Properties props = getProducerProperties(Constants.BROKERS); props.setProperty("auto.offset.reset", "latest"); FlinkKafkaConsumer<String> source = new FlinkKafkaConsumer<>(Constants.CLIENT_LOG, new SimpleStringSchema(), props); public static FlinkKafkaConsumer<String> getKafkaRuleSource() { Properties props = getProducerProperties(Constants.BROKERS); props.setProperty("auto.offset.reset", "latest"); FlinkKafkaConsumer<String> source = new FlinkKafkaConsumer<>("yinew_drl_rule", new SimpleStringSchema(), props);
HBaseUtils Hbase工具類
package com.star.engine.utils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; public class HBaseUtils { private static Connection connection; private static Configuration configuration; configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.property.clientPort", Constants.ZOOKEEPER_PORT); configuration.set("hbase.zookeeper.quorum", Constants.HOST); connection = ConnectionFactory.createConnection(configuration); } catch (IOException e) { public static HTable initHbaseClient(String tableName) { return new HTable(configuration, tableName); } catch (IOException e) { * @param columnFamilies 列族的數(shù)組 public static boolean createTable(String tableName, List<String> columnFamilies) { HBaseAdmin admin = (HBaseAdmin) connection.getAdmin(); if (admin.tableExists(tableName)) { HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableName)); columnFamilies.forEach(columnFamily -> { HColumnDescriptor columnDescriptor = new HColumnDescriptor(columnFamily); columnDescriptor.setMaxVersions(2); tableDescriptor.addFamily(columnDescriptor); admin.createTable(tableDescriptor); } catch (IOException e) { public static boolean deleteTable(String tableName) { HBaseAdmin admin = (HBaseAdmin) connection.getAdmin(); admin.disableTable(tableName); admin.deleteTable(tableName); * @param rowKey 唯一標(biāo)識(shí) * @param columnFamilyName 列族名 * @param qualifier 列標(biāo)識(shí) * @param value 數(shù)據(jù) public static boolean putRow(String tableName, String rowKey, String columnFamilyName, String qualifier, Table table = connection.getTable(TableName.valueOf(tableName)); Put put = new Put(Bytes.toBytes(rowKey)); put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(qualifier), Bytes.toBytes(value)); } catch (IOException e) { * 根據(jù) rowKey 獲取指定行的數(shù)據(jù) * @param rowKey 唯一標(biāo)識(shí) public static Result getRow(String tableName, String rowKey) { Table table = connection.getTable(TableName.valueOf(tableName)); Get get = new Get(Bytes.toBytes(rowKey)); } catch (IOException e) { * 獲取指定行指定列 (cell) 的最新版本的數(shù)據(jù) * @param rowKey 唯一標(biāo)識(shí) * @param qualifier 列標(biāo)識(shí) public static String getCell(String tableName, String rowKey, String columnFamily, String qualifier) { Table table = connection.getTable(TableName.valueOf(tableName)); Get get = new Get(Bytes.toBytes(rowKey)); if (!get.isCheckExistenceOnly()) { get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier)); Result result = table.get(get); byte[] resultValue = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier)); return Bytes.toString(resultValue); } catch (IOException e) { public static ResultScanner getScanner(String tableName) { Table table = connection.getTable(TableName.valueOf(tableName)); return table.getScanner(scan); } catch (IOException e) { public static ResultScanner getScanner(String tableName, FilterList filterList) { Table table = connection.getTable(TableName.valueOf(tableName)); scan.setFilter(filterList); return table.getScanner(scan); } catch (IOException e) { * @param startRowKey 起始 RowKey * @param endRowKey 終止 RowKey public static ResultScanner getScanner(String tableName, String startRowKey, String endRowKey, Table table = connection.getTable(TableName.valueOf(tableName)); scan.setStartRow(Bytes.toBytes(startRowKey)); scan.setStopRow(Bytes.toBytes(endRowKey)); scan.setFilter(filterList); return table.getScanner(scan); } catch (IOException e) { * @param rowKey 唯一標(biāo)識(shí) public static boolean deleteRow(String tableName, String rowKey) { Table table = connection.getTable(TableName.valueOf(tableName)); Delete delete = new Delete(Bytes.toBytes(rowKey)); } catch (IOException e) { * @param rowKey 唯一標(biāo)識(shí) * @param qualifier 列標(biāo)識(shí) public static boolean deleteColumn(String tableName, String rowKey, String familyName, Table table = connection.getTable(TableName.valueOf(tableName)); Delete delete = new Delete(Bytes.toBytes(rowKey)); delete.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(qualifier)); } catch (IOException e) {
KafkaProducerUtils Kafka生產(chǎn)者工具類
package com.star.engine.utils; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import java.util.Properties; public class KafkaProducerUtils { static Producer<String, String> producer; public static void init() { Properties props = new Properties(); props.put("metadata.broker.list", Constants.BROKERS); props.put("bootstrap.servers", Constants.BROKERS); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("producer.type", "async"); props.put("request.required.acks", "-1"); producer = new KafkaProducer<>(props); public static Producer getProducer() {
第一版需求:
1、完成是否異地登錄判斷
package com.star.engine.core; import com.alibaba.fastjson.JSON; import com.star.engine.pojo.ClientLog; import com.star.engine.utils.Constants; import com.star.engine.utils.FlinkKafkaUtils; import com.star.engine.utils.HBaseUtils; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; import org.apache.hadoop.hbase.client.HTable; import org.locationtech.spatial4j.distance.DistanceUtils; import java.text.SimpleDateFormat; import java.util.Iterator; public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<String> source = environment.addSource(FlinkKafkaUtils.getKafkaEventSource()); SingleOutputStreamOperator<ClientLog> clientlogSource = source.map(str -> JSON.parseObject(str, ClientLog.class)); clientlogSource.keyBy(clientLog -> clientLog.getUserNo()) .process(new KeyedProcessFunction<String, ClientLog, String>() { * 從hbase讀取用戶畫像數(shù)據(jù) // 基于狀態(tài)存儲(chǔ)用戶上一次登錄的數(shù)據(jù) ListState<ClientLog> privousLoginData; public void open(Configuration parameters) throws Exception { table = HBaseUtils.initHbaseClient(Constants.HBASE_TABLE); privousLoginData = getRuntimeContext().getListState(new ListStateDescriptor<ClientLog>("privoulsLoginData", ClientLog.class)); // 對(duì)每條數(shù)據(jù)進(jìn)行處理 public void processElement(ClientLog clientLog, Context context, Collector<String> out) throws Exception { String eventType = clientLog.getEventType(); if (eventType.equals(Constants.RULE_TYPE_LOGIN)) { distanceProcess(clientLog,privousLoginData); environment.execute("Processl"); * @param privousLoginData public static void distanceProcess(ClientLog clientLog, ListState<ClientLog> privousLoginData) { SimpleDateFormat simpleDateFormat = new SimpleDateFormat(); String reason = "第一次登錄:" + clientLog.getAddr() + " 本地登錄時(shí)間:" + clientLog.getDetailTime(); Iterator<ClientLog> iterator = privousLoginData.get().iterator(); if (iterator.hasNext()) { ClientLog privousMessage = iterator.next(); String distanceRule = "500|1@400|2@300|3@200|4@100|5"; String[] distanceRules = distanceRule.split("@"); String oldTime = privousMessage.getDetailTime(); String oldGps = privousMessage.getGps(); String oldAddr = privousMessage.getAddr(); double distanceReal = DistanceUtils.distHaversineRAD(Double.parseDouble(clientLog.getGps().split("\\,")[0]), Double.parseDouble(clientLog.getGps().split("\\,")[1]), Double.parseDouble(oldGps.split("\\,")[0]), Double.parseDouble(oldGps.split("\\,")[1])); long time = simpleDateFormat.parse(clientLog.getDetailTime()).getTime() - simpleDateFormat.parse(oldTime).getTime(); double speed = distanceReal / (time / (1000 * 3600.0)); for (String rule : distanceRules) { double speedLimit = Double.parseDouble(rule.split("\\|")[0]); String speedScore = rule.split("\\|")[1]; if (speed >= speedLimit) { reason += "=== 短時(shí)間內(nèi)速度為:"+ speed + " 規(guī)定約定速度為:" + speedLimit+" 當(dāng)前登錄地:" + clientLog.getAddr() + " 上一次登錄地:" + privousMessage.getAddr(); reason = "第一次登錄:" + clientLog.getAddr() + " 登錄時(shí)間為:" + clientLog.getDetailTime(); privousLoginData.clear(); privousLoginData.add(clientLog);
|