深入理解Storm 之 TridentStrom

news/2024/5/15 1:39:01/文章来源:https://blog.csdn.net/gridlayout/article/details/129286345
从Demo讲起:
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
               new Values("the cow jumped over the moon"),new Values("the man went to the store and bought some candy"),new Values("four score and seven years ago"),new Values("how many apples can you eat"));
spout.setCycle(true);
 
TridentTopology topology = new TridentTopology();        
TridentState wordCounts =topology.newStream("spout1", spout).each(new Fields("sentence"), new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))                .parallelismHint(6);

Trident keeps track of a small amount of state for each input source (metadata about what it has consumed) in Zookeeper, and the "spout1" string here specifies the node in Zookeeper where Trident should keep that metadata.  (Trident 在zookeeper中为每一个输入资源(metaData)建立一系列状态监控和追踪.   "spout1" 指定了在zookeer中,Trident应该存储metaData的位置.)

the spout emits a stream containing one field called "sentence". The next line of the topology definition applies the Split function to each tuple in the stream, taking the "sentence" field and splitting it into words. Each sentence tuple creates potentially many word tuples – for instance, the sentence "the cow jumped over the moon" creates six "word" tuples.  (就是说每一个处理输入就是一个tuple,输出就是一个单独的tuple.)

One of the cool things about Trident is that it has fully fault-tolerant, exactly-once processing semantics. This makes it easy to reason about your realtime processing. Trident persists state in a way so that if failures occur and retries are necessary, it won't perform multiple updates to the database for the same source data.

Trident实现了容错和一次执行的原子性操作, 通过事务管理数据的整个处理过程。 并且失败后,可以重新尝试, 同一组数据不会多次更新。

DRPC

 a low latency distributed query on the word counts

低延迟的分布式查询

DRPCClient client = new DRPCClient("drpc.server.location", 3772);
System.out.println(client.execute("words", "cat dog the man");
"words" 表示流名称.
topology.newDRPCStream("words").each(new Fields("args"), new Split(), new Fields("word")).groupBy(new Fields("word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")).each(new Fields("count"), new FilterNull()).aggregate(new Fields("count"), new Sum(), new Fields("sum"));
wordCounts: 另外一个数据流拓扑对象.

Each DRPC request is treated as its own little batch processing job that takes as input a single tuple representing the request. The tuple contains one field called "args" that contains the argument provided by the client.

一个单独的tuple 代表一个 ERPC request.  tuple 包含一个“args”字段,里面包含客户端请求参数. 比如:"cat dog the man"

Trident is intelligent about how it executes a topology to maximize performance. 

1.Operations that read from or write to state (like persistentAggregate and stateQuery) automatically batch operations to that state.  从state中读或者向state中写,自动转换为批量处理. 也可以可以设置cache,最大程度限制读写次数,提高性能和提供方便.

2.Trident aggregators are heavily optimized.   聚合最优化.

Rather than transfer all tuples for a group to the same machine and then run the aggregator, Trident will do partial aggregations when possible before sending tuples over the network. For example, the Count aggregator computes the count on each partition, sends the partial count over the network, and then sums together all the partial counts to get the total count. This technique is similar to the use of combiners in MapReduce.

aggregators  并不是转换所有的tuple给一个相同的机器,然后再去做聚合. Trident 会对tuple做部分聚合,然后把部分聚合发送到其他集群节点,一起汇总一个总的聚合.

另外一个例子:

TridentState urlToTweeters =topology.newStaticState(getUrlToTweetersState()); //曾经转发url的tweeter用户.
TridentState tweetersToFollowers =topology.newStaticState(getTweeterToFollowersState()); //tweeter用户的粉丝数量.topology.newDRPCStream("reach").stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters")) // args,DRPCClient的请求参数,url. 根据url获取到转发该url的tweeter用户集合. 一个tuple..each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter")) // 展开tweeters集合用户, tweeter, 很多个tuple,取决于用户集合数量..shuffle().stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers")) // 根据个体用户tweeter,查询他或她的粉丝群,followers集合,一个tuple..parallelismHint(200) // 为粉丝群开启200个并行任务.
       .each(new Fields("followers"), new ExpandList(), new Fields("follower")) //展开粉丝群, followers -> follower. 多个tuple..groupBy(new Fields("follower")) //对粉丝群进行分组..aggregate(new One(), new Fields("one")) //合并相同的粉丝为一个..parallelismHint(20) .aggregate(new Count(), new Fields("reach")); // 计算人数.
 

Fields and tuples

The Trident data model is the TridentTuple which is a named list of values. 

过滤操操作:

Consider this example. Suppose you have a stream called "stream" that contains the fields "x", "y", and "z". To run a filter MyFilter that takes in "y" as input, you would say:

stream.each(new Fields("y"), new MyFilter())

Suppose the implementation of MyFilter is this:

public class MyFilter extends BaseFilter {public boolean isKeep(TridentTuple tuple) {return tuple.getInteger(0) < 10;}
}

This will keep all tuples whose "y" field is less than 10.  会过滤掉所有y<10的tuple.

each 操作:

假设stream里面有x,y,z字段.

stream.each(new Fields("x", "y"), new AddAndMultiply(), new Fields("added", "multiplied")); // 运算完之后的字段是: x, y, z, added, multiplied

public class AddAndMultiply extends BaseFunction {public void execute(TridentTuple tuple, TridentCollector collector) { // 这个tuple里面仅仅有x,y两个字段.没有z字段.int i1 = tuple.getInteger(0);int i2 = tuple.getInteger(1);collector.emit(new Values(i1 + i2, i1 * i2));}
}

聚合操作:

With aggregators, on the other hand, the function fields replace the input tuples. So if you had a stream containing the fields "val1" and "val2", and you did this:

stream.aggregate(new Fields("val2"), new Sum(), new Fields("sum"))

The output stream would only contain a single tuple with a single field called "sum", representing the sum of all "val2" fields in that batch.

流输出结果仅包含一个tuple,tuple里面仅有一个字段sum. 代表在此轮批量处理中val2的总和.

分组聚合操作: 

With grouped streams, the output will contain the grouping fields followed by the fields emitted by the aggregator. For example:

stream.groupBy(new Fields("val1")) // 输出字段里包含正在分组的字段val1, 跟随着聚合字段sum..aggregate(new Fields("val2"), new Sum(), new Fields("sum")) // sum 字段替换val2字段.
 

In this example, the output will contain the fields "val1" and "sum".

State

如何保证每个消息仅仅执行一次:

Trident solves this problem by doing two things:

  1. Each batch is given a unique id called the "transaction id". If a batch is retried it will have the exact same transaction id.   给每个批次赋予交易id, 失败重试的时候,交易id是一样的.
  2. State updates are ordered among batches. That is, the state updates for batch 3 won't be applied until the state updates for batch 2 have succeeded.  //交易id按序列运行, 如果2没有成功,3也不会执行.

    存储交易id和计数value到数据库里面,

Then, when updating the count, you can just compare the transaction id in the database with the transaction id for the current batch. If they're the same, you skip the update – because of the strong ordering, you know for sure that the value in the database incorporates the current batch. If they're different, you increment the count.

交易id也不是必须得存储起来,Trident有一套自己的"至少一次消息处理"机制,保证失败的情况.

Execution of Trident topologies  ,  Trident 拓扑的执行流程: 

tuple 只有在shuffle或group的时候,才会进行网络同步.

Tuples are only sent over the network when a repartitioning of the data is required, such as if you do a groupBy or a shuffle.

Trident topologies compile down into as efficient of a Storm topology as possible. Tuples are only sent over the network when a repartitioning of the data is required, such as if you do a groupBy or a shuffle.

Trident拓扑会最优化的编译成最高效的Storm拓扑.

So if you had this Trident topology:

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_75881.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

环境搭建02-Ubuntu16.04 安装CUDA和CUDNN、CUDA多版本替换

1、CUDA安装 &#xff08;1&#xff09;下载需要的CUDA版本 https://developer.nvidia.com/cuda-toolkit-archive &#xff08;2&#xff09;安装 sudo sh cuda_8.0.61_375.26_linux.run&#xff08;3&#xff09;添加环境 gedit ~/.bashrc在文件末尾添加&#xff1a; ex…

【JVM】垃圾收集器理论算法

垃圾收集算法 垃圾收集算法是内存回收的方法理论&#xff08;理论方法&#xff0c;并未实际实现&#xff09; 分代收集理论 JVM虚拟机的垃圾收集是采用分代收集算法&#xff0c;根据对象的存活周期的不同将内存分块。java将堆分为新生代和老年代,就可以根据不同年龄的不同特点…

【Linux】理解文件系统

文章目录理解文件系统了解磁盘结构inode理解文件系统 了解磁盘结构 磁盘是计算机中的一个 机械设备 这个磁盘的盘片就像光盘一样,数据就在盘片上放着, 但是光盘是只读的,磁盘是可读可写的 机械硬盘的寻址的工作方式: 盘片不断旋转,磁头不断摆动,定位到特定的位置 我们可以把…

怕被AI取代快想办法“攒”个“数字第二大脑”

每日经济新闻发文:来自央视财经微博2月27日消息,美国《财富》杂志网站近日报道,美国一家提供就业服务的平台对1000家企业进行了调查。结果显示,美国最新调查显示50%企业已在用ChatGPT,其中48%已让其代替员工,有公司省下10多万美元!还有30%表示,有计划使用。

【IoT】2023裁员潮还在继续,构建规划能力也许是一剂良方

今天要分享的主题是华为的市场管理方法论。 市场管理这个词总体来说还是有些抽象&#xff0c;本质上来看或者说从个人的角度来看&#xff0c;其实就是一种规划的能力。 无论是创业&#xff0c;还是作为职场人&#xff0c;规划能力必将是你不可或缺的一种基础能力。 尤其是在这样…

某马程序员NodeJS速学笔记

文章目录前言一、什么是Node.js?二、fs文件系统模块三、Http模块四、模块化五、开发属于自己的包模块加载机制六、Express1.初识ExpressGET/POSTnodemon2.路由模块化3.中间件中间件分类自定义中间件4. 跨域问题七、Mysql模块安装与配置基本使用Web开发模式Session认证JWT八、m…

八、异步编程

文章目录异步编程FutureTask应用&源码分析FutureTask介绍FutureTask应用FutureTask源码分析FutureTask中的核心属性FutureTask的run方法FutureTask的set&setException方法FutureTask的cancel方法FutureTask的get方法FutureTask的finishCompletion方法CompletableFuture…

基于部标JT808的车载视频监控需求与EasyCVR视频融合平台解决方案设计

一、方案背景 众所周知&#xff0c;在TSINGSEE青犀视频解决方案中&#xff0c;EasyCVR视频智能融合共享平台主要作为视频汇聚平台使用&#xff0c;不仅能兼容安防标准协议RTSP/Onvif、国标GB28181&#xff0c;互联网直播协议RTMP&#xff0c;私有协议海康SDK、大华SDK&#xf…

虚拟局域网VLAN的实现机制

虚拟局域网VLAN的实现机制1.IEEE 802.1Q帧2.交换的端口类型AccessTrunkHybrid&#xff08;华为特有&#xff09;1.IEEE 802.1Q帧 IEEE802.1Q帧&#xff08;也称Dot One Q帧&#xff09;对以太网的MAC帧格式进行了扩展&#xff0c;插入了4字节的VLAN标记。 2.交换的端口类型 A…

Facebook广告成本过高?尝试这些成本控制技巧

在当今的数字营销领域中&#xff0c;Facebook广告已经成为许多企业的首选。但是&#xff0c;随着竞争的加剧&#xff0c;Facebook广告的成本也在不断攀升。如果您发现自己的Facebook广告成本过高&#xff0c;不要担心&#xff0c;下面将介绍一些成本控制技巧。一.利用Facebook的…

第四阶段05- 关于响应结果JsonResult对象,枚举,Spring MVC的统一处理异常机制

23. 关于响应结果 目前&#xff0c;当成功的添加相册后&#xff0c;服务器端响应的结果是&#xff1a; 添加相册成功&#xff01;如果相册名称已经被占用&#xff0c;服务器端响应的结果是&#xff1a; 添加相册失败&#xff0c;相册名称已经被占用&#xff01;以上的响应结…

机器学习100天(三十二):032 KD树的构造和搜索

机器学习100天,今天讲的是:KD树的构造和搜索! 《机器学习100天》完整目录:目录 在 K 近邻算法中,我们计算测试样本与所有训练样本的距离,类似于穷举法。如果数据量少的时候,算法运行时间没有大的影响,但是如果数据量很大,那么算法运行的时间就会很长。这在实际的应用…

4.排序算法之一:冒泡排序

排序算法稳定性假定在待排序的记录序列中&#xff0c;存在多个具有相同的关键字的记录&#xff0c;若经过排序&#xff0c;这些记录的相对次序保持不变&#xff0c;即在原序列中&#xff0c;r[i]r[j]&#xff0c;且r[i]在r[j]之前&#xff0c;而在排序后的序列中&#xff0c;r[…

柔性电路板的优点、分类和发展方向

柔性电路板是pcb电路板的一种&#xff0c;又称为软板、柔性印刷电路板&#xff0c;主要是由柔性基材制作而成的一种具有高可靠性、高可挠性的印刷电路板&#xff0c;具有厚度薄、可弯曲、配线密度高、重量轻、灵活度高等特点&#xff0c;主要用在手机、电脑、数码相机、家用电器…

二叉树——二叉树的最近公共祖先

二叉树的最近公共祖先 给定一个二叉树, 找到该树中两个指定节点的最近公共祖先。 百度百科中最近公共祖先的定义为&#xff1a;“对于有根树 T 的两个节点 p、q&#xff0c;最近公共祖先表示为一个节点 x&#xff0c;满足 x 是 p、q 的祖先且 x 的深度尽可能大&#xff08;一…

Guitar Pro8免费吉他曲谱mySongBook

每周都会发布新的谱子&#xff0c;目前已有有数千首歌曲可供选择&#xff0c;在谱库中&#xff0c;您能找到 Guns N Roses&#xff0c;Miles Davis&#xff0c;Ed Sheeran 等人的经典曲目。开头我们先做一个小实验&#xff1a;现在打开你电脑里存放曲谱的文件夹&#xff0c;里面…

[busybox] busybox生成一个最精简rootfs(上)

这篇文章是承接着[rootfs]用busybox做一个rootfs(根文件系统)来的&#xff0c;再回看这篇我很久之前写的文章的时候&#xff0c;有一个问题出现在我的脑海中&#xff0c;创建了这个文件那个文件&#xff0c;但确实是每个文件都是必需的吗&#xff1f; 这篇文章我们就来讨论下这…

【用Group整理目录结构 Objective-C语言】

一、接下来,我们看另外一个知识点,怎么用Group把这一堆乱七八糟的文件给它整理一下,也算是封装一下吧, 1.这一堆杂乱无章的文件: 那么,哪些类是属于模型呢,哪些类是属于视图呢,哪些类是属于控制器呢, 我们接下来通过Group的方式,来给它们分一下类, 这样看起来就好…

蓝海彤翔执行副总裁张加廷接受【联播苏州】独家专访

今年春节档&#xff0c;科幻类电影《流浪地球2》票房口碑双丰收&#xff0c;截至目前&#xff0c;累计票房已破 38 亿&#xff0c;淘票票评分 9.6 &#xff0c;影片的特效质感可以媲美国际顶尖水平。其中&#xff0c;蓝海彤翔为影片的后期制作提供了出色的渲染服务。2月21日&am…

招投标管理系统-适合于招标代理、政府采购、企业采购、工程交易等业务的企业

招投标管理系统-适合于招标代理、政府采购、企业采购、工程交易等业务的企业 招投标管理系统是一个用于内部业务项目管理的应用平台。以项目为主线&#xff0c;从项目立项&#xff0c;资格预审&#xff0c;标书编制审核&#xff0c;招标公告&#xff0c;项目开标&#xff0c;项…