1 环境
SQLAlchemy 2.0.7
PyMySQL 1.0.2
Python 3.8.16
2 背景
SQLAlchemy 工具
-
实现多种数据库连接支持
-
MetaData、automap_base 实现已有数据表反射
-
对原有表的数据inster update delete等高级操作
3 数据库连接引擎即高级查询的相关实例
# coding: utf-8
"""
数据库链接引擎
实现:
1、数据库链接 支持Mysql、PostgreSQL、Oracle
2、sqlalchemy 数据 预览 插入 获取表信息
3、sqlalchemy 相关高级查询
"""
# 连接引擎 MetaData类
from sqlalchemy import create_engine,MetaData,Table
# 创建会话
from sqlalchemy.orm import sessionmaker
# 使用df 插入数据库
import pandas as pd
# automap_base 反射表
from sqlalchemy.ext.automap import automap_base
# 记录报错日志
import logging
logging.basicConfig(filename="test.log",filemode="w",format="%(asctime)s %(name)s:%(levelname)s:%(message)s",datefmt="%d-%M-%Y %H:%M:%S",level=logging.DEBUG)
class Dbbase(object):
"""
db_con : 数据库连接
get_table_dicts : 获取数据库表映射字典
db_show_tables : 获取数据库所有表
db_select_table_columns : 获取数据表的列信息
db_select_table : 查看数据表数据
table_insert_row : 数据表插入单条数据
table_insert_df : 通过df实现批量插入数据
table_insert_df_old : 通过df实现批量插入数据方法2 ,和前面方法调用方式略微不同
get_table_instantiation : 该方法可用于Table创建表对象
db_close :关闭数据库引擎
"""
def __init__(self,db_dict):
self.db_type = db_dict['db_type']
self.db_user = db_dict['db_user']
self.db_passwd = db_dict['db_passwd']
self.db_ip = db_dict['db_ip']
self.db_port = db_dict['db_port']
self.db_name = db_dict['db_name']
if db_dict['db_charset'] :
self.db_charset = db_dict['db_charset']
else:
self.db_charset = 'utf8'
def db_con(self):
if self.db_type == 'MySQL' :
self.engine = create_engine(f'mysql+pymysql://{self.db_user}:{self.db_passwd}@{self.db_ip}:{self.db_port}/{self.db_name}?charset={self.db_charset}')
elif self.db_type == 'PostgreSQL' :
self.engine = create_engine(f'postgresql://{self.db_user}:{self.db_passwd}@{self.db_ip}:{self.db_port}/{self.db_name}')
elif self.db_type == 'Oracle' :
self.engine = create_engine(f'cx_Oracle://{self.db_user}:{self.db_passwd}@{self.db_ip}:{self.db_port}/{self.db_name}')
else:
print('数据库类型暂不支持!!!')
self.session = sessionmaker(bind=self.engine)()
self.conn = self.engine.connect()
# base实列
self.Base = automap_base()
# reflect the tables
self.Base.prepare(self.engine,reflect=True)
# MetaData 实列
self.metadata = MetaData()
try:
# reflect 映射dash_base库下的表结构
self.metadata.reflect(schema=self.db_name,bind=self.engine)
print("数据库连接成功!!!")
except Exception as e:
# 主要错误日志
# logging.error(e)
# 详细日志信息
logging.exception(e)
self.session = ''
print("数据库连接失败!!!")
return self.session,self.conn
def get_table_dicts(self):
# 字典的形式将metadata.tables解析出来,方便检索,也不用再使用Table()类
self.table_dicts = {i.name: i for i in self.metadata.tables.values()}
return self.table_dicts
def db_show_tables(self):
table_name_list = self.table_dicts.keys()
print('table_name_list :\n',table_name_list,'\n')
return table_name_list
def db_select_table_columns(self,table_name):
table_columns_list = [ i.name for i in self.table_dicts[table_name].columns]
print('table_columns_list :\n',table_columns_list,'\n')
return table_columns_list
def get_table_class(self,table_name):
table_class = eval(f'self.Base.classes.{table_name}')
return table_class
def db_select_table(self,table_name,limit_size = 2 ):
if limit_size == -1:
rows = [instance for instance in self.session.query(self.table_dicts[table_name]).all()]
else:
rows = [instance for instance in self.session.query(self.table_dicts[table_name]).limit(limit_size)]
print('rows_list :\n',rows,'\n')
return rows
def table_insert_row(self,row_list):
row_data = dict(zip(self.db_select_table_columns(table_name),row_list))
self.conn.execute(self.get_table_dicts()[table_name].insert().values(row_data))
self.conn.commit()
def table_insert_df(self,data,chunksize = 10000,if_exists = 'append',index = False):
"""
:param table_name: 数据表
:param data: 列表数据
:param chunksize: 缓存值
如果data数据量大,需要设置合理的chunksize值,这和数据库缓存大小有关,
可以设置在50000-10000,如果提示数据库连接超时错误,就将size值调小。
:param if_exists: append 追加 replace 替换覆盖
:param index: True 插入index字段 False 不插入index字段
:return:
"""
"""
示例:
from sqlalchemy.types import DATE,CHAR,VARCHAR
DTYPES = {'col_1字段名称' : DATE,'col_2':CHAR(4),'col_3':VARCHAR(10)}
df.to_sql(....,dtype = DTYPES)
将写入数据表的df中,dtype 指定 根据列名对应的数据类型字段即可
如果使用.to_sql()需要指定dtype类型时,如果数据库中不存在目标表,则相应创建;如果数据库中已经存在目标表,则设置append追加模式写入数据库时,可能会引起字段类型冲突。
"""
df = pd.DataFrame(data,columns=db_base.db_select_table_columns(table_name))
df.to_sql(table_name,con=self.engine,chunksize=chunksize,if_exists=if_exists,index=index) # 暂时不用replace会卡死
def table_insert_df_old(self,index = False):
"""
:param table_name: 数据表
:param data: 列表数据
:param chunksize: 缓存值
如果data数据量大,需要设置合理的chunksize值,这和数据库缓存大小有关,
可以设置在50000-10000,如果提示数据库连接超时错误,就将size值调小。
:param if_exists: append 追加 replace 替换覆盖
:param index: True 插入index字段 False 不插入index字段
:return:
"""
df = pd.DataFrame(data,columns=db_base.db_select_table_columns(table_name))
pd.io.sql.to_sql(df,con = self.engine,schema = self.db_name,if_exists = if_exists,index=index)
def get_table_instantiation(self,table_name):
table_instantiation = Table(table_name,self.metadata,schema=self.db_name)
return table_instantiation
def table_truncate(self,table_name):
self.conn.execute(text(f'TRUNCATE TABLE {table_name}'))
self.conn.close()
def db_close(self):
self.conn.close()
self.session.close()
self.engine.dispose()
if __name__ == '__main__':
db_dict = {}
db_dict['db_type'] = 'MySQL'
db_dict['db_user'] = 'root'
db_dict['db_passwd'] = 'root'
db_dict['db_ip'] = '192.168.10.1'
db_dict['db_port'] = 3306
db_dict['db_name'] = 'test'
db_dict['db_charset'] = 'utf8'
# 实例化类
db_base = Dbbase(db_dict)
# 数据库连接
db_session,db_conn = db_base.db_con()
if db_session:
# 获取数据库表映射
db_table_dicts = db_base.get_table_dicts()
# 查看数据库的数据表
db_base.db_show_tables()
# 查看数据表列名
db_base.db_select_table_columns('users')
# 数据预览
db_base.db_select_table('users')
# # 设置查询前几条数据
# dbb.db_select_table('users',4)
# # 查询数据表所有数据
# dbb.db_select_table('users',-1)
# 数据插入
# 单条数据插入
# 创建列表数据对象
row_list = ['11','ed099099','fred',9]
db_base.table_insert_row('users',row_list)
# df 方式插入数据
# 创建列表数据对象
data = [[13,'13'],[14,'14'],[15,'15'],[16,'16']]
# 方式一
db_base.table_insert_df('users',data)
# 方式二
db_base.table_insert_df_old('users',data)
###############################################################
# automap_base 反射表 的使用
###############################################################
# 多条数据插入
# db_session 批量插入
user_data = [{'id': i,'name':'name%s' %i,'fullname':'fullname%s' %i,'nickname':'nickname%s' %i} for i in range(18,20)]
User = db_base.get_table_class('users')
db_session.bulk_insert_mappings(User,user_data)
# 多条数据更新
# db_session 批量更新
user_data = [{'id': i,'fullname':'test%s' %i,'nickname':'test%s' %i} for i in range(18,20)]
User = db_base.get_table_class('users')
db_session.bulk_update_mappings(User,user_data)
print('全表查询 sql\n:',db_session.query(User))
print('全表查询\n:',db_session.query(User).all(),'\n')
###############################################################
# MetaData 表字典的使用高级查询
#automap_base 反射表查询使用时 User对象替换db_table_dicts['users'],User.id,User.name 替换 db_table_dicts['users'].columns['id','name']
###############################################################
# 查询 可事先定义db_query 即全表查询
# # 自定义数据表查询
db_query_customer = db_session.query(db_table_dicts['users'].columns['id','name'])
# 全表查询
db_query = db_session.query(db_table_dicts['users'])
# 查询 all list
print("查询 all list sql :\n",db_query)
print("查询 all list :\n",db_query.all(),'\n')
# 查询 first() 第一个
print("查询 first() 第一个 :\n",db_query.first(),'\n')
# 筛选 查询 filter()
print("筛选 查询 filter() sql :\n",db_query.filter(db_table_dicts['users'].columns['id'] == 1))
print("筛选 查询 filter() :\n",db_query.filter(db_table_dicts['users'].columns['id'] == 1).all(),'\n')
# 升\降序 查询 order_by()
# 升序
print("升序 查询 order_by() sql :\n",db_query.order_by(db_table_dicts['users'].columns['id']))
print("升序 查询 order_by() :\n",db_query.order_by(db_table_dicts['users'].columns['id']).all(),'\n')
# 降序
print("降序 查询 order_by() sql :\n",db_query.order_by(db_table_dicts['users'].columns['id'].desc()))
print("降序 查询 order_by() :\n",'\n')
# 查询 one() 如果这个结果集少于或者多于一条数据,结论有且只有一条数据的时候才会正常的返回,否则抛出异常
print("查询 one() :\n",db_query.filter(db_table_dicts['users'].columns['id'] == 1).one(),'\n')
# print("查询 one() :\n",db_query.filter(db_table_dicts['users'].columns['id'] == 7).one(),'\n')
# 查询 one_or_none() 在结果集中没有数据的时候也不会抛出异常
print("查询 one_or_none() :\n",db_query.filter(db_table_dicts['users'].columns['id'] == 6).one_or_none())
print("查询 one_or_none() :\n",db_query.filter(db_table_dicts['users'].columns['id'] == 7).one_or_none(),'\n')
# 查询 scalar() 底层调用one()方法,并且如果one()方法没有抛出异常,会返回查询到的第一列的数据
print("查询 scalar() :\n",db_query.filter(db_table_dicts['users'].columns['name'] == "fred").scalar(),'\n')
# 查询 text类型
from sqlalchemy import text
# 升序
print("升序 查询 text类型 sql:\n",db_query.filter(text("id< 3")).order_by(text('id')))
print("升序 查询 text类型:\n",db_query.filter(text("id< 3")).order_by(text('id')).all(),'\n')
# 降序
print("降序 查询 text类型 sql:\n",db_query.filter(text("id< 3")).order_by(text('id desc')))
print("降序 查询 text类型:\n",db_query.filter(text("id< 3")).order_by(text('id desc')).all(),'\n')
# text 带变量方式 用 :传入变量,用params接收
print("查询 text 带变量方式 sql:\n",db_query.filter(text("id< :value")).params(value=4))
print("查询 text 带变量方式:\n",db_query.filter(text("id< :value")).params(value=4).all(),'\n')
# from_statement 原生sql语句
print("查询 from_statement 原生sql语句 sql:\n",db_query.from_statement(text("select * from users where id>:value")).params(value=2))
print("查询 from_statement 原生sql语句:\n",db_query.from_statement(
text("select * from users where id>:value")).params(value=2).all(),'\n')
# and_ or_ 普通的表达式在这里面不好使
from sqlalchemy.sql import and_,asc,desc,or_
print("查询 and_ or_ 普通的表达式 sql:\n",db_query.filter(and_(db_table_dicts['users'].columns['id'] == 4,db_table_dicts['users'].columns['name'] == "ed")))
print("查询 and_ or_ 普通的表达式:\n",db_table_dicts['users'].columns['name'] == "ed")).first())
print("查询 and_ or_ 普通的表达式 sql:\n",db_query.filter(
or_(db_table_dicts['users'].columns['id'] == 4,db_table_dicts['users'].columns['name'] == "ed")).first(),'\n')
# between 大于多少小于多少
print("查询 between 大于多少小于多少 sql:\n",db_query.filter(db_table_dicts['users'].columns['id'].between(1,3)))
print("查询 between 大于多少小于多少:\n",3)).all())
# in_ 在里面对应还有not in等
print("查询 in_ 在里面对应还有not in等 sql:\n",db_query.filter(db_table_dicts['users'].columns['id'].in_([1])))
print("查询 in_ 在里面对应还有not in等:\n",db_query.filter(db_table_dicts['users'].columns['id'].in_([1])).all())
print("查询 notin_ 在里面对应还有not in等 sql:\n",db_query.filter(db_table_dicts['users'].columns['id'].notin_([1])))
print("查询 notin_ 在里面对应还有not in等:\n",db_query.filter(db_table_dicts['users'].columns['id'].notin_([1])).all(),'\n')
# synchronize_session 可以理解为 引用更新 注意:要使用全部查询,使用筛选字段时,更新操作会报错失败
# synchronize_session=False 在原有值的基础上增加和删除 099 str
db_query.filter(db_table_dicts['users'].columns['id'] > 0).update({db_table_dicts['users'].columns['name']: db_table_dicts['users'].columns['name'] + '099'},synchronize_session=False)
db_session.commit()
# synchronize_session=evaluate 在原有值的基础上增加和减少 11,必须是整数类型
db_query.filter(db_table_dicts['users'].columns['id'] > 0).update({db_table_dicts['users'].columns['nickname']: db_table_dicts['users'].columns['nickname'] + 1},synchronize_session="evaluate")
db_session.commit()
# 如果查询条件里有in_,需要在delete()中加如下参数: fetch 删除的更快
db_query.filter(db_table_dicts['users'].columns['id'].in_([1,2])).delete(synchronize_session='fetch')
db_session.commit()
# 计数 注意不能延用前面的db_query,使用db_session.query 查询
from sqlalchemy import func
# 简单func.count()计数查询
print('简单func.count()计数查询 sql:\n',db_session.query(func.count(db_table_dicts['users'].columns['id'])))
print('简单func.count()计数查询:\n',db_session.query(func.count(db_table_dicts['users'].columns['id'])).first(),'\n')
# 如果想实现select count(*) from users,可以通过以下方式来实现:
print('实现select count(*) sql:\n',db_session.query(func.count("*")).select_from(db_table_dicts['users']))
print('实现select count(*):\n',db_session.query(func.count("*")).select_from(db_table_dicts['users']).scalar(),'\n')
# 如果指定了要查找的表的字段,可以省略select_from()方法:
print('实现select count(*) sql:\n',db_session.query(func.count(db_table_dicts['users'].columns['id'])))
print('实现select count(*):\n',db_session.query(func.count(db_table_dicts['users'].columns['id'])).scalar(),'\n')
# limit、offset和切片
"""
limit:可以限制每次查询的时候只查询几条数据。
offset:可以限制查找数据的时候过滤掉前面多少条。
切片 slice :可以对Query对象使用切片操作,来获取想要的数据。
"""
print('实现 limit 查询 sql:\n',db_query.limit(2))
print('实现 limit 查询:\n',db_query.limit(2).all(),'\n')
print('实现 limit offset 查询 sql:\n',db_query.limit(2).offset(1))
print('实现 limit offset 查询:\n',db_query.limit(2).offset(1).all(),'\n')
print('实现 切片 查询 sql:\n',db_query.slice(2,5))
print('实现 切片 查询:\n',5).all(),'\n')
# group_by
# 比如我想根据名字来分组, 统计每个名字分组里面有多少人
from sqlalchemy import func
# 我想根据名字来分组, 统计每个名字分组里面有多少人
print('实现 group_by 查询 sql:\n',db_session.query(db_table_dicts['users'].columns['id'],func.count(db_table_dicts['users'].columns['id'])).group_by(db_table_dicts['users'].columns['id']))
print('实现 group_by 查询:\n',db_session.query(db_table_dicts['users'].columns['name'],func.count(db_table_dicts['users'].columns['name'])).group_by(db_table_dicts['users'].columns['name']).all(),'\n')
# having
# having是对查找结果进一步过滤。比如只想要看未成年人的数量,那么可以首先对年龄进行分组统计人数,然后再对分组进行having过滤
print('实现 having 查询 sql:\n',func.count(db_table_dicts['users'].columns['name'])).group_by(db_table_dicts['users'].columns['id']).having(db_table_dicts['users'].columns['id'] >= 2))
print('实现 having 查询:\n',func.count(db_table_dicts['users'].columns['name'])).group_by(db_table_dicts['users'].columns['id']).having(db_table_dicts['users'].columns['id'] >= 2).all(),'\n')
# join
print('实现 join 查询 sql:\n',db_session.query(db_table_dicts['users'],db_table_dicts['addresses']).join(db_table_dicts['addresses']))
print('实现 join 查询:\n',db_table_dicts['addresses']).join(db_table_dicts['addresses']).all(),'\n')
print('实现 outerjoin 查询 sql:\n',db_table_dicts['addresses']).outerjoin(db_table_dicts['addresses']))
print('实现 outerjoin 查询:\n',db_table_dicts['addresses']).outerjoin(db_table_dicts['addresses']).all(),'\n')
# 别名
# 当多表查询的时候,有时候同一个表要用到多次,这时候用别名就可以方便的解决命名冲突的问题了
from sqlalchemy.orm import aliased
adalias1 = aliased(db_table_dicts['users'])
adalias2 = aliased(db_table_dicts['addresses'])
print('实现 aliased 查询 sql:\n',db_session.query(adalias1.columns['id'],adalias1.columns['id'],adalias2.columns['id']).join(adalias1))
print('实现 aliased 查询:\n',adalias2.columns['id']).join(adalias1).all(),'\n')
# 子查询
from sqlalchemy.sql import func
print('创建查询 sql:\n',db_session.query( db_table_dicts['addresses'].columns['id'].label('user_id'),func.count("*").label('address_count')).group_by(db_table_dicts['addresses'].columns['id']))
print('创建查询:\n',func.count("*").label('address_count')).group_by(db_table_dicts['addresses'].columns['id']).all(),'\n')
print('创建子查询 sql:\n',db_session.query(db_table_dicts['addresses'].columns['id'].label('user_id'),func.count("*").label('address_count')).group_by(db_table_dicts['addresses'].columns['id']).subquery())
addr = db_session.query(db_table_dicts['addresses'].columns['id'].label('user_id'),func.count("*").label('address_count')).group_by(db_table_dicts['addresses'].columns['id']).subquery()
print('子查询带入父查询 sql:\n',addr.c.address_count).outerjoin(addr,db_table_dicts['users'].columns['id'] == addr.c.user_id ).order_by(db_table_dicts['users'].columns['id'] ))
print('子查询带入父查询:\n',db_table_dicts['users'].columns['id'] == addr.c.user_id ).order_by(db_table_dicts['users'].columns['id'] ).all(),'\n')
# update
print('update sql:\n',db_query.filter(db_table_dicts['users'].columns['id'] == '1').update({'name':'test'}))
db_session.commit()
print('update 结果:\n',db_query.filter(db_table_dicts['users'].columns['id'] == '1').all(),'\n')
# 条件删除 delete
print('delete sql:\n',db_query.filter(db_table_dicts['users'].columns['id'] == '2').delete())
db_session.commit()
print('delete 结果:\n',db_query.filter(db_table_dicts['users'].columns['id'] == '2').all(),'\n')
表数据清空
print('update sql:\n',db_query.delete())
db_session.commit()
print('update 结果:\n','\n')
TRUNCATE TABLE 实现
db_base.table_truncate('users_copy1')
# 关闭数据库
db_base.db_close()
运行结果:
数据库连接成功!!!
table_name_list :
dict_keys(['addresses','users','keywords','post_keywords','posts','users_copy1'])
table_columns_list :
['id','name','fullname','nickname']
rows_list :
[(1,'test','6'),(3,'6')]
全表查询 sql
: SELECT users.id AS users_id,users.name AS users_name,users.fullname AS users_fullname,users.nickname AS users_nickname
FROM users
全表查询
: [<sqlalchemy.ext.automap.users object at 0x7fe069bee580>,<sqlalchemy.ext.automap.users object at 0x7fe069bee520>,<sqlalchemy.ext.automap.users object at 0x7fe069bee640>,<sqlalchemy.ext.automap.users object at 0x7fe069bee6a0>,<sqlalchemy.ext.automap.users object at 0x7fe069bee700>,<sqlalchemy.ext.automap.users object at 0x7fe069bee760>,<sqlalchemy.ext.automap.users object at 0x7fe069bee7c0>,<sqlalchemy.ext.automap.users object at 0x7fe069bee820>,<sqlalchemy.ext.automap.users object at 0x7fe069bee880>,<sqlalchemy.ext.automap.users object at 0x7fe069bee8e0>,<sqlalchemy.ext.automap.users object at 0x7fe069bee940>,<sqlalchemy.ext.automap.users object at 0x7fe069bee9a0>,<sqlalchemy.ext.automap.users object at 0x7fe069beea00>,<sqlalchemy.ext.automap.users object at 0x7fe069beea60>]
查询 all list sql :
SELECT test.users.id AS test_users_id,test.users.name AS test_users_name,test.users.fullname AS test_users_fullname,test.users.nickname AS test_users_nickname
FROM test.users
查询 all list :
[(1,(4,(5,'fred099099','Fred Flintstone',(6,(9,'9'),(10,(11,(13,'13'),(14,'14'),(15,'15'),(16,'16'),(18,'name18','test18','test18'),(19,'name19','test19','test19')]
查询 first() 第一个 :
(1,'6')
筛选 查询 filter() sql :
SELECT test.users.id AS test_users_id,test.users.nickname AS test_users_nickname
FROM test.users
WHERE test.users.id = %(id_1)s
筛选 查询 filter() :
[(1,'6')]
升序 查询 order_by() sql :
SELECT test.users.id AS test_users_id,test.users.nickname AS test_users_nickname
FROM test.users ORDER BY test.users.id
升序 查询 order_by() :
[(1,'test19')]
降序 查询 order_by() sql :
SELECT test.users.id AS test_users_id,test.users.nickname AS test_users_nickname
FROM test.users ORDER BY test.users.id DESC
降序 查询 order_by() :
[(1,'test19')]
查询 one() :
(1,'6')
查询 one_or_none() :
(6,'6')
查询 one_or_none() :
None
查询 scalar() :
None
升序 查询 text类型 sql:
SELECT test.users.id AS test_users_id,test.users.nickname AS test_users_nickname
FROM test.users
WHERE id< 3 ORDER BY id
升序 查询 text类型:
[(1,'6')]
降序 查询 text类型 sql:
SELECT test.users.id AS test_users_id,test.users.nickname AS test_users_nickname
FROM test.users
WHERE id< 3 ORDER BY id desc
降序 查询 text类型:
[(1,'6')]
查询 text 带变量方式 sql:
SELECT test.users.id AS test_users_id,test.users.nickname AS test_users_nickname
FROM test.users
WHERE id< %(value)s
查询 text 带变量方式:
[(1,'6')]
查询 from_statement 原生sql语句 sql:
select * from users where id>%(value)s
查询 from_statement 原生sql语句:
[(3,'test19')]
查询 and_ or_ 普通的表达式 sql:
SELECT test.users.id AS test_users_id,test.users.nickname AS test_users_nickname
FROM test.users
WHERE test.users.id = %(id_1)s AND test.users.name = %(name_1)s
查询 and_ or_ 普通的表达式:
None
查询 and_ or_ 普通的表达式 sql:
SELECT test.users.id AS test_users_id,test.users.nickname AS test_users_nickname
FROM test.users
WHERE test.users.id = %(id_1)s OR test.users.name = %(name_1)s
查询 and_ or_ 普通的表达式:
(4,'6')
查询 between 大于多少小于多少 sql:
SELECT test.users.id AS test_users_id,test.users.nickname AS test_users_nickname
FROM test.users
WHERE test.users.id BETWEEN %(id_1)s AND %(id_2)s
查询 between 大于多少小于多少:
[(1,'6')]
查询 in_ 在里面对应还有not in等 sql:
SELECT test.users.id AS test_users_id,test.users.nickname AS test_users_nickname
FROM test.users
WHERE test.users.id IN (__[POSTCOMPILE_id_1])
查询 in_ 在里面对应还有not in等:
[(1,'6')]
查询 notin_ 在里面对应还有not in等 sql:
SELECT test.users.id AS test_users_id,test.users.nickname AS test_users_nickname
FROM test.users
WHERE (test.users.id NOT IN (__[POSTCOMPILE_id_1]))
查询 notin_ 在里面对应还有not in等:
[(3,'test19')]
简单func.count()计数查询 sql:
SELECT count(test.users.id) AS count_1
FROM test.users
简单func.count()计数查询:
(14,)
实现select count(*) sql:
SELECT count(%(count_2)s) AS count_1
FROM test.users
实现select count(*):
14
实现select count(*) sql:
SELECT count(test.users.id) AS count_1
FROM test.users
实现select count(*):
14
实现 limit 查询 sql:
SELECT test.users.id AS test_users_id,test.users.nickname AS test_users_nickname
FROM test.users
LIMIT %(param_1)s
实现 limit 查询:
[(1,'6')]
实现 limit offset 查询 sql:
SELECT test.users.id AS test_users_id,test.users.nickname AS test_users_nickname
FROM test.users
LIMIT %(param_1)s,%(param_2)s
实现 limit offset 查询:
[(3,'6')]
实现 切片 查询 sql:
SELECT test.users.id AS test_users_id,%(param_2)s
实现 切片 查询:
[(4,'6')]
实现 group_by 查询 sql:
SELECT test.users.id AS test_users_id,count(test.users.id) AS count_1
FROM test.users GROUP BY test.users.id
实现 group_by 查询:
[('ed099099',10),('fred099099',1),('name18',('name19',('test',1)]
实现 having 查询 sql:
SELECT test.users.id AS test_users_id,count(test.users.name) AS count_1
FROM test.users GROUP BY test.users.id
HAVING test.users.id >= %(id_1)s
实现 having 查询:
[(3,1)]
实现 join 查询 sql:
SELECT test.users.id AS test_users_id,test.users.nickname AS test_users_nickname,test.addresses.id AS test_addresses_id,test.addresses.email_address AS test_addresses_email_address,test.addresses.user_id AS test_addresses_user_id
FROM test.users INNER JOIN test.addresses ON test.users.id = test.addresses.user_id
实现 join 查询:
[(1,'6',1,'11@qq.com',1)]
实现 outerjoin 查询 sql:
SELECT test.users.id AS test_users_id,test.addresses.user_id AS test_addresses_user_id
FROM test.users LEFT OUTER JOIN test.addresses ON test.users.id = test.addresses.user_id
实现 outerjoin 查询:
[(1,None,None),'9','13','14','15','16',None)]
实现 aliased 查询 sql:
SELECT users_1.id AS users_1_id,users_1.id AS users_1_id__1,addresses_1.id AS addresses_1_id
FROM test.addresses AS addresses_1 INNER JOIN test.users AS users_1 ON users_1.id = addresses_1.user_id
实现 aliased 查询:
[(1,1)]
创建查询 sql:
SELECT test.addresses.id AS user_id,count(%(count_1)s) AS address_count
FROM test.addresses GROUP BY test.addresses.id
创建查询:
[(1,1)]
创建子查询 sql:
SELECT test.addresses.id AS user_id,count(:count_1) AS address_count
FROM test.addresses GROUP BY test.addresses.id
子查询带入父查询 sql:
SELECT test.users.id AS test_users_id,anon_1.address_count AS anon_1_address_count
FROM test.users LEFT OUTER JOIN (SELECT test.addresses.id AS user_id,count(%(count_1)s) AS address_count
FROM test.addresses GROUP BY test.addresses.id) AS anon_1 ON test.users.id = anon_1.user_id ORDER BY test.users.id
子查询带入父查询:
[(1,None)]
update sql:
1
update 结果:
[(1,'6')]
delete sql:
0
delete 结果:
[]
原文地址:https://blog.csdn.net/wisdom_lp/article/details/130274494
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。