XActivity
from fastapi import FastAPI, WebSocket,WebSocketDisconnect
from starlette.middleware.cors import CORSMiddleware
origins = ["*", ]
class XActivity:
def __init__(self):
self.app = FastAPI()
self.app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"])
self.init_base()
self.init_utils()
self.init_fragment()
def init_fragment(self):
pass
def init_base(self):
pass
def init_utils(self):
pass
XFragment
class XFragment:
def __init__(self,app,name):
self.name = name
self.run(app)
def get_name(self, tag):
mind = "{}/{}".format(self.name, tag)
if mind[0] != "/":
mind = "/" + mind
return mind
def run(self, app):
raise Exception
XMaster
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from redis import Redis
import asyncio, time, gc, sys, gc
from .XFragment import XFragment
from pydantic import BaseModel
from dataclasses import dataclass
from ..XUtils.XUtils import XUta
class LoginItem(BaseModel):
user: str
password: str
class R:
def __init__(self, db):
self.r = Redis(db=db, decode_responses=True)
self.p = self.r.pipeline()
self.r.flushdb()
class R0(R):
def __init__(self):
R.__init__(self, 5)
class R1(R):
def __init__(self):
R.__init__(self, 4)
def iscrea(self, aka, dt, cand):
data = MasterItem(aka, dt, cand).show()
self.p.multi()
self.p.hmset(aka, data)
self.p.expire(aka, 60)
self.p.execute()
return SlaveItem(aka, dt).show()
def iscand(self, aka):
return self.r.hget(aka, "cand")
class Mind:
def show(self):
return self.__dict__
@dataclass()
class SlaveItem(Mind):
aka: str
dt: str
@dataclass()
class MasterItem(SlaveItem):
cand: str
@dataclass()
class AntItem(Mind):
cand: str
dt: str
por: str
class XMaster(XFragment, XUta):
def __init__(self, app: FastAPI):
XFragment.__init__(self, app, "main")
self.r0 = R0()
self.r1 = R1()
def run(self, app):
self._websocket(app)
self._login(app)
def _login(self, app):
@app.post(self.get_name("login"))
async def login(item: LoginItem):
return self.login(item.user, item.password)
def add(self, func, id, seconds=1):
self.s0.add_job(func, 'interval', seconds=seconds, id=id)
def login(self, user, password):
raise Exception
def _websocket(self, app: FastAPI):
raise Exception
def keeplive(self, cand, por):
return self.r0.r.hget(cand, "por") == por
def crea(self, cand, aka):
if self.r0.r.exists(cand) == 1:
self.kill(cand)
self.r1.r.delete(aka)
por = self.get_uid()
data = AntItem(cand, self.get_dt(), por).show()
self.r0.p.multi()
self.r0.p.hmset(cand, data)
self.r0.p.expire(cand, 60)
self.r0.p.execute()
return por
XSlave
from fastapi import WebSocket,WebSocketDisconnect
import asyncio,os,sys
from ..XUtils.XUtils import XUta
class XSlave(XUta):
def __init__(self,websocket: WebSocket,aka,cand,por):
XUta.__init__(self)
self.websocket = websocket
self.cand = cand
self.aka = aka
self.por = por
self.keep = True
def shark(self,msg):
raise Exception
def startlight(self):
pass
def shark(self,msg):
print(msg)
def keeplive(self,cand,por):
raise Exception
XUtils
import time, datetime, os, shutil
from uuid import uuid4
class XUta:
def get_dt(self, t=True):
dt = datetime.datetime.now().replace(microsecond=0)
if t:
dt = str(dt)
return dt
def get_uid(self):
return uuid4().hex
def make_dt(self, date):
return datetime.datetime.strptime(date, "%Y-%m-%d %H:%M:%S")
class XUtb:
def new_bag(self, path):
if os.path.exists(path) == False:
os.makedirs(path)
return path
def new_file(self, old_path, new_path, cur_path):
if os.path.exists(old_path):
os.remove(old_path)
shutil.copy(cur_path, new_path)
def __ispath(self, path, na):
ge = os.path.exists(path)
if ge:
return path
else:
if na:
raise Exception
def hard_path(self, path):
self.__ispath(path, True)
def simp_path(self, path):
return self.__ispath(path, False)
class XUtils(XUta, XUtb):
def __init__(self):
XUta.__init__(self)
XUtb.__init__(self)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。