在平时的运维过程中,备份数据库这一项是我们必不可少的一个工作,这个事情好比是一把双刃剑,运用好了,会给工作和项目上带来很大帮助;反之,如果没有一个好的备份方案,那这将会是一个........(提桶跑路)
需求由来:公司内,备份数据库的方式可谓是多种多样,每一种备份方式都带有每一个人对它的理解。如果这些备份方案需要给各个项目中的同事去使用,难免会发生使用上的问题,通用性不强。领导后来选择了一位同事的方案(shell脚本+crontab+mailx)来集成了一个定时备份后并发送邮件通知。不久,因为好多同事对Linux的服务配置和shell语言不太精通,导致很多问题的出现。为了使方案更简单、更人性化,我用了一个python脚本集成了这三个动作,使用起来,也都是人性化的配置,请大家使用前务必仔细读配置文件的注释!
一:配置文件(config.py)
#!encoding=utf-8
"""连接数据库信息"""
DB_information ={
'db_ip':'192.168.x.x','db_user':'postgres','db_password':'xxxx','db_database':'postgres','db_port':5432
}
#数据库容器ID
container_id = 'a74ee76ee144'
"""备份目录路径,结尾加 / (必须是已存在的目录)"""
Backup_path = "/home/backup_pgsql/"
"""定时执行的时间"""
#week:星期(0-6;0为周一,6为周日) hour:小时(24小时制) minute:分(如果是05分执行,则写5;00分执行,则写0)
#举个栗子:week:3 hour:20 minute:5代表:每周四晚上的8点05分执行任务
cron_time = {
'week':2,'hour':11,'minute':41
}
"""选择备份的方式:1:单/多模式备份 2:单库备份 3:单/多表备份 4:整库备份 5:多库备份"""
way = '5'
"""1:单/多模式备份"""
#单个模式写一个,多个递增即可
db_schema = 'shilidb'
schemas = [
'kfpt_ht','xh_yw'
]
"""3:单/多表备份"""
#单个表写一个,多个递增即可(格式:模式名.表名)
db_table = 'shilidb'
table_schemas = 'xh_yw'
tables = [
'qxz_tb','qxz_py'
]
"""2:单库备份,填库名"""
backup_one = 'shilidb'
"""5:多库备份,填库名"""
backup_double = [
'data','shilidb','postgres'
]
"""执行备份时的缓存时间(单位秒)"""
#根据库的大小进行调整,库越大,应当将缓存时间调高;最小值建议不低于10;
time_buffer = 10
"""定义删除多少天之前的备份目录"""
del_dirtime = 30
"""发送邮件"""
#username_send:发送人的用户名 password:发送人邮箱的授权码 username_recv:收件人用户名 port:邮箱端口
email_information = {
'mailserver':'smtp.163.com','username_send':'xxxxxxxxxx@163.com','password':'xxxxxxxxxx','port':25
}
#username_to:收件人邮箱,多个收件人按照格式递增添加
username_to = [
'xxxxxxxxx@qq.com','xxxxxxxxx@qq.com','xxxxxxxxx@163.com'
]
"""邮件内容"""
#mail_head:邮件标题 mail_body:邮件内容
email_text = {
'mail_head':'数据库服务器备份通知','mail_body':'数据库已备份完成'
}
二、主程序代码(Postgres_backup.py)
本次版本采用的是针对启动在docker中的postgres数据库进行备份,后续我还会出一个mysql版本!
运行方式:nohup python3 Postgres_backup.py &
#!/usr/bin/python3
#!encoding=utf-8
"""作者:陈浩
更新时间:2022.5.25
名称:Python定时备份数据库(整库、模式、表多功能备份)
"""
import os
import os.path
import logging
import time
import psycopg2 as pgsql
import traceback
import smtplib
from config import *
from email.mime.text import MIMEText
from apscheduler.schedulers.blocking import BlockingScheduler
class DatabaseBR(object):
"""备份、恢复、定时执行的类"""
Time = time.strftime('%Y-%m-%d %H:%M:%S')
Dirtime = time.strftime('%Y%m%d%H')
Dirbackup = 'pg_back_' + Dirtime
sql = "select datname from pg_database;"
logging.basicConfig(level=logging.DEBUG,filename=os.path.abspath(os.path.dirname(__file__)) + '/pgbackup.log',format='%(asctime)s - %(funcName)s{} -%(levelname)s : %(message)s'.format("方法"))
logging.info("检测备份目录是否存在……")
if os.path.exists(Backup_path):
if not os.path.exists(Backup_path + Dirbackup):
logging.warning("正在创建备份目录")
os.mkdir(Backup_path + Dirbackup)
else:
logging.info("备份目录 %s 已存在"%Dirbackup)
else:
logging.error("目录 %s 不存在"%Backup_path)
def conn_pgsql(self):
'''连接数据库'''
logging.info("进入连接数据库的方法")
conn = None
try:
conn = pgsql.connect(database=DB_information['db_database'],user=DB_information['db_user'],password=DB_information['db_password'],host=DB_information['db_ip'],port=DB_information['db_port'])
logging.info("连接Postgres:{}".format(conn))
except Exception as e:
logging.error("pgsql连接报错\n%s"%traceback.format_exc())
if conn != None:
return conn
def execute_time(func):
"""装饰器:计算函数的执行时间"""
from time import time
def wrapper(*args,**kwargs):
start = time()
func_return = func(*args,**kwargs)
end = time()
logging.warning(f'{func.__name__}方法执行的时间为: {end - start}s')
return func_return
return wrapper
def execute_rmdir(func):
"""装饰器:删除指定日期前的备份目录"""
import os,datetime,time,shutil
def rm_daydir(*args,**kwargs):
all_dir = []
for f in list(os.listdir(Backup_path)):
dirs = "{0}{1}".format(Backup_path,f)
if os.path.isdir(dirs):
all_dir.append(dirs)
for i in range(len(all_dir)):
dir_createtime = time.strftime("%Y%m%d",time.localtime(os.path.getctime(all_dir[i])))
del_time = datetime.date.today() - datetime.timedelta(days=del_dirtime)
del_time_str = del_time.strftime("%Y%m%d")
if int(dir_createtime) < int(del_time_str):
logging.warning("正在删除备份目录……")
shutil.rmtree(all_dir[i])
logging.info("已删除{}目录".format(all_dir[i]))
else:
logging.info("没有检测到符合删除日期的备份目录")
func(*args,**kwargs)
return rm_daydir
@execute_rmdir
@execute_time
def backing(self):
"""备份所有库的方法"""
con_pg = None
logging.info("进入执行备份所有库的方法")
logging.info("正在连接数据库……")
try:
con_pg = self.conn_pgsql()
curs = con_pg.cursor()
curs.execute(self.sql)
results = curs.fetchall()
databases = []
for result in results:
result = list(result)
databases.append(result)
logging.info("当前的可备份的数据库有\n{}".format(databases))
for database in databases:
logging.info("开始备份 %s 库"%database[0])
dumps = "docker exec -i -d {0} /bin/bash -c 'pg_dump -U {1} {2} > /{3}.sql'".format(
container_id,DB_information['db_user'],database[0],database[0])
os.popen(dumps)
time.sleep(time_buffer)
for cpsql in databases:
copy_sql = "docker cp {0}:/{1}.sql '{2}'".format(
container_id,cpsql[0],Backup_path + self.Dirbackup + '/')
logging.warning("开始从容器%s中备份到宿主机"%container_id)
os.popen(copy_sql)
time.sleep(time_buffer)
curs.close()
logging.warning("提交事务")
con_pg.commit()
except Exception as e:
logging.error("执行备份所有库的方法报错%s"% traceback.format_exc())
finally:
con_pg.close()
logging.info("关闭数据库连接")
self.emails()
@execute_rmdir
@execute_time
def backup_onedb(self):
"""备份单库的方法"""
logging.info("进入备份单库的方法")
con_pg = None
logging.info("获取到要备份的数据库名称%s"%backup_one)
logging.info("正在连接数据库……")
try:
con_pg = self.conn_pgsql()
curs = con_pg.cursor()
curs.execute(self.sql)
results = curs.fetchall()
databases = []
for result in results:
result = list(result)
databases.append(result[0])
if backup_one in databases:
logging.info("开始备份 %s 库"%backup_one)
dumps = "docker exec -i -d {0} /bin/bash -c 'pg_dump -U {1} {2} > /{3}.sql'".format(
container_id,backup_one,backup_one)
os.popen(dumps)
time.sleep(time_buffer)
copy_sql = "docker cp {0}:/{1}.sql {2}".format(
container_id,Backup_path + self.Dirbackup + '/')
logging.warning("开始从容器%s中备份到宿主机" % container_id)
os.popen(copy_sql)
else:
logging.error("数据库 %s 不存在,请检查后重新运行!"%backup_one)
except Exception as e:
logging.error("执行备份单库的方法报错%s"%traceback.format_exc())
finally:
con_pg.close()
logging.info("关闭数据库连接")
self.emails()
@execute_rmdir
@execute_time
def backup_doubledb(self):
"""备份多个库的方法"""
con_pg = None
logging.info("进入备份多个库的方法")
logging.info("正在连接数据库……")
try:
con_pg = self.conn_pgsql()
curs = con_pg.cursor()
curs.execute(self.sql)
results = curs.fetchall()
database = []
for result in results:
result = list(result)
database.append(result[0])
for backup_doubles in backup_double:
if backup_doubles in database:
logging.info("开始备份 %s 库"%backup_doubles)
dumps = "docker exec -i -d {0} /bin/bash -c 'pg_dump -U {1} {2} > /{3}.sql'".format(
container_id,backup_doubles,backup_doubles)
os.popen(dumps)
time.sleep(time_buffer)
copy_sql = "docker cp {0}:/{1}.sql {2}".format(
container_id,Backup_path + self.Dirbackup + '/')
logging.warning("开始从容器%s中备份到宿主机" % container_id)
os.popen(copy_sql)
time.sleep(time_buffer)
else:
logging.error("数据库 %s 不存在,请检查后重新运行!"%backup_doubles)
except Exception as e:
logging.error("执行备份多库方法报错%s"%traceback.format_exc())
finally:
con_pg.close()
logging.info("关闭数据库连接")
self.emails()
@execute_rmdir
@execute_time
def schemas(self):
"""备份单/多模式的方法"""
logging.info("进入执行备份单/多模式的方法")
con_pg = None
try:
logging.info("正在连接数据库……")
con_pg = self.conn_pgsql()
curs = con_pg.cursor()
curs.execute(self.sql)
results = curs.fetchall()
databases = []
for result in results:
result = list(result)
databases.append(result[0])
if db_schema in databases:
logging.info("开始备份 %s 模式"%schemas)
for schema in schemas:
dumps = "docker exec -i -d {0} /bin/bash -c 'pg_dump -U postgres {1} --schema={2} > /{3}.sql'".format(
container_id,db_schema,schema,schema)
os.popen(dumps)
time.sleep(time_buffer)
copy_sql = "docker cp {0}:/{1}.sql {2}".format(
container_id,Backup_path + self.Dirbackup + '/')
logging.warning("开始从容器%s中备份到宿主机"% container_id)
os.popen(copy_sql)
time.sleep(time_buffer)
else:
logging.error("数据库 %s 不存在,请检查后重新运行!"%db_schema)
except Exception as e:
logging.error("执行备份单/多模式的方法报错\n%s"%traceback.format_exc())
finally:
con_pg.close()
logging.info("关闭数据库连接")
self.emails()
@execute_rmdir
@execute_time
def tables(self):
"""备份单/多表的方法"""
logging.info("进入执行备份单/多表的方法")
con_pg = None
try:
logging.info("正在连接数据库……")
con_pg = self.conn_pgsql()
curs = con_pg.cursor()
curs.execute(self.sql)
results = curs.fetchall()
databases = []
for result in results:
result = list(result)
databases.append(result[0])
if db_table in databases:
logging.info("开始备份 %s 表"%tables)
for table in tables:
dumps = "docker exec -i -d {0} /bin/bash -c 'pg_dump -U postgres {1} -t {2}.{3} > /{4}.sql'".format(
container_id,db_table,table_schemas,table,table)
os.popen(dumps)
time.sleep(time_buffer)
copy_sql = "docker cp {0}:/{1}.sql {2}".format(
container_id,Backup_path + self.Dirbackup + '/')
logging.warning("开始从容器%s中备份到宿主机" % container_id)
os.popen(copy_sql)
time.sleep(time_buffer)
else:
logging.error("数据库 %s 不存在,请检查后重新运行!" % db_table)
except Exception as e:
logging.error("执行备份单/多表的方法报错\n%s"%traceback.format_exc())
finally:
con_pg.close()
logging.info("关闭数据库连接")
self.emails()
@execute_rmdir
@execute_time
def emails(self):
"""发送邮件的方法"""
logging.info("进入执行发送邮件的方法")
try:
mailserver = email_information['mailserver']
username_send = email_information['username_send']
password = email_information['password']
for user in username_to:
username_recv = user
mail = MIMEText(email_text['mail_body'])
mail['Subject'] = email_text['mail_head']
mail['From'] = username_send
mail['To'] = username_recv
smtp = smtplib.SMTP(mailserver,port=email_information['port'])
smtp.login(username_send,password)
smtp.sendmail(username_send,username_recv,mail.as_string())
smtp.quit()
except Exception as e:
logging.error("执行发送邮件的方法报错%s"%traceback.format_exc())
else:
logging.info("执行发送邮件的方法成功,请到邮箱查收!")
@execute_time
def con_task(self,func):
"""执行定时任务的方法"""
logging.info("进入执行定时任务的方法")
try:
blocking = BlockingScheduler() #实例化父类
blocking.add_job(func,'cron',day_of_week=cron_time['week'],hour=cron_time['hour'],minute=cron_time['minute'])
blocking.start()
except (Exception,SystemExit,KeyboardInterrupt) as e:
logging.error("定时任务执行发生错误%s"%traceback.format_exc())
def rmdir(self,day):
"""执行删除指定日期之前备份的方法"""
if __name__ == '__main__':
"""主程序"""
databasebr = DatabaseBR()
if way == '1':
'''单/多模式备份'''
logging.info("===选择了'单/多模式备份',等待定时器执行===")
databasebr.con_task(databasebr.schemas)
elif way == '2':
'''单库'''
logging.info("===选择了'单库备份',等待定时器执行===")
databasebr.con_task(databasebr.backup_onedb)
elif way == '3':
'''单/多表备份'''
logging.info("===选择了'单/多表备份',等待定时器执行===")
databasebr.con_task(databasebr.tables)
elif way == '4':
'''整库备份'''
logging.info("===选择了'整库备份',等待定时器执行===")
databasebr.con_task(databasebr.backing)
elif way == '5':
'''多库备份'''
logging.info("===选择了'多库备份',等待定时器执行===")
databasebr.con_task(databasebr.backup_doubledb)
else:
logging.error("config配置中的'way'选择的备份模式错误!已结束")
三、日志信息
2022-05-19 13:43:01,436 - DatabaseBR方法 -INFO : 检测备份目录是否存在……
2022-05-19 13:43:01,437 - DatabaseBR方法 -INFO : 备份目录 pg_back_2022051913 已存在
2022-05-19 13:43:01,437 - backing方法 -INFO : 进入执行备份数据库的方法
2022-05-19 13:43:01,437 - backing方法 -INFO : 正在连接数据库……
2022-05-19 13:43:01,437 - conn_pgsql方法 -INFO : 进入连接数据库的方法
2022-05-19 13:43:01,449 - conn_pgsql方法 -INFO : 连接Postgres:<connection object at 0x00000133C1E50160; dsn: 'user=postgres password=xxx dbname=postgres host=192.168.1.198 port=5432',closed: 0>
2022-05-19 13:43:01,454 - backing方法 -WARNING : 正在执行sql……
2022-05-19 13:43:01,454 - backing方法 -INFO : 当前的可备份的数据库有
[['postgres'],['template1'],['template0'],['template_postgis'],['postgres001'],['data'],['for study'],['shilidb']]
2022-05-19 13:43:01,454 - backing方法 -INFO : ['postgres']
2022-05-19 13:43:01,454 - backing方法 -INFO : ['template1']
2022-05-19 13:43:01,455 - backing方法 -INFO : ['template0']
2022-05-19 13:43:01,455 - backing方法 -INFO : ['template_postgis']
2022-05-19 13:43:01,455 - backing方法 -INFO : ['postgres001']
2022-05-19 13:43:01,455 - backing方法 -INFO : ['data']
2022-05-19 13:43:01,455 - backing方法 -INFO : ['for study']
2022-05-19 13:43:01,455 - backing方法 -INFO : ['shilidb']
2022-05-19 13:43:01,455 - backing方法 -WARNING : 提交事务
2022-05-19 13:43:01,457 - backing方法 -INFO : 关闭数据库连接
本次内容就这多,使用起来是不是更简单、更方便呢,值得注意的是:我这里是正对pgsql进行备份的,而且是运行在docker容器当中,如果读者需要使用内容,还需把代码读懂后再进行修改,感谢阅读
原文地址:https://blog.csdn.net/weixin_49291148/article/details/124882598
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。