flume用戶自定義攔截器.創(chuàng)建flume-demo的maven項(xiàng)目. 創(chuàng)建項(xiàng)目文件POM.xml. <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.7.0</version> </dependency> package com.kpwong.flume.interceptor; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.util.List; import java.util.Map; public class CustomInterceptor implements Interceptor { @Override public void initialize() { } //單個(gè)事件攔截 @Override public Event intercept(Event event) { Map<String, String> headers = event.getHeaders(); String body = new String( event.getBody()); if (body.contains("hello")){ headers.put("topic","letter"); } else { headers.put("topic","number"); } return event; } //多個(gè)事件攔截 @Override public List<Event> intercept(List<Event> list) { for (Event event : list) { intercept(event); } return list; } @Override public void close() { } public static class Builder implements Interceptor.Builder{ @Override public Interceptor build() { return new CustomInterceptor(); } @Override public void configure(Context context) { } } } 打包項(xiàng)目jar文件。拷貝文件到/flume/lib目錄下
配置conf文件.準(zhǔn)備三臺(tái)機(jī)器(hadoop202,hadoop203,hadoop204) 在hadoop202上。配置flume2.conf # Name the components on this agent a2.sources = r1 a2.sinks = k1 k2 a2.channels = c1 c2 # Describe/configure the source a2.sources.r1.type = netcat a2.sources.r1.bind = localhost a2.sources.r1.port = 44444 #channel interceptors a2.sources.r1.interceptors = i1 a2.sources.r1.interceptors.i1.type =com.kpwong.flume.interceptor.CustomInterceptor$Builder a2.sources.r1.selector.type = multiplexing a2.sources.r1.selector.header = topic a2.sources.r1.selector.mapping.letter = c1 a2.sources.r1.selector.mapping.number = c2 # Describe the sink a2.sinks.k1.type = avro a2.sinks.k1.hostname = hadoop203 a2.sinks.k1.port = 4141 a2.sinks.k2.type=avro a2.sinks.k2.hostname = hadoop204 a2.sinks.k2.port = 4142 # Use a channel which buffers events in memory a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # Use a channel which buffers events in memory a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r1.channels = c1 c2 a2.sinks.k1.channel = c1 a2.sinks.k2.channel = c2 攔截器配置代碼: a2.sources.r1.interceptors = i1
a2.sources.r1.interceptors.i1.type =com.kpwong.flume.interceptor.CustomInterceptor$Builder
a2.sources.r1.selector.type = multiplexing
a2.sources.r1.selector.header = topic
a2.sources.r1.selector.mapping.letter = c1
a2.sources.r1.selector.mapping.number = c2
a3.sources = r1 a3.sinks = k1 a3.channels = c1 a3.sources.r1.type = avro a3.sources.r1.bind = hadoop203 a3.sources.r1.port = 4141 a3.sinks.k1.type = logger a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100 a3.sinks.k1.channel = c1 a3.sources.r1.channels = c1 hadoop204上配置: a4.sources = r1 a4.sinks = k1 a4.channels = c1 a4.sources.r1.type = avro a4.sources.r1.bind = hadoop204 a4.sources.r1.port = 4142 a4.sinks.k1.type = logger a4.channels.c1.type = memory a4.channels.c1.capacity = 1000 a4.channels.c1.transactionCapacity = 100 a4.sinks.k1.channel = c1 a4.sources.r1.channels = c1 在hadoop204上運(yùn)行: bin/flume-ng agent -c conf/ -f job/interceptor/flume4.conf -n a4 -Dflume.root.logger=INFO,console 在hadoop203上運(yùn)行: bin/flume-ng agent -c conf/ -f job/interceptor/flume3.conf -n a3 -Dflume.root.logger=INFO,console 在hadoop202上運(yùn)行: bin/flume-ng agent -c conf/ -f job/interceptor/flume2.conf -n a2
nc localhost 44444
實(shí)驗(yàn)結(jié)果:
|
|