Flume
概念
-
高可用、高可靠,分布式海量日志采集、聚合和传输的系统。
-
主要作用:实时读取服务器本地磁盘的数据,将数据写入到HDFS
组成
-
Agent,JVM进程,以事件的形式将数据从源头送到目的地
-
Agent分为Source、Channel、Sink
-
Source负责接收数据到Flume Agent,Source组件可以处理各种类型、各种格式的日志数据(avro、thrift、exec、spooling、directory、netcat…)
-
Sink,不断轮询Channel中的事件且批量移除他们,将这些事件批量写入到存储或索引系统,或者被发送到另一个Flume Agent。Sink组件目的地包括:hdfs、logger、avro、file、HBase…
-
Channel,位于Source和Sink的缓冲区。Channel允许Source和Sink运作在不同的速率上。Channel是线程安全的,可以同时处理几个Source的写入操作和几个Sink的读取操作。
-
Flume自带两种Channel:Memory Channel和File Channel。
- Memory Channel是内存中的队列。Memory Channel在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么Memory Channel就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。
- File Channel将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。
-
-
Event
- 传输单元,Flume数据传输的基本单元,以Event的形式将数据从源头送至目的地。
- Event由Header和Body两部分组成,Header用来存放该event的一些属性,为K-V结构,Body用来存放该条数据,形式为字节数组。
主要步骤
分析需要什么组件,去官方看文档,配置,启动
如果官方实在没有提供相关功能,就自定义Source或者Sink(也是看官方)
案例1
监控端口数据,将数据打印到控制台
端口,所以选用NetCat TCP Source
打印到控制台,所以使用Logger Sink
建立文件夹(自己建的),flume/job
配置环境变量FLUME_HOME,方便在其他地方调用命令
配置打印日志:vim /opt/module/flume-1.9.0/conf/log4j.properties
设置:flume.log.dir=/opt/module/flume-1.9.0/logs
(如果不设置,那么每次在哪里运行的命令,日志就会出现在哪个文件夹下)
配置文件名:自己命名,job/netcat-flume-logger.conf
#Named
a1.sources=r1
a1.channels=c1
a1.sinks=k1#Source
a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=6666#Channels
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100#Sink
a1.sinks.k1.type=logger#Bind
#一个sink只能连一个channel,一个channel可以连多个sink
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
启动:
$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1
flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/job/flume-netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console
【-Dflume.root.logger=INFO,console的意思是,把文件打印到控制台】
案例2
实时监控单个追加文件,将监控到的内容追加到控制台
分析:Source选用Exec Source
配置文件:exec-flume-logger.conf
#Named
a1.sources=r1
a1.channels=c1
a1.sinks=k1#Source
# 监控tail.txt那个文件
a1.sources.r1.type=exec
a1.sources.r1.command=tail -f /opt/module/flume-1.9.0/job/tail.txt#Channels
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100#Sink
a1.sinks.k1.type=logger#Bind
#一个sink只能连一个channel,一个channel可以连多个sink
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
touch /opt/module/flume-1.9.0/job/tail.txt
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/exec-flume-logger.conf -n a1 -Dflume.root.logger=INFO,console
echo a>> tail.txt
查看效果,只能追加,覆盖的话监控不到(会以为文件没变)
案例2.2
实时监控单个追加文件,将监控到的内容追加到HDFS中
配置文件:hdfs-flume-logger.conf
分析:Source选用Exec Source
Sink选用HDFS sink
Logger选用
#Named
a1.sources=r1
a1.channels=c1
a1.sinks=k1#Source
# 监控tail.txt那个文件
a1.sources.r1.type=exec
a1.sources.r1.command=tail -f /opt/module/flume-1.9.0/job/tail.txt#Channels
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100#Sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/%Y%m%d/%H
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount = 0#Bind
#一个sink只能连一个channel,一个channel可以连多个sink
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/hdfs-flume-logger.conf -n a1 -Dflume.root.logger=INFO,console
案例3 实时监控目录下的多个新文件
新文件:只会采集一次
配置文件:spooling-flume-logger.conf
分析:Source选用Spooling Directory Source
Sink选用HDFS sink
Logger选用
#Named
a1.sources=r1
a1.channels=c1
a1.sinks=k1#Sourcea1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/opt/module/flume-1.9.0/job/spooling
#判断是不是新文件的方式:后缀名如果是.COMLETED就认为是新文件
a1.sources.r1.fileSuffix= .COMPLETED
#忽略tmp文件,正则
a1.sources.r1.ignorePattern=.*\.tmp#Channels
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100#Sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/%Y%m%d/%H
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount = 0#Bind
#一个sink只能连一个channel,一个channel可以连多个sink
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/spooling-flume-logger.conf -n a1 -Dflume.root.logger=INFO,console
案例4 实时监控目录下的多个追加文件
追加多个文件:多次
配置文件:taildir-flume-logger.conf
监测文件和位置文件:
mkdir taildir
mkdir position
touch file1.txt
touch file2.txt
touch log1.log
touch log2.log
touch position.json
分析:Source选用TAILDIR
Sink选用HDFS sink
Logger选用
#Named
a1.sources=r1
a1.channels=c1
a1.sinks=k1#Sourcea1.sources.r1.type=TAILDIR
a1.sources.r1.filegroups=f1 f2
a1.sources.r1.filegroups.f1=/opt/module/flume-1.9.0/job/taildir/.*\.txt
a1.sources.r1.filegroups.f2=/opt/module/flume-1.9.0/job/taildir/.*\.log
# 记录每个文件所采集的位置。定期往position File中更新每个文件读取到的最新的位置,实现断点续传
a1.sources.r1.positionFile=/opt/module/flume-1.9.0/job/position/position.json#Channels
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100#Sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/%Y%m%d/%H
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount = 0#Bind
#一个sink只能连一个channel,一个channel可以连多个sink
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/taildir-flume-logger.conf -n a1 -Dflume.root.logger=INFO,console
案例1 复制
Flume1监控文件内容的变动,将监控到的内容分别给到Flume2和Flume3。Flume2将内容写到HDFS,Flume3将数据写到本地文件系统
下游是服务器
为什么要两个Channel:一个Channel对应多个Sink的情况只能是负载均衡或者故障转移,所以用复制
Flume1.conf
#Named
a1.sources=r1
a1.channels=c1 c2
a1.sinks=k1 k2#Sourcea1.sources.r1.type=TAILDIR
a1.sources.r1.filegroups=f1
a1.sources.r1.filegroups.f1=/opt/module/flume-1.9.0/job/taildir/.*\.txt
# 记录每个文件所采集的位置。定期往position File中更新每个文件读取到的最新的位置,实现断点续传
a1.sources.r1.positionFile=/opt/module/flume-1.9.0/job/position/position.json#Channel selector
a1.sources.r1.selector.type=replicating#Channels
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100
a1.channels.c2.type=memory
a1.channels.c2.capacity=10000
a1.channels.c2.transactionCapacity=100#Sink
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=localhost
a1.sinks.k1.port=7777
a1.sinks.k2.type=avro
a1.sinks.k2.hostname=localhost
a1.sinks.k2.port=8888#Bind
#一个sink只能连一个channel,一个channel可以连多个sink
a1.sources.r1.channels=c1 c2
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c2
Flume2.conf
#Named
a2.sources=r1
a2.channels=c1
a2.sinks=k1#Sourcea2.sources.r1.type=avro
a2.sources.r1.bind=localhost
a2.sources.r1.port=7777#Channels
a2.channels.c1.type=memory
a2.channels.c1.capacity=10000
a2.channels.c1.transactionCapacity=100a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = /flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k1.hdfs.rollCount = 0#Bind
#一个sink只能连一个channel,一个channel可以连多个sink
a2.sources.r1.channels=c1
a2.sinks.k1.channel=c1
Flume3.conf
#Named
a3.sources=r1
a3.channels=c1
a3.sinks=k1#Sourcea3.sources.r1.type=avro
a3.sources.r1.bind=localhost
a3.sources.r1.port=8888#Channels
a3.channels.c1.type=memory
a3.channels.c1.capacity=10000
a3.channels.c1.transactionCapacity=100#Sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory=/opt/module/flume-1.9.0/job/fileroll#Bind
a3.sources.r1.channels=c1
a3.sinks.k1.channel=c1
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/replicating/flume3.conf -n a3 -Dflume.root.logger=INFO,console
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/replicating/flume2.conf -n a2 -Dflume.root.logger=INFO.console
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/replicating/flume1.conf -n a1 -Dflume.root.logger=INFO.console
案例2 负载均衡
Flume1监控数据,将监控到的内容通过轮询或者随机的方式给到Flume2和Flume3
Flume2将内容打印到控制台
Flume3将内容打印到控制台
FailoverSinkProcessor负责故障转移(load balancing processor)
Flume1.conf
#Named
a1.sources=r1
a1.channels=c1
a1.sinks=k1 k2#Sourcea1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=6666#Channel selector
a1.sources.r1.selector.type=replicating#Channels
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100
a1.channels.c2.type=memory
a1.channels.c2.capacity=10000
a1.channels.c2.transactionCapacity=100#Sink
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=localhost
a1.sinks.k1.port=7777
a1.sinks.k2.type=avro
a1.sinks.k2.hostname=localhost
a1.sinks.k2.port=8888#Sink processor
a1.sinkgroups=g1
a1.sinkgroups.g1.sinks=k1 k2
a1.sinkgroups.g1.processor.type=load_balance
# 分配策略:随机
a1.sinkgroups.g1.processor.selector=random
# 某个sink出问题了,在一段时间内自动选择另一个
a1.sinkgroups.g1.processor.backoff=false#Bind
#一个sink只能连一个channel,一个channel可以连多个sink
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c1
Flume2.conf
#Named
a2.sources=r1
a2.channels=c1
a2.sinks=k1#Sourcea2.sources.r1.type=avro
a2.sources.r1.bind=localhost
a2.sources.r1.port=7777#Channels
a2.channels.c1.type=memory
a2.channels.c1.capacity=10000
a2.channels.c1.transactionCapacity=100#Sink
a3.sinks.k1.type = logger#Bind
#一个sink只能连一个channel,一个channel可以连多个sink
a2.sources.r1.channels=c1
a2.sinks.k1.channel=c1
Flume3.conf
#Named
a3.sources=r1
a3.channels=c1
a3.sinks=k1#Sourcea3.sources.r1.type=avro
a3.sources.r1.bind=localhost
a3.sources.r1.port=8888#Channels
a3.channels.c1.type=memory
a3.channels.c1.capacity=10000
a3.channels.c1.transactionCapacity=100#Sink
a3.sinks.k1.type = logger#Bind
a3.sources.r1.channels=c1
a3.sinks.k1.channel=c1
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/load_balance/flume3.conf -n a3 -Dflume.root.logger=INFO,console
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/load_balance/flume2.conf -n a2 -Dflume.root.logger=INFO.console
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/load_balance/flume1.conf -n a1 -Dflume.root.logger=INFO.console
案例2 2故障转移
Flume1监控数据,将监控到的内容发给Activate的Sink
Flume2将内容打印到控制台
Flume3将内容打印到控制台
FailoverSinkProcessor负责故障转移(load balancing processor)
Flume1.conf
#Named
a1.sources=r1
a1.channels=c1
a1.sinks=k1 k2#Sourcea1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=6666#Channel selector
a1.sources.r1.selector.type=replicating#Channels
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100
a1.channels.c2.type=memory
a1.channels.c2.capacity=10000
a1.channels.c2.transactionCapacity=100#Sink
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=localhost
a1.sinks.k1.port=7777
a1.sinks.k2.type=avro
a1.sinks.k2.hostname=localhost
a1.sinks.k2.port=8888#Sink processor
a1.sinkgroups=g1
a1.sinkgroups.g1.sinks=k1 k2
a1.sinkgroups.g1.processor.type=failover
a1.sinkgroups.processor.priority.k1=5
a1.sinkgroups.processor.priority.k2=10#Bind
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c1
Flume2.conf
#Named
a2.sources=r1
a2.channels=c1
a2.sinks=k1#Sourcea2.sources.r1.type=avro
a2.sources.r1.bind=localhost
a2.sources.r1.port=7777#Channels
a2.channels.c1.type=memory
a2.channels.c1.capacity=10000
a2.channels.c1.transactionCapacity=100#Sink
a2.sinks.k1.type = logger#Bind
#一个sink只能连一个channel,一个channel可以连多个sink
a2.sources.r1.channels=c1
a2.sinks.k1.channel=c1
Flume3.conf
#Named
a3.sources=r1
a3.channels=c1
a3.sinks=k1#Sourcea3.sources.r1.type=avro
a3.sources.r1.bind=localhost
a3.sources.r1.port=8888#Channels
a3.channels.c1.type=memory
a3.channels.c1.capacity=10000
a3.channels.c1.transactionCapacity=100#Sink
a3.sinks.k1.type = logger#Bind
a3.sources.r1.channels=c1
a3.sinks.k1.channel=c1
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/failover/flume3.conf -n a3 -Dflume.root.logger=INFO,console
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/failover/flume2.conf -n a2 -Dflume.root.logger=INFO.console
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/failover/flume1.conf -n a1 -Dflume.root.logger=INFO.console
案例3 聚合
Flume1监控文件内容,Flume2监控端口数据
Flume1(102)和Flume2(103)将监控到的数据发往Flume3
Flume3(104)将内容打印到控制台
Flume1.conf
#Named
a1.sources=r1
a1.channels=c1
a1.sinks=k1#Source
a1.sources.r1.type=TAILDIR
a1.sources.r1.filegroups=f1
a1.sources.r1.filegroups.f1=/opt/module/flume-1.9.0/job/taildir/.*\.txt
a1.sources.r1.positionFile=/opt/module/flume-1.9.0/job/position/position.json#Channel selector
a1.sources.r1.selector.type=replicating#Channels
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100#Sink
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=hadoop104
a1.sinks.k1.port=8888#Bind
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
Flume2.conf
#Named
a2.sources=r1
a2.channels=c1
a2.sinks=k1#Sourcea2.sources.r1.type=netcat
a2.sources.r1.bind=localhost
a2.sources.r1.port=7777#Channels
a2.channels.c1.type=memory
a2.channels.c1.capacity=10000
a2.channels.c1.transactionCapacity=100#Sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname=hadoop104
a2.sinks.k1.port=8888#Bind
a2.sources.r1.channels=c1
a2.sinks.k1.channel=c1
Flume3.conf
#Named
a3.sources=r1
a3.channels=c1
a3.sinks=k1#Sourcea3.sources.r1.type=avro
a3.sources.r1.bind=hadoop104
a3.sources.r1.port=8888#Channels
a3.channels.c1.type=memory
a3.channels.c1.capacity=10000
a3.channels.c1.transactionCapacity=100#Sink
a3.sinks.k1.type = logger#Bind
a3.sources.r1.channels=c1
a3.sinks.k1.channel=c1
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/aggr/flume3.conf -n a3 -Dflume.root.logger=INFO,console
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/aggr/flume2.conf -n a2 -Dflume.root.logger=INFO.console
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/aggr/flume1.conf -n a1 -Dflume.root.logger=INFO.console
案例4 多路案例(拦截器
根据title判断数据该放在哪里
使用拦截器进行拦截处理
Flume Interceptor(看文档
步骤:
1.自定义拦截器(实习Flume提供的Interceptor接口
添加依赖
<dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version>
</dependency>
public class EventHeaderInterceptor implements Interceptor {//flume怎么去使用?//通过反射来创建对象//或者先获取Builder类再获取拦截器对象// 使用的时候会先通过反射获取Builder类,// 再通过Builder类中的EventHeaderInterceptor创建拦截器对象public void initialize() {}/*** 拦截方法* @param event* @return*/public Event intercept(Event event) {// 1. 获取event的headersMap<String, String> headers = event.getHeaders();// 以前的编码是什么?String body = new String(event.getBody(), StandardCharsets.UTF_8);// 3. 判断body是否包含"atguigu" "shangguigu"if (body.contains("atguigu"))headers.put("title","at");else if (body.contains("shangguigu"))headers.put("title","st");return event;}public List<Event> intercept(List<Event> list) {// 把每个event放进拦截器for (Event event : list) {intercept(event);}return list;}public void close() {}public static class MyBuilder implements Builder{@Overridepublic Interceptor build() {return new EventHeaderInterceptor();
// return null;}@Overridepublic void configure(Context context) {}}
}
2.打包上传至flume/lib
3.测试自定义拦截器
Flume1监控文件内容,对文件进行拦截,根据文件内容分发到
flume2,flume3,flume4
Flume1.conf
#Named
a1.sources = r1
a1.channels = c1 c2 c3
a1.sinks = k1 k2 k3#Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 5555#channel selector
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = title
a1.sources.r1.selector.mapping.at = c2
a1.sources.r1.selector.mapping.st = c3
a1.sources.r1.selector.default = c1# Interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.jojo.interceptor.EventHeaderInterceptor$MyBuilder#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 100a1.channels.c3.type = memory
a1.channels.c3.capacity = 10000
a1.channels.c3.transactionCapacity = 100#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 6666a1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 7777a1.sinks.k3.type = avro
a1.sinks.k3.hostname = localhost
a1.sinks.k3.port = 8888#Bind
a1.sources.r1.channels = c1 c2 c3
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
a1.sinks.k3.channel = c3
Flume2.conf
#Named
a2.sources=r1
a2.channels=c1
a2.sinks=k1#Sourcea2.sources.r1.type=avro
a2.sources.r1.bind=localhost
a2.sources.r1.port=6666#Channels
a2.channels.c1.type=memory
a2.channels.c1.capacity=10000
a2.channels.c1.transactionCapacity=100#Sink
a2.sinks.k1.type = logger#Bind
a2.sources.r1.channels=c1
a2.sinks.k1.channel=c1
Flume3.conf
#Named
a3.sources=r1
a3.channels=c1
a3.sinks=k1#Sourcea3.sources.r1.type=avro
a3.sources.r1.bind=localhost
a3.sources.r1.port=7777#Channels
a3.channels.c1.type=memory
a3.channels.c1.capacity=10000
a3.channels.c1.transactionCapacity=100#Sink
a3.sinks.k1.type = logger#Bind
a3.sources.r1.channels=c1
a3.sinks.k1.channel=c1
Flume4.conf
#Named
a4.sources=r1
a4.channels=c1
a4.sinks=k1#Sourcea4.sources.r1.type=avro
a4.sources.r1.bind=localhost
a4.sources.r1.port=8888#Channels
a4.channels.c1.type=memory
a4.channels.c1.capacity=10000
a4.channels.c1.transactionCapacity=100#Sink
a4.sinks.k1.type = logger#Bind
a4.sources.r1.channels=c1
a4.sinks.k1.channel=c1
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/muti/flume4.conf -n a4 -Dflume.root.logger=INFO,console
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/muti/flume3.conf -n a3 -Dflume.root.logger=INFO,console
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/muti/flume2.conf -n a2 -Dflume.root.logger=INFO,console
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/muti/flume1.conf -n a1 -Dflume.root.logger=INFO.console
Ganglia
Ganglia由gmond、gmetad和gweb三部分组成。
gmond(Ganglia Monitoring Daemon)是一种轻量级服务,安装在每台需要收集指标数据的节点主机上。使用gmond,你可以很容易收集很多系统指标数据,如CPU、内存、磁盘、网络和活跃进程的数据等。
gmetad(Ganglia Meta Daemon)整合所有信息,并将其以RRD格式存储至磁盘的服务。
.k1.channel=c1
Flume4.conf```conf
#Named
a4.sources=r1
a4.channels=c1
a4.sinks=k1#Sourcea4.sources.r1.type=avro
a4.sources.r1.bind=localhost
a4.sources.r1.port=8888#Channels
a4.channels.c1.type=memory
a4.channels.c1.capacity=10000
a4.channels.c1.transactionCapacity=100#Sink
a4.sinks.k1.type = logger#Bind
a4.sources.r1.channels=c1
a4.sinks.k1.channel=c1
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/muti/flume4.conf -n a4 -Dflume.root.logger=INFO,console
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/muti/flume3.conf -n a3 -Dflume.root.logger=INFO,console
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/muti/flume2.conf -n a2 -Dflume.root.logger=INFO,console
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/muti/flume1.conf -n a1 -Dflume.root.logger=INFO.console
Ganglia
Ganglia由gmond、gmetad和gweb三部分组成。
gmond(Ganglia Monitoring Daemon)是一种轻量级服务,安装在每台需要收集指标数据的节点主机上。使用gmond,你可以很容易收集很多系统指标数据,如CPU、内存、磁盘、网络和活跃进程的数据等。
gmetad(Ganglia Meta Daemon)整合所有信息,并将其以RRD格式存储至磁盘的服务。
gweb(Ganglia Web)Ganglia可视化工具,gweb是一种利用浏览器显示gmetad所存储数据的PHP前端。在Web界面中以图表方式展现集群的运行状态下收集的多种不同指标数据。