【性能|优化】TB级flink任务报错分析:Could not compute the container Resource

news/2024/4/25 21:25:14/文章来源:https://blog.csdn.net/hiliang521/article/details/128089291

文章目录

    • 一. 问题引入
      • 1. 场景描述
      • 2. 日志简析
    • 二. 初级问题分析与解决
      • 1. 问题分析
        • 1.1. yarn的调度器设置
        • 1.2. 程序设置
      • 2. 问题解决
    • 三. (性能)新的问题
      • 1. 问题描述
      • 2. 理想化的最优方案
      • 3. "PlanB"的解决方案
    • 四. 反思与迭代

一. 问题引入

1. 场景描述

使用flink引擎,处理hdfs到hive的任务,hdfs的文件数有4000个,这里设置并行度为20,提交任务运行。

 

2. 日志简析

任务提交之后发现报错,我们简单分析下yarn的日志:

1. 申请的资源超过了yarn最大的container资源限制,也就是说一个taskexecutor所需的资源过大Could not compute the container Resource from the given TaskExecutorProcessSpec TaskExecutorProcessSpec {cpuCores=10.0, frameworkHeapSize=128.000mb (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), taskHeapSize=5.200gb (5583457416 bytes), taskOffHeapSize=0 bytes, networkMemSize=1024.000mb (1073741824 bytes), managedMemorySize=4.300gb (4617089912 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), jvmOverheadSize=1024.000mb (1073741824 bytes)}. This usually indicates the requested resource is larger than Yarn's max container resource limit.2. 接着开始请求一个新的worker,这里的worker应该也是container ,那此时正在pend的数量为1.Requesting new worker with resource spec WorkerResourceSpec {cpuCores=10.0, taskHeapSize=5.200gb (5583457416 bytes), taskOffHeapSize=0 bytes, networkMemSize=1024.000mb (1073741824 bytes), managedMemSize=4.300gb (4617089912 bytes)}, current pending count: 1.3. 但同样的这个container超过了yarn的资源,这时直接放弃分配资源Requested container resource (<memory:12288, vCores:10>) exceeds the max limitation of the Yarn cluster (<memory:12288, vCores:4>). Will not allocate resource.4. 如此在反复这样执行,似乎陷入到了死循环。最后任务部署超时而报错。。。

上面的日志说的比较明白,就是:申请的资源超过了yarn最大的container资源限制。
 
 
再放出点堆栈信息,供日后参考分析:

2022-11-18 <b>10:12:12</b>,938 WARN  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Failed requesting worker with resource spec WorkerResourceSpec {cpuCores=10.0, taskHeapSize=5.200gb (5583457416 bytes), taskOffHeapSize=0 bytes, networkMemSize=1024.000mb (1073741824 bytes), managedMemSize=4.300gb (4617089912 bytes)}, current pending count: 0
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: <b>Could not compute the container Resource from the given</b> TaskExecutorProcessSpec TaskExecutorProcessSpec {cpuCores=10.0, frameworkHeapSize=128.000mb (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), taskHeapSize=5.200gb (5583457416 bytes), taskOffHeapSize=0 bytes, networkMemSize=1024.000mb (1073741824 bytes), managedMemorySize=4.300gb (4617089912 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), jvmOverheadSize=1024.000mb (1073741824 bytes)}. This usually indicates the requested resource is larger than Yarn's max container resource limit.。。。at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.12.7.jar:1.12.7]
2022-11-18 <b>10:12:12,93</b>9 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Requesting new worker with resource spec WorkerResourceSpec {cpuCores=10.0, taskHeapSize=5.200gb (5583457416 bytes), taskOffHeapSize=0 bytes, networkMemSize=1024.000mb (1073741824 bytes), managedMemSize=4.300gb (4617089912 bytes)}, current pending count: 1.
2022-11-18 10:12:12,940 WARN  org.apache.flink.yarn.TaskExecutorProcessSpecContainerResourcePriorityAdapter [] - Requested container resource (<memory:12288, vCores:10>) exceeds the max limitation of the Yarn cluster (<memory:12288, vCores:4>). Will not allocate resource.
2022-11-18 10:12:12,940 WARN  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Failed requesting worker with resource spec WorkerResourceSpec {cpuCores=10.0, taskHeapSize=5.200gb (5583457416 bytes), taskOffHeapSize=0 bytes, networkMemSize=1024.000mb (1073741824 bytes), managedMemSize=4.300gb (4617089912 bytes)}, current pending count: 0
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: Could not compute the container Resource from the given TaskExecutorProcessSpec TaskExecutorProcessSpec {cpuCores=10.0, frameworkHeapSize=128.000mb (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), taskHeapSize=5.200gb (5583457416 bytes), taskOffHeapSize=0 bytes, networkMemSize=1024.000mb (1073741824 bytes), managedMemorySize=4.300gb (4617089912 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), jvmOverheadSize=1024.000mb (1073741824 bytes)}. This usually indicates the requested resource is larger than Yarn's max container resource limit.。。。at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.12.7.jar:1.12.7]
2022-11-18 10:12:12,941 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Requesting new worker with resource spec WorkerResourceSpec {cpuCores=10.0, taskHeapSize=5.200gb (5583457416 bytes), taskOffHeapSize=0 bytes, networkMemSize=1024.000mb (1073741824 bytes), managedMemSize=4.300gb (4617089912 bytes)}, current pending count: 1.
2022-11-18 10:12:12,941 WARN  org.apache.flink.yarn.TaskExecutorProcessSpecContainerResourcePriorityAdapter [] - Requested container resource (<memory:12288, vCores:10>) exceeds the max limitation of the Yarn cluster (<memory:12288, vCores:4>). Will not allocate resource.
2022-11-18 10:12:12,941 WARN  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Failed requesting worker with resource spec WorkerResourceSpec {cpuCores=10.0, taskHeapSize=5.200gb (5583457416 bytes), taskOffHeapSize=0 bytes, networkMemSize=1024.000mb (1073741824 bytes), managedMemSize=4.300gb (4617089912 bytes)}, current pending count: 0
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: Could not compute the container Resource from the given TaskExecutorProcessSpec TaskExecutorProcessSpec {cpuCores=10.0, frameworkHeapSize=128.000mb (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), taskHeapSize=5.200gb (5583457416 bytes), taskOffHeapSize=0 bytes, networkMemSize=1024.000mb (1073741824 bytes), managedMemorySize=4.300gb (4617089912 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), jvmOverheadSize=1024.000mb (1073741824 bytes)}. This usually indicates the requested resource is larger than Yarn's max container resource limit.at org.apache.flink.yarn.YarnResourceManagerDriver.requestResource(YarnResourceManagerDriver.java:254) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestNewWorker(ActiveResourceManager.java:249) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestWorkerIfRequired(ActiveResourceManager.java:310) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$0(ActiveResourceManager.java:261) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155) ~[?:1.8.0_152]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestNewWorker(ActiveResourceManager.java:251) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestWorkerIfRequired(ActiveResourceManager.java:310) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$0(ActiveResourceManager.java:261) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155) ~[?:1.8.0_152]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestNewWorker(ActiveResourceManager.java:251) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestWorkerIfRequired(ActiveResourceManager.java:310) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$0(ActiveResourceManager.java:261) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155) ~[?:1.8.0_152]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestNewWorker(ActiveResourceManager.java:251) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestWorkerIfRequired(ActiveResourceManager.java:310) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$0(ActiveResourceManager.java:261) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155) ~[?:1.8.0_152]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestNewWorker(ActiveResourceManager.java:251) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestWorkerIfRequired(ActiveResourceManager.java:310) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$0(ActiveResourceManager.java:261) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155) ~[?:1.8.0_152]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestNewWorker(ActiveResourceManager.java:251) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestWorkerIfRequired(ActiveResourceManager.java:310) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$0(ActiveResourceManager.java:261) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155) ~[?:1.8.0_152]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestNewWorker(ActiveResourceManager.java:251) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestWorkerIfRequired(ActiveResourceManager.java:310) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$0(ActiveResourceManager.java:261) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155) ~[?:1.8.0_152]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestNewWorker(ActiveResourceManager.java:251) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestWorkerIfRequired(ActiveResourceManager.java:310) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$0(ActiveResourceManager.java:261) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155) ~[?:1.8.0_152]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestNewWorker(ActiveResourceManager.java:251) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestWorkerIfRequired(ActiveResourceManager.java:310) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$0(ActiveResourceManager.java:261) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155) ~[?:1.8.0_152]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestNewWorker(ActiveResourceManager.java:251) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestWorkerIfRequired(ActiveResourceManager.java:310) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$0(ActiveResourceManager.java:261) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155) ~[?:1.8.0_152]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestNewWorker(ActiveResourceManager.java:251) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestWorkerIfRequired(ActiveResourceManager.java:310) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$0(ActiveResourceManager.java:261) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155) ~[?:1.8.0_152]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestNewWorker(ActiveResourceManager.java:251) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.startNewWorker(ActiveResourceManager.java:160) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.ResourceManager$ResourceActionsImpl.allocateResource(ResourceManager.java:1382) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.allocateResource(SlotManagerImpl.java:1058) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.fulfillPendingSlotRequestWithPendingTaskManagerSlot(SlotManagerImpl.java:954) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$internalRequestSlot$9(SlotManagerImpl.java:943) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:51) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.internalRequestSlot(SlotManagerImpl.java:941) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:410) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.ResourceManager.requestSlot(ResourceManager.java:529) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_152]at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_152]at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_152]at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_152]at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.12.7.jar:1.12.7]at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.12.7.jar:1.12.7]at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.12.7.jar:1.12.7]at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.12.7.jar:1.12.7]at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.7.jar:1.12.7]at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.12.7.jar:1.12.7]
<br/>

 
 

二. 初级问题分析与解决

1. 问题分析

报错很明确,也就是我们向yarn多申请了资源。所以我们关注两点:yarn的调度器设置(规定了队列,每个任务申请的限制)、程序中是如何设置的并行度的。

1.1. yarn的调度器设置

打开yarn 看到Maximum Allocation限制为<memory:12288, vCores:4> ,具体的说,就是我们申请一个container最大内存能申请12G、最大核心数为4
在这里插入图片描述
 

1.2. 程序设置

看下运行的shell的伪代码

tmp_value=`echo $parallelism 10 | awk '{if($1 > $2) print 1; else print 0;}'`
if [ $tmp_value -eq 1 ] ;thentm=12288vcores=10numberOfTaskSlots=10
fi
..."yarn.containers.vcores":${vcores}
"taskmanager.numberOfTaskSlots":${numberOfTaskSlots}
...

这里可以看到冲突:当我们的并行度设置超过10时,vcores设置为10,但yarn最大让设置为4,所以会报错。。。

 

2. 问题解决

解决问题的方式也比较简单,让vcores最大保持为4,然后再运行。

这里我们设置整个job的并行度还是为20,然后申请一个container中vcores=4,那么按照一个并行度分配一个线程的逻辑

即一个taskmanager(container)中有4核对应有4个slot,那将会有20/4=5个taskmanager

 
 

三. (性能)新的问题

1. 问题描述

任务是跑起来了,但是出现了一个性能的问题:

先看下job的运行情况:

job的消费速度:

  • 20并行度下,每个taskmanager的内存为12G,每秒消费11.28万条数据,那每分钟处理的速度是676.89万条/min,每小时4.06亿条/小时

  • 2032.766839793899649 GB 每小时平均

  • 5793.385493412613869 GB 累计2.85小时
     

最终的结果

  • 53.14亿条
  • 数据量有:3.03TB
  • 消费每条数据的平均大小是:626byte。
  • 1G=1,073,741,824 bytes

最终整个job运行完,花了11个小时:
在这里插入图片描述
客户的要求是1小时内完成,但速度太慢了。。。
 

2. 理想化的最优方案

客户集群的资源是够的,所以我们不考虑资源问题,那既然这样,因为hdfs的文件总共有4000个,再有yarn最大资源分配是4(core/container),所以我们部署一个4000并行度的任务,它将运行的最快!!!

所以我将并行度设置为4000时将会有1000个taskmanager启动。等一下。1000个???我心疼jobmanager一秒钟先。

果然,任务还没调度完,就失败了。。。

所以理想有些简单,也没有银弹。

 

3. "PlanB"的解决方案

既然一个jobmanager不能管理这么多的taskmanager,那就降低taskmanager的数量。
。。。经过多次尝试之后,这里最终给出了方案B的设置:

并行度设置为500,taskmanager启动了125个。

任务最终处理时间缩短到:23分钟

*********************************************
nErrors              |  0
nullErrors           |  0
duplicateErrors      |  0
conversionErrors     |  0
otherErrors          |  0
numWrite             |  3164002700
byteWrite            |  17123302298224
numRead              |  3164002700
writeDuration        |  545313434
byteRead             |  1983716362408
readDuration         |  539805057
snapshotWrite        |  6328005400
*********************************************2022-11-21 11:48:56 Start to stop flink job(3734d90205a13eff7cfa61b5c0c9686b)
2022-11-21 11:50:11 Success to stop flink job(3734d90205a13eff7cfa61b5c0c9686b)
2022-11-21 11:50:11 Flink process exit code is :0

 
 

四. 反思与迭代

针对上述的处理过程,发现几点值得思考和迭代:

  1. 我们程序和yarn进行定时通讯,及时获取yarn中调度器的设置,然后动态设置最大核心数,充分利用资源的同时,也保证了程序的稳定;
  1. 我们在设置并行度时,需要考虑jobmanager能够协调的taskmanager的数量,不是靠尝试。
     
    本文学到的经验是,设置125个taskmanager时,jobmanager是可以顶住压力的,但接下来可以分析分析flink的通讯相关的源码,以便能极大的发挥集群资源。
     
    其次对于jobmanager管理这么多节点是否可以设置flink的高可用,增加job运行的稳定度

还有一个小细节
在这里插入图片描述

  1. taskmanager的内存使用有些浪费,如上图,并未充分利用内存资源,这点我们也需思考要如何优化。

仔细点我们也可以发现

  1. 当container设置的核心数最大为4时,numberOfTaskSlots的设置其实无效了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_38503.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Focal Self-attention for Local-Global Interactions in Vision Transformers

目录 Abstract 1. Introduction 2. MEthod 2.1 Model architecture 2.2 Focal self-attention 2.2.1 Window-wise attention 2.2.2 Complexity analysis 2.3 Model configuration 3. Related work 4. Experiments 4.1 Image classification on ImageNet-1K 4.…

xss-labs/level7

我们如同一下构造那样 <script>alert(xss)</script> 产生以下回显 接着查看源代码 发现第一个输出点被转义了 没有利用价值了 第二个输出点则是可以发现script关键字直接给干没了 氧化钙!!! 似此 如何处之&#xff1f; 我们猜测后台服务器还是对一些特殊字符进…

hiveSql 相互关注问题

hiveSql 相互关注问题说明需求分析优化实现最后说明 普遍社交软件上会有关注功能&#xff0c;如何知道自己的关注是否也是关注了自己呢&#xff1f; 需求 求关注结果数据中&#xff0c;相互关注的用户对。 数据如下&#xff1a; follow表&#xff1b;from_user&#xff1a;关…

人才资源开发杂志人才资源开发杂志社人才资源开发编辑部2022年第21期目录

开卷有益《人才资源开发》投稿&#xff1a;cnqikantg126.com 年轻干部要走好成长成才之路 赖宁; 1 特别关注 迎难而上 勇担重任 推进河南人事考试高质量发展 常万琦; 6-7 工作要闻 第五届中国河南招才引智创新发展大会开幕 归欣 ;张笑闻; 8 河南省12家企业和2个…

Playwright 简明入门教程:录制自动化测试用例,结合 Docker 使用

本篇文章聊聊如何使用 Playwright 进行测试用例的录制生成&#xff0c;以及如何在Docker 容器运行测试用例&#xff0c;或许是网上最简单的入门教程。 写在前面 Playwright 是微软出品的 Web 自动化测试工具和框架&#xff0c;和 Google Puppeteer 有着千丝万缕的关系。前一阵…

电脑密码忘了怎么解除?最简单操作的方法

可能很多人都遇到过这种情况&#xff1a;忘记了电脑密码。电脑密码忘了怎么解除&#xff1f;大多数人都不知道如何解决。下面就为大家分享一下电脑密码忘记怎么解决的方法&#xff0c;方法主要分为两种情况&#xff1a;一种情况是&#xff0c;电脑开机下忘记密码&#xff1b;另…

Cpp知识点系列-类型转换

前言 在做题的时候发现了需要用到类型转换&#xff0c;于是在这里进行了简单的记录。 历史原因&#xff0c;慢慢整理着发现类型转换也能写老大一篇文章了。又花了时间来梳理一下就成了本文了。 cpp 之前使用的环境是DEV-C 5.4&#xff0c;而对应的GCC版本太低了。支持c11需要…

智慧环卫管理系统解决方案(垃圾分类)

1.1 垃圾分类管理子系统 1.1.1 公众参与互动平台 可为客户量身打造微信公众号&#xff0c;搭建互动交流的平台&#xff0c;更好的调动居民参与垃圾分类的热情。 1.1.1.1 分类指导 将目前城市的垃圾处理情况、垃圾分类现状、分类的意义&#xff0c;以数据来说明垃圾分类的…

【设计】OOA、OOD、OOP

这三者都是 OO&#xff08;Object-Oriented&#xff09;领域的思想。 一般我们我们接到产品经理的需求后&#xff0c;开发阶段分这样几个步骤&#xff1a; 可行性预研阶段&#xff0c;此阶段评估需求是否合理&#xff0c;能否实现&#xff1b;OOA阶段&#xff0c;此阶段分析用…

语文课内外杂志语文课内外杂志社语文课内外杂志社2022年第14期目录

幼儿教育《语文课内外》投稿&#xff1a;cn7kantougao163.com 家园协同视域下幼儿心理危机的预防与干预对策 曹锭1-3 幼小衔接阶段幼儿时间观念的培养对策 陈晶晶4-6 有效支持 助力幼儿在书海中徜徉 胡玲珊7-9 东西部幼儿园结对帮扶,助力乡村教育扶贫——以广州市人民政府机关幼…

多线程(2)

文章目录前言 &#xff1a;1.Thread类 &#xff1a;1.1 Thread类常见的构造方法1.2 Thread的几个常见属性1.3 中断一个线程1.4 等待一个线程-join()1.5 获取当前线程引用1.6 休眠当前线程2.线程状态前言 &#xff1a; 简单回顾上文知识点 上文我们了解了 线程是为解决并发编程引…

Java数据审计工具:Envers and JaVers比较

在Java世界中&#xff0c;有两种数据审计工具&#xff1a;Envers和JaVers。 Envers已经存在了很长时间&#xff0c;它被认为是主流。 JaVers提供全新的方法和技术独立性。 如果您考虑哪种工具更适合您的项目&#xff0c;本文是一个很好的起点。 本文分为三个部分。首先&#x…

[附源码]计算机毕业设计springboot餐馆点餐管理系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

【新知实验室-TRTC开发】实时音视频之欢度世界杯

目录 一、什么是TRTC 二、用5分钟跑通一个demo 1、开通腾讯云-TRTC 2、获取demo必须的两把钥匙 2.1输入应用名称 2.2下载对应的源码包&#xff08;手机、web、小程序等&#xff09; 2.3拿到钥匙 2.4完成 三、搭建一起看世界杯应用 1、解压源码&#xff08;耗时30S&#x…

[附源码]计算机毕业设计springboot房屋租赁系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

MySQL为自动编号的字段赋值

insert users values(NULL,ming,fasdfasdfasd,22,1); 或者 insert users values(DEFAULT,ming,fasdfasdfasd,22,1);

网络的根基

hi 大家好&#xff0c;上个周末带小伙伴&#xff0c;一起复习了一遍网络协议&#xff0c;对网络协议的核心知识进行梳理&#xff0c;希望大家早日掌握这些核心知识&#xff0c;打造自己坚实的基础&#xff0c;为自己目标慢慢积累&#xff0c;等到自己春天的到来。详细点击查看…

zabbix拓扑图和聚合图形

目录 一、环境准备 1、搭建zabbix基础环境 2、创建被监控主机 二、拓扑图 1、拓扑图作用 2、拓扑图绘制步骤 三、聚合图形 1、聚合图形的作用 2、创建聚合图形 一、环境准备 1、搭建zabbix基础环境 zabbix基础环境部署参照&#xff1a;zabbix基础环境部署_桂安俊kyli…

Day14--商品详情-渲染商品详情的数据并优化详情页的显示

提纲挈领&#xff1a; 那么如何在小程序中将这些html的字符串渲染成这莫好看的结构呢&#xff1f; 官方文档&#xff1a;【使用uni-ui组件库中的rich-text组件】 1.渲染商品详情信息 我的操作&#xff1a; 1》在页面结构中&#xff0c;使用 rich-text 组件&#xff0c;将带有…

计算机网络第五章知识点回顾(自顶向下)

1. 网络层控制面 1.1 网络层功能 1.2选路问题 选路问题的描述&#xff1a; 给定一组路由器和连接路由器的链路&#xff0c;寻找一条从源路由器到目的路由器的最佳路径。 1.3 什么是最佳路径&#xff1f; 1.4 图抽象 1.5 选路算法分类 1.6 链路状态&#xff08;LS&#xff0…