SparkStreaming学习之——无状态与有状态转化、遍历kafka的topic消息、WindowOperations

news/2024/4/25 18:51:05/文章来源:https://blog.csdn.net/Helen_1997_1997/article/details/130363644

目录

一、状态转化

二、kafka topic A→SparkStreaming→kafka topic B

(一)rdd.foreach与rdd.foreachPartition

(二)案例实操1

1.需求:

2.代码实现:

3.运行结果

(三)案例实操2

1.需求:

2.代码实现:

3.运行结果

三、WindowOperations

1.WindowOperations 窗口概述

2.代码示例

3.运行结果


一、状态转化

        无状态转化操作就是把简单的 RDD 转化操作应用到每个批次上,也就是转化 DStream 中的每一个 RDD。

        有状态转化操作就是窗口与窗口之间的数据有关系。上次一UpdateStateByKey 原语用于记录历史记录,有时,我们需要在 DStream 中跨批次维护状态(例如流计算中累加 wordcount)。针对这种情况,updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对形式的 DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指 定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}object SparkStreamingKafkaSource {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("sparkKafkaStream").setMaster("local[*]")val streamingContext = new StreamingContext(conf, Seconds(5))streamingContext.checkpoint("checkpoint")val kafkaParams = Map((ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "lxm147:9092"),(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),(ConsumerConfig.GROUP_ID_CONFIG -> "sparkstreamgroup1"))val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(streamingContext,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe(Set("sparkkafkastu"), kafkaParams))// TODO 无状态:每个窗口数据独立/*val wordCountStream: DStream[(String, Int)] = kafkaStream.flatMap(_.value().toString.split("\\s+")).map((_, 1)).reduceByKey(_ + _)wordCountStream.print()*/// TODO 有状态:窗口与窗口之间的数据有关系val sumStateStream: DStream[(String, Int)] = kafkaStream.flatMap(x => x.value().toString.split("\\s+")).map((_, 1)).updateStateByKey {case (seq, buffer) => {println("进入到updateStateByKey函数中")println("seqvalue:", seq.toList.toString())println("buffer:", buffer.getOrElse(0).toString)val sum: Int = buffer.getOrElse(0) + seq.sumOption(sum)}}sumStateStream.print()streamingContext.start()streamingContext.awaitTermination()}
}

有状态转化会将之前的历史记录与当前输入的数据进行计算:

二、kafka topic A→SparkStreaming→kafka topic B

(一)rdd.foreach与rdd.foreachPartition

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}import java.util/*** 将数据从kafka的topic A取出数据后加工处理,之后再输出到kafka的topic B中*/
object SparkStreamKafkaSourceToKafkaSink {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("sparkKafkaStream2").setMaster("local[*]")val streamingContext = new StreamingContext(conf, Seconds(5))streamingContext.checkpoint("checkpoint")streamingContext.checkpoint("checkpoint")val kafkaParams = Map( // TODO 连接生产者端的topic(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "lxm147:9092"),(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),(ConsumerConfig.GROUP_ID_CONFIG -> "kfkgroup2"))val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(streamingContext,LocationStrategies.PreferConsistent,// 如果没有topic需要创建// kafka-topics.sh --create --zookeeper lxm147:2181 --topic sparkkafkademoin --partitions 1 --replication-factor 1ConsumerStrategies.Subscribe(Set("sparkkafkademoin"), kafkaParams))println("1.配置spark消费kafkatopic")// TODO 使用foreachRDD太过消耗资源——不推荐kafkaStream.foreachRDD( // 遍历rdd => {println("2.遍历spark DStream中每个RDD")// 每隔5秒输出一次/* rdd.foreach(y => { // y:kafka中的keyValue对象println(y.getClass + " 遍历RDD中的每一条kafka的记录")val props = new util.HashMap[String, Object]()// TODO 连接消费者端的topicprops.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "lxm147:9092")props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")val producer = new KafkaProducer[String, String](props)val words: Array[String] = y.value().toString.trim.split("\\s+") // hello worldfor (word <- words) {val record = new ProducerRecord[String, String]("sparkkafkademoout", word + ",1")producer.send(record)}})  */rdd.foreachPartition(rdds => { // rdds是包含rdd某个分区内的所有元素println("3.rdd 每个分区内的所有kafka记录集合")val props = new util.HashMap[String, Object]() // TODO 连接消费者端的topicprops.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "lxm147:9092")props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")val producer = new KafkaProducer[String, String](props)rdds.foreach(y => {println("4.遍历获取rdd某一个分区内的每一条消息")val words: Array[String] = y.value().trim.split("\\s+")for (word <- words) {val record = new ProducerRecord[String, String]("sparkkafkademoout", word + ",1")producer.send(record)}})})})streamingContext.start()streamingContext.awaitTermination()}
}

(二)案例实操1

1.需求:

清洗前:
user            ,                        friends
3197468391,1346449342 3873244116 4226080662 1222907620

清洗后:
user             ,friends                  目标topic:user_friends2
3197468391,1346449342
3197468391,3873244116
3197468391,4226080662
3197468391,1222907620

2.代码实现:

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}import java.utilobject SparkStreamUserFriendrawToUserFriend {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("sparkufStream2").setMaster("local[2]")val streamingContext = new StreamingContext(conf, Seconds(5))streamingContext.checkpoint("checkpoint")val kafkaParams = Map( // TODO 连接生产者端的topic(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "lxm147:9092"),(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),(ConsumerConfig.GROUP_ID_CONFIG -> "sparkuf3"),(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"))val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(streamingContext,LocationStrategies.PreferConsistent,// 如果没有topic需要创建// kafka-topics.sh --create --zookeeper lxm147:2181 --topic user_friends2 --partitions 1 --replication-factor 1ConsumerStrategies.Subscribe(Set("user_friends_raw"), kafkaParams))kafkaStream.foreachRDD(rdd => {rdd.foreachPartition(x => {val props = new util.HashMap[String, Object]() // TODO 连接消费者端的topicprops.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "lxm147:9092")props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")val producer = new KafkaProducer[String, String](props)x.foreach(y => {val splits: Array[String] = y.value().split(",")if (splits.length == 2) {val userid: String = splits(0)val friends: Array[String] = splits(1).split("\\s+")for (friend <- friends) {val record = new ProducerRecord[String, String]("user_friends2", userid + "," + friend)producer.send(record)}}})})})streamingContext.start()streamingContext.awaitTermination()}
}

3.运行结果

(三)案例实操2

1.需求:

清洗前:

event           ,                   yes               ,        maybe   ,              invited               ,no
1159822043,1975964455 3973364512,2733420590 ,1723091036 795873583,3575574655

清洗前后:

eventid        ,friendid        ,status
1159822043,1975964455,yes
1159822043,3973364512,yes
1159822043,2733420590,maybe
1159822043,1723091036,invited

1159822043,795873583,invited

1159822043,3575574655,no

2.代码实现:

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}import java.utilobject SparkStreamEventAttToEvent2 {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("sparkufStream2").setMaster("local[2]")val streamingContext = new StreamingContext(conf, Seconds(5))streamingContext.checkpoint("checkpoint")val kafkaParams = Map( // TODO 连接生产者端的topic(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "lxm147:9092"),(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),(ConsumerConfig.GROUP_ID_CONFIG -> "sparkevent"),(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"))val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(streamingContext,LocationStrategies.PreferConsistent,// 如果没有topic需要创建// kafka-topics.sh --create --zookeeper lxm147:2181 --topic event2 --partitions 1 --replication-factor 1ConsumerStrategies.Subscribe(Set("event_attendees_raw"), kafkaParams))kafkaStream.foreachRDD(rdd => {rdd.foreachPartition(x => {val props = new util.HashMap[String, Object]() // TODO 连接消费者端的topicprops.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "lxm147:9092")props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")val producer = new KafkaProducer[String, String](props)x.foreach(y => { // todo 遍历获取rdd某一个分区内的每一条消息val splits: Array[String] = y.value().split(",")val eventID: String = splits(0)if (eventID.trim.nonEmpty) {if (splits.length >= 2) {val yesarr: Array[String] = splits(1).split("\\s+")for (yesID <- yesarr) {val yes = new ProducerRecord[String, String]("event2", eventID + "," + yesID + ",yes")producer.send(yes)}}if (splits.length >= 3) {val maybearr: Array[String] = splits(2).split("\\s+")for (maybeID <- maybearr) {val yes = new ProducerRecord[String, String]("event2", eventID + "," + maybeID + ",maybe")producer.send(yes)}}if (splits.length >= 4) {val invitedarr: Array[String] = splits(3).split("\\s+")for (invitedID <- invitedarr) {val invited = new ProducerRecord[String, String]("event2", eventID + "," + invitedID + ",invited")producer.send(invited)}}if (splits.length >= 5) {val noarr: Array[String] = splits(4).split("\\s+")for (noID <- noarr) {val no = new ProducerRecord[String, String]("event2", eventID + "," + noID + ",no")producer.send(no)}}}})})})streamingContext.start()streamingContext.awaitTermination()}
}

3.运行结果

三、WindowOperations

1.WindowOperations 窗口概述

        Window Operations 可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming 的允许状态。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。

➢ 窗口时长:计算内容的时间范围;

➢ 滑动步长:隔多久触发一次计算。

注意:这两者都必须为采集周期大小的整数倍。

2.代码示例

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}object SparkWindowDemo1 {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("sparkwindow1").setMaster("local[*]")val streamingContext = new StreamingContext(conf, Seconds(3))streamingContext.checkpoint("checkpoint")val kafkaParams = Map( // TODO 连接生产者端的topic(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "lxm147:9092"),(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),(ConsumerConfig.GROUP_ID_CONFIG -> "sparkwindow"),(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest"))val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(streamingContext,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe(Set("sparkkafkastu"), kafkaParams))val winStream: DStream[(String, Int)] = kafkaStream.flatMap(x => x.value().trim.split("\\s+")).map((_, 1)).window(Seconds(9), Seconds(3))winStream.print()streamingContext.start()streamingContext.awaitTermination()}
}

注意:window的步长不进行设置,默认是采集周期

3.运行结果

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_103492.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Eclipse代码提示突然失灵的解决方案

不知道改动了啥&#xff0c;突然间Eclipse的代码提示就失效了&#xff0c;发现缺少后极不方便。 使用快捷键&#xff1a;Alt/ 提示 No Default Proposals 为什么使用快捷键&#xff1a;Alt/ 会提示“No Default Proposals。”呢&#xff1f; 网上提示可能是热键冲突 但是一套…

数据可视化大屏电商数据展示平台开发实录(Echarts柱图曲线图、mysql筛选统计语句、时间计算、大数据量统计)

数据可视化大屏电商数据展示平台 一、前言二、项目介绍三、项目展示四、项目经验分享4.1 翻牌器4.1.1 翻牌器-今日实时交易4.1.2.翻牌器后端统计SUM函数的使用 4.2 不同时间指标的数据MySql内部的时间计算 4.3 实时交易播报MySql联表查询和内部遍历循环 4.4 每日交易量4.4.1.近…

5.5 高斯型求积公式简历

学习目标&#xff1a; 我会按照以下步骤学习高斯求积公式简介&#xff1a; 理解积分的概念&#xff1a;学习什么是积分以及积分的几何和物理意义&#xff0c;如面积、质量、电荷等概念。 掌握基本的积分技巧&#xff1a;掌握基本的积分公式和技巧&#xff0c;如换元法、分部积…

流辰信息微服务平台:数字化转型的优良工具!

在互联网迅猛发展的今天&#xff0c;越来越多的企业倾向于新兴领域带来的便利性和灵活性了&#xff0c;其中&#xff0c;微服务平台就是其中之一了。流辰信息微服务平台是专注于研发系统开发、数据治理、数据分析的平台&#xff0c;致力于为各中大小型企业提供优质的微服务解决…

Java——字符串的排列

题目链接 牛客网在线oj题——字符串的排列 题目描述 输入一个长度为 n 字符串&#xff0c;打印出该字符串中字符的所有排列&#xff0c;你可以以任意顺序返回这个字符串数组。 例如输入字符串ABC,则输出由字符A,B,C所能排列出来的所有字符串ABC,ACB,BAC,BCA,CBA和CAB。 数…

打造卓越游戏 | 2023 Google 游戏开发者峰会

一款游戏从初始构想的开发到辉煌赛季的策划&#xff0c;开发者们每时每刻都在倾注心血潜心钻研&#xff0c;Google 也致力于在整个开发和发布生命周期中为您提供帮助。我们很高兴能在今年如约而至的 Google 游戏开发者峰会中与您分享诸多更新&#xff0c;展示我们为助力您打造精…

如何有效的开展接口自动化测试,一篇就行

一、简介 接口自动化测试是指使用自动化测试工具和脚本对软件系统中的接口进行测试的过程。其目的是在软件开发过程中&#xff0c;通过对接口的自动化测试来提高测试效率和测试质量&#xff0c;减少人工测试的工作量和测试成本&#xff0c;并且能够快速发现和修复接口错误&…

PCL点云库(2) — IO模块

目录 2.1 IO模块接口 2.2 PCD数据读写 &#xff08;1&#xff09; PCD数据解析 &#xff08;2&#xff09;PCD文件读写示例 2.3 PLY数据读写 &#xff08;1&#xff09;PLY数据解析 &#xff08;2&#xff09;PLY文件读写示例 2.4 OBJ数据读写 &#xff08;1&#xff…

C语言指针2大问题:指针类型有什么用?指针如何运算?

如题&#xff0c;本篇博客主要解决2个疑点&#xff1a;指针类型的用处&#xff0c;指针如何运算。 1.指针类型 C语言中的指针类型&#xff0c;在X86环境下大小是4个字节&#xff0c;在X64环境下大小是8个字节。既然指针的大小和指针类型无关&#xff0c;那么指针类型究竟有什么…

银行系统【GUI/Swing+MySQL】(Java课设)

系统类型 Swing窗口类型Mysql数据库存储数据 使用范围 适合作为Java课设&#xff01;&#xff01;&#xff01; 部署环境 jdk1.8Mysql8.0Idea或eclipsejdbc 运行效果 ​​​​​​​ 本系统源码地址&#xff1a;​​​​​​​https://download.csdn.net/download/qq_50…

从零开始写ChatGLM大模型的微调代码

cursor 的下载及安装&#xff08;免费版每月100次&#xff0c;升级pro 20刀/月&#xff09; cursor是一款与openai合作的&#xff0c;使用gpt-4的一款编程工具&#xff0c;它可以让你通过gpt-4进行辅助编程&#xff0c;以此提高效率。 下载地址&#xff1a;https://www.curso…

USART串口协议和USART串口外设(USART串口发送串口发送和接收)

1、通信接口 • 通信的目的&#xff1a;将一个设备的数据传送到另一个设备&#xff0c;扩展硬件系统 • 通信协议&#xff1a;制定通信的规则&#xff0c;通信双方按照协议规则进行数据收发 异步&#xff1a;需要双方约定一个频率 2、 硬件电路 • 简单双向串口通信有两根通信…

【Unity-ML】Unity机器学习(一)

安装环境&#xff1a;Windows10 Anaconda3(64-bit)&#xff0c;网上很多教程&#xff0c;例如这个anaconda下载及安装(保姆级教程) - 知乎anaconda包管理器和环境管理器&#xff0c;强烈建议食用 1.下载官网下载太慢可选用镜像下载 官网下载&#xff1a; Anaconda | Individua…

〖ChatGPT实践指南 - 零基础扫盲篇④〗- OpenAI API 相关介绍、提示-Prompt 与 完成-Completion

文章目录 ⭐ OpenAI API介绍⭐ 提示-Prompt 与 完成-Completion 介绍 这一章节将为各位小伙伴介绍一下 OpenAI 的 API 相关内容&#xff0c;以及在 ChatGPT 中两个经常被用来比较的名词&#xff1a;“提示-prompt” 与 “完成-completion”。 ⭐ OpenAI API介绍 OpenAI API 概…

JavaScript常用方法整理

文章目录 前言1.栈方法&#xff1a;push()、pop()2.队列方法&#xff1a;unshift()、shift()3.indexof()、lastIndexOf()、includes()4.操作方法&#xff1a;concat()、slice()、splice()5.Array.isArray()6.排序方法:sort()、reverse()7.转换方法&#xff1a;toString()、join…

【Winform学习笔记(二)】TextBox文本框实现按回车键触发Button事件

TextBox文本框实现按回车键触发Button事件 前言正文1、实现方法2、具体代码3、实现效果 前言 在本文中主要介绍 如何基于 Winform 框架实现 TextBox 文本框实现按回车键触发 Button 事件&#xff0c;该功能可实现在文本框中输入密码后不需要按登录或确定按钮&#xff0c;直接回…

vue页面内嵌iframe使用postMessage进行数据交互(postMessage跨域通信)

什么是postMessage postMessage是html5引入的API,它允许来自不同源的脚本采用异步方式进行有效的通信,可以实现跨文本文档,多窗口,跨域消息传递.多用于窗口间数据通信,这也使它成为跨域通信的一种有效的解决方案. vue父页面&#xff08;嵌入iframe的页面&#xff09; 在vue中…

webAPI学习笔记2(DOM事件高级)

1. 注册事件&#xff08;绑定事件&#xff09; 1.1 注册事件概述 给元素添加事件&#xff0c;称为注册事件或者绑定事件。 注册事件有两种方式&#xff1a;传统方式和方法监听注册方式 传统注册方式 利用 on 开头的事件 onclick <button οnclick“alert(hi~)”><…

如何构建可靠的台账数据——详解台账管理系统的使用方法

随着数字化的发展&#xff0c;越来越多的企业开始采用电子台账管理&#xff0c;实现了对各项业务数据的及时准确保存和管理。而在台账管理应用中&#xff0c;发票管理、工单管理和库房台账是三大重要方面。下面我将详细介绍一下台账管理系统。 一、发票管理 1.收票台账报表 …

【Python小技巧】使用Gradio构建基于ChatGPT的 Web 应用(附源码)

文章目录 前言一、Gradio是什么&#xff1f;二、使用Gradio构建基于ChatGPT的 Web 应用1. 安装gradio库2. 安装openai库&#xff08;ChatGPT的python库&#xff09;3. Web 应用示例&#xff08;源代码&#xff09; 总结 前言 随着人工智能的不断发展&#xff0c;各种智能算法越…