写这篇笔记的背景:
现在的数据库种类有很多,有数据仓库hive类型、有mysql数据库类型、也有ClickHouse数据库类型,针对不同的数据库类型,python读取的方式和代码都不一样。所以本文以读取数据库操作为例子,将这些连接不同数据库的操作整合成类。便于代码的使用和扩展。代码可以直接复制过去,修改ip、账号、密码、端口后直接使用
文章目录
- 工具库sqlalchemy简单介绍
- 读取mysql数据的最小单元例子
- 连接mysql的代码封装成一个类
- 读取hive数据的最小单元例子,需要用到impala
- 连接hive的代码封装成一个类
- 读取ClickHouse数据库的最小单元例子
- 连接ClickHouse的代码封装成一个类
工具库sqlalchemy简单介绍
需要用到的工具包是pandas和sqlalchemy。pandas是python的一种数据分析库。sqlalchemy是Python中最有名的ORM工具,特点是操纵Python对象而不是SQL查询,也就是在代码层面考虑的是对象,而不是SQL,体现的是一种程序化思维,这样使得Python程序更加简洁易读。
例如下面的User
类表示的就是
包含id
和name
的user
表
class User(object):def __init__(self, id, name):self.id = idself.name = name
# 下面就是user表
[User('1', 'Michael'),User('2', 'Bob'),User('3', 'Adam')
]
我们可以通过sqlalchemy连接mysql数据库,然后进行各种增删改查,这里只演示查询读取数据。
读取mysql数据的最小单元例子
-
连接数据库,生成数据库引擎
# 各种信息需要按照格式这样子编排 connection_str = 'mysql+pymysql://%s:%s@%s:%d/%s' %(self.user, self.password, self.host, self.port, self.database) # 创建一个连接引擎 engine = sqlalchemy.create_engine(connection_str)
-
根据数据库引擎,创建连接
conn = engine.connect()
-
根据连接读取数据
data = pd.read_sql(sql, conn) # sql就是你自己的sql语句
-
第四步,关闭连接,然后释放引擎
conn.close() # 关闭连接 engine.dispose() # 释放引擎
连接mysql的代码封装成一个类
封装成一个类的好处就是你想要用,直接把这个类拿出来即可。
相关的说明在代码里面有详细的注释,里面的ip、账号、密码、等信息是虚假的。可直接复制过去使用。
import logging
import pandas as pd
import sqlalchemy
from sqlalchemy.orm import sessionmaker
import os
import timeclass MySqlHelper(object):def __init__(self,host='192.168.15.124',port=3306,database='tert_ims',user='spkjz_wyiter',password='7cmoP3PDtueVJQj2q4Az',logger:logging.Logger=None):self.host = hostself.port = portself.database = databaseself.user = userself.password = passwordself.logger = loggerself.connection_str = 'mysql+pymysql://%s:%s@%s:%d/%s' %(self.user, self.password, self.host, self.port, self.database)self.conn = Noneself.cursor = Noneself.engine = Noneself.session = Nonedef create_table_code(self, file_name):'''创建表类代码'''os.system(f'sqlacodegen {self.connection_str} > {file_name}')return self.conndef get_conn(self):'''创建连接或获取连接'''if self.conn is None:engine = self.get_engine()self.conn = engine.connect()return self.conndef get_engine(self):'''创建连接或获取连接'''if self.engine is None:self.engine = sqlalchemy.create_engine(self.connection_str)return self.enginedef get_cursor(self):'''创建连接或获取连接'''if self.cursor is None:self.cursor = self.conn.cursor()return self.cursordef get_session(self) -> sessionmaker:'''创建连接或获取连接'''if self.session is None:engine = self.get_engine()Session = sessionmaker(bind=engine)self.session = Session()return self.sessiondef close_conn(self):'''关闭连接'''if self.conn is not None:self.conn.close()self.conn = Noneself.dispose_engine()def close_session(self):'''关闭连接'''if self.session is not None:self.session.close()self.session = Noneself.dispose_engine()def dispose_engine(self):'''释放engine'''if self.engine is not None:# self.engine.dispose(close=False)self.engine.dispose()self.engine = Nonedef close_cursor(self):'''关闭cursor'''if self.cursor is not None:self.cursor.close()self.cursor = Nonedef get_data(self, sql, auto_close=True) -> pd.DataFrame:'''查询数据'''conn = self.get_conn()data = Nonetry:# 异常重试3次for i in range(3):try:data = pd.read_sql(sql, conn)breakexcept Exception as ex:if i == 2:raise ex # 往外抛出异常time.sleep(60) # 一分钟后重试except Exception as ex:self.logger.exception(ex)raise ex # 往外抛出异常finally:if auto_close:self.close_conn()return data
读取hive数据的最小单元例子,需要用到impala
-
连接数据库,生成数据库引擎
from impala.dbapi import connect import sqlalchemy impala_conn = connect(host=self.host,port=self.port,database=self.database,auth_mechanism=self.auth_mechanism,user=self.user,password=self.password) engine = sqlalchemy.create_engine('impala://', creator=get_impala_conn)
-
根据数据库引擎,创建连接
conn = engine.connect()
-
根据连接读取数据
data = pd.read_sql(sql, conn) # sql就是你自己的sql语句
-
第四步,关闭连接,然后释放引擎
conn.close() # 关闭连接 engine.dispose() # 释放引擎
可以看到,使用sqlalchemy的话,对于hive数据库而言,跟mysql唯一不同的只是在第一步生成引擎的时候有差异,其它用法都没有差异
连接hive的代码封装成一个类
import logging
import pandas as pd
from impala.dbapi import connect
import sqlalchemy
from sqlalchemy.orm import sessionmaker
import os
import timeclass HiveHelper(object):def __init__(self,host='10.2.32.12',port=21051,database='ur_ti_dw',auth_mechanism='LDAP',user='urberi',password='Ur#730exd',logger:logging.Logger=None):self.host = hostself.port = portself.database = databaseself.auth_mechanism = auth_mechanismself.user = userself.password = passwordself.logger = loggerself.impala_conn = Noneself.conn = Noneself.cursor = None # 这里没有详细解释cursor的用法,感兴趣可以去sqlalchemy官网了解self.engine = Noneself.session = None # 这里没有详细解释session的用法,感兴趣可以去sqlalchemy官网了解def create_table_code(self, file_name):'''创建表类代码'''os.system(f'sqlacodegen {self.connection_str} > {file_name}')return self.conndef get_conn(self):'''创建连接或获取连接'''if self.conn is None:engine = self.get_engine()self.conn = engine.connect()return self.conndef get_impala_conn(self):'''创建连接或获取连接'''if self.impala_conn is None:self.impala_conn = connect(host=self.host,port=self.port,database=self.database,auth_mechanism=self.auth_mechanism,user=self.user,password=self.password)return self.impala_conndef get_engine(self):'''创建连接或获取连接'''if self.engine is None:self.engine = sqlalchemy.create_engine('impala://', creator=self.get_impala_conn)return self.enginedef get_cursor(self):'''创建连接或获取连接'''if self.cursor is None:self.cursor = self.conn.cursor()return self.cursordef get_session(self) -> sessionmaker:'''创建连接或获取连接'''if self.session is None:engine = self.get_engine()Session = sessionmaker(bind=engine)self.session = Session()return self.sessiondef close_conn(self):'''关闭连接'''if self.conn is not None:self.conn.close()self.conn = Noneself.dispose_engine()self.close_impala_conn()def close_impala_conn(self):'''关闭impala连接'''if self.impala_conn is not None:self.impala_conn.close()self.impala_conn = Nonedef close_session(self):'''关闭连接'''if self.session is not None:self.session.close()self.session = Noneself.dispose_engine()def dispose_engine(self):'''释放engine'''if self.engine is not None:# self.engine.dispose(close=False)self.engine.dispose()self.engine = Nonedef close_cursor(self):'''关闭cursor'''if self.cursor is not None:self.cursor.close()self.cursor = Nonedef get_data(self, sql, auto_close=True) -> pd.DataFrame:'''查询数据'''conn = self.get_conn()data = Nonetry:# 异常重试3次for i in range(3):try:data = pd.read_sql(sql, conn)breakexcept Exception as ex:if i == 2:raise ex # 往外抛出异常time.sleep(60) # 一分钟后重试except Exception as ex:self.logger.exception(ex)raise ex # 往外抛出异常finally:if auto_close:self.close_conn()return data
读取ClickHouse数据库的最小单元例子
-
连接数据库,生成数据库引擎
import sqlalchemy connection_str = 'clickhouse://%s:%s@%s:%d/%s?ssl=True' %(self.user, self.password, self.host, self.port, self.database) engine = sqlalchemy.create_engine(self.connection_str)
-
根据数据库引擎,创建连接
conn = engine.connect()
-
根据连接读取数据
data = pd.read_sql(sql, conn) # sql就是你自己的sql语句
-
第四步,关闭连接,然后释放引擎
conn.close() # 关闭连接 engine.dispose() # 释放引擎
显而易见,更上面的差别也只是在生成引擎部分而已。不严谨得推测,不管是什么类型数据库,用上面这个流程,根据官网的提示去生成对应的引擎即可。这就是sqlalchemy的优点之处,对所有数据库进行了整合。
连接ClickHouse的代码封装成一个类
同学们,阅读一遍代码很重要,代码都有注释
import logging
import pandas as pd
import sqlalchemy
from sqlalchemy.orm import sessionmaker
import os
import timeclass ClickhouseHelper(object):def __init__(self,host='10.2.160.11',port=8123,database='urbidb',user='admin',password='UrClickhouse123admin',logger:logging.Logger=None):self.host = hostself.port = portself.database = databaseself.user = userself.password = passwordself.logger = loggerself.connection_str = 'clickhouse://%s:%s@%s:%d/%s?ssl=True' %(self.user, self.password, self.host, self.port, self.database)self.conn = Noneself.cursor = Noneself.engine = Noneself.session = Nonedef create_table_code(self, file_name):'''创建表类代码'''os.system(f'sqlacodegen {self.connection_str} > {file_name}')return self.conndef get_conn(self):'''创建连接或获取连接'''if self.conn is None:engine = self.get_engine()self.conn = engine.connect()return self.conndef get_engine(self):'''创建连接或获取连接'''if self.engine is None:self.engine = sqlalchemy.create_engine(self.connection_str)return self.enginedef get_cursor(self):'''创建连接或获取连接'''if self.cursor is None:self.cursor = self.conn.cursor()return self.cursordef get_session(self) -> sessionmaker:'''创建连接或获取连接'''if self.session is None:engine = self.get_engine()Session = sessionmaker(bind=engine)self.session = Session()return self.sessiondef close_conn(self):'''关闭连接'''if self.conn is not None:self.conn.close()self.conn = Noneself.dispose_engine()def close_session(self):'''关闭连接'''if self.session is not None:self.session.close()self.session = Noneself.dispose_engine()def dispose_engine(self):'''释放engine'''if self.engine is not None:# self.engine.dispose(close=False)self.engine.dispose()self.engine = Nonedef close_cursor(self):'''关闭cursor'''if self.cursor is not None:self.cursor.close()self.cursor = Nonedef get_data(self, sql, auto_close=True) -> pd.DataFrame:'''查询数据'''conn = self.get_conn()data = Nonetry:# 异常重试3次for i in range(3):try:data = pd.read_sql(sql, conn)breakexcept Exception as ex:if i == 2:raise ex # 往外抛出异常time.sleep(60) # 一分钟后重试except Exception as ex:self.logger.exception(ex)raise ex # 往外抛出异常finally:if auto_close:self.close_conn()return data
有了这个基础的连接类之后,你可以根据不同类的不同对象去实现数据库的所有增删改查。因为我是做人工智能算法的,所以写入操作和读取操作用得比较多,修改和更新操作用得不多。