【Flume】尚硅谷学习笔记

news/2024/6/16 11:38:58/文章来源:https://blog.csdn.net/m0_64280569/article/details/137184870

实时监控目录下多个新文件

本案例是将虚拟机本地文件进行实时监控,并将上传的数据实时上传到HDFS中。

TAILDIR SOURCE【实现多目录监控、断点续传】

监视指定的文件,一旦检测到附加到每个文件的新行,就几乎实时地跟踪它们。如果正在写入新行,则该源将在等待写入完成时重试读取它们。它以JSON格式定期将每个文件的最后读位置写入给定的位置文件。如果Flume因某种原因停止或关闭,它可以从现有位置文件上写入的位置重新开始跟踪。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = TAILDIR
# 多文件监控
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/module/flume/files/.*
a1.sources.r1.filegroups.f2 = /opt/module/flume/files2/.*
# 断点续传
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json# Describe the sink
a1.sinks.k1.type = logger# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

文件改名、vim修改文件等操作(本地文件发生改变),Flume均要进行重新读取,其本质原因是因为inode发生了变化。

HDFS Sink

该sink将事件写入HDFS,可以根据经过的时间、数据大小或事件数量定期滚动文件(关闭当前文件并创建新文件)。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = TAILDIR
# 多文件监控
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/module/flume/files/.*
a1.sources.r1.filegroups.f2 = /opt/module/flume/files2/.*
# 断点续传
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y-%m-%d/%H
a1.sinks.k1.hdfs.filePrefix = log-
# 实际开发使用时间需要调整为1h 学习使用改为10s
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217700
a1.sinks.k1.hdfs.rollCount = 0
# 是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Flume事务

在这里插入图片描述

多路复用及拦截器的使用

在该案例中,我们以端口数据模拟日志,以数字(单个)和字母(单个)模拟不同类型的日志,我们需要自定义 interceptor 区分数字和字母,将其分别发往不同的分析系统(Channel)。

Multiplexing的原理是,根据 event 中 Header 的某个 key 的值,将不同的 event 发送到不同的 Channel中,所以我们需要自定义一个 Interceptor,为不同类型的 event 的 Header 中的 key 赋予不同的值。

在这里插入图片描述

首先编写拦截器,我觉得起主要作用是对header打标签,后传给channel选择器来进行分通道传输。

package com.atguigu.flume;import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.util.List;
import java.util.Map;/*** 1、 继承flume的拦截器接口* 2、 重写4个抽象方法* 3、 编写静态内部类 builder*/public class MyInterceptor implements Interceptor {public void initialize() {}// 处理单个eventpublic Event intercept(Event event) {// 在event的头信息里面添加标记// 提供给channel selector 选择发送到不同的channelMap<String, String> headers = event.getHeaders();String log = new String(event.getBody());// 判断第一个单符 如果是字母发送到channel1, 如果是数字发送到channel2char c = log.charAt(0);if (c >= '0' && c <= '9') {// 判断c为数字headers.put("type", "number");} else if ((c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')) {// 判断c为字母headers.put("type", "letter");}event.setHeaders(headers);return event;}// 处理多个eventpublic List<Event> intercept(List<Event> events) {for (Event event : events) {intercept(event);}return events;}public void close() {}public static class Builder implements Interceptor.Builder{// 创建一个拦截器对象public Interceptor build() {return new MyInterceptor();}// 读配置文件public void configure(Context context) {}}
}

其次完成配置文件的编写:

①为 hadoop102 上 的 Flume1 配 置 1 个 netcat source , 1 个 sink group ( 2 个 avro sink),并配置相应的 ChannelSelector 和 interceptor。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444a1.sources.r1.selector.type = multiplexing
# 填写标记的key
a1.sources.r1.selector.header = type
# value对应的channel
a1.sources.r1.selector.mapping.number = c2
a1.sources.r1.selector.mapping.letter = c1
a1.sources.r1.selector.default = c2a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.MyInterceptor$Builder# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

②为 hadoop102 上的 Flume4 配置一个 avro source 和一个 logger sink。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop102
a1.sources.r1.port = 4141# Describe the sink
a1.sinks.k1.type = logger# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

③为 hadoop102 上的 Flume4 配置一个 avro source 和一个 logger sink。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop102
a1.sources.r1.port = 4142# Describe the sink
a1.sinks.k1.type = logger# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Flume聚合案例

在这里插入图片描述

(1)准备工作

分发 Flume xsync flume

在 hadoop102、hadoop103 以及 hadoop104 的/opt/module/flume/job 目录下创建一个group3 文件夹。 mkdir group3

(2)创建配置文件

配置 Source 用于监控文件,配置 Sink 输出数据到下一级 Flume。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/flume/files/.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position4.json# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

配置 Source 监控端口 44444 数据流,配置 Sink 数据到下一级 Flume:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

配置 source 用于接收 flume1 与 flume2 发送过来的数据流,最终合并后 sink 到控制台。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop104
a1.sources.r1.port = 4141# Describe the sink
a1.sinks.k1.type = logger# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

执行配置文件。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_1034890.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

空间数据结构(四叉树,八叉树,BVH树,BSP树,K-d树)

下文参考&#xff1a;https://www.cnblogs.com/KillerAery/p/10878367.html 游戏编程知识课程 - 四分树(quadtree)_哔哩哔哩_bilibili 利用空间数据结构可以加速计算&#xff0c;是重要的优化思想。空间数据结构常用于场景管理&#xff0c;渲染&#xff0c;物理&#xff0c;游…

CSS使用clip-path实现元素动画

前言&#xff1a; 在日常开发当中&#xff0c;如果想要开发多边形&#xff0c;一般都需要多个盒子或者伪元素的帮助&#xff0c;有没有一直办法能只使用一个盒子实现呢&#xff1f; 有的&#xff1a;css裁剪 目录 前言&#xff1a; clip-path到底是什么&#xff1f; clip-pa…

基于springboot+vue+Mysql的企业客户信息反馈平台

开发语言&#xff1a;Java框架&#xff1a;springbootJDK版本&#xff1a;JDK1.8服务器&#xff1a;tomcat7数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09;数据库工具&#xff1a;Navicat11开发软件&#xff1a;eclipse/myeclipse/ideaMaven包&#xff1a;…

Windows 电脑麦克风 自动启用/禁用 小玩具!

WinMicrophone Windows 系统的 麦克风设备&#xff08;启用/禁用&#xff09;切换驱动&#xff01;它是小巧且快速的&#xff0c;它能够自动的检测并切换麦克风的情况。 您可以在软件包仓库中找到发布版本的exe包&#xff0c;无需安装&#xff01;其能够大大增大您在Windows中…

6.3物联网RK3399项目开发实录-驱动开发之I2C 使用(wulianjishu666)

物联网开发源码案例集&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/1kfPDpYZpm_G0GBLAup3KTQ?pwdvgvv I2C 使用 简介 AIO-3399J 开发板上有 9 个片上 I2C 控制器&#xff0c;各个 I2C 的使用情况如下表&#xff1a; 本文主要描述如何在该开发板上配置 I2C。 配置…

WebCopilot:一款功能强大的子域名枚举和安全漏洞扫描工具

关于WebCopilot WebCopilot是一款功能强大的子域名枚举和安全漏洞扫描工具&#xff0c;该工具能够枚举目标域名下的子域名&#xff0c;并使用不同的开源工具检测目标存在的安全漏洞。 工具运行机制 WebCopilot首先会使用assetsfinder、submaster、subfinder、accumt、finddom…

大数据学习第十二天(hadoop概念)

1、服务器之间数据文件传递 1&#xff09;服务器之间传递数据&#xff0c;依赖ssh协议 2&#xff09;http协议是web网站之间的通讯协议&#xff0c;用户可已通过http网址访问到对应网站数据 3&#xff09;ssh协议是服务器之间&#xff0c;或windos和服务器之间传递的数据的协议…

C语言第三十九弹---预处理(上)

✨个人主页&#xff1a; 熬夜学编程的小林 &#x1f497;系列专栏&#xff1a; 【C语言详解】 【数据结构详解】 预处理 1、预定义符号 2、#define定义常量 3、#define定义宏 4、带有副作用的宏参数 5、宏替换的规则 6、宏和函数的对比 总结 在C语言中&#xff0c;预处…

阿里云2核4G云服务器支持多少人同时在线?并发数计算?

阿里云2核4G服务器多少钱一年&#xff1f;2核4G配置1个月多少钱&#xff1f;2核4G服务器30元3个月、轻量应用服务器2核4G4M带宽165元一年、企业用户2核4G5M带宽199元一年。可以在阿里云CLUB中心查看 aliyun.club 当前最新2核4G服务器精准报价、优惠券和活动信息。 阿里云官方2…

蓝奏云直链获取在线解析网站源码

源码简介 蓝奏云直链获取在线解析网站源码 蓝奏云链接解析 本地API接口 支持有无密码和短期直链和永久直链&#xff0c;同时还可以显示文件名和大小。 这个解析器无需数据库即可搭建&#xff0c;API接口已经本地化&#xff0c;非常简单易用。 安装环境 php5.6 搭建教程 …

在编程中使用中文到底该不该??

看到知乎上有个热门问题&#xff0c;为什么很多人反对中文在编程中的使用&#xff1f; 这个问题有几百万的浏览热度&#xff0c;其中排名第一的回答非常简洁&#xff0c;我深以为然&#xff1a; 在国内做开发&#xff0c;用中文写注释、写文档&#xff0c;是非常好的习惯&…

前端工程师————HTML5学习

HTML5基础 开发工具很多&#xff0c;其中Hbulider较好用&#xff0c;下载网址如下&#xff1a; DCloud - HBuilder、HBuilderX、uni-app、uniapp、5、5plus、mui、wap2app、流应用、HTML5、小程序开发、跨平台App、多端框架 html表示整个页面 head表示搜素框 body表示内容 ti…

Java练习

这个练习我用到了继承&#xff0c;多态和封装。 1.继承&#xff1a; Animal 类是一个抽象类&#xff0c;它有两个子类 Dog 和 Cat。 Dog 和 Cat 分别继承自 Animal 类&#xff0c;因此它们可以使用 Animal 类中定义的属性和方法&#xff0c;同时也可以有自己特有的属性和方法。…

【Java代码审计】SSTI模板注入篇

【Java代码审计】SSTI模板注入篇 1.概述2.Velocity 模板引擎3.Thymeleaf 模板注入复现普通url作为视图 4.SSTI 漏洞修复白名单控制跳转模版设置response参数 1.概述 模板引擎支持使用静态模板文件&#xff0c;在运行时用 HTML 页面中的实际值替换变量/占位符&#xff0c;从而让…

C++要学到什么程度才能找到实习?

在考虑 C 学习到何种程度可以找到实习时&#xff0c;以下是一些具体的方向和建议。我这里有一套编程入门教程&#xff0c;不仅包含了详细的视频讲解&#xff0c;项目实战。如果你渴望学习编程&#xff0c;不妨点个关注&#xff0c;给个评论222&#xff0c;私信22&#xff0c;我…

java学习之路-数组定义与使用

目录 ​编辑 1.什么是数组 2.数组的创建及其初始化 2.1数组的创建 2.2数组的初始化 3.数组的使用 3.1数组元素访问 3.2遍历数组 4.数组是引用类型 4.1jvm的内存分布 4.2基本类型变量与引用类型变量的区别 4.3引用变量详解 4.4 null 5.数组的使用场景 5.1存储数据 5…

使用Vue实现CSS过渡和动画

01-初识动画和过渡 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta name"viewport" content"widthdevice-width, initial-scale1.0" /><title>使用vue实现css过渡和动画&l…

数据结构——遍历二叉树和线索二叉树,树和森林

目录 1.遍历的算法实现 1.先序遍历 代码示例&#xff1a; 2.中序遍历 代码示例&#xff1a; 3.后序遍历 代码示例&#xff1a; 4.遍历算法的分析 2.遍历二叉树的非递归算法 1.中序遍历非递归算法 代码示例&#xff1a; 3.二叉树的层次遍历 代码示例&#xff1a; 4.二…

c#仿ppt案例

画曲线 namespace ppt2024 {public partial class Form1 : Form{public Form1(){InitializeComponent();}//存放所有点的位置信息List<Point> lstPosition new List<Point>();//控制开始画的时机bool isDrawing false;//鼠标点击开始画private void Form1_MouseD…

网络原理 - HTTP / HTTPS(2)——http请求

目录 一、认识 “方法”&#xff08;method&#xff09; 1、GET方法 2、POST方法 &#xff08;1&#xff09;登录 &#xff08;2&#xff09;上传 &#xff08;3&#xff09;GET和POST使用习惯 3、GET方法和POST方法的区别 正确滴 关于一些网上的说法&#xff0c;错误滴…