Flume笔记

news/2024/5/3 0:30:36/文章来源:https://blog.csdn.net/niki__/article/details/130020771

Flume

概念

  • 高可用、高可靠,分布式海量日志采集、聚合和传输的系统。

  • 主要作用:实时读取服务器本地磁盘的数据,将数据写入到HDFS

组成

在这里插入图片描述

  • Agent,JVM进程,以事件的形式将数据从源头送到目的地

  • Agent分为SourceChannelSink

    • Source负责接收数据到Flume Agent,Source组件可以处理各种类型、各种格式的日志数据(avro、thrift、exec、spooling、directory、netcat…)

    • Sink,不断轮询Channel中的事件且批量移除他们,将这些事件批量写入到存储或索引系统,或者被发送到另一个Flume Agent。Sink组件目的地包括:hdfs、logger、avro、file、HBase…

    • Channel,位于Source和Sink的缓冲区。Channel允许Source和Sink运作在不同的速率上。Channel是线程安全的,可以同时处理几个Source的写入操作和几个Sink的读取操作。

    • Flume自带两种Channel:Memory Channel和File Channel。

      • Memory Channel是内存中的队列。Memory Channel在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么Memory Channel就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。
      • File Channel将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。
  • Event

    • 传输单元,Flume数据传输的基本单元,以Event的形式将数据从源头送至目的地。
    • Event由HeaderBody两部分组成,Header用来存放该event的一些属性,为K-V结构,Body用来存放该条数据,形式为字节数组。

主要步骤

分析需要什么组件,去官方看文档,配置,启动

如果官方实在没有提供相关功能,就自定义Source或者Sink(也是看官方)

案例1

监控端口数据,将数据打印到控制台

端口,所以选用NetCat TCP Source

打印到控制台,所以使用Logger Sink

在这里插入图片描述

建立文件夹(自己建的),flume/job

配置环境变量FLUME_HOME,方便在其他地方调用命令

配置打印日志:vim /opt/module/flume-1.9.0/conf/log4j.properties

设置:flume.log.dir=/opt/module/flume-1.9.0/logs(如果不设置,那么每次在哪里运行的命令,日志就会出现在哪个文件夹下)

配置文件名:自己命名,job/netcat-flume-logger.conf

#Named
a1.sources=r1
a1.channels=c1
a1.sinks=k1#Source
a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=6666#Channels
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100#Sink
a1.sinks.k1.type=logger#Bind
#一个sink只能连一个channel,一个channel可以连多个sink
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

启动:

$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1

flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/job/flume-netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console

【-Dflume.root.logger=INFO,console的意思是,把文件打印到控制台】

案例2

实时监控单个追加文件,将监控到的内容追加到控制台

分析:Source选用Exec Source

配置文件:exec-flume-logger.conf

#Named
a1.sources=r1
a1.channels=c1
a1.sinks=k1#Source
# 监控tail.txt那个文件
a1.sources.r1.type=exec
a1.sources.r1.command=tail -f /opt/module/flume-1.9.0/job/tail.txt#Channels
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100#Sink
a1.sinks.k1.type=logger#Bind
#一个sink只能连一个channel,一个channel可以连多个sink
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

touch /opt/module/flume-1.9.0/job/tail.txt

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/exec-flume-logger.conf -n a1 -Dflume.root.logger=INFO,console

echo a>> tail.txt查看效果,只能追加,覆盖的话监控不到(会以为文件没变)

案例2.2

实时监控单个追加文件,将监控到的内容追加到HDFS中

配置文件:hdfs-flume-logger.conf

分析:Source选用Exec Source

​ Sink选用HDFS sink

​ Logger选用

#Named
a1.sources=r1
a1.channels=c1
a1.sinks=k1#Source
# 监控tail.txt那个文件
a1.sources.r1.type=exec
a1.sources.r1.command=tail -f /opt/module/flume-1.9.0/job/tail.txt#Channels
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100#Sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/%Y%m%d/%H
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount = 0#Bind
#一个sink只能连一个channel,一个channel可以连多个sink
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/hdfs-flume-logger.conf -n a1 -Dflume.root.logger=INFO,console

案例3 实时监控目录下的多个新文件

新文件:只会采集一次

配置文件:spooling-flume-logger.conf

分析:Source选用Spooling Directory Source

​ Sink选用HDFS sink

​ Logger选用

#Named
a1.sources=r1
a1.channels=c1
a1.sinks=k1#Sourcea1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/opt/module/flume-1.9.0/job/spooling
#判断是不是新文件的方式:后缀名如果是.COMLETED就认为是新文件
a1.sources.r1.fileSuffix= .COMPLETED
#忽略tmp文件,正则
a1.sources.r1.ignorePattern=.*\.tmp#Channels
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100#Sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/%Y%m%d/%H
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount = 0#Bind
#一个sink只能连一个channel,一个channel可以连多个sink
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/spooling-flume-logger.conf -n a1 -Dflume.root.logger=INFO,console

案例4 实时监控目录下的多个追加文件

追加多个文件:多次

配置文件:taildir-flume-logger.conf

监测文件和位置文件:

mkdir taildir mkdir position

touch file1.txt touch file2.txt touch log1.log touch log2.log

touch position.json

分析:Source选用TAILDIR

​ Sink选用HDFS sink

​ Logger选用

#Named
a1.sources=r1
a1.channels=c1
a1.sinks=k1#Sourcea1.sources.r1.type=TAILDIR
a1.sources.r1.filegroups=f1 f2
a1.sources.r1.filegroups.f1=/opt/module/flume-1.9.0/job/taildir/.*\.txt
a1.sources.r1.filegroups.f2=/opt/module/flume-1.9.0/job/taildir/.*\.log
# 记录每个文件所采集的位置。定期往position File中更新每个文件读取到的最新的位置,实现断点续传
a1.sources.r1.positionFile=/opt/module/flume-1.9.0/job/position/position.json#Channels
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100#Sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/%Y%m%d/%H
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount = 0#Bind
#一个sink只能连一个channel,一个channel可以连多个sink
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/taildir-flume-logger.conf -n a1 -Dflume.root.logger=INFO,console

案例1 复制

Flume1监控文件内容的变动,将监控到的内容分别给到Flume2和Flume3。Flume2将内容写到HDFS,Flume3将数据写到本地文件系统

在这里插入图片描述

下游是服务器

为什么要两个Channel:一个Channel对应多个Sink的情况只能是负载均衡或者故障转移,所以用复制

Flume1.conf

#Named
a1.sources=r1
a1.channels=c1 c2
a1.sinks=k1 k2#Sourcea1.sources.r1.type=TAILDIR
a1.sources.r1.filegroups=f1 
a1.sources.r1.filegroups.f1=/opt/module/flume-1.9.0/job/taildir/.*\.txt
# 记录每个文件所采集的位置。定期往position File中更新每个文件读取到的最新的位置,实现断点续传
a1.sources.r1.positionFile=/opt/module/flume-1.9.0/job/position/position.json#Channel selector
a1.sources.r1.selector.type=replicating#Channels
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100
a1.channels.c2.type=memory
a1.channels.c2.capacity=10000
a1.channels.c2.transactionCapacity=100#Sink
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=localhost
a1.sinks.k1.port=7777
a1.sinks.k2.type=avro
a1.sinks.k2.hostname=localhost
a1.sinks.k2.port=8888#Bind
#一个sink只能连一个channel,一个channel可以连多个sink
a1.sources.r1.channels=c1 c2
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c2

Flume2.conf

#Named
a2.sources=r1
a2.channels=c1
a2.sinks=k1#Sourcea2.sources.r1.type=avro
a2.sources.r1.bind=localhost
a2.sources.r1.port=7777#Channels
a2.channels.c1.type=memory
a2.channels.c1.capacity=10000
a2.channels.c1.transactionCapacity=100a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = /flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k1.hdfs.rollCount = 0#Bind
#一个sink只能连一个channel,一个channel可以连多个sink
a2.sources.r1.channels=c1
a2.sinks.k1.channel=c1

Flume3.conf

#Named
a3.sources=r1
a3.channels=c1
a3.sinks=k1#Sourcea3.sources.r1.type=avro
a3.sources.r1.bind=localhost
a3.sources.r1.port=8888#Channels
a3.channels.c1.type=memory
a3.channels.c1.capacity=10000
a3.channels.c1.transactionCapacity=100#Sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory=/opt/module/flume-1.9.0/job/fileroll#Bind
a3.sources.r1.channels=c1
a3.sinks.k1.channel=c1

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/replicating/flume3.conf -n a3 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/replicating/flume2.conf -n a2 -Dflume.root.logger=INFO.console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/replicating/flume1.conf -n a1 -Dflume.root.logger=INFO.console

案例2 负载均衡

Flume1监控数据,将监控到的内容通过轮询或者随机的方式给到Flume2和Flume3

Flume2将内容打印到控制台

Flume3将内容打印到控制台

FailoverSinkProcessor负责故障转移(load balancing processor)

在这里插入图片描述

Flume1.conf

#Named
a1.sources=r1
a1.channels=c1 
a1.sinks=k1 k2#Sourcea1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=6666#Channel selector
a1.sources.r1.selector.type=replicating#Channels
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100
a1.channels.c2.type=memory
a1.channels.c2.capacity=10000
a1.channels.c2.transactionCapacity=100#Sink
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=localhost
a1.sinks.k1.port=7777
a1.sinks.k2.type=avro
a1.sinks.k2.hostname=localhost
a1.sinks.k2.port=8888#Sink processor
a1.sinkgroups=g1
a1.sinkgroups.g1.sinks=k1 k2
a1.sinkgroups.g1.processor.type=load_balance
# 分配策略:随机
a1.sinkgroups.g1.processor.selector=random
# 某个sink出问题了,在一段时间内自动选择另一个
a1.sinkgroups.g1.processor.backoff=false#Bind
#一个sink只能连一个channel,一个channel可以连多个sink
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c1

Flume2.conf

#Named
a2.sources=r1
a2.channels=c1
a2.sinks=k1#Sourcea2.sources.r1.type=avro
a2.sources.r1.bind=localhost
a2.sources.r1.port=7777#Channels
a2.channels.c1.type=memory
a2.channels.c1.capacity=10000
a2.channels.c1.transactionCapacity=100#Sink
a3.sinks.k1.type = logger#Bind
#一个sink只能连一个channel,一个channel可以连多个sink
a2.sources.r1.channels=c1
a2.sinks.k1.channel=c1

Flume3.conf

#Named
a3.sources=r1
a3.channels=c1
a3.sinks=k1#Sourcea3.sources.r1.type=avro
a3.sources.r1.bind=localhost
a3.sources.r1.port=8888#Channels
a3.channels.c1.type=memory
a3.channels.c1.capacity=10000
a3.channels.c1.transactionCapacity=100#Sink
a3.sinks.k1.type = logger#Bind
a3.sources.r1.channels=c1
a3.sinks.k1.channel=c1

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/load_balance/flume3.conf -n a3 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/load_balance/flume2.conf -n a2 -Dflume.root.logger=INFO.console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/load_balance/flume1.conf -n a1 -Dflume.root.logger=INFO.console

案例2 2故障转移

Flume1监控数据,将监控到的内容发给Activate的Sink

Flume2将内容打印到控制台

Flume3将内容打印到控制台

FailoverSinkProcessor负责故障转移(load balancing processor)

在这里插入图片描述

Flume1.conf

#Named
a1.sources=r1
a1.channels=c1 
a1.sinks=k1 k2#Sourcea1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=6666#Channel selector
a1.sources.r1.selector.type=replicating#Channels
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100
a1.channels.c2.type=memory
a1.channels.c2.capacity=10000
a1.channels.c2.transactionCapacity=100#Sink
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=localhost
a1.sinks.k1.port=7777
a1.sinks.k2.type=avro
a1.sinks.k2.hostname=localhost
a1.sinks.k2.port=8888#Sink processor
a1.sinkgroups=g1
a1.sinkgroups.g1.sinks=k1 k2
a1.sinkgroups.g1.processor.type=failover
a1.sinkgroups.processor.priority.k1=5
a1.sinkgroups.processor.priority.k2=10#Bind
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c1

Flume2.conf

#Named
a2.sources=r1
a2.channels=c1
a2.sinks=k1#Sourcea2.sources.r1.type=avro
a2.sources.r1.bind=localhost
a2.sources.r1.port=7777#Channels
a2.channels.c1.type=memory
a2.channels.c1.capacity=10000
a2.channels.c1.transactionCapacity=100#Sink
a2.sinks.k1.type = logger#Bind
#一个sink只能连一个channel,一个channel可以连多个sink
a2.sources.r1.channels=c1
a2.sinks.k1.channel=c1

Flume3.conf

#Named
a3.sources=r1
a3.channels=c1
a3.sinks=k1#Sourcea3.sources.r1.type=avro
a3.sources.r1.bind=localhost
a3.sources.r1.port=8888#Channels
a3.channels.c1.type=memory
a3.channels.c1.capacity=10000
a3.channels.c1.transactionCapacity=100#Sink
a3.sinks.k1.type = logger#Bind
a3.sources.r1.channels=c1
a3.sinks.k1.channel=c1

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/failover/flume3.conf -n a3 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/failover/flume2.conf -n a2 -Dflume.root.logger=INFO.console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/failover/flume1.conf -n a1 -Dflume.root.logger=INFO.console

案例3 聚合

Flume1监控文件内容,Flume2监控端口数据

Flume1(102)和Flume2(103)将监控到的数据发往Flume3

Flume3(104)将内容打印到控制台

在这里插入图片描述

Flume1.conf

#Named
a1.sources=r1
a1.channels=c1 
a1.sinks=k1#Source
a1.sources.r1.type=TAILDIR
a1.sources.r1.filegroups=f1 
a1.sources.r1.filegroups.f1=/opt/module/flume-1.9.0/job/taildir/.*\.txt
a1.sources.r1.positionFile=/opt/module/flume-1.9.0/job/position/position.json#Channel selector
a1.sources.r1.selector.type=replicating#Channels
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100#Sink
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=hadoop104
a1.sinks.k1.port=8888#Bind
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

Flume2.conf

#Named
a2.sources=r1
a2.channels=c1
a2.sinks=k1#Sourcea2.sources.r1.type=netcat
a2.sources.r1.bind=localhost
a2.sources.r1.port=7777#Channels
a2.channels.c1.type=memory
a2.channels.c1.capacity=10000
a2.channels.c1.transactionCapacity=100#Sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname=hadoop104
a2.sinks.k1.port=8888#Bind
a2.sources.r1.channels=c1
a2.sinks.k1.channel=c1

Flume3.conf

#Named
a3.sources=r1
a3.channels=c1
a3.sinks=k1#Sourcea3.sources.r1.type=avro
a3.sources.r1.bind=hadoop104
a3.sources.r1.port=8888#Channels
a3.channels.c1.type=memory
a3.channels.c1.capacity=10000
a3.channels.c1.transactionCapacity=100#Sink
a3.sinks.k1.type = logger#Bind
a3.sources.r1.channels=c1
a3.sinks.k1.channel=c1

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/aggr/flume3.conf -n a3 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/aggr/flume2.conf -n a2 -Dflume.root.logger=INFO.console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/aggr/flume1.conf -n a1 -Dflume.root.logger=INFO.console

案例4 多路案例(拦截器

根据title判断数据该放在哪里

使用拦截器进行拦截处理

Flume Interceptor(看文档

步骤:

1.自定义拦截器(实习Flume提供的Interceptor接口

添加依赖

<dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version>
</dependency>
public class EventHeaderInterceptor implements Interceptor {//flume怎么去使用?//通过反射来创建对象//或者先获取Builder类再获取拦截器对象// 使用的时候会先通过反射获取Builder类,// 再通过Builder类中的EventHeaderInterceptor创建拦截器对象public void initialize() {}/*** 拦截方法* @param event* @return*/public Event intercept(Event event) {// 1. 获取event的headersMap<String, String> headers = event.getHeaders();// 以前的编码是什么?String body = new String(event.getBody(), StandardCharsets.UTF_8);// 3. 判断body是否包含"atguigu" "shangguigu"if (body.contains("atguigu"))headers.put("title","at");else if (body.contains("shangguigu"))headers.put("title","st");return event;}public List<Event> intercept(List<Event> list) {// 把每个event放进拦截器for (Event event : list) {intercept(event);}return list;}public void close() {}public static class MyBuilder implements Builder{@Overridepublic Interceptor build() {return new EventHeaderInterceptor();
//            return null;}@Overridepublic void configure(Context context) {}}
}

2.打包上传至flume/lib

3.测试自定义拦截器

Flume1监控文件内容,对文件进行拦截,根据文件内容分发到

flume2,flume3,flume4

在这里插入图片描述

Flume1.conf

#Named
a1.sources = r1
a1.channels = c1 c2 c3
a1.sinks = k1 k2 k3#Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 5555#channel selector
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = title
a1.sources.r1.selector.mapping.at = c2
a1.sources.r1.selector.mapping.st = c3
a1.sources.r1.selector.default = c1# Interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.jojo.interceptor.EventHeaderInterceptor$MyBuilder#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 100a1.channels.c3.type = memory
a1.channels.c3.capacity = 10000
a1.channels.c3.transactionCapacity = 100#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 6666a1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 7777a1.sinks.k3.type = avro
a1.sinks.k3.hostname = localhost
a1.sinks.k3.port = 8888#Bind
a1.sources.r1.channels = c1 c2 c3   
a1.sinks.k1.channel = c1 
a1.sinks.k2.channel = c2 
a1.sinks.k3.channel = c3 

Flume2.conf

#Named
a2.sources=r1
a2.channels=c1
a2.sinks=k1#Sourcea2.sources.r1.type=avro
a2.sources.r1.bind=localhost
a2.sources.r1.port=6666#Channels
a2.channels.c1.type=memory
a2.channels.c1.capacity=10000
a2.channels.c1.transactionCapacity=100#Sink
a2.sinks.k1.type = logger#Bind
a2.sources.r1.channels=c1
a2.sinks.k1.channel=c1

Flume3.conf

#Named
a3.sources=r1
a3.channels=c1
a3.sinks=k1#Sourcea3.sources.r1.type=avro
a3.sources.r1.bind=localhost
a3.sources.r1.port=7777#Channels
a3.channels.c1.type=memory
a3.channels.c1.capacity=10000
a3.channels.c1.transactionCapacity=100#Sink
a3.sinks.k1.type = logger#Bind
a3.sources.r1.channels=c1
a3.sinks.k1.channel=c1

Flume4.conf

#Named
a4.sources=r1
a4.channels=c1
a4.sinks=k1#Sourcea4.sources.r1.type=avro
a4.sources.r1.bind=localhost
a4.sources.r1.port=8888#Channels
a4.channels.c1.type=memory
a4.channels.c1.capacity=10000
a4.channels.c1.transactionCapacity=100#Sink
a4.sinks.k1.type = logger#Bind
a4.sources.r1.channels=c1
a4.sinks.k1.channel=c1

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/muti/flume4.conf -n a4 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/muti/flume3.conf -n a3 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/muti/flume2.conf -n a2 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/muti/flume1.conf -n a1 -Dflume.root.logger=INFO.console

Ganglia

Ganglia由gmond、gmetad和gweb三部分组成。

gmond(Ganglia Monitoring Daemon)是一种轻量级服务,安装在每台需要收集指标数据的节点主机上。使用gmond,你可以很容易收集很多系统指标数据,如CPU、内存、磁盘、网络和活跃进程的数据等。

gmetad(Ganglia Meta Daemon)整合所有信息,并将其以RRD格式存储至磁盘的服务。

.k1.channel=c1


Flume4.conf```conf
#Named
a4.sources=r1
a4.channels=c1
a4.sinks=k1#Sourcea4.sources.r1.type=avro
a4.sources.r1.bind=localhost
a4.sources.r1.port=8888#Channels
a4.channels.c1.type=memory
a4.channels.c1.capacity=10000
a4.channels.c1.transactionCapacity=100#Sink
a4.sinks.k1.type = logger#Bind
a4.sources.r1.channels=c1
a4.sinks.k1.channel=c1

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/muti/flume4.conf -n a4 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/muti/flume3.conf -n a3 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/muti/flume2.conf -n a2 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/job/muti/flume1.conf -n a1 -Dflume.root.logger=INFO.console

Ganglia

Ganglia由gmond、gmetad和gweb三部分组成。

gmond(Ganglia Monitoring Daemon)是一种轻量级服务,安装在每台需要收集指标数据的节点主机上。使用gmond,你可以很容易收集很多系统指标数据,如CPU、内存、磁盘、网络和活跃进程的数据等。

gmetad(Ganglia Meta Daemon)整合所有信息,并将其以RRD格式存储至磁盘的服务。

gweb(Ganglia Web)Ganglia可视化工具,gweb是一种利用浏览器显示gmetad所存储数据的PHP前端。在Web界面中以图表方式展现集群的运行状态下收集的多种不同指标数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_283287.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

李宏毅2021春季机器学习课程视频笔记5-模型训练不起来问题(当梯度很小的时候问题)

求解最小Loss的失败&#xff0c;不能得到最优的值&#xff0c;找不到Loss足够小的值。 1.Loss关于参数的梯度为0&#xff0c;不能继续更新参数。&#xff08;local minima 或者 saddle point&#xff09;如何知道走到了哪个点&#xff1f; 利用泰勒展开&#xff1a; Critical P…

免费ChatGPT接入-国内怎么玩chatGPT

免费ChatGPT中文版 OpenAI 的 GPT 模型目前并不提供中文版的免费使用&#xff0c;但是有许多机器学习平台和第三方服务提供商也提供了基于 GPT 技术的中文版模型和 API。下面是一些常见的免费中文版 ChatGPT&#xff1a; Hugging Face&#xff1a;Hugging Face 是一个开源社区…

Mysql主备一致性保证

大家知道 bin log 既可以用来归档&#xff0c;又可以用来做主备同步。有人可能会问&#xff0c;为什么备库执行了 bin log 就可以跟主库保持一致了呢&#xff1f;bin log的内容是什么样的呢&#xff1f;今天我们就来聊聊它。 在最开始&#xff0c;Mysql 是以容易学习和方便的高…

JDK1.8下载与安装完整教程

目录 一、获取安装资源 1、百度网盘共享 2、官方网站下载(百度网盘文件下载下来有问题情况下) 2.1、搜索jdk官方网站 2.2、进到官网下拉找到Java8&#xff0c;选择Windows 2.3、下载安装程序(下载要登录&#xff0c;没有账号就注册就行) 二、正式安装 1、先在D盘(不在C…

【模型复现】Network in Network,将1*1卷积引入网络设计,运用全局平均池化替代全连接层。模块化设计网络

《Network In Network》是一篇比较老的文章了&#xff08;2014年ICLR的一篇paper&#xff09;&#xff0c;是当时比较厉害的一篇论文&#xff0c;同时在现在看来也是一篇非常经典并且影响深远的论文&#xff0c;后续很多创新都有这篇文章的影子。[1312.4400] Network In Networ…

蓝桥杯刷题冲刺 | 倒计时1天

作者&#xff1a;指针不指南吗 专栏&#xff1a;蓝桥杯倒计时冲刺 &#x1f43e;蓝桥杯加油&#xff0c;大家一定可以&#x1f43e; 文章目录我是菜菜&#xff0c;最近容易我犯的错误总结 一些tips 各位蓝桥杯加油加油 当输入输出数据不超过 1e6 时&#xff0c;scanf printf 和…

elasticsearch基础6——head插件安装和web页面查询操作使用、ik分词器

文章目录一、基本了解1.1 插件分类1.2 插件管理命令二、分析插件2.1 es中的分析插件2.1.1 官方核心分析插件2.1.2 社区提供分析插件2.2 API扩展插件三、Head 插件3.1 安装3.2 web页面使用3.2.1 概览页3.2.1.1 unassigned问题解决3.2.2 索引页3.2.3 数据浏览页3.2.4 基本查询页3…

微服务+springcloud+springcloud alibaba学习笔记(1/9)

1.微服务简介 什么是微服务呢&#xff1f; 就是将一个大的应用&#xff0c;拆分成多个小的模块&#xff0c;每个模块都有自己的功能和职责&#xff0c;每个模块可以 进行交互&#xff0c;这就是微服务 简而言之&#xff0c;微服务架构的风格&#xff0c;就是将单一程序开发成…

项目管理案例分析有哪些?

项目管控中遇到的问题有哪些&#xff1f;这些问题是如何解决的&#xff1f; 在项目管理领域&#xff0c;案例分析是一种常见的方法来学习和理解项目管理实践&#xff0c;下面就来介绍几个成功案例&#xff0c;希望能给大家带来一些参考。 1、第六空间&#xff1a;快速响应个性…

1669_MIT 6.828 xv6代码的获取以及编译启动

全部学习汇总&#xff1a; GreyZhang/g_unix: some basic learning about unix operating system. (github.com) 6.828的学习的资料从开始基本信息的讲解&#xff0c;逐步往unix的一个特殊版本xv6过度了。这样&#xff0c;先得熟悉一下这个OS的基本代码以及环境。 在课程中其实…

最短路径算法及Python实现

最短路径问题 在图论中&#xff0c;最短路径问题是指在一个有向或无向的加权图中找到从一个起点到一个终点的最短路径。这个问题是计算机科学中的一个经典问题&#xff0c;也是许多实际问题的基础&#xff0c;例如路线规划、通信网络设计和交通流量优化等。在这个问题中&#…

Downloader工具配置参数并烧录到flash中

1 Downloader工具介绍 Downloader工具可以用来烧录固件到设备中&#xff0c;固件格式默认为*dcf。该工具还可以用来在线调试EQ或者进行系统设置。 2 配置参数 2.1 作用 当有一个dcf文件时&#xff0c;配合不同的配置文件*.setting&#xff0c;在不进行编译的情况下&#xff…

【毕业设计】ESP32通过MQTT协议连接服务器(二)

文章目录0 前期教程1 前言2 配置SSL证书3 配置用户名和密码4 配置客户端id&#xff08;client_id&#xff09;5 conf文件理解6 websocket配置7 其他资料0 前期教程 【毕业设计】ESP32通过MQTT协议连接服务器&#xff08;一&#xff09; 1 前言 上一篇教程简单讲述了怎么在虚拟…

【调试】ftrace(三)trace-cmd和kernelshark

之前使用ftrace的时候需要一系列的配置&#xff0c;使用起来有点繁琐&#xff0c;这里推荐一个ftrace的一个前端工具&#xff0c;它就是trace-cmd trace-cmd安装教程 安装trace-cmd及其依赖库 git clone https://git.kernel.org/pub/scm/libs/libtrace/libtraceevent.git/ c…

【Ruby学习笔记】19.Ruby 连接 Mysql - MySql2

Ruby 连接 Mysql - MySql2 前面一章节我们介绍了 Ruby DBI 的使用。这章节我们技术 Ruby 连接 Mysql 更高效的驱动 mysql2&#xff0c;目前也推荐使用这种方式连接 MySql。 安装 mysql2 驱动&#xff1a; gem install mysql2你需要使用 –with-mysql-config 配置 mysql_conf…

【DevOps】GitOps 初识(下) - 让DevOps变得更好

实践GitOps的五大难题 上一篇文章中&#xff0c;我们介绍了GitOps能为我们带来许多的好处&#xff0c;然而&#xff0c;任何新的探索都将不会是一帆风顺的。在开始之前&#xff0c;如果能了解实践GitOps通常会遇到的挑战&#xff0c;并对此作出合适的应对&#xff0c;可能会使…

数据结构和算法(一):复杂度、数组、链表、栈、队列

从广义上来讲&#xff1a;数据结构就是一组数据的存储结构 &#xff0c; 算法就是操作数据的方法 数据结构是为算法服务的&#xff0c;算法是要作用在特定的数据结构上的。 10个最常用的数据结构&#xff1a;数组、链表、栈、队列、散列表、二叉树、堆、跳表、图、Trie树 10…

StorageManagerService.java中的mVold.mount

android源码&#xff1a;android-11.0.0_r21&#xff08;网址&#xff1a;Search (aospxref.com)&#xff09; 一、问题 2243行mVold.mount执行的是哪个mount函数&#xff1f; 2239 private void mount(VolumeInfo vol) { 2240 try { 2241 // TOD…

【LeetCode】-- 108. 将有序数组转换为二叉搜索树

1. 题目 108. 将有序数组转换为二叉搜索树 - 力扣&#xff08;LeetCode&#xff09; 给你一个整数数组 nums &#xff0c;其中元素已经按升序排列&#xff0c;请你将其转换为一棵高度平衡二叉搜索树。高度平衡二叉树是一棵满足「每个节点的左右两个子树的高度差的绝对值不超过 …

mysql在CentOS7.x环境安装

查看当前环境的yum源 ls -l /etc/yum.repos.d/ 可以看到当前环境是没有下载mysql对应的yum源的, 所以需要去官网下载对应的yum源. 找mysql的yum源并安装 http://repo.mysql.com/ 在选择对应yum源之前, 需要看一下自己系统的版本: 进入官网后, 鼠标右击进入查看页面源代码, 因为…