Flume(四)【配置文件总结】

时间:2020-07-06 01:18:20   收藏:0   阅读:133

Agent的配置文件最好根据Flume的拓扑架构,依次写好每个节点的配置文件;

一.Agent

开头都是先要定义agent,sorce,channel,sink名

# Name the components on this agent( 描述这个Agent,给各个组件取名字)
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

二.Source

taildir

# Describe/configure the source
a1.sources.r3.type = TAILDIR
#维护这每个文件读取到的最新的位置
a1.sources.r3.positionFile = /opt/module/flume/tail_dir.json
#可配置多目录
a1.sources.r3.filegroups = f1 f2
#正则匹配文件名
a1.sources.r3.filegroups.f1 = /opt/module/flume/files/.*file.*
a1.sources.r3.filegroups.f2 = /opt/module/flume/files/.*log.*

arvo

# Describe/configure the source
# source端的avro是一个数据接收服务
a1.sources.r1.type = avro
#接收的主机
a1.sources.r1.bind = hadoop102
#要和上级的avro的sink的端口一致
a1.sources.r1.port = 4141

netstat

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

exec

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c

spooldir

# Describe/configure the source
a1.sources.r3.type = spooldir
# 指定文件夹
a1.sources.r3.spoolDir = /opt/module/flume/upload
#指定文件上传后的后缀
a1.sources.r3.fileSuffix = .COMPLETED
a1.sources.r3.fileHeader = true
#忽略所有以.tmp结尾的文件,不上传
a1.sources.r3.ignorePattern = ([^ ]*\.tmp)

三.Sink

hdfs

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-
#是否对时间戳取整
a1.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
#创建文件夹的时间单位
a1.sinks.k1.hdfs.roundUnit = day
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件,单位:s
a1.sinks.k1.hdfs.rollInterval = 3600
#设置每个文件的滚动大小,一般略小于128M
a1.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount = 0

kafka(待续)

hbase(待续)

arvo

# Describe the sink
# sink端的avro是一个数据发送者
a1.sinks.k1.type = avro
#发送的目的主机ip
a1.sinks.k1.hostname = hadoop102 
a1.sinks.k1.port = 4141

logger

# Describe the sink
a1.sinks.k1.type = logger

本地目录(file_roll)

# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/datas/flume3

注意:输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录。

四.Channel

# Describe the channel
#channel的类型为memory或者file
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

五.组件绑定

# Bind the source and sink to the channel
#组件绑定,1个source,2个channel
a1.sources.r1.channels = c1 c2 
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

六.自定义拦截器和channle选择器

channel选择器有两种:replicating(默认),multiplexing

a1.sources.r1.interceptors = i1
#自定义拦截器的全类名
a1.sources.r1.interceptors.i1.type = com.atguigu.interceptor.TypeInterceptor$Builder
#channel选择器选用multiplexing类型
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.hello = c1
a1.sources.r1.selector.mapping.nohello = c2

七.负载均衡和故障转移

# Name the components on this agent
a1.sources = r1
a1.channels = c1
#添加sink组
a1.sinkgroups = g1
a1.sinks = k1 k2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#配置为故障转移(failover)
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
#sink组的绑定
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

八.启动flume

#启动脚本           flume的conf目录   agent名字       执行的配置文件
bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf

logger 打印控制台

bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console

#缩写形式
bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
评论(0
© 2014 mamicode.com 版权所有 京ICP备13008772号-2  联系我们:gaon5@hotmail.com
迷上了代码!