Event 构成
Source
Flume的三个部分:Source,Channel,Sink
数据是存储在Event对象中在这三部分之间传递
Event 构成
Event 接口
public interface Event {
public Map<String, String> getHeaders();
public void setHeaders(Map<String, String> headers);
public byte[] getBody();
public void setBody(byte[] body);
}
以 最简单的实现类 SimpleEvent
为例子
public class SimpleEvent implements Event {
private Map<String, String> headers;
private byte[] body;
...
}
可见
- header
以
Map<String, String>
的形式存储键值对信息 - body
二进制形式存储,从Source接收到的数据会存储在这里
Source
带着问题
- header 里有什么
一些实现类
ExecSource
- ExecRunnable 主要处理
- 通过 EventBuilder.withBody 基础创建 Event , 一行一个 Event
- header 为空
- flushEventBatch 中
- ChannelProcessor :: processEventBatch
1. 依次通过所有拦截器 Intercept
2. 通过 ChannelSelector 分发
3. Event 放入 Channel 中
- SourceCounter :: addToEventAcceptedCount
- 监控器作用
- ChannelProcessor :: processEventBatch
- SpoolDirectorySource
- SpoolDirectoryRunnable 处理
- Channel
- MemoryChannel
- doPut 由 Source 放入
- 将传入的 Event 放入 putList
- Event 不做操作
- doTake 由 Sink 拿走
- queue.poll() – LinkedBlockingDeque
类型 - 放入 tackList 中, 缓冲充当,有大小 transCapacity 限制
- queue.poll() – LinkedBlockingDeque
- doPut 由 Source 放入
- KafkaChannel
- doPut 由 Source 放入
- 寻找 Event header 中的
- key 名称:key
- partitionHeader
Integer
类型。(名称 在配置文件中 由 partitionIdHeader 指定)用于 分区id
- parseAsFlumeEvent ( xml )
指定是否序列化 为 FlumeEvent
默认 true
- 创建 ProducerRecord
- 键为 Event header 中的 key
- 值为 对 event 进行 serializeValue , 根据 parseAsFlumeEvent 来 序列化
- Event 被 序列化 传给 kafka broker
- 寻找 Event header 中的
- doTake 由 Sink 拿走
- Event 由 kafka 反序列生成,再返回给 Sink 的
- 拿 ConsumerRecord 由 kafka 消费者
- deserializeValue 根据 parseAsFlumeEvent 反序列化,转化为 Event
- 如果 record 有 key, 对 Event header 加入 “key” -> record.key()
- doPut 由 Source 放入
- MemoryChannel
- Sink
-
channel.take();
调用的是 Channel 中的 toTake -
LoggerSink
- 直接拿
-
KafkaSink
- 找 Event header 中的
- topicHeader
- allowTopicOverride ( xml )
boolean
为 true - 否则
- kafka.topic ( xml )
- allowTopicOverride ( xml )
- key
- partitionIdHeader
Integer
用于 分区id
- useFlumeEventFormat
boolean
( xml )默认 false
- topicHeader
- 从 Channel 拿到 Event( take )
- 生成 ProducerRecord
对 event 进行 serializeValue , 根据 useFlumeEventFormat 来 序列化
- 生产者生产 ProducerRecord
- producer 加入 future 异步进程
- producer flush
- 提交channel事务
b
- 找 Event header 中的
-