Python ORM之SQLAlchemy 数据库连接引擎实现Mysql、PostgreSQL、Oracle连接以及高级查询的相关实例

1 环境

SQLAlchemy              2.0.7
PyMySQL                 1.0.2
Python 3.8.16

2 背景

SQLAlchemy 工具

  • 实现多种数据库连接支持

  • MetaData、automap_base  实现已有数据表反射

  • 对原有表的数据inster update delete等高级操作

3 数据库连接引擎即高级查询的相关实例

# coding: utf-8
"""
数据库链接引擎
实现:
1、数据库链接 支持Mysql、PostgreSQL、Oracle
2、sqlalchemy 数据 预览 插入 获取表信息
3、sqlalchemy 相关高级查询
"""
# 连接引擎 MetaData类
from sqlalchemy import create_engine,MetaData,Table
# 创建会话
from sqlalchemy.orm import sessionmaker
# 使用df 插入数据库
import pandas as pd
#  automap_base 反射表
from sqlalchemy.ext.automap import automap_base

# 记录报错日志
import logging
logging.basicConfig(filename="test.log",filemode="w",format="%(asctime)s %(name)s:%(levelname)s:%(message)s",datefmt="%d-%M-%Y %H:%M:%S",level=logging.DEBUG)


class Dbbase(object):
    """
    db_con : 数据库连接
    get_table_dicts : 获取数据库表映射字典
    db_show_tables : 获取数据库所有表
    db_select_table_columns : 获取数据表的列信息
    db_select_table : 查看数据表数据
    table_insert_row : 数据表插入单条数据
    table_insert_df : 通过df实现批量插入数据
    table_insert_df_old : 通过df实现批量插入数据方法2 ,和前面方法调用方式略微不同
    get_table_instantiation : 该方法可用于Table创建表对象
    db_close :关闭数据库引擎
    """
    def __init__(self,db_dict):
        self.db_type = db_dict['db_type']
        self.db_user = db_dict['db_user']
        self.db_passwd = db_dict['db_passwd']
        self.db_ip = db_dict['db_ip']
        self.db_port = db_dict['db_port']
        self.db_name = db_dict['db_name']
        if db_dict['db_charset'] :
            self.db_charset = db_dict['db_charset']
        else:
            self.db_charset = 'utf8'

    def db_con(self):
        if self.db_type == 'MySQL' :
            self.engine = create_engine(f'mysql+pymysql://{self.db_user}:{self.db_passwd}@{self.db_ip}:{self.db_port}/{self.db_name}?charset={self.db_charset}')
        elif self.db_type == 'PostgreSQL' :
            self.engine = create_engine(f'postgresql://{self.db_user}:{self.db_passwd}@{self.db_ip}:{self.db_port}/{self.db_name}')
        elif self.db_type == 'Oracle' :
            self.engine = create_engine(f'cx_Oracle://{self.db_user}:{self.db_passwd}@{self.db_ip}:{self.db_port}/{self.db_name}')
        else:
            print('数据库类型暂不支持!!!')
        self.session = sessionmaker(bind=self.engine)()
        self.conn = self.engine.connect()

        # base实列
        self.Base = automap_base()
        # reflect the tables
        self.Base.prepare(self.engine,reflect=True)

        # MetaData 实列
        self.metadata = MetaData()
        try:
            # reflect 映射dash_base库下的表结构
            self.metadata.reflect(schema=self.db_name,bind=self.engine)
            print("数据库连接成功!!!")
        except Exception as e:
            # 主要错误日志
            # logging.error(e)
            # 详细日志信息
            logging.exception(e)
            self.session = ''
            print("数据库连接失败!!!")

        return self.session,self.conn

    def get_table_dicts(self):
        # 字典的形式将metadata.tables解析出来,方便检索,也不用再使用Table()类
        self.table_dicts = {i.name: i for i in self.metadata.tables.values()}
        return self.table_dicts

    def db_show_tables(self):
        table_name_list = self.table_dicts.keys()
        print('table_name_list :\n',table_name_list,'\n')
        return table_name_list

    def db_select_table_columns(self,table_name):
        table_columns_list = [ i.name for i in self.table_dicts[table_name].columns]
        print('table_columns_list :\n',table_columns_list,'\n')
        return table_columns_list

    def get_table_class(self,table_name):
        table_class = eval(f'self.Base.classes.{table_name}')
        return table_class


    def db_select_table(self,table_name,limit_size = 2 ):
        if limit_size == -1:
            rows = [instance for instance in self.session.query(self.table_dicts[table_name]).all()]
        else:
            rows = [instance for instance in self.session.query(self.table_dicts[table_name]).limit(limit_size)]
        print('rows_list :\n',rows,'\n')
        return rows

    def table_insert_row(self,row_list):
        row_data = dict(zip(self.db_select_table_columns(table_name),row_list))
        self.conn.execute(self.get_table_dicts()[table_name].insert().values(row_data))
        self.conn.commit()

    def table_insert_df(self,data,chunksize = 10000,if_exists = 'append',index = False):
        """

        :param table_name: 数据表
        :param data: 列表数据
        :param chunksize: 缓存值
        如果data数据量大,需要设置合理的chunksize值,这和数据库缓存大小有关,
        可以设置在50000-10000,如果提示数据库连接超时错误,就将size值调小。
        :param if_exists: append 追加 replace 替换覆盖
        :param index: True 插入index字段 False 不插入index字段
        :return:
        """
        """
        示例:
        from sqlalchemy.types import DATE,CHAR,VARCHAR 
        DTYPES = {'col_1字段名称' : DATE,'col_2':CHAR(4),'col_3':VARCHAR(10)}
        df.to_sql(....,dtype = DTYPES)
        将写入数据表的df中,dtype 指定 根据列名对应的数据类型字段即可
        如果使用.to_sql()需要指定dtype类型时,如果数据库中不存在目标表,则相应创建;如果数据库中已经存在目标表,则设置append追加模式写入数据库时,可能会引起字段类型冲突。
        """
        df = pd.DataFrame(data,columns=db_base.db_select_table_columns(table_name))
        df.to_sql(table_name,con=self.engine,chunksize=chunksize,if_exists=if_exists,index=index) # 暂时不用replace会卡死

    def table_insert_df_old(self,index = False):
        """

        :param table_name: 数据表
        :param data: 列表数据
        :param chunksize: 缓存值
        如果data数据量大,需要设置合理的chunksize值,这和数据库缓存大小有关,
        可以设置在50000-10000,如果提示数据库连接超时错误,就将size值调小。
        :param if_exists: append 追加 replace 替换覆盖
        :param index: True 插入index字段 False 不插入index字段
        :return:
        """
        df = pd.DataFrame(data,columns=db_base.db_select_table_columns(table_name))
        pd.io.sql.to_sql(df,con = self.engine,schema = self.db_name,if_exists = if_exists,index=index)

    def get_table_instantiation(self,table_name):
        table_instantiation = Table(table_name,self.metadata,schema=self.db_name)
        return table_instantiation

    def table_truncate(self,table_name):
        self.conn.execute(text(f'TRUNCATE TABLE {table_name}'))
        self.conn.close()

    def db_close(self):
        self.conn.close()
        self.session.close()
        self.engine.dispose()



if __name__ == '__main__':
    db_dict = {}
    db_dict['db_type'] = 'MySQL'
    db_dict['db_user'] = 'root'
    db_dict['db_passwd'] = 'root'
    db_dict['db_ip'] = '192.168.10.1'
    db_dict['db_port'] = 3306
    db_dict['db_name'] = 'test'
    db_dict['db_charset'] = 'utf8'

    # 实例化类
    db_base = Dbbase(db_dict)
    # 数据库连接
    db_session,db_conn = db_base.db_con()
    if db_session:
        # 获取数据库表映射
        db_table_dicts = db_base.get_table_dicts()
        # 查看数据库的数据表
        db_base.db_show_tables()
        # 查看数据表列名
        db_base.db_select_table_columns('users')
        # 数据预览
        db_base.db_select_table('users')
        # # 设置查询前几条数据
        # dbb.db_select_table('users',4)
        # # 查询数据表所有数据
        # dbb.db_select_table('users',-1)

        # 数据插入
        # 单条数据插入
        # 创建列表数据对象
        row_list = ['11','ed099099','fred',9]
        db_base.table_insert_row('users',row_list)

        # df 方式插入数据
        # 创建列表数据对象
        data = [[13,'13'],[14,'14'],[15,'15'],[16,'16']]
        # 方式一
        db_base.table_insert_df('users',data)
        # 方式二
        db_base.table_insert_df_old('users',data)

        ###############################################################
        # automap_base 反射表 的使用
        ###############################################################
        # 多条数据插入
        # db_session 批量插入
        user_data = [{'id': i,'name':'name%s' %i,'fullname':'fullname%s' %i,'nickname':'nickname%s' %i} for i in range(18,20)]
        User = db_base.get_table_class('users')
        db_session.bulk_insert_mappings(User,user_data)

        # 多条数据更新
        # db_session 批量更新
        user_data = [{'id': i,'fullname':'test%s' %i,'nickname':'test%s' %i} for i in range(18,20)]
        User = db_base.get_table_class('users')
        db_session.bulk_update_mappings(User,user_data)

        print('全表查询 sql\n:',db_session.query(User))
        print('全表查询\n:',db_session.query(User).all(),'\n')


        ###############################################################
        # MetaData 表字典的使用高级查询
        #automap_base 反射表查询使用时 User对象替换db_table_dicts['users'],User.id,User.name 替换 	db_table_dicts['users'].columns['id','name']
        ###############################################################
        # 查询 可事先定义db_query 即全表查询

        # # 自定义数据表查询
        db_query_customer = db_session.query(db_table_dicts['users'].columns['id','name'])
        # 全表查询
        db_query = db_session.query(db_table_dicts['users'])
        # 查询 all list
        print("查询 all list sql :\n",db_query)
        print("查询 all list :\n",db_query.all(),'\n')

        # 查询 first() 第一个
        print("查询 first() 第一个 :\n",db_query.first(),'\n')

        # 筛选 查询 filter()
        print("筛选 查询 filter() sql :\n",db_query.filter(db_table_dicts['users'].columns['id'] == 1))
        print("筛选 查询 filter() :\n",db_query.filter(db_table_dicts['users'].columns['id'] == 1).all(),'\n')

        # 升\降序 查询 order_by()
        # 升序
        print("升序 查询 order_by() sql :\n",db_query.order_by(db_table_dicts['users'].columns['id']))
        print("升序 查询 order_by() :\n",db_query.order_by(db_table_dicts['users'].columns['id']).all(),'\n')
        # 降序
        print("降序 查询 order_by() sql :\n",db_query.order_by(db_table_dicts['users'].columns['id'].desc()))
        print("降序 查询 order_by() :\n",'\n')

        # 查询 one()  如果这个结果集少于或者多于一条数据,结论有且只有一条数据的时候才会正常的返回,否则抛出异常
        print("查询 one() :\n",db_query.filter(db_table_dicts['users'].columns['id'] == 1).one(),'\n')
        # print("查询 one() :\n",db_query.filter(db_table_dicts['users'].columns['id'] == 7).one(),'\n')

        # 查询 one_or_none() 在结果集中没有数据的时候也不会抛出异常
        print("查询 one_or_none() :\n",db_query.filter(db_table_dicts['users'].columns['id'] == 6).one_or_none())
        print("查询 one_or_none() :\n",db_query.filter(db_table_dicts['users'].columns['id'] == 7).one_or_none(),'\n')

        # 查询 scalar()  底层调用one()方法,并且如果one()方法没有抛出异常,会返回查询到的第一列的数据
        print("查询 scalar() :\n",db_query.filter(db_table_dicts['users'].columns['name'] == "fred").scalar(),'\n')

        # 查询 text类型
        from sqlalchemy import text
        # 升序
        print("升序 查询 text类型 sql:\n",db_query.filter(text("id< 3")).order_by(text('id')))
        print("升序 查询 text类型:\n",db_query.filter(text("id< 3")).order_by(text('id')).all(),'\n')
        # 降序
        print("降序 查询 text类型 sql:\n",db_query.filter(text("id< 3")).order_by(text('id desc')))
        print("降序 查询 text类型:\n",db_query.filter(text("id< 3")).order_by(text('id desc')).all(),'\n')

        # text 带变量方式 用 :传入变量,用params接收
        print("查询 text 带变量方式 sql:\n",db_query.filter(text("id< :value")).params(value=4))
        print("查询 text 带变量方式:\n",db_query.filter(text("id< :value")).params(value=4).all(),'\n')

        # from_statement 原生sql语句
        print("查询 from_statement 原生sql语句 sql:\n",db_query.from_statement(text("select * from users where id>:value")).params(value=2))
        print("查询 from_statement 原生sql语句:\n",db_query.from_statement(
            text("select * from users where id>:value")).params(value=2).all(),'\n')

        # and_ or_ 普通的表达式在这里面不好使
        from sqlalchemy.sql import and_,asc,desc,or_
        print("查询 and_ or_ 普通的表达式 sql:\n",db_query.filter(and_(db_table_dicts['users'].columns['id'] == 4,db_table_dicts['users'].columns['name'] == "ed")))
        print("查询 and_ or_ 普通的表达式:\n",db_table_dicts['users'].columns['name'] == "ed")).first())
        print("查询 and_ or_ 普通的表达式 sql:\n",db_query.filter(
            or_(db_table_dicts['users'].columns['id'] == 4,db_table_dicts['users'].columns['name'] == "ed")).first(),'\n')

        # between 大于多少小于多少
        print("查询 between 大于多少小于多少 sql:\n",db_query.filter(db_table_dicts['users'].columns['id'].between(1,3)))
        print("查询 between 大于多少小于多少:\n",3)).all())

        # in_ 在里面对应还有not in等
        print("查询 in_ 在里面对应还有not in等 sql:\n",db_query.filter(db_table_dicts['users'].columns['id'].in_([1])))
        print("查询 in_ 在里面对应还有not in等:\n",db_query.filter(db_table_dicts['users'].columns['id'].in_([1])).all())
        print("查询 notin_ 在里面对应还有not in等 sql:\n",db_query.filter(db_table_dicts['users'].columns['id'].notin_([1])))
        print("查询 notin_ 在里面对应还有not in等:\n",db_query.filter(db_table_dicts['users'].columns['id'].notin_([1])).all(),'\n')

        # synchronize_session 可以理解为 引用更新 注意:要使用全部查询,使用筛选字段时,更新操作会报错失败

        # synchronize_session=False 在原有值的基础上增加和删除 099 str
        db_query.filter(db_table_dicts['users'].columns['id'] > 0).update({db_table_dicts['users'].columns['name']: db_table_dicts['users'].columns['name'] + '099'},synchronize_session=False)
        db_session.commit()

        # synchronize_session=evaluate 在原有值的基础上增加和减少 11,必须是整数类型
        db_query.filter(db_table_dicts['users'].columns['id'] > 0).update({db_table_dicts['users'].columns['nickname']: db_table_dicts['users'].columns['nickname'] + 1},synchronize_session="evaluate")
        db_session.commit()

        # 如果查询条件里有in_,需要在delete()中加如下参数: fetch  删除的更快
        db_query.filter(db_table_dicts['users'].columns['id'].in_([1,2])).delete(synchronize_session='fetch')
        db_session.commit()

        # 计数 注意不能延用前面的db_query,使用db_session.query 查询

        from sqlalchemy import func
        # 简单func.count()计数查询
        print('简单func.count()计数查询 sql:\n',db_session.query(func.count(db_table_dicts['users'].columns['id'])))
        print('简单func.count()计数查询:\n',db_session.query(func.count(db_table_dicts['users'].columns['id'])).first(),'\n')

        # 如果想实现select count(*) from users,可以通过以下方式来实现:
        print('实现select count(*) sql:\n',db_session.query(func.count("*")).select_from(db_table_dicts['users']))
        print('实现select count(*):\n',db_session.query(func.count("*")).select_from(db_table_dicts['users']).scalar(),'\n')

        # 如果指定了要查找的表的字段,可以省略select_from()方法:
        print('实现select count(*) sql:\n',db_session.query(func.count(db_table_dicts['users'].columns['id'])))
        print('实现select count(*):\n',db_session.query(func.count(db_table_dicts['users'].columns['id'])).scalar(),'\n')


        # limit、offset和切片

        """
        limit:可以限制每次查询的时候只查询几条数据。  
        offset:可以限制查找数据的时候过滤掉前面多少条。  
        切片 slice :可以对Query对象使用切片操作,来获取想要的数据。 
        """

        print('实现 limit 查询 sql:\n',db_query.limit(2))
        print('实现 limit 查询:\n',db_query.limit(2).all(),'\n')

        print('实现 limit offset 查询 sql:\n',db_query.limit(2).offset(1))
        print('实现 limit offset 查询:\n',db_query.limit(2).offset(1).all(),'\n')

        print('实现 切片 查询 sql:\n',db_query.slice(2,5))
        print('实现 切片 查询:\n',5).all(),'\n')

        # group_by

        # 比如我想根据名字来分组, 统计每个名字分组里面有多少人

        from sqlalchemy import func
        # 我想根据名字来分组, 统计每个名字分组里面有多少人
        print('实现 group_by 查询 sql:\n',db_session.query(db_table_dicts['users'].columns['id'],func.count(db_table_dicts['users'].columns['id'])).group_by(db_table_dicts['users'].columns['id']))
        print('实现 group_by 查询:\n',db_session.query(db_table_dicts['users'].columns['name'],func.count(db_table_dicts['users'].columns['name'])).group_by(db_table_dicts['users'].columns['name']).all(),'\n')

        # having

        # having是对查找结果进一步过滤。比如只想要看未成年人的数量,那么可以首先对年龄进行分组统计人数,然后再对分组进行having过滤
        print('实现 having 查询 sql:\n',func.count(db_table_dicts['users'].columns['name'])).group_by(db_table_dicts['users'].columns['id']).having(db_table_dicts['users'].columns['id'] >= 2))
        print('实现 having 查询:\n',func.count(db_table_dicts['users'].columns['name'])).group_by(db_table_dicts['users'].columns['id']).having(db_table_dicts['users'].columns['id'] >= 2).all(),'\n')

        # join
        print('实现 join 查询 sql:\n',db_session.query(db_table_dicts['users'],db_table_dicts['addresses']).join(db_table_dicts['addresses']))
        print('实现 join 查询:\n',db_table_dicts['addresses']).join(db_table_dicts['addresses']).all(),'\n')

        print('实现 outerjoin 查询 sql:\n',db_table_dicts['addresses']).outerjoin(db_table_dicts['addresses']))
        print('实现 outerjoin 查询:\n',db_table_dicts['addresses']).outerjoin(db_table_dicts['addresses']).all(),'\n')

        # 别名

        # 当多表查询的时候,有时候同一个表要用到多次,这时候用别名就可以方便的解决命名冲突的问题了
        from sqlalchemy.orm import aliased

        adalias1 = aliased(db_table_dicts['users'])
        adalias2 = aliased(db_table_dicts['addresses'])

        print('实现 aliased 查询 sql:\n',db_session.query(adalias1.columns['id'],adalias1.columns['id'],adalias2.columns['id']).join(adalias1))
        print('实现 aliased 查询:\n',adalias2.columns['id']).join(adalias1).all(),'\n')

        # 子查询

        from sqlalchemy.sql import func

        print('创建查询 sql:\n',db_session.query( db_table_dicts['addresses'].columns['id'].label('user_id'),func.count("*").label('address_count')).group_by(db_table_dicts['addresses'].columns['id']))
        print('创建查询:\n',func.count("*").label('address_count')).group_by(db_table_dicts['addresses'].columns['id']).all(),'\n')

        print('创建子查询 sql:\n',db_session.query(db_table_dicts['addresses'].columns['id'].label('user_id'),func.count("*").label('address_count')).group_by(db_table_dicts['addresses'].columns['id']).subquery())
        addr = db_session.query(db_table_dicts['addresses'].columns['id'].label('user_id'),func.count("*").label('address_count')).group_by(db_table_dicts['addresses'].columns['id']).subquery()
        print('子查询带入父查询 sql:\n',addr.c.address_count).outerjoin(addr,db_table_dicts['users'].columns['id'] == addr.c.user_id ).order_by(db_table_dicts['users'].columns['id'] ))
        print('子查询带入父查询:\n',db_table_dicts['users'].columns['id'] == addr.c.user_id ).order_by(db_table_dicts['users'].columns['id'] ).all(),'\n')

        # update
        print('update sql:\n',db_query.filter(db_table_dicts['users'].columns['id'] == '1').update({'name':'test'}))
        db_session.commit()
        print('update 结果:\n',db_query.filter(db_table_dicts['users'].columns['id'] == '1').all(),'\n')

        # 条件删除 delete
        print('delete sql:\n',db_query.filter(db_table_dicts['users'].columns['id'] == '2').delete())
        db_session.commit()
        print('delete 结果:\n',db_query.filter(db_table_dicts['users'].columns['id'] == '2').all(),'\n')

        表数据清空
        print('update sql:\n',db_query.delete())
        db_session.commit()
        print('update 结果:\n','\n')
        TRUNCATE TABLE 实现
        db_base.table_truncate('users_copy1')

        # 关闭数据库
        db_base.db_close()

运行结果:

数据库连接成功!!!
table_name_list :
 dict_keys(['addresses','users','keywords','post_keywords','posts','users_copy1']) 

table_columns_list :
 ['id','name','fullname','nickname'] 

rows_list :
 [(1,'test','6'),(3,'6')] 

全表查询 sql
: SELECT users.id AS users_id,users.name AS users_name,users.fullname AS users_fullname,users.nickname AS users_nickname 
FROM users
全表查询
: [<sqlalchemy.ext.automap.users object at 0x7fe069bee580>,<sqlalchemy.ext.automap.users object at 0x7fe069bee520>,<sqlalchemy.ext.automap.users object at 0x7fe069bee640>,<sqlalchemy.ext.automap.users object at 0x7fe069bee6a0>,<sqlalchemy.ext.automap.users object at 0x7fe069bee700>,<sqlalchemy.ext.automap.users object at 0x7fe069bee760>,<sqlalchemy.ext.automap.users object at 0x7fe069bee7c0>,<sqlalchemy.ext.automap.users object at 0x7fe069bee820>,<sqlalchemy.ext.automap.users object at 0x7fe069bee880>,<sqlalchemy.ext.automap.users object at 0x7fe069bee8e0>,<sqlalchemy.ext.automap.users object at 0x7fe069bee940>,<sqlalchemy.ext.automap.users object at 0x7fe069bee9a0>,<sqlalchemy.ext.automap.users object at 0x7fe069beea00>,<sqlalchemy.ext.automap.users object at 0x7fe069beea60>] 

查询 all list sql :
 SELECT test.users.id AS test_users_id,test.users.name AS test_users_name,test.users.fullname AS test_users_fullname,test.users.nickname AS test_users_nickname 
FROM test.users
查询 all list :
 [(1,(4,(5,'fred099099','Fred Flintstone',(6,(9,'9'),(10,(11,(13,'13'),(14,'14'),(15,'15'),(16,'16'),(18,'name18','test18','test18'),(19,'name19','test19','test19')] 

查询 first() 第一个 :
 (1,'6') 

筛选 查询 filter() sql :
 SELECT test.users.id AS test_users_id,test.users.nickname AS test_users_nickname 
FROM test.users 
WHERE test.users.id = %(id_1)s
筛选 查询 filter() :
 [(1,'6')] 

升序 查询 order_by() sql :
 SELECT test.users.id AS test_users_id,test.users.nickname AS test_users_nickname 
FROM test.users ORDER BY test.users.id
升序 查询 order_by() :
 [(1,'test19')] 

降序 查询 order_by() sql :
 SELECT test.users.id AS test_users_id,test.users.nickname AS test_users_nickname 
FROM test.users ORDER BY test.users.id DESC
降序 查询 order_by() :
 [(1,'test19')] 

查询 one() :
 (1,'6') 

查询 one_or_none() :
 (6,'6')
查询 one_or_none() :
 None 

查询 scalar() :
 None 

升序 查询 text类型 sql:
 SELECT test.users.id AS test_users_id,test.users.nickname AS test_users_nickname 
FROM test.users 
WHERE id< 3 ORDER BY id
升序 查询 text类型:
 [(1,'6')] 

降序 查询 text类型 sql:
 SELECT test.users.id AS test_users_id,test.users.nickname AS test_users_nickname 
FROM test.users 
WHERE id< 3 ORDER BY id desc
降序 查询 text类型:
 [(1,'6')] 

查询 text 带变量方式 sql:
 SELECT test.users.id AS test_users_id,test.users.nickname AS test_users_nickname 
FROM test.users 
WHERE id< %(value)s
查询 text 带变量方式:
 [(1,'6')] 

查询 from_statement 原生sql语句 sql:
 select * from users where id>%(value)s
查询 from_statement 原生sql语句:
 [(3,'test19')] 

查询 and_ or_ 普通的表达式 sql:
 SELECT test.users.id AS test_users_id,test.users.nickname AS test_users_nickname 
FROM test.users 
WHERE test.users.id = %(id_1)s AND test.users.name = %(name_1)s
查询 and_ or_ 普通的表达式:
 None
查询 and_ or_ 普通的表达式 sql:
 SELECT test.users.id AS test_users_id,test.users.nickname AS test_users_nickname 
FROM test.users 
WHERE test.users.id = %(id_1)s OR test.users.name = %(name_1)s
查询 and_ or_ 普通的表达式:
 (4,'6') 

查询 between 大于多少小于多少 sql:
 SELECT test.users.id AS test_users_id,test.users.nickname AS test_users_nickname 
FROM test.users 
WHERE test.users.id BETWEEN %(id_1)s AND %(id_2)s
查询 between 大于多少小于多少:
 [(1,'6')]
查询 in_ 在里面对应还有not in等 sql:
 SELECT test.users.id AS test_users_id,test.users.nickname AS test_users_nickname 
FROM test.users 
WHERE test.users.id IN (__[POSTCOMPILE_id_1])
查询 in_ 在里面对应还有not in等:
 [(1,'6')]
查询 notin_ 在里面对应还有not in等 sql:
 SELECT test.users.id AS test_users_id,test.users.nickname AS test_users_nickname 
FROM test.users 
WHERE (test.users.id NOT IN (__[POSTCOMPILE_id_1]))
查询 notin_ 在里面对应还有not in等:
 [(3,'test19')] 

简单func.count()计数查询 sql:
 SELECT count(test.users.id) AS count_1 
FROM test.users
简单func.count()计数查询:
 (14,) 

实现select count(*) sql:
 SELECT count(%(count_2)s) AS count_1 
FROM test.users
实现select count(*):
 14 

实现select count(*) sql:
 SELECT count(test.users.id) AS count_1 
FROM test.users
实现select count(*):
 14 

实现 limit 查询 sql:
 SELECT test.users.id AS test_users_id,test.users.nickname AS test_users_nickname 
FROM test.users 
 LIMIT %(param_1)s
实现 limit 查询:
 [(1,'6')] 

实现 limit offset 查询 sql:
 SELECT test.users.id AS test_users_id,test.users.nickname AS test_users_nickname 
FROM test.users 
 LIMIT %(param_1)s,%(param_2)s
实现 limit offset 查询:
 [(3,'6')] 

实现 切片 查询 sql:
 SELECT test.users.id AS test_users_id,%(param_2)s
实现 切片 查询:
 [(4,'6')] 

实现 group_by 查询 sql:
 SELECT test.users.id AS test_users_id,count(test.users.id) AS count_1 
FROM test.users GROUP BY test.users.id
实现 group_by 查询:
 [('ed099099',10),('fred099099',1),('name18',('name19',('test',1)] 

实现 having 查询 sql:
 SELECT test.users.id AS test_users_id,count(test.users.name) AS count_1 
FROM test.users GROUP BY test.users.id 
HAVING test.users.id >= %(id_1)s
实现 having 查询:
 [(3,1)] 

实现 join 查询 sql:
 SELECT test.users.id AS test_users_id,test.users.nickname AS test_users_nickname,test.addresses.id AS test_addresses_id,test.addresses.email_address AS test_addresses_email_address,test.addresses.user_id AS test_addresses_user_id 
FROM test.users INNER JOIN test.addresses ON test.users.id = test.addresses.user_id
实现 join 查询:
 [(1,'6',1,'11@qq.com',1)] 

实现 outerjoin 查询 sql:
 SELECT test.users.id AS test_users_id,test.addresses.user_id AS test_addresses_user_id 
FROM test.users LEFT OUTER JOIN test.addresses ON test.users.id = test.addresses.user_id
实现 outerjoin 查询:
 [(1,None,None),'9','13','14','15','16',None)] 

实现 aliased 查询 sql:
 SELECT users_1.id AS users_1_id,users_1.id AS users_1_id__1,addresses_1.id AS addresses_1_id 
FROM test.addresses AS addresses_1 INNER JOIN test.users AS users_1 ON users_1.id = addresses_1.user_id
实现 aliased 查询:
 [(1,1)] 

创建查询 sql:
 SELECT test.addresses.id AS user_id,count(%(count_1)s) AS address_count 
FROM test.addresses GROUP BY test.addresses.id
创建查询:
 [(1,1)] 

创建子查询 sql:
 SELECT test.addresses.id AS user_id,count(:count_1) AS address_count 
FROM test.addresses GROUP BY test.addresses.id
子查询带入父查询 sql:
 SELECT test.users.id AS test_users_id,anon_1.address_count AS anon_1_address_count 
FROM test.users LEFT OUTER JOIN (SELECT test.addresses.id AS user_id,count(%(count_1)s) AS address_count 
FROM test.addresses GROUP BY test.addresses.id) AS anon_1 ON test.users.id = anon_1.user_id ORDER BY test.users.id
子查询带入父查询:
 [(1,None)] 

update sql:
 1
update 结果:
 [(1,'6')] 

delete sql:
 0
delete 结果:
 []

原文地址:https://blog.csdn.net/wisdom_lp/article/details/130274494

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


文章浏览阅读773次,点赞6次,收藏9次。【代码】c# json字符串转Oracle的insert into的小程序。
文章浏览阅读8.7k次,点赞2次,收藏17次。此现象一般定位到远端的监听服务来找问题,在远端查看监听服务状态(具体看下面的解决方案会详细呈现),服务是否开启,另外查看监听端点概要是否存在host未指向到计算名的,如无直接进入监听配置文件listener.ora内添加指向即可。2、查看监听服务状态 lsnrctl status,右边为远端端点状态,未添加host指向到计算名;1、本地及远端安装好Oracle并配置好连接,Oracle服务和监听已启动;1、远程Oracle数据库:Oracle11g R2。或者进入下述服务手动重启。,再进行远程连接即可。_ora-12541:tns:无监听程序
文章浏览阅读2.8k次。mysql脚本转化为oracle脚本_mysql建表语句转oracle
文章浏览阅读2.2k次。cx_Oracle报错:cx_Oracle DatabaseError: DPI-1047: Cannot locate a 64-bit Oracle Client library_cx_oracle.databaseerror: dpi-1047: cannot locate a 64-bit oracle client libr
文章浏览阅读1.1k次,点赞38次,收藏35次。本文深入探讨了Oracle数据库的核心要素,包括体系结构、存储结构以及各类参数。通过解析Oracle数据库的体系结构,读者可以深入了解其内部组成和工作原理。存储结构部分介绍了数据在Oracle中的存储方式,从表空间到数据文件的层层逻辑。最后,我们深入探讨了Oracle数据库中各类参数的作用和配置方法,帮助读者更好地理解和优化数据库性能。本文旨在帮助读者全面理解Oracle数据库的运作机制,为其在实践中的应用提供基础和指导。
文章浏览阅读1.5k次。默认自动收集统计信息的时间为晚上10点(周一到周五,4个小时),早上6点(周六,周日,20个小时)由于平时默认每天只收集4小时,时间有点短了,改成每天可收集8小时。oracle 18c中默认是打开的。查看当前自动收集统计信息的时间。_oracle自动收集统计信息
文章浏览阅读929次,点赞18次,收藏20次。只有assm(Automatic Shared Memory Management)模式可以使用大页,需要关闭amm(Memory Manager Process)HugePages_Free: 306 (空闲306页,已使用306-306=0页)防止oracle使用的内存交换,所以设置的大小与oracle配置的sga、pga相关。HugePages_Rsvd: 0 (操作系统承诺给oracle预留的页数)HugePages_Total: 306 (总共306页)_oracle11g 大页
文章浏览阅读801次。例如:10046:0,1,4,8,12。默认redo日志有三个,大小为50M,循环覆盖使用。redo log再覆盖之前,会被归档,形成归档日志。答:不同事件,不同级别。trace的不同级别?_oracle 日志
文章浏览阅读4.2k次,点赞84次,收藏77次。主要讲解MySQL中SQL的DDL语句,其中包括对数据库和表的一系列操作。_sql ddl 新增字段 mysql
文章浏览阅读1.1k次。ON DEMAND:仅在该物化视图“需要”被刷新了,才进行刷新(REFRESH),即更新物化视图,以保证和基表数据的一致性;ON COMMIT:一旦基表有了COMMIT,即事务提交,则立刻刷新,立刻更新物化视图,使得数据和基表一致。Method =>'C',物化视图有三种刷新方式:COMPLETE、FAST和FORCE。物化视图会占用空间,一半可用于大量数据查询时,减缓主表的查询压力使用。例如创建一个物化视图,让对接单位查询。_oracle物化视图定时刷新
文章浏览阅读713次,点赞21次,收藏18次。1.背景介绍在当今的大数据时代,数据量越来越大,传统的关系型数据库已经无法满足业务需求。因此,NoSQL数据库技术迅速崛起,成为企业和开发者的首选。Oracle NoSQL Database是Oracle公司推出的一款分布式NoSQL数据库产品,具有高性能、高可用性和易于扩展等特点。在本文中,我们将深入了解Oracle NoSQL Database的集成与开发者工具,帮助您更好地掌握这款产品的...
文章浏览阅读2.5k次,点赞2次,收藏4次。今天遇见一个问题需要将字段中包含中文字符串的筛选出来。_oracle查询包含中文字符
文章浏览阅读802次。arcmap 在oracle删除表重新创建提示表名存在解决放啊
文章浏览阅读4.3k次,点赞2次,收藏4次。Oracle连接数据库提示 ORA-12638:身份证明检索失败_ora-12638
文章浏览阅读3.4k次,点赞6次,收藏25次。etc/profile是一个全局配置文件,所有用户登录都会使用该文件构建用户环境。与windows配置环境变量是一个道理。选择Linux系统,找到适合自己系统的安装包,我的是CentOS 8 x64。接下来需要登陆Oracle账户才能下载,无账户的可以自己注册一个。Linux中export 命令用于设置或显示环境变量。模式,利用上下键到文档最后,添加以下代码。出现如图所示版本号字样,则说明安装成功。点击下载,勾选1,点击2。记住完整路径用于后面配置。找到Java并点击进去。往下翻,找到Java8。_linux安装jdk1.8
文章浏览阅读2.4w次,点赞26次,收藏109次。JDK 是的简称,也就是 Java 开发工具包。JDK 是整个 Java 的核心,其中JDK包含了 Java 运行环境(Java Runtime Envirnment,简称 JRE),Java 工具(比如 javac、java、javap 等等),以及 Java 基础类库(比如 rt.jar)。最主流的 JDK 是Oracle公司发布的 JDK,除了 Oracle JDK(商业化,更稳定)之外,还有很多公司和组织开发了属于自己的 JDK,比较有名的有IBM JDK(更适合 IBM) 和OpenJDK。_jdk安装教程
文章浏览阅读7.5w次。出现 “java.sql.SQLNonTransientConnectionException:Could not create connection to database server” 的错误通常是由于无法连接到数据库服务器引起的。_java.sql.sqlnontransientconnectionexception: could not create connection to
文章浏览阅读849次,点赞7次,收藏10次。在ClickHouse中创建用户、数据库并进行权限分配是一个重要的管理任务,它涉及到安全性和访问控制。下面是一个基本的指南来帮助你完成这些操作:1. 创建数据库首先,需要创建一个数据库。使用以下命令:CREATE DATABASE IF NOT EXISTS your_database_name;将 your_database_name 替换为你想要的数据库名。2. 创建用户接下来,创建一个新用户。使用以下命令:CREATE USER your_username IDENTIFIED WIT_在clickhouse中如何创建用户 赋权
文章浏览阅读1.2k次,点赞53次,收藏39次。本文是一篇关于Oracle数据库安装和使用的博文摘要。作者以轻松幽默的笔调介绍了自己在实验中掌握的Oracle数据库基本操作,包括使用组件查看命令、配置数据库监听器等。作者也分享了在实验中遇到的一些有趣问题,如SQL语句缺少分号导致的意外错误。此外,作者还强调了登录sys用户和启动实例加载数据库的注意事项,并鼓励读者面对挑战时保持乐观,不断提升自己的能力。整体风格风趣严谨,引人入胜。
文章浏览阅读820次,点赞17次,收藏16次。KingbaseES、xml、dbms_xmlgen、SETSKIPROWS、人大金仓、KingbaseES兼容Oracle包dbms_xmlgen的功能是通过SQL查询将关系表中数据转化为XML文档。转化方式一共有两种:(1)通过查询字符串直接转化。(2)通过上下文句柄转化。对于通过查询字符串直接转化的方式,无法跳过若干行进行查询,只能直接将表格中的所有数据转化为XML文档。