用flask实现数据库flask_sqlalchemy 后台操作

用flask实现了一个简单的后台操作,用于数据的新建、查询、修改以及删除

from flask import Flask
from flask import  render_template
from flask_sqlalchemy import SQLAlchemy
from flask import request,jsonify
import pymysql
import datetime
import json
from werkzeug.exceptions import HTTPException
from flask_cors import *

pymysql.install_as_MySQLdb()


app = Flask(__name__)
# CORS(app, supports_credentials=True)
# r'/*' 是通配符,让本服务器所有的URL 都允许跨域请求
CORS(app, resources=r'/*')

# 载入配置文件
#app.config.from_pyfile('config.ini')
#指定数据库连接还有库名
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:XXXXX/Navigat?charset=utf8'
# 指定配置,用来省略提交操作
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True
#
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# 建立数据库对象
db = SQLAlchemy(app)


class ApiException(HTTPException):
    code = 500
    errmsg = 'sorry ,we make a mistake'
    def __init__(self,code=None,errmsg=None,id=None,header=None):
        if code:
            self.code = code
        if errmsg:
            self.errmsg = errmsg
        super(ApiException,self).__init__(errmsg,None)

    def get_body(self, environ=None):
        body=dict(
            errmsg=self.errmsg
        )
        text = json.dumps(body)
        return text
    def get_headers(self, environ=None) :
        return [('Content-Type','application/json')]
    @staticmethod
    def get_url_no_param():
        full_url = str(request.full_path)
        main_path = full_url.split('?')
        return  main_path[0]

class ClientTypeError(ApiException):
    code = 400
    errmsg = 'client is invalid'

class ParameterException(ApiException):
    code = 400
    errmsg = 'invalid parameter'

@app.errorhandler(Exception)
def framework_error(e):
    if isinstance(e, ApiException):
        return e
    if isinstance(e,HTTPException):
        code=e.code
        errmsg=e.description
        return ApiException(code,errmsg)
    else:
        return e



# 建立数据库类,用来映射数据库表,将数据库的模型作为参数传入
class Navigat(db.Model):
    # 定义表名
    __tablename__ = 'navigat'
    # 定义字段
    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    name = db.Column(db.String(64))
    sort = db.Column(db.Integer)
    content = db.Column(db.String(64))
    parentid = db.Column(db.Integer)
    update_date = db.Column(db.DATE)

def get_info_byID(id):
    navigat_list = []
    sql = 'select * from navigat where id = %d order by sort' % id
    items = db.session.execute(sql)
    for item in items:
        navigat_list.append({
            "id": item.id,
            "name": item.name,
            "sort": item.sort,
            "content": item.content,
            "parentid": item.parentid
        })

    return navigat_list

#新建
def index(name=None,sort=None,content=None,parentid=None):

    now = datetime.datetime.now()
    # 声明对象
    navigat = Navigat(name=name,sort=sort,content=content,parentid=parentid,update_date=now)
    # 调用添加方法
    db.session.add(navigat)
    db.session.flush()
    id = navigat.id
    # 提交入库
    db.session.commit()

    result = {"id": id, "errmsg": None}
    return result

#更改
def edit_navigat(id,newdata):
    # 根据某个字段做修改操作
    # 翻译为 update user set name = '张三' where id = 2

    if get_info_byID(id):
        Navigat.query.filter_by(id=id).update(newdata)
    else:
        raise ApiException(errmsg="id不存在")
    # 提交入库
    db.session.commit()
    result={"errmsg": None}
    return result

#删除
def del_navigat(id):
    # 根据某个字段做删除,filter_by可以理解为where条件限定
    # 翻译为 delete from user where id = 1

    if get_info_byID(id):
         Navigat.query.filter_by(id=id).delete()
        # 提交入库
         db.session.commit()
    else:
        raise ApiException(errmsg="id不存在")
    errmsg = None
    result = {"errmsg": errmsg}
    return result

# 数据库的查询操作(查)
def select_navigat(id=None,name=None):

    navigat_list = []
    if id==None and name==None:
        #全量获取数据
        try:
            nlist = Navigat.query.all()
        except Exception as e:
            db.session.rollback()
            result = {
                "list": "",
                "errmsg": "获取数据失败"
            }
            return result
        for item in nlist:
            navigat_list.append({
            "id": item.id,
            "name": item.name,
            "sort": item.sort,
            "content": item.content,
            "parentid": item.parentid
        })
        result = {
            "list": navigat_list,
             "errmsg": None
            }
        return result

    if id!=None and name==None:
        #根据id查询所有的子项
        try:
            sql='select * from navigat where parentid = %d order by sort' % id
            items = db.session.execute(sql)
        except TypeError as e:
            print(e)
            db.session.rollback()
            result = {
                "list": "",
                "errmsg": "数据类型错误,%s" % (e)
            }
            return result
        except Exception as e:
            db.session.rollback()
            result = {
                "list": "",
                "errmsg": "获取数据失败,%s"%(e)
            }
            return result
        for item in items:
            navigat_list.append({
                "id": item.id,
                "name": item.name,
                "sort": item.sort,
                "content": item.content,
                "parentid": item.parentid
            })

        result = {
                "list": navigat_list,
                "errmsg": None
        }

        return result

    if id==None and name!=None:
        #根据姓名模糊查询
        name="%"+name+"%"
        try:
            nlist = Navigat.query.filter(Navigat.name.ilike(name))
        except Exception as e:
            db.session.rollback()
            result = {
                "list": "",
                "errmsg": "获取数据失败,%s"%(e)
            }
            return  result

        for item in nlist:
            navigat_list.append({
            "id": item.id,
            "name": item.name,
            "sort": item.sort,
            "content": item.content,
            "parentid": item.parentid
        })
        result = {
            "list": navigat_list,
            "errmsg": None
        }
        return result






@app.route('/hello/')
def hello_world():
    return 'Hello World!'

#新建
@app.route('/setNavigat',methods=['POST'])
def setNavigat():
    #请求的数据格式 : {"name":"客户","sort":10,"content":"www.bai","parentid":1}
    #curl -H "Content-Type: application/json" -X POST -d '{"name":"hello,it is me","sort":10,"content":"www.bai","parentid":1}' http://200.200.168.48:2333/setNavigat

    data = request.get_json()
    if 'name' in data and 'sort' in data and 'content' in data and 'parentid' in data :
        name = data['name']
        sort = data['sort']
        content = data['content']
        parentid = data['parentid']
    else:
        raise ParameterException()

    # if name==None or sort==None :
    #     return
    result = index(name,sort,content,parentid)
    result = json.dumps(result)
    return result

#编辑
@app.route('/updateNavigat',methods=['POST'])
def updateNavigat():
    # 请求的数据格式 : {"id":5,"name":"客户","sort":10,"content":"www.bai",parentid:1}
    #curl -H "Content-Type: application/json" -X POST -d '{"id":5,"name":"客户","sort":10,"content":"www.bai"}' http://200.200.168.48:2333/updateNavigat

    newdata = request.get_json()
    if 'id' in newdata :
        id = newdata["id"]
    else:
        raise ParameterException()

    del newdata['id']
    result = edit_navigat(id,newdata)
    result = json.dumps(result)
    return result

#删除
'''
删除逻辑:先判断是否有子节点,如果没有子节点,直接删除
if 有子节点 & 有父节点 将子节点的parentid修改为父节点
if 有子节点 & 无父节点  不允许删除
'''
@app.route('/delNavigat',methods=['POST'])
def delNavigat():
    #curl -H "Content-Type: application/json" -X POST -d '{"id":9}' http://200.200.168.48:2333/delNavigat
    newdata = request.get_json()
    id = newdata["id"]
    navigatlist = select_navigat(id)
    lists = navigatlist['list']
    if lists:
        parentid = get_info_byID(id)[0]['parentid']
        if parentid:
            #双有
            for list  in lists:
                # {"id":33,"parentid":31}
                edit_navigat(id=list['id'],newdata={"parentid":parentid})
            navigatlist=select_navigat(id)
            if navigatlist['list']:
                raise ApiException(errmsg="处理失败,请稍后再试")
            else:
                result = del_navigat(id)
        else:
            raise ApiException(errmsg="根目录不允许删除")
    else:
        result = del_navigat(id)

    result = json.dumps(result)
    return result
#全量获取数据
@app.route('/getAllNavigat',methods=['GET'])
def getAllNavigat():
    # return render_template('Navigat.json')  #返回模板数据
    #curl -H "Content-Type: application/json" -X GET -d '{}' http://200.200.168.48:2333/getAllNavigat
    result = select_navigat()
    result = json.dumps(result)
    return result

#获取所有项
@app.route('/getNavigatbyId',methods=['POST'])
def getNavigatbyId():
    #curl -H "Content-Type: application/json" -X POST -d '{"id":"客户"}' http://200.200.168.48:2333/getNavigatbyId
    data = request.get_json()
    id = data["id"]
    result = select_navigat(id=id)
    result = json.dumps(result)
    return result

#搜索
@app.route('/searchNavigatbyName',methods=['POST'])
def searchNavigatbyName():
    #curl -H "Content-Type: application/json" -X POST -d '{"key":"客户"}' http://200.200.168.48:2333/searchNavigatbyName
    data = request.get_json()
    name = data["key"]
    result = select_navigat(name=name)
    result = json.dumps(result)
    return result

if __name__ == '__main__':
    app.run(host='0.0.0.0',port=2333,debug=True)

 

原文地址:https://www.cnblogs.com/sugoi/p/14379136.html

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Jinja2:是Python的Web项目中被广泛应用的模板引擎,是由Python实现的模板语言,Jinja2 的作者也是 Flask 的作者。他的设计思想来源于Django的模板引擎,并扩展了其语法和一系列强大的功能,其是Flask内置的模板语言。
Fullcalendar日历使用,包括视图选择、事件插入、编辑事件、事件状态更改、事件添加和删除、事件拖动调整,自定义头部,加入el-popover显示图片、图片预览、添加附件链接等,支持手机显示。
监听QQ消息并不需要我们写代码,因为市面上已经有很多开源QQ机器人框架,在这里我们使用go-cqhttp官方文档:go-cqhttp如果您感兴趣的话,可以阅读一下官方文档,如果不想看,直接看我的文章即可。
【Flask框架】—— 视图和URL总结
python+web+flask轻量级框架的实战小项目。登录功能,后续功能可自行丰富。
有了这个就可以配置可信IP,关键是不需要企业认证,个人信息就可以做。
本专栏是对Flask官方文档中个人博客搭建进行的归纳总结,与官方文档结合事半功倍。 本人经验,学习一门语言或框架时,请首先阅读官方文档。学习完毕后,再看其他相关文章(如本系列文章),才是正确的学习道路。
本专栏是对Flask官方文档中个人博客搭建进行的归纳总结,与官方文档结合事半功倍。基础薄弱的同学请戳Flask官方文档教程 本人经验,学习一门语言或框架时,请首先阅读官方文档。学习完毕后,再看其他相关文章(如本系列文章),才是正确的学习道路。 如果python都完全不熟悉,一定不要着急学习框架,请首先学习python官方文档,一步一个脚印。要不然从入门到放弃是大概率事件。 Python 官方文档教程
快到年末了 相信大家都在忙着处理年末数据 刚好有一个是对超市的商品库存进行分析的学员案例 真的非常简单~
一个简易的问答系统就这样完成了,当然,这个项目还可以进一步完善,比如 将数据存入Elasticsearch,通过它先进行初步的检索,然后再通过这个系统,当然我们也可以用其他的架构实现。如果你对这系统还有其他的疑问,也可以再下面进行留言!!!
#模版继承和页面之间的调用@app.route("/bl")def bl(): return render_template("file_2.html")主ht
#form表达提交@app.route("/data",methods=['GET','POST']) #methods 让当前路由支持GET 和
#form表达提交@app.route("/data",methods=['GET','POST']) #methods 让当前路由支持GET 和
#session 使用app.secret_key = "dsada12212132dsad1232113"app.config['PERMANENT_SESSION_LI
#文件上传@app.route("/file",methods=['GET','POST'])def file(): if request.meth
#跳转操作:redirect@app.route("/red")def red(): return redirect("/login")
#session 使用app.secret_key = "dsada12212132dsad1232113"app.config['PERMANENT_SESSION_LI
@app.route("/req",methods=['GET','POST'])def req(): print(request.headers)
#模版继承和页面之间的调用@app.route("/bl")def bl(): return render_template("file_2.html")主ht
#文件操作:send_file,支持图片 视频 mp3 文本等@app.route("/img")def img(): return send_file("1.jpg&q