如何将值传递回此字符串?

如何解决如何将值传递回此字符串?

摘要:我有一个微型位连接到rpi-zero。我对微比特进行了编码,当按下A button时,它将通过uart.write将数据发送到rpi-zero。

在此测试中,微比特将uart.write("Test")并向rpi-zero写入一个“测试”字。

我的最终目标是使用rpi-zero的BLE功能来充当控制设备,并通过微比特按钮发送指令。

我发现这个GATT Server Code用python为rpi编写。完全没有问题。

下面的代码将用于侦听微比特uart服务,并检查接收到的数据是否为"Test"

import serial

serialPort = serial.Serial(port = "/dev/ttyACM0",baudrate=115200,bytesize=8,timeout=0.5,stopbits=serial.STOPBITS_ONE)

serialString = " "

(serialPort.in_waiting > 0)

while True:

        serialString = serialPort.readline()

        if serialString == b'Test':
            print("Yes")
        else:
            print("F")

但是真正的问题是当我尝试将此循环代码实现为GATT服务器代码时。

我似乎无法理解如何将此值传递给self.send_tx

此外,GATT服务器代码中似乎已经存在一个全局循环。因此,我尝试使用线程来同时运行两个函数,但是当我添加self.send_tx("Test")时,它只会抛出错误Self is not defined

对不起,我对编码一无所知,有人知道对此可能的解决方法吗?谢谢

这是完整的代码:

import sys
import threading
import dbus,dbus.mainloop.glib
import serial
from gi.repository import GLib
from example_advertisement import Advertisement
from example_advertisement import register_ad_cb,register_ad_error_cb
from example_gatt_server import Service,Characteristic
from example_gatt_server import register_app_cb,register_app_error_cb

BLUEZ_SERVICE_NAME =           'org.bluez'
DBUS_OM_IFACE =                'org.freedesktop.DBus.ObjectManager'
LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
GATT_MANAGER_IFACE =           'org.bluez.GattManager1'
GATT_CHRC_IFACE =              'org.bluez.GattCharacteristic1'
UART_SERVICE_UUID =            '6e400001-b5a3-f393-e0a9-e50e24dcca9e'
UART_RX_CHARACTERISTIC_UUID =  '6e400002-b5a3-f393-e0a9-e50e24dcca9e'
UART_TX_CHARACTERISTIC_UUID =  '6e400003-b5a3-f393-e0a9-e50e24dcca9e'
LOCAL_NAME =                   'rpi-gatt-server'
mainloop = None

serialPort = serial.Serial(port = "/dev/ttyACM0",timeout=0.8,stopbits=serial.STOPBITS_ONE)

serialString = " "

(serialPort.in_waiting > 0)

class TxCharacteristic(Characteristic):
    def __init__(self,bus,index,service):
        Characteristic.__init__(self,UART_TX_CHARACTERISTIC_UUID,['notify'],service)
        self.notifying = False
        GLib.io_add_watch(sys.stdin,GLib.IO_IN,self.on_console_input)

    def on_console_input(self,fd,condition):
        s = fd.readline()
        if s.isspace():
            pass
        else:
            self.send_tx(s)
        return True

    def send_tx(self,s):
        if not self.notifying:
            return
        value = []
        for c in s:
            value.append(dbus.Byte(c.encode()))
        self.PropertiesChanged(GATT_CHRC_IFACE,{'Value': value},[])

    def StartNotify(self):
        if self.notifying:
            print("yes")
            return
        self.notifying = True

    def StopNotify(self):
        if not self.notifying:
            print("no")
            return
        self.notifying = False

class RxCharacteristic(Characteristic):
    def __init__(self,UART_RX_CHARACTERISTIC_UUID,['write'],service)

    def WriteValue(self,value,options):
        print('remote: {}'.format(bytearray(value).decode()))

class UartService(Service):
    def __init__(self,index):
        Service.__init__(self,UART_SERVICE_UUID,True)
        self.add_characteristic(TxCharacteristic(bus,self))
        self.add_characteristic(RxCharacteristic(bus,1,self))

class Application(dbus.service.Object):
    def __init__(self,bus):
        self.path = '/'
        self.services = []
        dbus.service.Object.__init__(self,self.path)

    def get_path(self):
        return dbus.ObjectPath(self.path)

    def add_service(self,service):
        self.services.append(service)

    @dbus.service.method(DBUS_OM_IFACE,out_signature='a{oa{sa{sv}}}')
    def GetManagedObjects(self):
        response = {}
        for service in self.services:
            response[service.get_path()] = service.get_properties()
            chrcs = service.get_characteristics()
            for chrc in chrcs:
                response[chrc.get_path()] = chrc.get_properties()
        return response

class UartApplication(Application):
    def __init__(self,bus):
        Application.__init__(self,bus)
        self.add_service(UartService(bus,0))

class UartAdvertisement(Advertisement):
    def __init__(self,index):
        Advertisement.__init__(self,'peripheral')
        self.add_service_uuid(UART_SERVICE_UUID)
        self.add_local_name(LOCAL_NAME)
        self.include_tx_power = True

def find_adapter(bus):
    remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME,'/'),DBUS_OM_IFACE)
    objects = remote_om.GetManagedObjects()
    for o,props in objects.items():
        if LE_ADVERTISING_MANAGER_IFACE in props and GATT_MANAGER_IFACE in props:
            return o
        print('Skip adapter:',o)
    return None

def check():
    while True:
        serialString = serialPort.readline()
        if serialString == b'Test':
            print("Okay,Test")
            self.send_tx("Test")
        else:
            print("No")

def main():
    global mainloop
    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
    bus = dbus.SystemBus()
    adapter = find_adapter(bus)
    if not adapter:
        print('BLE adapter not found')
        return

    service_manager = dbus.Interface(
                                bus.get_object(BLUEZ_SERVICE_NAME,adapter),GATT_MANAGER_IFACE)
    ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME,LE_ADVERTISING_MANAGER_IFACE)

    app = UartApplication(bus)
    adv = UartAdvertisement(bus,0)

    mainloop = GLib.MainLoop()

    service_manager.RegisterApplication(app.get_path(),{},reply_handler=register_app_cb,error_handler=register_app_error_cb)
    ad_manager.RegisterAdvertisement(adv.get_path(),reply_handler=register_ad_cb,error_handler=register_ad_error_cb)

    try:
        mainloop.run()
    except KeyboardInterrupt:
        adv.Release()

if __name__ == '__main__':
    p1 = threading.Thread(target=main)
    p2 = threading.Thread(target=check)
    p1.start()
    p2.start()

解决方法

我认为这可能是XY problem。我了解到的是,您想通过蓝牙低功耗(BLE)从micro:bit向RPi发送按钮按下。如果是这样,那么有一种更有效的方法可以做到这一点。

在micro:bit的Bluetooth Profile中,可以使用按钮服务和按钮A状态特征。他们的UUID是:

BTN_SRV = 'E95D9882-251D-470A-A062-FA1922DFA9A8'
BTN_A_STATE = 'E95DDA90-251D-470A-A062-FA1922DFA9A8'

您需要在micro:bit上设置GATT服务器。最有效的方法是使用https://makecode.microbit.org/#editor创建以下内容: enter image description here

如果左侧菜单上没有蓝牙,请单击屏幕右上方的齿轮,选择Extensions,然后选择Bluetooth来替换{{1 }}扩展名。

通过使用pydbus库进行D-Bus绑定,可以简化RPi上的GATT客户端代码。

使用蓝牙,而不是具有Radio循环并不断轮询micro:bit,让事件循环从Button A特性(在这种情况下)预订通知更为有效。每次按下或释放micro:bit按钮A时,都会调用我命名为while的功能。

下面的代码不进行micro:bit和RPi之间的初始配对。由于配对是一次性配置步骤,因此我手动执行此操作。

这是RPi的示例python代码,该代码响应micro:bit上被按下的Button A ...

btn_handler

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 <select id="xxx"> SELECT di.id, di.name, di.work_type, di.updated... <where> <if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 <property name="dynamic.classpath" value="tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams['font.sans-serif'] = ['SimHei'] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -> systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping("/hires") public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate<String
使用vite构建项目报错 C:\Users\ychen\work>npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-