python连接mysql、hive、clickhouse数据库封装类(2023年2月)

news/2024/4/23 19:47:16/文章来源:https://blog.csdn.net/my_name_is_learn/article/details/129243809

写这篇笔记的背景:

现在的数据库种类有很多,有数据仓库hive类型、有mysql数据库类型、也有ClickHouse数据库类型,针对不同的数据库类型,python读取的方式和代码都不一样。所以本文以读取数据库操作为例子,将这些连接不同数据库的操作整合成类。便于代码的使用和扩展。代码可以直接复制过去,修改ip、账号、密码、端口后直接使用

文章目录

    • 工具库sqlalchemy简单介绍
    • 读取mysql数据的最小单元例子
    • 连接mysql的代码封装成一个类
    • 读取hive数据的最小单元例子,需要用到impala
    • 连接hive的代码封装成一个类
    • 读取ClickHouse数据库的最小单元例子
    • 连接ClickHouse的代码封装成一个类

工具库sqlalchemy简单介绍

需要用到的工具包是pandas和sqlalchemy。pandas是python的一种数据分析库。sqlalchemy是Python中最有名的ORM工具,特点是操纵Python对象而不是SQL查询,也就是在代码层面考虑的是对象,而不是SQL,体现的是一种程序化思维,这样使得Python程序更加简洁易读。

例如下面的User类表示的就是

包含idnameuser

class User(object):def __init__(self, id, name):self.id = idself.name = name
# 下面就是user表
[User('1', 'Michael'),User('2', 'Bob'),User('3', 'Adam')
]

我们可以通过sqlalchemy连接mysql数据库,然后进行各种增删改查,这里只演示查询读取数据。

读取mysql数据的最小单元例子

  • 连接数据库,生成数据库引擎

    # 各种信息需要按照格式这样子编排
    connection_str = 'mysql+pymysql://%s:%s@%s:%d/%s' %(self.user, self.password, self.host, self.port, self.database)
    # 创建一个连接引擎
    engine = sqlalchemy.create_engine(connection_str)
  • 根据数据库引擎,创建连接

    conn = engine.connect()
    
  • 根据连接读取数据

    data = pd.read_sql(sql, conn)  # sql就是你自己的sql语句
    
  • 第四步,关闭连接,然后释放引擎

    conn.close()  # 关闭连接
    engine.dispose()  # 释放引擎
    

连接mysql的代码封装成一个类

封装成一个类的好处就是你想要用,直接把这个类拿出来即可。

相关的说明在代码里面有详细的注释,里面的ip、账号、密码、等信息是虚假的。可直接复制过去使用。

import logging
import pandas as pd
import sqlalchemy
from sqlalchemy.orm import sessionmaker
import os
import timeclass MySqlHelper(object):def __init__(self,host='192.168.15.124',port=3306,database='tert_ims',user='spkjz_wyiter',password='7cmoP3PDtueVJQj2q4Az',logger:logging.Logger=None):self.host = hostself.port = portself.database = databaseself.user = userself.password = passwordself.logger = loggerself.connection_str = 'mysql+pymysql://%s:%s@%s:%d/%s' %(self.user, self.password, self.host, self.port, self.database)self.conn = Noneself.cursor = Noneself.engine = Noneself.session = Nonedef create_table_code(self, file_name):'''创建表类代码'''os.system(f'sqlacodegen {self.connection_str} > {file_name}')return self.conndef get_conn(self):'''创建连接或获取连接'''if self.conn is None:engine = self.get_engine()self.conn = engine.connect()return self.conndef get_engine(self):'''创建连接或获取连接'''if self.engine is None:self.engine = sqlalchemy.create_engine(self.connection_str)return self.enginedef get_cursor(self):'''创建连接或获取连接'''if self.cursor is None:self.cursor = self.conn.cursor()return self.cursordef get_session(self) -> sessionmaker:'''创建连接或获取连接'''if self.session is None:engine = self.get_engine()Session = sessionmaker(bind=engine)self.session = Session()return self.sessiondef close_conn(self):'''关闭连接'''if self.conn is not None:self.conn.close()self.conn = Noneself.dispose_engine()def close_session(self):'''关闭连接'''if self.session is not None:self.session.close()self.session = Noneself.dispose_engine()def dispose_engine(self):'''释放engine'''if self.engine is not None:# self.engine.dispose(close=False)self.engine.dispose()self.engine = Nonedef close_cursor(self):'''关闭cursor'''if self.cursor is not None:self.cursor.close()self.cursor = Nonedef get_data(self, sql, auto_close=True) -> pd.DataFrame:'''查询数据'''conn = self.get_conn()data = Nonetry:# 异常重试3次for i in range(3):try:data = pd.read_sql(sql, conn)breakexcept Exception as ex:if i == 2:raise ex # 往外抛出异常time.sleep(60) # 一分钟后重试except Exception as ex:self.logger.exception(ex)raise ex # 往外抛出异常finally:if auto_close:self.close_conn()return data

读取hive数据的最小单元例子,需要用到impala

  • 连接数据库,生成数据库引擎

    from impala.dbapi import connect
    import sqlalchemy
    impala_conn = connect(host=self.host,port=self.port,database=self.database,auth_mechanism=self.auth_mechanism,user=self.user,password=self.password)
    engine = sqlalchemy.create_engine('impala://', creator=get_impala_conn)
    
  • 根据数据库引擎,创建连接

    conn = engine.connect()
    
  • 根据连接读取数据

    data = pd.read_sql(sql, conn)  # sql就是你自己的sql语句
    
  • 第四步,关闭连接,然后释放引擎

    conn.close()  # 关闭连接
    engine.dispose()  # 释放引擎
    

可以看到,使用sqlalchemy的话,对于hive数据库而言,跟mysql唯一不同的只是在第一步生成引擎的时候有差异,其它用法都没有差异

连接hive的代码封装成一个类

import logging
import pandas as pd
from impala.dbapi import connect
import sqlalchemy
from sqlalchemy.orm import sessionmaker
import os
import timeclass HiveHelper(object):def __init__(self,host='10.2.32.12',port=21051,database='ur_ti_dw',auth_mechanism='LDAP',user='urberi',password='Ur#730exd',logger:logging.Logger=None):self.host = hostself.port = portself.database = databaseself.auth_mechanism = auth_mechanismself.user = userself.password = passwordself.logger = loggerself.impala_conn = Noneself.conn = Noneself.cursor = None  # 这里没有详细解释cursor的用法,感兴趣可以去sqlalchemy官网了解self.engine = Noneself.session = None  # 这里没有详细解释session的用法,感兴趣可以去sqlalchemy官网了解def create_table_code(self, file_name):'''创建表类代码'''os.system(f'sqlacodegen {self.connection_str} > {file_name}')return self.conndef get_conn(self):'''创建连接或获取连接'''if self.conn is None:engine = self.get_engine()self.conn = engine.connect()return self.conndef get_impala_conn(self):'''创建连接或获取连接'''if self.impala_conn is None:self.impala_conn = connect(host=self.host,port=self.port,database=self.database,auth_mechanism=self.auth_mechanism,user=self.user,password=self.password)return self.impala_conndef get_engine(self):'''创建连接或获取连接'''if self.engine is None:self.engine = sqlalchemy.create_engine('impala://', creator=self.get_impala_conn)return self.enginedef get_cursor(self):'''创建连接或获取连接'''if self.cursor is None:self.cursor = self.conn.cursor()return self.cursordef get_session(self) -> sessionmaker:'''创建连接或获取连接'''if self.session is None:engine = self.get_engine()Session = sessionmaker(bind=engine)self.session = Session()return self.sessiondef close_conn(self):'''关闭连接'''if self.conn is not None:self.conn.close()self.conn = Noneself.dispose_engine()self.close_impala_conn()def close_impala_conn(self):'''关闭impala连接'''if self.impala_conn is not None:self.impala_conn.close()self.impala_conn = Nonedef close_session(self):'''关闭连接'''if self.session is not None:self.session.close()self.session = Noneself.dispose_engine()def dispose_engine(self):'''释放engine'''if self.engine is not None:# self.engine.dispose(close=False)self.engine.dispose()self.engine = Nonedef close_cursor(self):'''关闭cursor'''if self.cursor is not None:self.cursor.close()self.cursor = Nonedef get_data(self, sql, auto_close=True) -> pd.DataFrame:'''查询数据'''conn = self.get_conn()data = Nonetry:# 异常重试3次for i in range(3):try:data = pd.read_sql(sql, conn)breakexcept Exception as ex:if i == 2:raise ex # 往外抛出异常time.sleep(60) # 一分钟后重试except Exception as ex:self.logger.exception(ex)raise ex # 往外抛出异常finally:if auto_close:self.close_conn()return data

读取ClickHouse数据库的最小单元例子

  • 连接数据库,生成数据库引擎

    
    import sqlalchemy
    connection_str = 'clickhouse://%s:%s@%s:%d/%s?ssl=True' %(self.user, self.password, self.host, self.port, self.database)
    engine = sqlalchemy.create_engine(self.connection_str)
    
  • 根据数据库引擎,创建连接

    conn = engine.connect()
    
  • 根据连接读取数据

    data = pd.read_sql(sql, conn)  # sql就是你自己的sql语句
    
  • 第四步,关闭连接,然后释放引擎

    conn.close()  # 关闭连接
    engine.dispose()  # 释放引擎
    

显而易见,更上面的差别也只是在生成引擎部分而已。不严谨得推测,不管是什么类型数据库,用上面这个流程,根据官网的提示去生成对应的引擎即可。这就是sqlalchemy的优点之处,对所有数据库进行了整合。

连接ClickHouse的代码封装成一个类

同学们,阅读一遍代码很重要,代码都有注释

import logging
import pandas as pd
import sqlalchemy
from sqlalchemy.orm import sessionmaker
import os
import timeclass ClickhouseHelper(object):def __init__(self,host='10.2.160.11',port=8123,database='urbidb',user='admin',password='UrClickhouse123admin',logger:logging.Logger=None):self.host = hostself.port = portself.database = databaseself.user = userself.password = passwordself.logger = loggerself.connection_str = 'clickhouse://%s:%s@%s:%d/%s?ssl=True' %(self.user, self.password, self.host, self.port, self.database)self.conn = Noneself.cursor = Noneself.engine = Noneself.session = Nonedef create_table_code(self, file_name):'''创建表类代码'''os.system(f'sqlacodegen {self.connection_str} > {file_name}')return self.conndef get_conn(self):'''创建连接或获取连接'''if self.conn is None:engine = self.get_engine()self.conn = engine.connect()return self.conndef get_engine(self):'''创建连接或获取连接'''if self.engine is None:self.engine = sqlalchemy.create_engine(self.connection_str)return self.enginedef get_cursor(self):'''创建连接或获取连接'''if self.cursor is None:self.cursor = self.conn.cursor()return self.cursordef get_session(self) -> sessionmaker:'''创建连接或获取连接'''if self.session is None:engine = self.get_engine()Session = sessionmaker(bind=engine)self.session = Session()return self.sessiondef close_conn(self):'''关闭连接'''if self.conn is not None:self.conn.close()self.conn = Noneself.dispose_engine()def close_session(self):'''关闭连接'''if self.session is not None:self.session.close()self.session = Noneself.dispose_engine()def dispose_engine(self):'''释放engine'''if self.engine is not None:# self.engine.dispose(close=False)self.engine.dispose()self.engine = Nonedef close_cursor(self):'''关闭cursor'''if self.cursor is not None:self.cursor.close()self.cursor = Nonedef get_data(self, sql, auto_close=True) -> pd.DataFrame:'''查询数据'''conn = self.get_conn()data = Nonetry:# 异常重试3次for i in range(3):try:data = pd.read_sql(sql, conn)breakexcept Exception as ex:if i == 2:raise ex # 往外抛出异常time.sleep(60) # 一分钟后重试except Exception as ex:self.logger.exception(ex)raise ex # 往外抛出异常finally:if auto_close:self.close_conn()return data

有了这个基础的连接类之后,你可以根据不同类的不同对象去实现数据库的所有增删改查。因为我是做人工智能算法的,所以写入操作和读取操作用得比较多,修改和更新操作用得不多。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_74850.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

能在软路由docker给部署搭建teamsperk服务器么?并且设置好ddns

参考链接(4条消息) 【个人学习总结】使用docker搭建Teamspeak服务器_blcurtain的博客-CSDN博客_teamspeak3 docker(⊙﹏⊙)哎呀,崩溃啦! (tdeh.top)TeamSpeak服务器搭建与使用 - 缘梦の镇 (cmsboy.cn)Openwrt X86 docker运行甜糖-软路由,x86系统,openwrt…

虚拟数字人直播带货相比人工有哪些优势?

新经济时代的到来,彻底改变了传统的消费方式。虚拟数字人的出现,标志着新一波的消费升级到来。虚拟数字人直播带货,不仅降低了商家的带货成本,拉近了商家与消费者的距离,也给消费者带来全新的消费方式。 花西子虚拟形象…

如何查看Spring Boot各版本的变化

目录 1.版本 2.基础特性和使用 3.新增特性和Bug修复 1.版本 打开Spring官网,点进Spring Boot项目我们会发现在不同版本后面会跟着不同的标签: 这些标签对应不同的版本,其意思如下: GA正式版本,通常意味着该版本已…

k8s学习之路 | Day16 k8s 中的容器初探

文章目录容器镜像镜像名称镜像拉取策略私有仓库的拉取策略容器的环境变量和启动命令容器的环境变量容器的启动命令容器的生命周期钩子postStartpreStop容器的探针startupProbelivenessProbereadinessProbek8s 集群中最小的管理单元就是一个Pod,而Pod里面才是容器&am…

利用GPT-3 Fine-tunes训练专属语言模型

利用GPT-3 Fine-tunes训练专属语言模型 文章目录什么是模型微调(fine-tuning)?为什么需要模型微调?微调 vs 重新训练微调 vs 提示设计训练专属模型数据准备清洗数据构建模型微调模型评估模型部署模型总结什么是模型微调&#xff0…

JavaScript split()方法

JavaScript split()方法 目录JavaScript split()方法一、定义和用法二、语法三、参数值四、返回值五、更多实例5.1 省略分割参数5.2 使用limit参数5.3 使用一个字符作为分割符一、定义和用法 split() 方法用于把一个字符串分割成字符串数组。 二、语法 string.split(separat…

NCRE计算机等级考试Python真题(四)

第四套试题1、以下选项中,不属于需求分析阶段的任务是:A.需求规格说明书评审B.确定软件系统的性能需求C.确定软件系统的功能需求D.制定软件集成测试计划正确答案: D2、关于数据流图(DFD)的描述,以下选项中正…

RTMP的工作原理及优缺点

一.什么是RTMP?RTMP(Real-Time Messaging Protocol,实时消息传输协议)是一种用于低延迟、实时音视频和数据传输的双向互联网通信协议,由Macromedia(后被Adobe收购)开发。RTMP的工作原理是&#…

IP-GUARD控制台账户输入多次错误密码锁定后该如何解锁?

其他管理员账户给锁定了,暂时只能等其锁定时间到了才可以再次输入,默认是设置是锁定30min; 1、如果急需此账户查看,可以使用admin系统管理员账户登录控制台,在工具-账户中清除这个账户的密码,重新登录设置密码。

NIO与零拷贝

目录 一、零拷贝的基本介绍 二、传统IO数据读写的劣势 三、mmap优化 四、sendFile优化 五、 mmap 和 sendFile 的区别 六、零拷贝实战 6.1 传统IO 6.2 NIO中的零拷贝 6.3 运行结果 一、零拷贝的基本介绍 零拷贝是网络编程的关键,很多性能优化都离不开。 在…

【云原生kubernetes】k8s 常用调度策略使用详解

一、前言 通过之前的学习,我们了解到k8s集群中最小工作单位是pod,对于k8s集群来说,一个pod的完整生命周期是由一系列调度策略来控制,这些调度策略具体是怎么工作的呢?本文将详细讨论下这个问题。 二、k8s调度策略简介…

【多目标优化算法】多目标蚱蜢优化算法(Matlab代码实现)

👨‍🎓个人主页:研学社的博客💥💥💞💞欢迎来到本博客❤️❤️💥💥🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密…

APP测试的7大注意点。

1. 运行 1) App安装完成后的试运行,可正常打开软件。 2) App打开测试,是否有加载状态进度提示。 3) App⻚面间的切换是否流畅,逻辑是否正确。 4) 注册 同表单编辑⻚面 用户名密码⻓度 …

快手电商新增商品信息诊断规则,对商家有何影响?

1、2022年快手短剧日活跃用户达2.6亿 新榜讯 近日,快手数据显示,2022年快手短剧日活跃用户达2.6亿,现在的付费用户数对比2022年4月增长超过480%,快手已经是最大的短剧消费市场。此外,2023年快手小游戏日活跃用户峰值超…

一文掌握项目管理工具 —— 时标网络图

一、认识时标网络图二、时标网络图的绘制方法三、自由时差、关键路径、总时差四、时标网络图可解决哪些问题进度调整的问题计算检测点时的 PV 数据资源平滑问题在系统集成项目管理工程师下午的案例分析考试中,有一道计算题分值达 20 分。整套试卷的总分为 75分,考试能否通过合…

ubuntu下用i686-w64-mingw32交叉编译支持SDL、Openssl的ffmpeg库

前言 本篇博客是基于前两篇关于ffmpeg交叉编译下,进行再次编译操作。ubuntu下ffmpeg的交叉编译环境搭建可以参看以下我的这篇博客:https://blog.csdn.net/linyibin_123/article/details/108759367 ; ubuntu下交叉编译openssl及交叉编译支持o…

20分钟6个示例4个动图教你学会Async Hooks

序幕 async_hooks模块提供了一个全新的功能世界,但作为 Node.js 爱好者,我最感兴趣的是,它可以让您轻松了解我们在应用程序中经常执行的一些任务的幕后情况。 在本文中,我将尝试借助async_hooks模块来演示和解释一个典型的异步资源的生命周期。 Async Hooks API 简介 as…

OSWatcher.sh脚本说明

OSWatcher.sh脚本位于oswbb目录下(Oracle 19c数据库中脚本的路径是: /u01/app/oracle/product/19.0.0/dbhome_1/suptools/tfa/release/tfa_home/ext/oswbb/),由脚本startOSWbb.sh和stopOSWbb.sh来调用启动和停止它。 1. startOSW…

《数据库系统概论》学习笔记——第七章 数据库设计

教材为数据库系统概论第五版(王珊) 这一章概念比较多。最重点就是7.4节。 7.1 数据库设计概述 数据库设计定义: 数据库设计是指对于一个给定的应用环境,构造(设计)优化的数据库逻辑模式和物理结构&#x…

C#窗口介绍

窗口就是打开程序我们所面对的一个面板,里面可以添加各种控件,如下图所示,我们可以在属性栏设置其标题名称、图标、大小等。图1 窗口图 图2 设置面板 图3 设置双击标题框,会生成Load函数,也可以到事件里面去找Load函数…