Python定时备份数据库(支持整库、单库、多库备份、邮件通知、定时删除旧备份)

在平时的运维过程中,备份数据库这一项是我们必不可少的一个工作,这个事情好比是一把双刃剑,运用好了,会给工作和项目上带来很大帮助;反之,如果没有一个好的备份方案,那这将会是一个........(提桶跑路)

需求由来:公司内,备份数据库的方式可谓是多种多样,每一种备份方式都带有每一个人对它的理解。如果这些备份方案需要给各个项目中的同事去使用,难免会发生使用上的问题,通用性不强。领导后来选择了一位同事的方案(shell脚本+crontab+mailx)来集成了一个定时备份后并发送邮件通知。不久,因为好多同事对Linux的服务配置和shell语言不太精通,导致很多问题的出现。为了使方案更简单、更人性化,我用了一个python脚本集成了这三个动作,使用起来,也都是人性化的配置,请大家使用前务必仔细读配置文件的注释!

一:配置文件(config.py)

#!encoding=utf-8

"""连接数据库信息"""
DB_information ={
    'db_ip':'192.168.x.x','db_user':'postgres','db_password':'xxxx','db_database':'postgres','db_port':5432
}

#数据库容器ID
container_id = 'a74ee76ee144'

"""备份目录路径,结尾加 / (必须是已存在的目录)"""
Backup_path = "/home/backup_pgsql/"

"""定时执行的时间"""
#week:星期(0-6;0为周一,6为周日) hour:小时(24小时制) minute:分(如果是05分执行,则写5;00分执行,则写0)
#举个栗子:week:3 hour:20 minute:5代表:每周四晚上的8点05分执行任务
cron_time = {
    'week':2,'hour':11,'minute':41
}

"""选择备份的方式:1:单/多模式备份 2:单库备份 3:单/多表备份 4:整库备份 5:多库备份"""
way = '5'

"""1:单/多模式备份"""
#单个模式写一个,多个递增即可
db_schema = 'shilidb'
schemas = [
    'kfpt_ht','xh_yw'
]

"""3:单/多表备份"""
#单个表写一个,多个递增即可(格式:模式名.表名)
db_table = 'shilidb'
table_schemas = 'xh_yw'
tables = [
    'qxz_tb','qxz_py'
]

"""2:单库备份,填库名"""
backup_one = 'shilidb'

"""5:多库备份,填库名"""
backup_double = [
    'data','shilidb','postgres'
]

"""执行备份时的缓存时间(单位秒)"""
#根据库的大小进行调整,库越大,应当将缓存时间调高;最小值建议不低于10;
time_buffer = 10

"""定义删除多少天之前的备份目录"""
del_dirtime = 30

"""发送邮件"""
#username_send:发送人的用户名  password:发送人邮箱的授权码  username_recv:收件人用户名  port:邮箱端口
email_information = {
    'mailserver':'smtp.163.com','username_send':'xxxxxxxxxx@163.com','password':'xxxxxxxxxx','port':25
}
#username_to:收件人邮箱,多个收件人按照格式递增添加
username_to = [
	'xxxxxxxxx@qq.com','xxxxxxxxx@qq.com','xxxxxxxxx@163.com' 
]

"""邮件内容"""
#mail_head:邮件标题  mail_body:邮件内容
email_text = {
    'mail_head':'数据库服务器备份通知','mail_body':'数据库已备份完成'
}

二、主程序代码(Postgres_backup.py)

本次版本采用的是针对启动在docker中的postgres数据库进行备份,后续我还会出一个mysql版本!

运行方式:nohup python3 Postgres_backup.py  &

#!/usr/bin/python3
#!encoding=utf-8
"""作者:陈浩
   更新时间:2022.5.25
   名称:Python定时备份数据库(整库、模式、表多功能备份)
"""
import os
import os.path
import logging
import time
import psycopg2 as pgsql
import traceback
import smtplib
from config import *
from email.mime.text import MIMEText
from apscheduler.schedulers.blocking import BlockingScheduler


class DatabaseBR(object):
    """备份、恢复、定时执行的类"""
    Time = time.strftime('%Y-%m-%d %H:%M:%S')
    Dirtime = time.strftime('%Y%m%d%H')
    Dirbackup = 'pg_back_' + Dirtime
    sql = "select datname from pg_database;"

    logging.basicConfig(level=logging.DEBUG,filename=os.path.abspath(os.path.dirname(__file__)) + '/pgbackup.log',format='%(asctime)s - %(funcName)s{} -%(levelname)s : %(message)s'.format("方法"))
    logging.info("检测备份目录是否存在……")
    if os.path.exists(Backup_path):
        if not os.path.exists(Backup_path + Dirbackup):
            logging.warning("正在创建备份目录")
            os.mkdir(Backup_path + Dirbackup)
        else:
            logging.info("备份目录 %s 已存在"%Dirbackup)
    else:
        logging.error("目录 %s 不存在"%Backup_path)

    def conn_pgsql(self):
        '''连接数据库'''
        logging.info("进入连接数据库的方法")
        conn = None
        try:
            conn = pgsql.connect(database=DB_information['db_database'],user=DB_information['db_user'],password=DB_information['db_password'],host=DB_information['db_ip'],port=DB_information['db_port'])
            logging.info("连接Postgres:{}".format(conn))
        except Exception as e:
            logging.error("pgsql连接报错\n%s"%traceback.format_exc())
        if conn != None:
            return conn

    def execute_time(func):
        """装饰器:计算函数的执行时间"""
        from time import time
        def wrapper(*args,**kwargs):
            start = time()
            func_return = func(*args,**kwargs)
            end = time()
            logging.warning(f'{func.__name__}方法执行的时间为: {end - start}s')
            return func_return
        return wrapper

    def execute_rmdir(func):
        """装饰器:删除指定日期前的备份目录"""
        import os,datetime,time,shutil
        def rm_daydir(*args,**kwargs):
            all_dir = []
            for f in list(os.listdir(Backup_path)):
                dirs = "{0}{1}".format(Backup_path,f)
                if os.path.isdir(dirs):
                    all_dir.append(dirs)
            for i in range(len(all_dir)):
                dir_createtime = time.strftime("%Y%m%d",time.localtime(os.path.getctime(all_dir[i])))
                del_time = datetime.date.today() - datetime.timedelta(days=del_dirtime)
                del_time_str = del_time.strftime("%Y%m%d")
                if int(dir_createtime) < int(del_time_str):
                    logging.warning("正在删除备份目录……")
                    shutil.rmtree(all_dir[i])
                    logging.info("已删除{}目录".format(all_dir[i]))
            else:
                logging.info("没有检测到符合删除日期的备份目录")
            func(*args,**kwargs)
        return rm_daydir

    @execute_rmdir
    @execute_time
    def backing(self):
        """备份所有库的方法"""
        con_pg = None
        logging.info("进入执行备份所有库的方法")
        logging.info("正在连接数据库……")
        try:
            con_pg = self.conn_pgsql()
            curs = con_pg.cursor()
            curs.execute(self.sql)
            results = curs.fetchall()
            databases = []
            for result in results:
                result = list(result)
                databases.append(result)
            logging.info("当前的可备份的数据库有\n{}".format(databases))
            for database in databases:
                logging.info("开始备份 %s 库"%database[0])
                dumps = "docker exec -i -d {0} /bin/bash -c 'pg_dump -U {1} {2} > /{3}.sql'".format(
                        container_id,DB_information['db_user'],database[0],database[0])
                os.popen(dumps)
                time.sleep(time_buffer)
            for cpsql in databases:
                copy_sql = "docker cp {0}:/{1}.sql '{2}'".format(
                            container_id,cpsql[0],Backup_path + self.Dirbackup + '/')
                logging.warning("开始从容器%s中备份到宿主机"%container_id)
                os.popen(copy_sql)
                time.sleep(time_buffer)
            curs.close()
            logging.warning("提交事务")
            con_pg.commit()
        except Exception as e:
            logging.error("执行备份所有库的方法报错%s"% traceback.format_exc())
        finally:
            con_pg.close()
            logging.info("关闭数据库连接")
            self.emails()
    @execute_rmdir
    @execute_time
    def backup_onedb(self):
        """备份单库的方法"""
        logging.info("进入备份单库的方法")
        con_pg = None
        logging.info("获取到要备份的数据库名称%s"%backup_one)
        logging.info("正在连接数据库……")
        try:
            con_pg = self.conn_pgsql()
            curs = con_pg.cursor()
            curs.execute(self.sql)
            results = curs.fetchall()
            databases = []
            for result in results:
                result = list(result)
                databases.append(result[0])
            if backup_one in databases:
                logging.info("开始备份 %s 库"%backup_one)
                dumps = "docker exec -i -d {0} /bin/bash -c 'pg_dump -U {1} {2} > /{3}.sql'".format(
                        container_id,backup_one,backup_one)
                os.popen(dumps)
                time.sleep(time_buffer)
                copy_sql = "docker cp {0}:/{1}.sql {2}".format(
                            container_id,Backup_path + self.Dirbackup + '/')
                logging.warning("开始从容器%s中备份到宿主机" % container_id)
                os.popen(copy_sql)
            else:
                logging.error("数据库 %s 不存在,请检查后重新运行!"%backup_one)
        except Exception as e:
            logging.error("执行备份单库的方法报错%s"%traceback.format_exc())
        finally:
            con_pg.close()
            logging.info("关闭数据库连接")
            self.emails()
    @execute_rmdir
    @execute_time
    def backup_doubledb(self):
        """备份多个库的方法"""
        con_pg = None
        logging.info("进入备份多个库的方法")
        logging.info("正在连接数据库……")
        try:
            con_pg = self.conn_pgsql()
            curs = con_pg.cursor()
            curs.execute(self.sql)
            results = curs.fetchall()
            database = []
            for result in results:
                result = list(result)
                database.append(result[0])
            for backup_doubles in backup_double:
                if backup_doubles in database:
                    logging.info("开始备份 %s 库"%backup_doubles)
                    dumps = "docker exec -i -d {0} /bin/bash -c 'pg_dump -U {1} {2} > /{3}.sql'".format(
                        container_id,backup_doubles,backup_doubles)
                    os.popen(dumps)
                    time.sleep(time_buffer)
                    copy_sql = "docker cp {0}:/{1}.sql {2}".format(
                        container_id,Backup_path + self.Dirbackup + '/')
                    logging.warning("开始从容器%s中备份到宿主机" % container_id)
                    os.popen(copy_sql)
                    time.sleep(time_buffer)
                else:
                    logging.error("数据库 %s 不存在,请检查后重新运行!"%backup_doubles)

        except Exception as e:
            logging.error("执行备份多库方法报错%s"%traceback.format_exc())
        finally:
            con_pg.close()
            logging.info("关闭数据库连接")
            self.emails()
    @execute_rmdir
    @execute_time
    def schemas(self):
        """备份单/多模式的方法"""
        logging.info("进入执行备份单/多模式的方法")
        con_pg = None
        try:
            logging.info("正在连接数据库……")
            con_pg = self.conn_pgsql()
            curs = con_pg.cursor()
            curs.execute(self.sql)
            results = curs.fetchall()
            databases = []
            for result in results:
                result = list(result)
                databases.append(result[0])
            if db_schema in databases:
                logging.info("开始备份 %s 模式"%schemas)
                for schema in schemas:
                    dumps = "docker exec -i -d {0} /bin/bash -c 'pg_dump  -U postgres {1} --schema={2} > /{3}.sql'".format(
                        container_id,db_schema,schema,schema)
                    os.popen(dumps)
                    time.sleep(time_buffer)
                    copy_sql = "docker cp {0}:/{1}.sql {2}".format(
                        container_id,Backup_path + self.Dirbackup + '/')
                    logging.warning("开始从容器%s中备份到宿主机"% container_id)
                    os.popen(copy_sql)
                    time.sleep(time_buffer)
            else:
                logging.error("数据库 %s 不存在,请检查后重新运行!"%db_schema)
        except Exception as e:
            logging.error("执行备份单/多模式的方法报错\n%s"%traceback.format_exc())
        finally:
            con_pg.close()
            logging.info("关闭数据库连接")
            self.emails()
    @execute_rmdir
    @execute_time
    def tables(self):
        """备份单/多表的方法"""
        logging.info("进入执行备份单/多表的方法")
        con_pg = None
        try:
            logging.info("正在连接数据库……")
            con_pg = self.conn_pgsql()
            curs = con_pg.cursor()
            curs.execute(self.sql)
            results = curs.fetchall()
            databases = []
            for result in results:
                result = list(result)
                databases.append(result[0])
            if db_table in databases:
                logging.info("开始备份 %s 表"%tables)
                for table in tables:
                    dumps = "docker exec -i -d {0} /bin/bash -c 'pg_dump -U postgres {1} -t {2}.{3} > /{4}.sql'".format(
                        container_id,db_table,table_schemas,table,table)
                    os.popen(dumps)
                    time.sleep(time_buffer)
                    copy_sql = "docker cp {0}:/{1}.sql {2}".format(
                                container_id,Backup_path + self.Dirbackup + '/')
                    logging.warning("开始从容器%s中备份到宿主机" % container_id)
                    os.popen(copy_sql)
                    time.sleep(time_buffer)
            else:
                logging.error("数据库 %s 不存在,请检查后重新运行!" % db_table)
        except Exception as e:
            logging.error("执行备份单/多表的方法报错\n%s"%traceback.format_exc())
        finally:
            con_pg.close()
            logging.info("关闭数据库连接")
            self.emails()
    @execute_rmdir
    @execute_time
    def emails(self):
        """发送邮件的方法"""
        logging.info("进入执行发送邮件的方法")
        try:
            mailserver = email_information['mailserver']
            username_send = email_information['username_send']
            password = email_information['password']
            for user in username_to:
                username_recv = user
                mail = MIMEText(email_text['mail_body'])
                mail['Subject'] = email_text['mail_head']
                mail['From'] = username_send
                mail['To'] = username_recv

                smtp = smtplib.SMTP(mailserver,port=email_information['port'])
                smtp.login(username_send,password)
                smtp.sendmail(username_send,username_recv,mail.as_string())
                smtp.quit()
        except Exception as e:
            logging.error("执行发送邮件的方法报错%s"%traceback.format_exc())
        else:
            logging.info("执行发送邮件的方法成功,请到邮箱查收!")
    @execute_time
    def con_task(self,func):
        """执行定时任务的方法"""
        logging.info("进入执行定时任务的方法")
        try:
            blocking = BlockingScheduler() #实例化父类
            blocking.add_job(func,'cron',day_of_week=cron_time['week'],hour=cron_time['hour'],minute=cron_time['minute'])
            blocking.start()
        except (Exception,SystemExit,KeyboardInterrupt) as e:
            logging.error("定时任务执行发生错误%s"%traceback.format_exc())

    def rmdir(self,day):
        """执行删除指定日期之前备份的方法"""

if __name__ == '__main__':
    """主程序"""
    databasebr = DatabaseBR()
    if way == '1':
        '''单/多模式备份'''
        logging.info("===选择了'单/多模式备份',等待定时器执行===")
        databasebr.con_task(databasebr.schemas)
    elif way == '2':
        '''单库'''
        logging.info("===选择了'单库备份',等待定时器执行===")
        databasebr.con_task(databasebr.backup_onedb)
    elif way == '3':
        '''单/多表备份'''
        logging.info("===选择了'单/多表备份',等待定时器执行===")
        databasebr.con_task(databasebr.tables)
    elif way == '4':
        '''整库备份'''
        logging.info("===选择了'整库备份',等待定时器执行===")
        databasebr.con_task(databasebr.backing)
    elif way == '5':
        '''多库备份'''
        logging.info("===选择了'多库备份',等待定时器执行===")
        databasebr.con_task(databasebr.backup_doubledb)
    else:
        logging.error("config配置中的'way'选择的备份模式错误!已结束")

三、日志信息

2022-05-19 13:43:01,436 - DatabaseBR方法 -INFO : 检测备份目录是否存在……
2022-05-19 13:43:01,437 - DatabaseBR方法 -INFO : 备份目录 pg_back_2022051913 已存在
2022-05-19 13:43:01,437 - backing方法 -INFO : 进入执行备份数据库的方法
2022-05-19 13:43:01,437 - backing方法 -INFO : 正在连接数据库……
2022-05-19 13:43:01,437 - conn_pgsql方法 -INFO : 进入连接数据库的方法
2022-05-19 13:43:01,449 - conn_pgsql方法 -INFO : 连接Postgres:<connection object at 0x00000133C1E50160; dsn: 'user=postgres password=xxx dbname=postgres host=192.168.1.198 port=5432',closed: 0>
2022-05-19 13:43:01,454 - backing方法 -WARNING : 正在执行sql……
2022-05-19 13:43:01,454 - backing方法 -INFO : 当前的可备份的数据库有
[['postgres'],['template1'],['template0'],['template_postgis'],['postgres001'],['data'],['for study'],['shilidb']]
2022-05-19 13:43:01,454 - backing方法 -INFO : ['postgres']
2022-05-19 13:43:01,454 - backing方法 -INFO : ['template1']
2022-05-19 13:43:01,455 - backing方法 -INFO : ['template0']
2022-05-19 13:43:01,455 - backing方法 -INFO : ['template_postgis']
2022-05-19 13:43:01,455 - backing方法 -INFO : ['postgres001']
2022-05-19 13:43:01,455 - backing方法 -INFO : ['data']
2022-05-19 13:43:01,455 - backing方法 -INFO : ['for study']
2022-05-19 13:43:01,455 - backing方法 -INFO : ['shilidb']
2022-05-19 13:43:01,455 - backing方法 -WARNING : 提交事务
2022-05-19 13:43:01,457 - backing方法 -INFO : 关闭数据库连接

本次内容就这多,使用起来是不是更简单、更方便呢,值得注意的是:我这里是正对pgsql进行备份的,而且是运行在docker容器当中,如果读者需要使用内容,还需把代码读懂后再进行修改,感谢阅读

原文地址:https://blog.csdn.net/weixin_49291148/article/details/124882598

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


文章浏览阅读601次。Oracle的数据导入导出是一项基本的技能,但是对于懂数据库却不熟悉Oracle的同学可能会有一定的障碍。正好在最近的一个项目中碰到了这样一个任务,于是研究了一下Oracle的数据导入导出,在这里跟大家分享一下。......_oracle 迁移方法 对比
文章浏览阅读553次。开头还是介绍一下群,如果感兴趣polardb ,mongodb ,mysql ,postgresql ,redis 等有问题,有需求都可以加群群内有各大数据库行业大咖,CTO,可以解决你的问题。加群请联系 liuaustin3 ,在新加的朋友会分到2群(共700多人左右 1 + 2)。最近我们在使用MYSQL 8 的情况下(8.025)在数据库运行中出现一个问题 参数prefer_order_i..._mysql prefer_ordering_index
文章浏览阅读3.5k次,点赞3次,收藏7次。折腾了两个小时多才成功连上,在这分享一下我的经验,也仅仅是经验分享,有不足的地方欢迎大家在评论区补充交流。_navicat连接opengauss
文章浏览阅读2.7k次。JSON 代表 JavaScript Object Notation。它是一种开放标准格式,将数据组织成中详述的键/值对和数组。_postgresql json
文章浏览阅读2.9k次,点赞2次,收藏6次。navicat 连接postgresql 注:navicat老版本可能报错。1.在springboot中引入我们需要的依赖以及相应版本。用代码生成器生成代码后,即可进行增删改查(略)安装好postgresql 略。更改配置信息(注释中有)_mybatisplus postgresql
文章浏览阅读1.4k次。postgre进阶sql,包含分组排序、JSON解析、修改、删除、更新、强制踢出数据库所有使用用户、连表更新与删除、获取今年第一天、获取近12个月的年月、锁表处理、系统表使用(查询所有表和字段及注释、查询表占用空间)、指定数据库查找模式search_path、postgre备份及还原_pgsql分组取每组第一条
文章浏览阅读3.3k次。上一篇我们学习了日志清理,日志清理虽然解决了日志膨胀的问题,但就无法再恢复检查点之前的一致性状态。因此,我们还需要日志归档,pg的日志归档原理和Oracle类似,不过归档命令需要自己配置。以下代码在postmaster.c除了开启归档外,还需要保证wal_level不能是MINIMAL状态(因为该状态下有些操作不会记录日志)。在db启动时,会同时检查archive_mode和wal_level。以下代码也在postmaster.c(PostmasterMain函数)。......_postgresql archive_mode
文章浏览阅读3k次。系统:ubuntu22.04.3目的:利用向日葵实现windows远程控制ubuntu。_csdn局域网桌面控制ubuntu
文章浏览阅读1.6k次。表分区是解决一些因单表过大引用的性能问题的方式,比如某张表过大就会造成查询变慢,可能分区是一种解决方案。一般建议当单表大小超过内存就可以考虑表分区了。1,继承式分区,分为触发器(trigger)和规则(rule)两种方式触发器的方式1)创建表CREATE TABLE "public"."track_info_trigger_partition" ( "id" serial, "object_type" int2 NOT NULL DEFAULT 0, "object_name..._pg数据表分区的实现
文章浏览阅读3.3k次。物联网平台开源的有几个,就我晓得的有、、thingskit、JetLink、DG-iot(还有其他开源的,欢迎在评论区留言哦!),然后重点分析了下ThingsBoard、ThingsPanel和JetLink,ThingsBoard和Jetlinks是工程师思维产品,可以更多的通过配置去实现开发的目的,ThingsPanel是业务人员思路产品,或者开发或者用,避免了复杂的配置带来的较高学习门槛。ThingsBoard和Jetlinks是Java技术体系的,ThingsPanel是PHP开发的。_jetlinks和thingsboard
文章浏览阅读3.8k次。PostgreSQL 数据类型转换_pgsql数字转字符串
文章浏览阅读7k次,点赞3次,收藏14次。在做数据统计页面时,总会遇到统计某段时间内,每天、每月、每年的数据视图(柱状图、折线图等)。这些统计数据一眼看过去也简单呀,不就是按照时间周期(天、月、年)对统计数据进行分个组就完了嘛?但是会有一个问题,简单的写个sql对周期分组,获取到的统计数据是缺失的,即没有数据的那天,整条记录也都没有了。如下图需求:以当前月份(2023年2月)为起点,往后倒推一年,查询之前一年里每个月的统计数据。可见图中的数据其实是缺少的,这条sql只查询到了有数据的月份(23年的1月、2月,22年的12月)_如何用一条sql查出按年按月按天的汇总
文章浏览阅读3.8k次,点赞66次,收藏51次。PostgreSQL全球开发小组与2022年10月13日,宣布发布PostgreSQL15,这是世界上最先进的开源数据库的最新版本_mysql8 postgresql15
文章浏览阅读1.3k次。上文介绍了磁盘管理器中VFD的实现原理,本篇将从上层角度讲解磁盘管理器的工作细节。_smgrrelationdata
文章浏览阅读1.1k次。PostgreSQL设置中文语言界面和局域网访问_postgressql汉化
文章浏览阅读4.2k次。PostgreSQL 修改数据存储路径_如何设置postgresql 数据目录
文章浏览阅读4.7k次。在项目中用到了多数据源,在连接postgres数据库时,项目启动报错,说数据库连接错误,说dual不存在,网上好多教程都是说数据库查询的时候的大小写问题,而这个仅仅是连接,咋鞥却处理方法是修改application-dev.yml中的配置文件.项目中的druid参数是这样的:确实在配置文件中有个查询语句。_relation "dual" does not exist
文章浏览阅读4.9k次。PostgreSQL是一款强大的关系型数据库,但在实际使用过程中,许多用户经常会遇到慢SQL的问题。这些问题不仅会降低数据库性能,还会直接影响业务流程和用户体验。因此,本文将会深入分析PostgreSQL慢SQL的原因和优化方案,帮助用户更好地利用这个优秀的数据库系统。无论你是初学者还是专业开发者,本文都将为你提供实用的技巧和方法,让你的PostgreSQL数据库始终保持高效快速。_postgresql数据库优化
文章浏览阅读1.6k次。Linux配置postgresql开机自启_linux 启动pgsql
文章浏览阅读2k次。本篇介绍如何在centos7系统搭建一个postgresql主备集群实现最近的HA(高可用)架构。后续更高级的HA模式都是基于这个最基本的主备搭建。_postgresql主备