[flink] flink macm1pro 快速使用从零到一

news/2024/4/28 14:19:22/文章来源:https://blog.csdn.net/qq_45704048/article/details/137127328

文章目录

  • 快速使用

快速使用

  1. 打开 https://flink.apache.org/downloads/ 下载 flink

因为书籍介绍的是 1.12版本的,为避免不必要的问题,下载相同版本

image.png
image.png

  1. 解压
 tar -xzvf flink-1.11.2-bin-scala_2.11.tgz

image.png

  1. 启动 flink
./bin/start-cluster.sh

image.png

  1. 打开 flink web 页面 localhost:8081

image.png

  1. 编写结合 Kafka 词频统计程序

具体参考 https://weread.qq.com/web/reader/51032ac07236f8e05107816k1f032c402131f0e3dad99f3?

package org.example;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;import java.util.Properties;public class WordCountKafkaInStdOut {public static void main(String[] args) throws Exception {// 设置Flink执行环境 StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// Kafka参数 Properties properties = new Properties();properties.setProperty("bootstrap.servers", "localhost:9092");properties.setProperty("group.id", "flink-group");String inputTopic = "Shakespeare";String outputTopic = "WordCount";// Source FlinkKafkaConsumer<String> consumer =new FlinkKafkaConsumer<String>(inputTopic, new SimpleStringSchema(),properties);DataStream<String> stream = env.addSource(consumer);// Transformation // 使用Flink  API对输入流的文本进行操作 // 按空格切词、计数、分区、设置时间窗口、聚合 DataStream<Tuple2<String, Integer>> wordCount = stream.flatMap((String line, Collector<Tuple2<String, Integer>> collector) -> {String[] tokens = line.split("\\s");// 输出结果  for (String token : tokens) {if (token.length() > 0) {collector.collect(new Tuple2<>(token, 1));}}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(0).timeWindow(Time.seconds(5)).sum(1);// Sink wordCount.print();// execute env.execute("kafka streaming word count");}
} 
  1. 打包应用(当然在这之前需要本地调试一下,至少得运行通吧😄)
  2. 使用Flink提供的命令行工具flink,将打包好的作业提交到集群上。命令行的参数 --class 用来指定哪个主类作为入口。
./bin/flink run --class org.example.WordCountKafkaInStdOut xxtarget/flink_study-1.0-SNAPSHOT.jar

class 建议直接拷贝引用
image.png

  1. web 页面查看作业提交成功

image.png

  1. kafka 生产者随便发点消息

image.png

  1. 查看作业日志,词频统计结果

image.png
image.png

  1. 关闭 flink
./bin/stop-cluster.sh

image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_1027374.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

RTOS线程切换的过程和原理

0 前言 RTOS中最重要的一个概念就是线程&#xff0c;线程的按需切换能够满足RTOS的实时性要求&#xff0c;同时能将复杂的需求分解成一个个线程执行减轻我们开发负担。 本文从栈的角度出发&#xff0c;详细介绍RTOS线程切换的过程和原理。 注&#xff1a;本文参考的RTOS是RT-T…

<QT基础(5)>事件监听

事件监听 事件监听&#xff08;Event Handling&#xff09;是在程序中监视和响应发生的事件的一种机制。在Qt中&#xff0c;事件监听是一种常见的用于处理用户输入、系统事件以及其他类型事件的方法。通过事件监听&#xff0c;您可以在发生特定事件时捕获事件并执行相应的操作…

【AI】在本地 Docker 环境中搭建使用 Hugging Face 托管的 Llama 模型

目录 Hugging Face 和 LLMs 简介利用 Docker 进行 ML格式的类型请求 Llama 模型访问创建 Hugging Face 令牌设置 Docker 环境快速演示访问页面入门克隆项目构建镜像运行容器结论推荐超级课程: Docker快速入门到精通Kubernetes入门到大师通关课AWS云服务快速入门实战Hugging Fa…

7、鸿蒙学习-共享包概述

HarmonyOS提供了两种共享包&#xff0c;HAR&#xff08;Harmony Archive&#xff09;静态共享包&#xff0c;和HSP&#xff08;Harmony Shared Package&#xff09;动态共享包。 HAR与HSR都是为了实现代码和资源的共享&#xff0c;都可以包含代码、C库、资源和配置文件&#xf…

iPhone用GPT替代Siri

shigen坚持更新文章的博客写手&#xff0c;擅长Java、python、vue、shell等编程语言和各种应用程序、脚本的开发。记录成长&#xff0c;分享认知&#xff0c;留住感动。 个人IP&#xff1a;shigen 前一段时间&#xff0c;因为iCloud协议的更新&#xff0c;我的云盘空间无法正常…

RISC-V特权架构 - 中断定义

RISC-V特权架构 - 中断定义 1 中断类型1.1 外部中断1.2 计时器中断1.3 软件中断1.4 调试中断 2 中断屏蔽3 中断等待4 中断优先级与仲裁5 中断嵌套6 异常相关寄存器 本文属于《 RISC-V指令集基础系列教程》之一&#xff0c;欢迎查看其它文章。 1 中断类型 RISC-V 架构定义的中…

helm 部署 Kube-Prometheus + Grafana + 钉钉告警部署 Kube-Prometheus

背景 角色IPK8S 版本容器运行时k8s-master-1172.16.16.108v1.24.1containerd://1.6.8k8s-node-1172.16.16.109v1.24.1containerd://1.6.8k8s-node-2172.16.16.110v1.24.1containerd://1.6.8 安装 kube-prometheus mkdir -p /data/yaml/kube-prometheus/prometheus &&…

CDH集群hive初始化元数据库失败

oracle数据库操作&#xff1a; 报错如下&#xff1a;命令 (Validate Hive Metastore schema (237)) 已失败 截图如下&#xff1a; 后台日志部分摘录&#xff1a; WARNING: Use “yarn jar” to launch YARN applications. SLF4J: Class path contains multiple SLF4J binding…

UE RPC 外网联机(1)

技术&#xff1a;RPC TCP通信 设计&#xff1a;大厅服务<---TCP--->房间服务<---RPC--->客户端&#xff08;Creator / Participator&#xff09; 1. PlayerController 用于RPC通信控制 2.GameMode 用于数据同步 3.类图 4. 注意 &#xff08;1&#xff09;RPC&a…

机器学习之决策树现成的模型使用

目录 须知 DecisionTreeClassifier sklearn.tree.plot_tree cost_complexity_pruning_path(X_train, y_train) CART分类树算法 基尼指数 分类树的构建思想 对于离散的数据 对于连续值 剪枝策略 剪枝是什么 剪枝的分类 预剪枝 后剪枝 后剪枝策略体现之威斯康辛州乳…

GIMP - GNU 图像处理程序 - 工具栏窗口 (Toolbox)

GIMP - GNU 图像处理程序 - 工具栏窗口 [Toolbox] 1. GNU Image Manipulation Program2. Windows -> Recently Closed Docks -> ToolboxReferences 1. GNU Image Manipulation Program 2. Windows -> Recently Closed Docks -> Toolbox References [1] Yongqiang …

软件概要设计说明书word原件(实际项目)

一、 引言 &#xff08;一&#xff09; 编写目的 &#xff08;二&#xff09; 范围 &#xff08;三&#xff09; 文档约定 &#xff08;四&#xff09; 术语 二、 项目概要 &#xff08;一&#xff09; 建设背景 &#xff08;二&#xff09; 建设目标 &#xff08;三&a…

Typora字数过多的时候造成卡顿现象如何解决?

Typora字数过多的时候造成卡顿现象如何解决&#xff1f; 点击 、切换、滚动、打字都有点卡顿&#xff0c;下面介绍三种方法&#xff0c;三种方法都可以尝试&#xff0c;建议先尝试方法一&#xff0c;效果不满意就用方法二&#xff0c;实在不行就最后一个取巧的办法。 方法1&a…

图像处理与视觉感知---期末复习重点(5)

文章目录 一、膨胀与腐蚀1.1 膨胀1.2 腐蚀 二、开操作与闭操作 一、膨胀与腐蚀 1.1 膨胀 1. 集合 A A A 被集合 B B B 膨胀&#xff0c;定义式如下。其中集合 B B B 也称为结构元素&#xff1b; ( B ^ ) z (\hat{B})z (B^)z 表示 B B B 的反射平移 z z z 后得到的新集合。…

《Vision mamba》论文笔记

原文出处&#xff1a; [2401.09417] Vision Mamba: Efficient Visual Representation Learning with Bidirectional State Space Model (arxiv.org) 原文笔记&#xff1a; What&#xff1a; Vision Mamba: Efficient Visual Representation Learning with Bidirectional St…

llama-index 结合chatglm3-6B 利用RAG 基于文档智能问答

简介 llamaindex结合chatglm3使用 import os import torch from llama_index.core import VectorStoreIndex, ServiceContext from llama_index.core.callbacks import CallbackManager from llama_index.core.llms.callbacks import llm_completion_callback from llama_ind…

Groovy基础入门

一、Groovy简介 Groovy是运行在JVM中的一种动态语言&#xff0c;可以在Java平台上进行编程&#xff0c;使用方式基本与使用Java代码的方式相同&#xff0c;它的语法与Java语言的语法很相似&#xff0c;与Java相比&#xff0c;Groovy更加灵活、简洁&#xff0c;而且完成同样的功…

C语言例4-6:格式字符d的使用例子

代码如下&#xff1a; //格式字符d的使用例子 #include<stdio.h> int main(void) {int num1123;long num2123456;printf("num1%d,num1%5d,num1%-5d,num1%2d\n",num1,num1,num1,num1);//以四种不同格式&#xff0c;输出int型数据num1的值printf("num2%ld,…

Mybatis别名 动态sql语句 分页查询

给Mybatis的实体类起别名 给Mybatis的xml文件注册mapper映射文件 动态sql语句 1 if 2 choose 3 where 4 foreach 一&#xff09;if 查询指定名称商品信息 语法&#xff1a; SELECT * FROM goods where 11 <if test "gName!null"> and g.g_name like co…

linux:线程同步

个人主页 &#xff1a; 个人主页 个人专栏 &#xff1a; 《数据结构》 《C语言》《C》《Linux》 文章目录 前言线程同步条件变量接口简单示例pthread_cond_wait为什么要有mutex伪唤醒问题的解决 (if->while) 总结 前言 本文作为我对于线程同步知识总结 线程同步 同步&…