一区二区三区日韩精品-日韩经典一区二区三区-五月激情综合丁香婷婷-欧美精品中文字幕专区

分享

flume interceptors flume攔截器

 行者花雕 2022-12-24 發(fā)布于北京

flume用戶自定義攔截器.創(chuàng)建flume-demo的maven項(xiàng)目.

創(chuàng)建項(xiàng)目文件POM.xml.

<dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.7.0</version>
</dependency>
package com.kpwong.flume.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.List;
import java.util.Map;

public class CustomInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    //單個(gè)事件攔截
    @Override
    public Event intercept(Event event) {

        Map<String, String> headers = event.getHeaders();
        String body = new String( event.getBody());

        if (body.contains("hello")){
            headers.put("topic","letter");
        }
        else
        {
            headers.put("topic","number");
        }

        return event;
    }

    //多個(gè)事件攔截
    @Override
    public List<Event> intercept(List<Event> list) {
        for (Event event : list) {
            intercept(event);
        }
        return list;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new CustomInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

打包項(xiàng)目jar文件。拷貝文件到/flume/lib目錄下

 

 

 配置conf文件.準(zhǔn)備三臺(tái)機(jī)器(hadoop202,hadoop203,hadoop204)

在hadoop202上。配置flume2.conf

# Name the components on this agent
a2.sources = r1
a2.sinks = k1 k2
a2.channels = c1 c2

# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = localhost
a2.sources.r1.port = 44444

#channel interceptors
a2.sources.r1.interceptors = i1
a2.sources.r1.interceptors.i1.type =com.kpwong.flume.interceptor.CustomInterceptor$Builder
a2.sources.r1.selector.type = multiplexing
a2.sources.r1.selector.header = topic
a2.sources.r1.selector.mapping.letter = c1
a2.sources.r1.selector.mapping.number = c2

# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop203
a2.sinks.k1.port = 4141

a2.sinks.k2.type=avro
a2.sinks.k2.hostname = hadoop204
a2.sinks.k2.port = 4142

# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1 c2
a2.sinks.k1.channel = c1
a2.sinks.k2.channel = c2

 攔截器配置代碼:

a2.sources.r1.interceptors = i1
a2.sources.r1.interceptors.i1.type =com.kpwong.flume.interceptor.CustomInterceptor$Builder
a2.sources.r1.selector.type = multiplexing
a2.sources.r1.selector.header = topic
a2.sources.r1.selector.mapping.letter = c1
a2.sources.r1.selector.mapping.number = c2
hadoop203上配置flume3.conf
a3.sources = r1
a3.sinks = k1
a3.channels = c1
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop203
a3.sources.r1.port = 4141
a3.sinks.k1.type = logger
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
a3.sinks.k1.channel = c1
a3.sources.r1.channels = c1
hadoop204上配置:
a4.sources = r1
a4.sinks = k1
a4.channels = c1
a4.sources.r1.type = avro
a4.sources.r1.bind = hadoop204
a4.sources.r1.port = 4142
a4.sinks.k1.type = logger
a4.channels.c1.type = memory
a4.channels.c1.capacity = 1000
a4.channels.c1.transactionCapacity = 100
a4.sinks.k1.channel = c1
a4.sources.r1.channels = c1

在hadoop204上運(yùn)行:

bin/flume-ng agent -c conf/ -f job/interceptor/flume4.conf  -n a4 -Dflume.root.logger=INFO,console

在hadoop203上運(yùn)行:

bin/flume-ng agent -c conf/ -f job/interceptor/flume3.conf -n a3 -Dflume.root.logger=INFO,console

在hadoop202上運(yùn)行:

bin/flume-ng agent -c conf/ -f job/interceptor/flume2.conf -n a2

 nc localhost 44444

實(shí)驗(yàn)結(jié)果:

 

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購(gòu)買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類似文章 更多

    麻豆欧美精品国产综合久久| 日韩黄色大片免费在线| 日韩人妻毛片中文字幕| 欧美三级大黄片免费看| 日韩18一区二区三区| 国产不卡免费高清视频| 日韩午夜老司机免费视频| 亚洲另类欧美综合日韩精品 | 内射精子视频欧美一区二区| 午夜福利激情性生活免费视频| 一区二区日韩欧美精品| 在线九月婷婷丁香伊人| 国产亚洲精品香蕉视频播放| 日韩一级毛一欧美一级乱| 日本久久精品在线观看| 亚洲清纯一区二区三区| 日本一级特黄大片国产| 欧美日韩国产综合在线| 69久久精品亚洲一区二区| 国产主播精品福利午夜二区| 亚洲欧美中文日韩综合| 亚洲视频一区自拍偷拍另类| 欧美日韩一区二区三区色拉拉| 永久福利盒子日韩日韩| 免费久久一级欧美特大黄孕妇| 黑丝国产精品一区二区| 国产精品偷拍一区二区| 99福利一区二区视频| 亚洲综合一区二区三区在线| 熟女乱一区二区三区四区| 国产精品一区二区日韩新区| 免费播放一区二区三区四区| 久久精视频免费视频观看| 樱井知香黑人一区二区| 中国一区二区三区人妻| 中文久久乱码一区二区| 精品伊人久久大香线蕉综合| 国产一区二区三区口爆在线| 亚洲天堂久久精品成人| 精品少妇一区二区三区四区| 亚洲女同一区二区另类|