一区二区三区日韩精品-日韩经典一区二区三区-五月激情综合丁香婷婷-欧美精品中文字幕专区

分享

Flink

 vnxy001 2022-10-10 發(fā)布于浙江

概要

        實(shí)時(shí)風(fēng)控解決方案
        總體架構(gòu)介紹
        第一版需求實(shí)現(xiàn)

1. 風(fēng)控背景

      互聯(lián)網(wǎng)場(chǎng)景中,典型的風(fēng)控場(chǎng)景包括:注冊(cè)風(fēng)控、登陸風(fēng)控、交易風(fēng)控、活動(dòng)風(fēng)控等,而風(fēng)控的最佳效果是防患于未然,所以事前事中和事后三種實(shí)現(xiàn)方案中,又以事前預(yù)警和事中控制最好。
      這要求風(fēng)控系統(tǒng)一定要有實(shí)時(shí)性。我們將實(shí)時(shí)風(fēng)控架構(gòu)作為重點(diǎn)講解。

2. 總體架構(gòu)

       風(fēng)控是業(yè)務(wù)場(chǎng)景的產(chǎn)物,風(fēng)控系統(tǒng)直接服務(wù)于業(yè)務(wù)系統(tǒng),與之相關(guān)的還有懲罰系統(tǒng)和分析系統(tǒng),各系統(tǒng)關(guān)系與角色如下:

 

風(fēng)控系統(tǒng)有規(guī)則和模型兩種技術(shù)路線,規(guī)則的優(yōu)點(diǎn)是簡(jiǎn)單直觀、可解釋性強(qiáng)、靈活,所以長(zhǎng)期活躍在風(fēng)控系統(tǒng)之中,但缺點(diǎn)是容易被攻破,一但被黑產(chǎn)猜中就會(huì)失效,于是在實(shí)際的風(fēng)控系統(tǒng)中,往往需要再結(jié)合上基于模型的風(fēng)控環(huán)節(jié)來(lái)增加健壯性。

  • 規(guī)則就是針對(duì)事物的條件判斷,我們針對(duì)注冊(cè)、登陸、交易、活動(dòng)分別假設(shè)幾條規(guī)則,比如:

    • 用戶名與身份證姓名不一致;

    • 某 IP 最近 1 小時(shí)注冊(cè)賬號(hào)數(shù)超過 10 個(gè);

    • 某賬號(hào)最近 3 分鐘登陸次數(shù)大于 5 次;

    • 某賬號(hào)群體最近 1 小時(shí)購(gòu)買優(yōu)惠商品超過 100 件;

    • 某賬號(hào)最近 3 分鐘領(lǐng)券超過 3 張;

  • 規(guī)則可以組合成規(guī)則組

    • 事實(shí):即被判斷的主體和屬性,如上面規(guī)則的賬號(hào)及登陸次數(shù)、IP 和注冊(cè)次數(shù)等;

    • 條件:判斷的邏輯,如某事實(shí)的某屬性大于某個(gè)指標(biāo);

    • 指標(biāo)閾值:判斷的依據(jù),比如登陸次數(shù)的臨界閾值,注冊(cè)賬號(hào)數(shù)的臨界閾值等;

    • 規(guī)則可由運(yùn)營(yíng)專家憑經(jīng)驗(yàn)填寫,也可由數(shù)據(jù)分析師根據(jù)歷史數(shù)據(jù)發(fā)掘,但因?yàn)橐?guī)則在與黑產(chǎn)的攻防之中會(huì)被猜中導(dǎo)致失效,所以無(wú)一例外都需要?jiǎng)討B(tài)調(diào)整。

  • 基于上邊的討論,我們?cè)O(shè)計(jì)一個(gè)風(fēng)控系統(tǒng)方案如下:

  • 該系統(tǒng)有三條數(shù)據(jù)流向:

    • 實(shí)時(shí)風(fēng)控?cái)?shù)據(jù)流:由紅線標(biāo)識(shí),同步調(diào)用,為風(fēng)控調(diào)用的核心鏈路

    • 準(zhǔn)實(shí)時(shí)指標(biāo)數(shù)據(jù)流:由藍(lán)線標(biāo)識(shí),異步寫入,為實(shí)時(shí)風(fēng)控部分準(zhǔn)備指標(biāo)數(shù)據(jù)

    • 準(zhǔn)實(shí)時(shí)/離線分析數(shù)據(jù)流:由綠線標(biāo)識(shí),異步寫入,為風(fēng)控系統(tǒng)的表現(xiàn)分析提供數(shù)據(jù)

實(shí)時(shí)風(fēng)控

        實(shí)時(shí)風(fēng)控是整個(gè)系統(tǒng)的核心,被業(yè)務(wù)系統(tǒng)同步調(diào)用,完成對(duì)應(yīng)的風(fēng)控判斷。
        前面提到規(guī)則往往由人編寫并且需要?jiǎng)討B(tài)調(diào)整,所以我們會(huì)把風(fēng)控判斷部分與規(guī)則管理部分拆開。規(guī)則管理后臺(tái)為運(yùn)營(yíng)服務(wù),由運(yùn)營(yíng)人員去進(jìn)行相關(guān)操作:
        場(chǎng)景管理:決定某個(gè)場(chǎng)景是否實(shí)施風(fēng)控,比如活動(dòng)場(chǎng)景,在活動(dòng)結(jié)束后可以關(guān)閉該場(chǎng)景;
        黑白名單:人工/程序找到系統(tǒng)的黑白名單,直接過濾
        規(guī)則管理:管理規(guī)則,包括增刪或修改,比如登陸新增 IP 地址判斷,比如下單新增頻率校驗(yàn)等;
        閾值管理:管理指標(biāo)的閾值,比如規(guī)則為某 IP 最近 1 小時(shí)注冊(cè)賬號(hào)數(shù)不能超過 10 個(gè),那 1 和 10 都屬于閾值;
        基于管理后臺(tái),那規(guī)則判斷部分的邏輯也就十分清晰了,分別包括前置過濾、事實(shí)數(shù)據(jù)準(zhǔn)備、規(guī)則判斷三個(gè)環(huán)節(jié)。

  • 前置過濾

    • 業(yè)務(wù)系統(tǒng)在特定事件(如注冊(cè)、登陸、下單、參加活動(dòng)等)被觸發(fā)后同步調(diào)用風(fēng)控系統(tǒng),附帶相關(guān)上下文,比如 IP 地址,事件標(biāo)識(shí)等,規(guī)則判斷部分會(huì)根據(jù)管理后臺(tái)的配置決定是否進(jìn)行判斷,如果是,接著進(jìn)行黑白名單過濾,都通過后進(jìn)入下一個(gè)環(huán)節(jié)。

  • 實(shí)時(shí)數(shù)據(jù)準(zhǔn)備

    • 在進(jìn)行判斷之前,系統(tǒng)必須要準(zhǔn)備一些事實(shí)數(shù)據(jù),比如:

      • 注冊(cè)場(chǎng)景,假如規(guī)則為單一 IP 最近 1 小時(shí)注冊(cè)賬號(hào)數(shù)不超過 10 個(gè),那系統(tǒng)需要根據(jù) IP 地址去 Redis/Hbase 找到該 IP 最近 1 小時(shí)注冊(cè)賬號(hào)的數(shù)目,比如 15;

      • 登陸場(chǎng)景,假如規(guī)則為單一賬號(hào)最近 3 分鐘登陸次數(shù)不超過 5 次,那系統(tǒng)需要根據(jù)賬號(hào)去 Redis/Hbase 找到該賬號(hào)最近 3 分鐘登陸的次數(shù),比如 8;

規(guī)則判斷

        在得到事實(shí)數(shù)據(jù)之后,系統(tǒng)會(huì)根據(jù)規(guī)則和閾值進(jìn)行判斷,然后返回結(jié)果,整個(gè)過程便結(jié)束了。
        整個(gè)過程邏輯上是清晰的,我們常說的規(guī)則引擎主要在這部分起作用,一般來(lái)說這個(gè)過程有兩種實(shí)現(xiàn)方式:
        借助成熟的規(guī)則引擎,比如 Drools,Drools 和 Java 環(huán)境結(jié)合的非常好,本身也非常完善,支持很多特性。
        基于 Groovy 等動(dòng)態(tài)語(yǔ)言完成
        這兩種方案都支持規(guī)則的動(dòng)態(tài)更新

  • 準(zhǔn)實(shí)時(shí)數(shù)據(jù)流

    • 這部分屬于后臺(tái)邏輯,為風(fēng)控系統(tǒng)服務(wù),準(zhǔn)備事實(shí)數(shù)據(jù)。

    • 把數(shù)據(jù)準(zhǔn)備與邏輯判斷拆分,是出于系統(tǒng)的性能/可擴(kuò)展性的角度考慮的。前邊提到,做規(guī)則判斷需要事實(shí)的相關(guān)指標(biāo),比如最近一小時(shí)登陸次數(shù),最近一小時(shí)注冊(cè)賬號(hào)數(shù)等等,這些指標(biāo)通常有一段時(shí)間跨度,是某種狀態(tài)或聚合,很難在實(shí)時(shí)風(fēng)控過程中根據(jù)原始數(shù)據(jù)進(jìn)行計(jì)算,因?yàn)轱L(fēng)控的規(guī)則引擎往往是無(wú)狀態(tài)的,不會(huì)記錄前面的結(jié)果。

    • 同時(shí),這部分原始數(shù)據(jù)量很大,因?yàn)橛脩艋顒?dòng)的原始數(shù)據(jù)都要傳過來(lái)進(jìn)行計(jì)算,所以這部分往往由一個(gè)流式大數(shù)據(jù)系統(tǒng)來(lái)完成。

    • 業(yè)務(wù)系統(tǒng)把埋點(diǎn)數(shù)據(jù)發(fā)送到 Kafka;

      • Flink 訂閱 Kafka,完成原子粒度的聚合;

    • Flink 僅完成原子粒度的聚合是和規(guī)則的動(dòng)態(tài)變更邏輯相關(guān)的。舉例來(lái)說,在注冊(cè)場(chǎng)景中,運(yùn)營(yíng)同學(xué)會(huì)根據(jù)效果一會(huì)要判斷某 IP 最近 1 小時(shí)的注冊(cè)賬號(hào)數(shù),一會(huì)要判斷最近 3 小時(shí)的注冊(cè)賬號(hào)數(shù),一會(huì)又要判斷最近 5 小時(shí)的注冊(cè)賬號(hào)數(shù)……也就是說這個(gè)最近 N 小時(shí)的 N 是動(dòng)態(tài)調(diào)整的。那 Flink 在計(jì)算時(shí)只應(yīng)該計(jì)算 1 小時(shí)的賬號(hào)數(shù),在判斷過程中根據(jù)規(guī)則來(lái)讀取最近 3 個(gè) 1 小時(shí)還是 5 個(gè) 1 小時(shí),然后聚合后進(jìn)行判斷。因?yàn)樵?Flink 的運(yùn)行機(jī)制中,作業(yè)提交后會(huì)持續(xù)運(yùn)行,如果調(diào)整邏輯需要停止作業(yè),修改代碼,然后重啟,相當(dāng)麻煩;同時(shí)因?yàn)?Flink 中間狀態(tài)的問題,重啟還面臨著中間狀態(tài)能否復(fù)用的問題。所以假如直接由 Flink 完成 N 小時(shí)的聚合的話,每次 N 的變動(dòng)都需要重復(fù)上面的操作,有時(shí)還需要追數(shù)據(jù),非常繁瑣。

      • Flink 把匯總的指標(biāo)結(jié)果寫入 Redis 或 Hbase,供實(shí)時(shí)風(fēng)控系統(tǒng)查詢。兩者問題都不大,根據(jù)場(chǎng)景選擇即可。

      • 通過把數(shù)據(jù)計(jì)算和邏輯判斷拆分開來(lái)并引入 Flink,我們的風(fēng)控系統(tǒng)可以應(yīng)對(duì)極大的用戶規(guī)模。

分析系統(tǒng)
       前面的東西靜態(tài)來(lái)看是一個(gè)完整的風(fēng)控系統(tǒng),但動(dòng)態(tài)來(lái)看就有缺失了,這種缺失不體現(xiàn)在功能性上,而是體現(xiàn)在演進(jìn)上。即如果從動(dòng)態(tài)的角度來(lái)看一個(gè)風(fēng)控系統(tǒng)的話,我們至少還需要兩部分,一是衡量系統(tǒng)的整體效果,一是為系統(tǒng)提供規(guī)則/邏輯升級(jí)的依據(jù)。
       在衡量整體效果方面,我們需要:
            判斷規(guī)則是否失效,比如攔截率的突然降低;
            判斷規(guī)則是否多余,比如某規(guī)則從來(lái)沒攔截過任何事件;
            判斷規(guī)則是否有漏洞,比如在舉辦某個(gè)促銷活動(dòng)或發(fā)放代金券后,福利被領(lǐng)完了,但沒有達(dá)到預(yù)期效果;
       在為系統(tǒng)提供規(guī)則/邏輯升級(jí)依據(jù)方面,我們需要:
            發(fā)現(xiàn)全局規(guī)則:比如某人在電子產(chǎn)品的花費(fèi)突然增長(zhǎng)了 100 倍,單獨(dú)來(lái)看是有問題的,但整體來(lái)看,可能很多人都出現(xiàn)了這個(gè)現(xiàn)象,原來(lái)是蘋果發(fā)新品了。
           識(shí)別某種行為的組合:?jiǎn)未涡袨槭钦5?,但組合是異常的,比如用戶買菜刀是正常的,買車票是正常的,買繩子也是正常的,去加油站加油也是正常的,但短時(shí)間內(nèi)同時(shí)做這些事情就不是正常的。
            群體識(shí)別:比如通過圖分析技術(shù),發(fā)現(xiàn)某個(gè)群體,然后給給這個(gè)群體的所有賬號(hào)都打上群體標(biāo)簽,防止出現(xiàn)那種每個(gè)賬號(hào)表現(xiàn)都正常,但整個(gè)群體卻在集中薅羊毛的情況。
            這便是分析系統(tǒng)的角色定位,在他的工作中有部分是確定性的,也有部分是探索性的,為了完成這種工作,該系統(tǒng)需要盡可能多的數(shù)據(jù)支持,如:
            業(yè)務(wù)系統(tǒng)的數(shù)據(jù),業(yè)務(wù)的埋點(diǎn)數(shù)據(jù),記錄詳細(xì)的用戶、交易或活動(dòng)數(shù)據(jù);
            風(fēng)控?cái)r截?cái)?shù)據(jù),風(fēng)控系統(tǒng)的埋點(diǎn)數(shù)據(jù),比如某個(gè)用戶在具有某些特征的狀態(tài)下因?yàn)槟硹l規(guī)則而被攔截,這條攔截本身就是一個(gè)事件數(shù)據(jù);
            這是一個(gè)典型的大數(shù)據(jù)分析場(chǎng)景,架構(gòu)也比較靈活

      相對(duì)來(lái)說這個(gè)系統(tǒng)是最開放的,既有固定的指標(biāo)分析,也可以使用機(jī)器學(xué)習(xí)/數(shù)據(jù)分析技術(shù)發(fā)現(xiàn)更多新的規(guī)則或模式。 

3.第一版需求開發(fā)

數(shù)據(jù)源介紹:

包名:com.star.engine.pojo

日志實(shí)體類

  1. package com.star.engine.pojo;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import lombok.ToString;
  6. import java.util.Map;
  7. @Data
  8. @NoArgsConstructor
  9. @AllArgsConstructor
  10. @ToString
  11. public class ClientLog {
  12. private String userNo; // 用戶ID
  13. private String userName; // 用戶名
  14. private String appId; // app的編號(hào)
  15. private String appVersion; // APP的版本
  16. private String addr; // 地址
  17. private String carrier; // 運(yùn)營(yíng)商
  18. private String imei; // 設(shè)備編號(hào)
  19. private String deviceType; // 設(shè)備類型
  20. private String ip; // 客戶端IP
  21. private String netType; // 網(wǎng)絡(luò)類型:WIFI,4G,5G
  22. private String osName; // 操作系統(tǒng)類型
  23. private String osVersion; // 操作系統(tǒng)版本
  24. private String sessionId; // 會(huì)話ID
  25. private String detailTime; // 創(chuàng)建詳細(xì)時(shí)間
  26. private String eventId; // 事件編號(hào)
  27. private String eventType; // 事件類型
  28. private String createTime; // 創(chuàng)建時(shí)間
  29. private String gps; // 經(jīng)緯度信息
  30. private Map<String,String> properties; // 事件詳細(xì)屬性
  31. }
  • 工具類介紹

包名:com.star.engine.utils;

Constants 常量類

  1. package com.star.engine.utils;
  2. /**
  3. * 常量信息
  4. */
  5. public class Constants {
  6. //topic
  7. public static String CLIENT_LOG = "client_log";
  8. public static String HBASE_TABLE = "events_db:users";
  9. public static String BROKERS = "star01:9092,star02:9092,star03:9092";
  10. public static Integer REDIS_PORT = 6379;
  11. public static String HOST = "star01";
  12. public static String REDIS_ADDR = "star01";
  13. public static String ZOOKEEPER_PORT = "2181";
  14. public static String RULE_TYPE_LOGIN = "login";
  15. }

FlinkKafkaUtils FlinkKafka工具類

  1. package com.star.engine.utils;
  2. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  3. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  4. import java.util.Properties;
  5. public class FlinkKafkaUtils {
  6. public static Properties getProducerProperties(String brokers) {
  7. Properties properties = getCommonProperties();
  8. properties.setProperty("bootstrap.servers", brokers);
  9. properties.setProperty("metadata.broker.list", brokers);
  10. properties.setProperty("zookeeper.connect", Constants.HOST+":"+Constants.ZOOKEEPER_PORT);
  11. return properties;
  12. }
  13. public static Properties getCommonProperties() {
  14. Properties properties = new Properties();
  15. properties.setProperty("linger.ms", "100");
  16. properties.setProperty("retries", "100");
  17. properties.setProperty("retry.backoff.ms", "200");
  18. properties.setProperty("buffer.memory", "524288");
  19. properties.setProperty("batch.size", "100");
  20. properties.setProperty("max.request.size", "524288");
  21. properties.setProperty("compression.type", "snappy");
  22. properties.setProperty("request.timeout.ms", "180000");
  23. properties.setProperty("max.block.ms", "180000");
  24. return properties;
  25. }
  26. public static FlinkKafkaConsumer<String> getKafkaEventSource(){
  27. Properties props = getProducerProperties(Constants.BROKERS);
  28. props.setProperty("auto.offset.reset", "latest");
  29. //指定Topic
  30. FlinkKafkaConsumer<String> source = new FlinkKafkaConsumer<>(Constants.CLIENT_LOG, new SimpleStringSchema(), props);
  31. return source;
  32. }
  33. public static FlinkKafkaConsumer<String> getKafkaRuleSource() {
  34. Properties props = getProducerProperties(Constants.BROKERS);
  35. props.setProperty("auto.offset.reset", "latest");
  36. FlinkKafkaConsumer<String> source = new FlinkKafkaConsumer<>("yinew_drl_rule", new SimpleStringSchema(), props);
  37. return source;
  38. }
  39. }

HBaseUtils Hbase工具類

  1. package com.star.engine.utils;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.hbase.HBaseConfiguration;
  4. import org.apache.hadoop.hbase.HColumnDescriptor;
  5. import org.apache.hadoop.hbase.HTableDescriptor;
  6. import org.apache.hadoop.hbase.TableName;
  7. import org.apache.hadoop.hbase.client.*;
  8. import org.apache.hadoop.hbase.filter.FilterList;
  9. import org.apache.hadoop.hbase.util.Bytes;
  10. import java.io.IOException;
  11. import java.util.List;
  12. public class HBaseUtils {
  13. private static Connection connection;
  14. private static Configuration configuration;
  15. static {
  16. configuration = HBaseConfiguration.create();
  17. configuration.set("hbase.zookeeper.property.clientPort", Constants.ZOOKEEPER_PORT);
  18. configuration.set("hbase.zookeeper.quorum", Constants.HOST);
  19. try {
  20. connection = ConnectionFactory.createConnection(configuration);
  21. } catch (IOException e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. public static HTable initHbaseClient(String tableName) {
  26. try {
  27. return new HTable(configuration, tableName);
  28. } catch (IOException e) {
  29. e.printStackTrace();
  30. }
  31. return null;
  32. }
  33. /**
  34. * 創(chuàng)建 HBase 表
  35. * @param tableName 表名
  36. * @param columnFamilies 列族的數(shù)組
  37. */
  38. public static boolean createTable(String tableName, List<String> columnFamilies) {
  39. try {
  40. HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();
  41. if (admin.tableExists(tableName)) {
  42. return false;
  43. }
  44. HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
  45. columnFamilies.forEach(columnFamily -> {
  46. HColumnDescriptor columnDescriptor = new HColumnDescriptor(columnFamily);
  47. columnDescriptor.setMaxVersions(2);
  48. tableDescriptor.addFamily(columnDescriptor);
  49. });
  50. admin.createTable(tableDescriptor);
  51. } catch (IOException e) {
  52. e.printStackTrace();
  53. }
  54. return true;
  55. }
  56. /**
  57. * 刪除 hBase 表
  58. * @param tableName 表名
  59. */
  60. public static boolean deleteTable(String tableName) {
  61. try {
  62. HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();
  63. // 刪除表前需要先禁用表
  64. admin.disableTable(tableName);
  65. admin.deleteTable(tableName);
  66. } catch (Exception e) {
  67. e.printStackTrace();
  68. }
  69. return true;
  70. }
  71. /**
  72. * 插入數(shù)據(jù)
  73. *
  74. * @param tableName 表名
  75. * @param rowKey 唯一標(biāo)識(shí)
  76. * @param columnFamilyName 列族名
  77. * @param qualifier 列標(biāo)識(shí)
  78. * @param value 數(shù)據(jù)
  79. */
  80. public static boolean putRow(String tableName, String rowKey, String columnFamilyName, String qualifier,
  81. String value) {
  82. try {
  83. Table table = connection.getTable(TableName.valueOf(tableName));
  84. Put put = new Put(Bytes.toBytes(rowKey));
  85. put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(qualifier), Bytes.toBytes(value));
  86. table.put(put);
  87. table.close();
  88. } catch (IOException e) {
  89. e.printStackTrace();
  90. }
  91. return true;
  92. }
  93. /**
  94. * 根據(jù) rowKey 獲取指定行的數(shù)據(jù)
  95. *
  96. * @param tableName 表名
  97. * @param rowKey 唯一標(biāo)識(shí)
  98. */
  99. public static Result getRow(String tableName, String rowKey) {
  100. try {
  101. Table table = connection.getTable(TableName.valueOf(tableName));
  102. Get get = new Get(Bytes.toBytes(rowKey));
  103. return table.get(get);
  104. } catch (IOException e) {
  105. e.printStackTrace();
  106. }
  107. return null;
  108. }
  109. /**
  110. * 獲取指定行指定列 (cell) 的最新版本的數(shù)據(jù)
  111. *
  112. * @param tableName 表名
  113. * @param rowKey 唯一標(biāo)識(shí)
  114. * @param columnFamily 列族
  115. * @param qualifier 列標(biāo)識(shí)
  116. */
  117. public static String getCell(String tableName, String rowKey, String columnFamily, String qualifier) {
  118. try {
  119. Table table = connection.getTable(TableName.valueOf(tableName));
  120. Get get = new Get(Bytes.toBytes(rowKey));
  121. if (!get.isCheckExistenceOnly()) {
  122. get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
  123. Result result = table.get(get);
  124. byte[] resultValue = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
  125. return Bytes.toString(resultValue);
  126. } else {
  127. return null;
  128. }
  129. } catch (IOException e) {
  130. e.printStackTrace();
  131. }
  132. return null;
  133. }
  134. /**
  135. * 檢索全表
  136. *
  137. * @param tableName 表名
  138. */
  139. public static ResultScanner getScanner(String tableName) {
  140. try {
  141. Table table = connection.getTable(TableName.valueOf(tableName));
  142. Scan scan = new Scan();
  143. return table.getScanner(scan);
  144. } catch (IOException e) {
  145. e.printStackTrace();
  146. }
  147. return null;
  148. }
  149. /**
  150. * 檢索表中指定數(shù)據(jù)
  151. *
  152. * @param tableName 表名
  153. * @param filterList 過濾器
  154. */
  155. public static ResultScanner getScanner(String tableName, FilterList filterList) {
  156. try {
  157. Table table = connection.getTable(TableName.valueOf(tableName));
  158. Scan scan = new Scan();
  159. scan.setFilter(filterList);
  160. return table.getScanner(scan);
  161. } catch (IOException e) {
  162. e.printStackTrace();
  163. }
  164. return null;
  165. }
  166. /**
  167. * 檢索表中指定數(shù)據(jù)
  168. *
  169. * @param tableName 表名
  170. * @param startRowKey 起始 RowKey
  171. * @param endRowKey 終止 RowKey
  172. * @param filterList 過濾器
  173. */
  174. public static ResultScanner getScanner(String tableName, String startRowKey, String endRowKey,
  175. FilterList filterList) {
  176. try {
  177. Table table = connection.getTable(TableName.valueOf(tableName));
  178. Scan scan = new Scan();
  179. scan.setStartRow(Bytes.toBytes(startRowKey));
  180. scan.setStopRow(Bytes.toBytes(endRowKey));
  181. scan.setFilter(filterList);
  182. return table.getScanner(scan);
  183. } catch (IOException e) {
  184. e.printStackTrace();
  185. }
  186. return null;
  187. }
  188. /**
  189. * 刪除指定行記錄
  190. *
  191. * @param tableName 表名
  192. * @param rowKey 唯一標(biāo)識(shí)
  193. */
  194. public static boolean deleteRow(String tableName, String rowKey) {
  195. try {
  196. Table table = connection.getTable(TableName.valueOf(tableName));
  197. Delete delete = new Delete(Bytes.toBytes(rowKey));
  198. table.delete(delete);
  199. } catch (IOException e) {
  200. e.printStackTrace();
  201. }
  202. return true;
  203. }
  204. /**
  205. * 刪除指定行的指定列
  206. *
  207. * @param tableName 表名
  208. * @param rowKey 唯一標(biāo)識(shí)
  209. * @param familyName 列族
  210. * @param qualifier 列標(biāo)識(shí)
  211. */
  212. public static boolean deleteColumn(String tableName, String rowKey, String familyName,
  213. String qualifier) {
  214. try {
  215. Table table = connection.getTable(TableName.valueOf(tableName));
  216. Delete delete = new Delete(Bytes.toBytes(rowKey));
  217. delete.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(qualifier));
  218. table.delete(delete);
  219. table.close();
  220. } catch (IOException e) {
  221. e.printStackTrace();
  222. }
  223. return true;
  224. }
  225. }

KafkaProducerUtils Kafka生產(chǎn)者工具類

  1. package com.star.engine.utils;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.Producer;
  4. import java.util.Properties;
  5. public class KafkaProducerUtils {
  6. static Producer<String, String> producer;
  7. public static void init() {
  8. Properties props = new Properties();
  9. //此處配置的是kafka的端口
  10. props.put("metadata.broker.list", Constants.BROKERS);
  11. props.put("bootstrap.servers", Constants.BROKERS);
  12. //配置value的序列化類
  13. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  14. //配置key的序列化類
  15. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  16. props.put("producer.type", "async");
  17. props.put("request.required.acks", "-1");
  18. producer = new KafkaProducer<>(props);
  19. }
  20. public static Producer getProducer() {
  21. if (producer == null) {
  22. init();
  23. }
  24. return producer;
  25. }
  26. }
第一版需求:
     1、完成是否異地登錄判斷
  1. package com.star.engine.core;
  2. import com.alibaba.fastjson.JSON;
  3. import com.star.engine.pojo.ClientLog;
  4. import com.star.engine.utils.Constants;
  5. import com.star.engine.utils.FlinkKafkaUtils;
  6. import com.star.engine.utils.HBaseUtils;
  7. import org.apache.flink.api.common.state.ListState;
  8. import org.apache.flink.api.common.state.ListStateDescriptor;
  9. import org.apache.flink.configuration.Configuration;
  10. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  11. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  12. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  13. import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
  14. import org.apache.flink.util.Collector;
  15. import org.apache.hadoop.hbase.client.HTable;
  16. import org.locationtech.spatial4j.distance.DistanceUtils;
  17. import java.text.SimpleDateFormat;
  18. import java.util.Iterator;
  19. /**
  20. * 第一版需求:
  21. * 1、完成是否異地登錄判斷
  22. *
  23. */
  24. public class Processl {
  25. public static void main(String[] args) throws Exception {
  26. StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
  27. // 1.獲取Kafka數(shù)據(jù)源
  28. DataStreamSource<String> source = environment.addSource(FlinkKafkaUtils.getKafkaEventSource());
  29. SingleOutputStreamOperator<ClientLog> clientlogSource = source.map(str -> JSON.parseObject(str, ClientLog.class));
  30. clientlogSource.keyBy(clientLog -> clientLog.getUserNo())
  31. .process(new KeyedProcessFunction<String, ClientLog, String>() {
  32. /**
  33. * 從hbase讀取用戶畫像數(shù)據(jù)
  34. * @param parameters
  35. * @throws Exception
  36. */
  37. HTable table;
  38. // 基于狀態(tài)存儲(chǔ)用戶上一次登錄的數(shù)據(jù)
  39. ListState<ClientLog> privousLoginData;
  40. @Override
  41. public void open(Configuration parameters) throws Exception {
  42. table = HBaseUtils.initHbaseClient(Constants.HBASE_TABLE);
  43. privousLoginData = getRuntimeContext().getListState(new ListStateDescriptor<ClientLog>("privoulsLoginData", ClientLog.class));
  44. }
  45. // 對(duì)每條數(shù)據(jù)進(jìn)行處理
  46. @Override
  47. public void processElement(ClientLog clientLog, Context context, Collector<String> out) throws Exception {
  48. String eventType = clientLog.getEventType();
  49. if (eventType.equals(Constants.RULE_TYPE_LOGIN)) {
  50. // 用戶登錄
  51. /**
  52. * 1、5分鐘登錄次數(shù)限制
  53. * 2、異地登錄
  54. * 3、不在常用地區(qū)登錄
  55. */
  56. // 5分鐘登錄次數(shù)限制
  57. // 異地登錄
  58. distanceProcess(clientLog,privousLoginData);
  59. // 在常用地區(qū)登錄
  60. }
  61. }
  62. }).print();
  63. environment.execute("Processl");
  64. }
  65. /**
  66. * 異地登錄
  67. * @param clientLog
  68. * @param privousLoginData
  69. */
  70. public static void distanceProcess(ClientLog clientLog, ListState<ClientLog> privousLoginData) {
  71. SimpleDateFormat simpleDateFormat = new SimpleDateFormat();
  72. String score = "0";
  73. String reason = "第一次登錄:" + clientLog.getAddr() + " 本地登錄時(shí)間:" + clientLog.getDetailTime();
  74. try {
  75. // 判斷上一次是否登錄
  76. Iterator<ClientLog> iterator = privousLoginData.get().iterator();
  77. if (iterator.hasNext()) {
  78. ClientLog privousMessage = iterator.next();
  79. // 不是第一次登錄
  80. // 靜態(tài)規(guī)則
  81. String distanceRule = "500|1@400|2@300|3@200|4@100|5";
  82. String[] distanceRules = distanceRule.split("@");
  83. String oldTime = privousMessage.getDetailTime();
  84. String oldGps = privousMessage.getGps();
  85. String oldAddr = privousMessage.getAddr();
  86. // 計(jì)算距離
  87. double distanceReal = DistanceUtils.distHaversineRAD(Double.parseDouble(clientLog.getGps().split("\\,")[0]), Double.parseDouble(clientLog.getGps().split("\\,")[1]), Double.parseDouble(oldGps.split("\\,")[0]), Double.parseDouble(oldGps.split("\\,")[1]));
  88. // 時(shí)間差
  89. long time = simpleDateFormat.parse(clientLog.getDetailTime()).getTime() - simpleDateFormat.parse(oldTime).getTime();
  90. double speed = distanceReal / (time / (1000 * 3600.0));
  91. // 規(guī)則匹配
  92. for (String rule : distanceRules) {
  93. double speedLimit = Double.parseDouble(rule.split("\\|")[0]);
  94. String speedScore = rule.split("\\|")[1];
  95. if (speed >= speedLimit) {
  96. score = speedScore;
  97. reason += "=== 短時(shí)間內(nèi)速度為:"+ speed + " 規(guī)定約定速度為:" + speedLimit+" 當(dāng)前登錄地:" + clientLog.getAddr() + " 上一次登錄地:" + privousMessage.getAddr();
  98. }
  99. }
  100. } else {
  101. // 第一次登錄
  102. score = "5";
  103. reason = "第一次登錄:" + clientLog.getAddr() + " 登錄時(shí)間為:" + clientLog.getDetailTime();
  104. }
  105. privousLoginData.clear();
  106. privousLoginData.add(clientLog);
  107. } catch (Exception e) {
  108. e.printStackTrace();
  109. }
  110. }
  111. }

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購(gòu)買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類似文章 更多

    日本久久中文字幕免费| 在线日韩欧美国产自拍| 欧洲一级片一区二区三区| 日韩人妻中文字幕精品| 99久久精品视频一区二区| 99久久精品午夜一区| 美日韩一区二区精品系列| 国产麻豆视频一二三区| 国产欧美精品对白性色| 亚洲国产91精品视频| 91人妻人澡人人爽人人精品| 人体偷拍一区二区三区| 午夜小视频成人免费看| 日韩一区二区三区久久| 在线一区二区免费的视频| 冬爱琴音一区二区中文字幕| 少妇毛片一区二区三区| 亚洲成人精品免费在线观看| 91精品视频全国免费| 午夜福利92在线观看| 神马午夜福利免费视频| 亚洲精品国产福利在线| 色老汉在线视频免费亚欧| 黄色激情视频中文字幕| 少妇人妻无一区二区三区| 九九热在线免费在线观看| 欧美三级不卡在线观线看| 日本丰满大奶熟女一区二区| 黄色av尤物白丝在线播放网址 | 国产日产欧美精品大秀| 亚洲国产精品久久琪琪| 99热在线精品视频观看| 日本午夜一本久久久综合| 日韩人妻少妇一区二区| 日韩欧美第一页在线观看| 高清国产日韩欧美熟女| 亚洲黄色在线观看免费高清| 日韩一区二区三区嘿嘿| 黄片在线免费看日韩欧美| 国产成人精品国产成人亚洲 | 国产精品一区二区香蕉视频|