fastapi结合Manticore Search、elasticsearch、mysql实现全文搜索

1、将数据写入到mysql中

  • 创建测试表
CREATE TABLE `student` (
  `sno` varchar(10) COLLATE utf8mb4_unicode_ci NOT NULL,`sname` varchar(20) COLLATE utf8mb4_unicode_ci DEFAULT NULL,`sage` int(2) DEFAULT NULL,`ssex` varchar(5) COLLATE utf8mb4_unicode_ci DEFAULT NULL,`description` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL,PRIMARY KEY (`sno`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
  • 测试表插入数据
insert into student values ('s001','张三',23,'男','张三是个好学生'); 
insert into student values ('s002','李四','张三是个好学生');  
insert into student values ('s003','吴鹏',25,'张三是个好学生');  
insert into student values ('s004','琴沁',20,'女','张三是个好学生');  
insert into student values ('s005','王丽','张三是个好学生');  
insert into student values ('s006','李波',21,'张三是个好学生');  
insert into student values ('s007','刘玉','张三是个好学生');  
insert into student values ('s008','萧蓉','张三是个好学生');  
insert into student values ('s009','陈萧晓','张三是个好学生');  
insert into student values ('s010','陈美',22,'张三是个好学生');
  • 表字段描述

  • 字段意义
sno 学号
sname	学生姓名
sage	学生年龄
ssex	学生性别
description	学生描述

2、python安装fastapi、elasticsearch框架、Manticore Search框架和mysql客户端

pip install elasticsearch==8.8.2
pip install pymysql
pip install manticoresearch

# Uvicorn是一个ASGI服务器,用于运行FastAPI应用。
pip install uvicorn
pip install fastapi

3、将mysql的数据写入到elasticsearch中

  • mysql数据同步到es
# mysql-to-es
# 本地es版本 8.8.2
# python es版本  8.8.2
import pymysql
from elasticsearch import Elasticsearch

def get_data():
    conn=pymysql.connect(host="localhost",port=3306,user="root",password="root",database="mydb")
    cursor=conn.cursor()
    sql="select * from student"
    t_header_sql = "desc student "
    cursor.execute(sql)
    results=cursor.fetchall()
    cursor.execute(t_header_sql)
    header_results = cursor.fetchall()
    conn.close()
    return results,header_results

def create_es_data():
    # es = Elasticsearch()
    es = Elasticsearch(
        [
            "https://192.168.10.1:9200",],ca_certs="./path/to/http_ca.crt",# es认证证书,8.0版本后开始使用
        basic_auth=("elastic","U4mRQUoVeQ+YMzcCFD1t"),request_timeout=3600
    )
    try:
        data_results,cloumns_results = get_data()
        for row in data_results:
            message = {}
            for i in range(len(row)):
                # print(cloumns_results[i][0],row[i])
                message[cloumns_results[i][0]] = row[i]

            print(message)
            es.index(index="student",document=message)
    except Exception as e:
        print("Error:" + str(e))

if __name__=="__main__":
    create_es_data()
  • es查看数据(Elasticvue插件)

 

4、将mysql的数据写入到Manticore中

  • mysql数据同步到Manticore

注:Manticore 和 Mysql 使用pymysql即mysql客户端

import pymysql

def get_data():
    conn = pymysql.connect(host="localhost",database="mydb")
    cursor = conn.cursor()
    sql = "select * from student"
    t_header_sql = "desc student "
    cursor.execute(sql)
    results = cursor.fetchall()
    cursor.execute(t_header_sql)
    header_results = cursor.fetchall()
    header_tuple = tuple(x[0] for x in header_results)
    conn.close()
    return results,header_tuple

def inster_data_to_manticore():
    try:
        db = pymysql.connect(host='localhost',port=9306)
        print('manticoredb 连接成功!')
    except:
        print('something wrong!')

    if db:
        cursor = db.cursor()
        rows,header_tuple = get_data()
        header_str = str(header_tuple).replace("\'","")
        sql = 'drop table if exists students'
        cursor.execute(sql)
        db.commit()

        for row in rows:
            sql = f'INSERT INTO students{header_str} VALUES {row}'
            print(sql)
            cursor.execute(sql)
            db.commit()

if __name__ == '__main__':
    inster_data_to_manticore()
  • Manticore 数据查询(工具Webyog SQLyog)

 

5、elasticsearch查找类的封装

  • es安全认证连接(参考官网)
  • 按fields查询方法封装,输入参数fields 筛选器,query查询字符串
# elasticsearch_query_class.py
from elasticsearch import Elasticsearch
class elasticsearchself():
    def __init__(self,index_name,index_type = '_doc'):
        self.es = Elasticsearch(
        [
            "https://192.168.10.1:9200",basic_auth=("elastic",request_timeout=3600
        )
        self.index_name=index_name
        self.index_type=index_type

    def search_by_fields(self,query,fields,count:int=30):
        ds = {"multi_match": {"query": query,"fields": fields}}
        fields = ['sname','description']
        match_data=self.es.search(index=self.index_name,query=ds,size=count)
        return match_data

es=elasticsearchself(index_name="student")
query = "张三"
fields= ['sname','description']
match_data = es.search_by_fields(query,fields)
print(match_data)

6、Manticoresearch查找类的封装

# manticoreself.py

import manticoresearch
from manticoresearch.api import search_api
from manticoresearch.model.search_request import SearchRequest

class manticoresearchself():
    def __init__(self,index_name):
        self.configuration = manticoresearch.Configuration(
        host = "http://127.0.0.1:9308"
        )
        self.index_name = index_name

    def search_all_text(self,query):
        with manticoresearch.ApiClient(self.configuration) as api_client:
            # Create an instance of the API class
            api_instance = search_api.SearchApi(api_client)

            # # Create SearchRequest
            # search_request = SearchRequest()
            # search_request.index='students'
            # # search_request.fulltext_filter=QueryFilter(23)
            # search_request.fulltext_filter =

            search_request = SearchRequest(
                index='students',query={
                    "match":
                        {
                            "*" : query
                        }
                },)

            # example passing only required values which don't have defaults set
            try:
                # Performs a search
                api_response = api_instance.search(search_request)
                # pprint(api_response)
                return api_response
            except manticoresearch.ApiException as e:
                print("Exception when calling SearchApi->search: %s\n" % e)


mc=manticoresearchself(index_name="student")
query = "s004"
match_data = mc.search_all_text(query)
match_data_dict = match_data.__dict__
print(match_data._hits._hits[0]["_source"])

7、fastapi实现elasticseach的全文检索模糊查询

# main.py
from fastapi import FastAPI
from elasticsearch_query_class import elasticsearchself
import json
import time

app = FastAPI()

@app.get("/get_es/{query}")
async def get_es(query):
    fields = ['*']
    es=elasticsearchself(index_name="student")
    time_start = time.time()  # 记录开始时间
    data=es.search_by_fields(query,fields)
    time_end = time.time()  # 记录结束时间
    address_data=data["hits"]["hits"]
    address_list=[]
    for item in address_data:
        address_list.append(item["_source"])
    time_sum = time_end - time_start  # 计算的时间差为程序的执行时间,单位为秒/s
    address_list.append({"time_sum":time_sum})
    new_json=json.dumps(address_list,ensure_ascii=False)
    return json.loads(new_json)

思路: es创建筛选器列表fields,[*]表示所有字段,查询体query

8、fastapi实现Manticoresearch的全文检索筛选查询

# main.py
from fastapi import FastAPI
from manticoreself import manticoresearchself
import json
import time

app = FastAPI()

@app.get('/get_mc/<query>')
async def get_mc(query):
    mc = manticoresearchself(index_name="student")
    time_start = time.time()  # 记录开始时间
    data = mc.search_all_text(query)
    time_end = time.time()  # 记录结束时间
    address_data = data._hits._hits
    address_list = []
    for item in address_data:
        address_list.append(item["_source"])

    time_sum = time_end - time_start  # 计算的时间差为程序的执行时间,单位为秒/s
    address_list.append({"time_sum": time_sum})
    new_json = json.dumps(address_list,ensure_ascii=False)
    return json.loads(new_json)

思路: Manticoresearch 支持openapi查询接口,使用search_all_text api接口查询结果,查询体query

9、fastapi实现的Mysql列表字段全文查询

# main.py
from fastapi import FastAPI
import json
import time
import pymysql

app = FastAPI()

@app.get('/get_mysql/<query>')
async def get_mysql(query):
    conn = pymysql.connect(host="localhost",database="mydb")
    cursor = conn.cursor()
    fields = "(sname,description,sno,ssex)"

    try:
        create_full_index_sql = f"create fulltext index full_idx_to_table on student{fields} "
        cursor.execute(create_full_index_sql)
    except:
        pass

    query_sql = f"select * from student where match{fields} against('{query}')"
    time_start = time.time()  # 记录开始时间
    cursor.execute(query_sql)
    time_end = time.time()  # 记录结束时间
    results = cursor.fetchall()
    address_list = []
    for row in results:
        address_list.append(row)
    time_sum = time_end - time_start  # 计算的时间差为程序的执行时间,单位为秒/s
    address_list.append({"time_sum": time_sum})
    new_json = json.dumps(address_list,ensure_ascii=False)
    return json.loads(new_json)

思路: 先创建需要查找的字段即筛选器,利用筛选器列表创建全文检索index(full_idx_to_table ) 后,使用fts(Full-Text Search)查询

10、fastapi实现接口源码即验证

  • fastapi源码
# main.py

from fastapi import FastAPI
from elasticsearch_query_class import elasticsearchself
import json
from manticoreself import manticoresearchself
import time
import pymysql

app = FastAPI()

@app.get("/get_es/{query}")
async def get_es(query):
    fields = ['*']
    es=elasticsearchself(index_name="student")
    time_start = time.time()  # 记录开始时间
    data=es.search_by_fields(query,ensure_ascii=False)
    return json.loads(new_json)

@app.get('/get_mc/<query>')
async def get_mc(query):
    mc = manticoresearchself(index_name="student")
    time_start = time.time()  # 记录开始时间
    data = mc.search_all_text(query)
    time_end = time.time()  # 记录结束时间
    address_data = data._hits._hits
    address_list = []
    for item in address_data:
        address_list.append(item["_source"])

    time_sum = time_end - time_start  # 计算的时间差为程序的执行时间,单位为秒/s
    address_list.append({"time_sum": time_sum})
    new_json = json.dumps(address_list,ensure_ascii=False)
    return json.loads(new_json)

@app.get('/get_mysql/<query>')
async def get_mysql(query):
    conn = pymysql.connect(host="localhost",ensure_ascii=False)
    return json.loads(new_json)

 

 

 

10、总结

  1. elasticsearch 全文搜索
  • 支持中、英全文搜索
  • 但速度没有mysql和Manticore Search快
  1. Manticore Search全文搜索
  • 暂时只支持英文全文搜索
  • 搜索速度快,相比es少量数据快,没有mysql少量数据快,但据官方显示大量数据时Manticore Search快于mysql
  1. Mysql全文搜索
  • 支持中、英搜索,中文时sql  against中加IN BOOLEAN MODE
  • 少量数据时搜索极快,但全文搜索时需要创建搜索数据的全文索引,有些麻烦

原文地址:https://blog.csdn.net/wisdom_lp/article/details/131713321

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


文章浏览阅读774次,点赞24次,收藏16次。typescript项目中我们使用typings-for-css-modules-loader来替代css-loader实现css modules。1、typings-for-css-modules-loader加载器介绍 Webpack加载器,用作css-loader的替代产品,可动态生成CSS模块的TypeScript类型这句话是什么意思呢?就是编译时处理css文件...
文章浏览阅读784次。react router redux antd eslint prettier less axios_react+antd+redux+less
文章浏览阅读3.9k次,点赞5次,收藏11次。需要删除.security-7索引文件。把在第1步中的被注释的配置打开。之后就是按照提示输入密码。执行bin目录下的文件。_failed to authenticate user 'elastic' against
文章浏览阅读1.2k次,点赞23次,收藏24次。Centos 8 安装es_centos8 yum elasticsearch
文章浏览阅读3.2k次。设置完之后,数据会⾃动同步到其他节点。修改密码时,将第⼀步配置删除,然后重启。单独使⽤⼀个节点⽣成证书;执⾏设置⽤户名和密码的命令。执⾏完上⾯命令以后就可以在。⽂件,在⾥⾯添加如下内容。这个⽂件复制到其他节点下。其中⼀个节点设置密码即可。依次对每个账户设置密码。全部节点都要重启⼀遍。需要在配置⽂件中开启。个⽤户分别设置密码,⽬录下,证书⽂件名为。功能,并指定证书位置。_es设置账号和密码
文章浏览阅读1.9k次,点赞2次,收藏7次。针对多数据源写入的场景,可以借助MQ实现异步的多源写入,这种情况下各个源的写入逻辑互不干扰,不会由于单个数据源写入异常或缓慢影响其他数据源的写入,虽然整体写入的吞吐量增大了,但是由于MQ消费是异步消费,所以不适合实时业务场景。不易出现数据丢失问题,主要基于MQ消息的消费保障机制,比如ES宕机或者写入失败,还能重新消费MQ消息。针对这种情况,有数据强一致性要求的,就必须双写放到事务中来处理,而一旦用上事物,则性能下降更加明显。可能出现延时问题:MQ是异步消费模型,用户写入的数据不一定可以马上看到,造成延时。_mysql同步es
文章浏览阅读3.6w次,点赞48次,收藏44次。【程序员洲洲送书福利-第十九期】《C++ Core Guidelines解析》
文章浏览阅读1.3k次。当我们在开发Vue应用时,经常需要对表单进行校验,以确保用户输入的数据符合预期。Vue提供了一个强大的校验规则机制,通过定义rules规则,可以方便地对表单进行验证,并给出相应的错误提示。_vue ruler校验
文章浏览阅读2k次,点赞16次,收藏12次。Linux内核源码下载地址及方式_linux源码下载
文章浏览阅读1k次。这样在每天自动生成的索引skywalking_log_xxx就会使用上述模版来生成,timestamp会被设置成date类型。然后此时在–>索引管理–>kibana–>索引模式添加skywalking_log*索引时就会有时间字段了。在通过skywalking将日志收集到es后,由于skywalking收集的日志(skywalking_log索引)没有date类型的字段导致在es上再索引模式中没有时间范围的查询。skywalking收集的日志有时间戳字段timestamp,只是默认为long类型。_skywalking timestamp
文章浏览阅读937次,点赞18次,收藏21次。1.初始化git仓库,使用git int命令。2.添加文件到git仓库,两步走:2.1 使用命令,注意,可反复多次使用,添加多个文件;2.2 使用命令,完成。此笔记是我个人学习记录笔记,通过廖雪峰的笔记进行学习,用自己能理解的笔记记录下来,如果侵权,联系删。不存在任何盈利性质,单纯发布后,用于自己学习回顾。
文章浏览阅读786次,点赞8次,收藏7次。上述示例中的 origin 是远程仓库的名称,https://github.com/example/repository.git 是远程仓库的 URL,(fetch) 表示该远程仓库用于获取更新,(push) 表示该远程仓库用于推送更新。你可以选择在本地仓库创建与远程仓库分支对应的本地分支,也可以直接将本地仓库的分支推送到远程仓库的对应分支。将 替换为远程仓库的名称(例如 origin), 替换为要推送的本地分支的名称, 替换为要推送到的远程分支的名称。_git remote 智能切换仓库
文章浏览阅读1.5k次。配置eslint校验代码工具_eslint 实时校验
文章浏览阅读1.2k次,点赞28次,收藏26次。Git入门基础介绍,什么是Git,如何使用Git,以及Git的工作的基本原理
文章浏览阅读2.7k次。基于官方给出的几种不同环境不同的安装方式,本文将会选择在使用.zip文件在Windows上安装Elasticsearch在Linux或macOS上从存档文件安装ElasticsearchInstall Elasticsearch with Docker (此种方式待定)使用Docker安装Elasticsearch。_elasticsearch安装部署windows
文章浏览阅读3.3k次,点赞5次,收藏11次。【Linux驱动】内核模块编译 —— make modules 的使用(单模块编译、多模块编译)_make modules
文章浏览阅读1k次。docker启动es报错_max virtual memory areas vm.max_map_count [65530] is too low, increase to at
文章浏览阅读4.2k次,点赞2次,收藏6次。使用docker单机安装elasticsearch后再安装kibana时找不到es。_unable to retrieve version information from elasticsearch nodes. security_ex
文章浏览阅读1.1k次。日志处理对于任何现代IT系统都是关键部分,本教程专为新手设计,通过详细解释Logstash的三大核心组件,为您展示如何从零开始搭建强大的日志处理系统。您还将学习如何同步MySQL数据到Elasticsearch,并通过一个"Hello World"示例快速入门。无论您是完全的新手还是有一些基础,本教程都将引导您顺利掌握Logstash的基本操作和高级应用。_logstash mysql
文章浏览阅读1.1w次,点赞5次,收藏25次。执行这条指令之后,你的本地项目就与远程Git仓库建立了连接,你就可以开始对你的代码进行版本追踪和协作开发了。使用“git remote add origin”指令,可以轻松地将本地项目连接到远程Git仓库。git remote set-url origin 执行这条指令之后,Git就会将已经添加的名为“origin”的仓库删除。git remote add origin 其中,是你的远程Git仓库的网址。_git remote add origin