python-paho-mqtt 客户端主动断开后重连方法_loop_forever() loop_start()-程序员宅基地

技术标签: MQTT  python  IoT  

在使用 python-paho-mqtt 开发客户端的时候,有时候会遇到mqtt客户端断开后无法重连的问题,如果你的客户端是使用 loop_start() 运行的,可能遇到了跟我同样的问题。

分析

paho.mqtt.client 中使用 loop_forever()阻塞式的自动处理收发数据的,所有的数据处理逻辑都在预先设定好的回调函数中进行的,如果不想阻塞主进程可以使用loop_start() 在子进程中运行loop_forever()

执行loop_start() 后,如果不主动断开与broker 的连接,客户端会在断开后以指数增长间隔的方式进行重连,间隔参数可以通过reconnect_delay_set() 方法设置。

但是如果调用了disconnect() 方法主动与broker断开连接,会导致 loop_forever() 方法退出,但是运行该方法的子进程不会销毁,依然保存在 client._thread 变量中。
只要该变量不重置为 None 是无法再执行 loop_start()loop_forever() 的。需调用 loop_stop() 终止由 loop_start() 开启的子进程,且调用loop_stop() 的代码不能写在cient的任一回调函数中,否则是无效的(回调函数运行也运行在由 loop_start() 启动的子进程中,总不能自己把自己的进程销毁吧。)

在源码中可以分析得出以上结论:

    def _thread_main(self):
        self.loop_forever(retry_first_connection=True)
        
    def loop_start(self):
    	####### 下面这一行  #######
        if self._thread is not None:
            return MQTT_ERR_INVAL

        self._thread_terminate = False
        ######## 开启子进程运行 loop_forever   ########
        self._thread = threading.Thread(target=self._thread_main)
        self._thread.daemon = True
        self._thread.start()

    def loop_stop(self, force=False):
        if self._thread is None:
            return MQTT_ERR_INVAL

        self._thread_terminate = True
        print('stop', threading.current_thread())
        if threading.current_thread() != self._thread:
            self._thread.join()
            ######### 重置变量  ########### 
            self._thread = None

测试代码

以下代码仅供测试使用,其中关键是使用了 client.reconnect()
client.loop_start() 重新让客户端正常运行。

# -*- coding: utf-8 -*-
import time
import paho.mqtt.client as mqtt
import logging

logging.basicConfig(level='DEBUG', format='%(asctime)s [%(name)s:%(lineno)d] [%(levelname)s]- %(message)s')


def on_connect(client, obj, flags, rc):
    print("connected rc: " + str(rc))


def on_publish(client, obj, mid):
    print("mid: " + str(mid))


def on_disconnect(client, userdata, rc):
    print("disconnect")


TOPIC = 'test/TOPIC'

client = mqtt.Client(client_id='paho_pub', clean_session=True)
client.on_connect = on_connect
client.on_publish = on_publish
client.on_disconnect = on_disconnect

client.enable_logger()
client.username_pw_set('admin', 'password')
client.connect("localhost", 61613, 60)
client.loop_start()

count = 0
while True:
    data = str(time.time())
    print('state: ', client._state, 'loop进程:', client._thread, end='  ')
    if client._state != 2:
        client.publish(TOPIC, data, qos=0)
        print(client._state, '发布: ', data)
    else:
        print('\n客户端已断开,')
    if count == 4:
        print('disconnect.................')
        client.disconnect()
        # loop_stop() 不能写在on_disconnect 回调里, 否则 threading.current_thread() == client._thread,\
        # 客户端无法清除client._thread 子进程,以后再使用loop_start()就无效了
        client.loop_stop()
    if count == 8:
        print('尝试重连')
        client.reconnect()  # 必须重连将 client._state 从断开状态切换为初始化状态
        client.loop_start()
    count += 1
    time.sleep(1)
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/wanzheng_96/article/details/112315091

智能推荐

我的一个关于文件的程序 - [C语言]_fseek(fp,0l,2)-程序员宅基地

文章浏览阅读6.3k次。 2005-09-05我的一个关于文件的程序 - [C语言]#includevoid main(){char ch;FILE* fp;if((fp=fopen("test.txt","r"))==NULL){printf("error");exit(1);}fseek(fp,0L,2);while((fseek(fp,-1L,1))!=-1){ch=fgetc(fp);pu_fseek(fp,0l,2)

oracle 设置查询条数,SQL、MySQL、Oracle、 Sqlite、Informix数据库查询指定条数数据的方法...-程序员宅基地

文章浏览阅读674次。SQL查询前10条的方法为:select top X * from table_name--查询前X条记录,可以改成需要的数字,比如前10条。select top X * from table_name order by colum_name desc--按colum_name属性降序排序查询前X条记录,“order by” 后紧跟要排序的属性列名,其中desc表示降序,asc表示升序(默认也..._oracle怎么用语句设置查询结果数量

课程设计之第二次冲刺----第九天-程序员宅基地

文章浏览阅读58次。讨论成员:罗凯旋、罗林杰、吴伟锋、黎文衷讨论完善APP,调试功能。转载于:https://www.cnblogs.com/383237360q/p/5011594.html

favicon.ico 图标及时更新问题_win 软件开发 ico图标多久更新-程序员宅基地

文章浏览阅读5.4k次。首先看你 favicon.ico 图标文件引入路径是否正确然后 看ico文件能否正常打开,这两个没问题的话,在地址栏直接输入你的域名 http://xxx.com/favicon.ico 注意 此刻可能还是 之前的ico图标 不要着急 刷新一下 试试 完美解决 清除程序缓存_win 软件开发 ico图标多久更新

手工物理删除Oracle归档日志RMAN备份报错_rman 说明与资料档案库中在任何归档日志都不匹配-程序员宅基地

文章浏览阅读2.1k次。Oracle归档日志删除我们都都知道在controlfile中记录着每一个archivelog的相关信息,当然们在OS下把这些物理文件delete掉后,在我们的controlfile中仍然记录着这些archivelog的信息,在oracle的OEM管理器中有可视化的日志展现出,当我们手工清除 archive目录下的文件后,这些记录并没有被我们从controlfile中清除掉,也就是or_rman 说明与资料档案库中在任何归档日志都不匹配

命令提示符_命令提示符文件开头-程序员宅基地

文章浏览阅读706次。命令提示符:[ root@localhost桌面] #[用户名@主机名 当前所在位置] #(超级用户) KaTeX parse error: Expected 'EOF', got '#' at position 25: …用户: #̲ su 用户名 //切… su密码:[ root@cml桌面] #临时提升为root权限:# sudo 命令..._命令提示符文件开头

随便推点

android+打包+不同app,基于Gradle的Android应用打包实践-程序员宅基地

文章浏览阅读152次。0x01 基本项目结构使用Android Studio创建的Android项目会划分成三个层级:project : settings.gradle定义了构建应用时包含了哪些模块;build.gradle定义了适用于项目中所有模块的构建配置module : 可以是一个app类型的module,对应生成apk应用;也可以是一个lib类型的module,对应生成aar包. 每个module中包含的bui..._android多个应用 gradle 怎么打包指定的应用

qsort实现顺序与逆序/排整型,字符串数组,字符数组,结构体类型数组的名字排序,年龄排序等_qsort反向排序-程序员宅基地

文章浏览阅读599次,点赞12次,收藏11次。前言:通常我们排序都需要创建一个函数实现排序,但当我们排完整型数组时,想要排字符串呢?那需要重新创建一个函数,完善它的功能,进而实现排字符串,这样非常繁琐,但是有一个函数可以帮我们实现传什么,排什么;qsort的传参:(1️⃣,2️⃣,3️⃣,4️⃣) (首元素地址,排序的元素个数,每个元素的大小,指向比较两个元素的函数的指针)1️⃣2️⃣3️⃣4️⃣的传参方法,下面介绍:…整型数组:......_qsort反向排序

MVC绕过登陆界面验证时HttpContext.Current.User.Identity.Name取值为空问题解决方法_mvc 不验证登陆-程序员宅基地

文章浏览阅读355次。MVC绕过登陆界面验证时HttpContext.Current.User.Identity.Name取值为空问题解决方法_mvc 不验证登陆

Java中DO、DTO、BO、AO、VO、POJO、Query 命名规范_dto命名规范-程序员宅基地

文章浏览阅读7.6k次,点赞2次,收藏8次。1.分层领域模型规约: • DO( Data Object):与数据库表结构一一对应,通过DAO层向上传输数据源对象。 • DTO( Data Transfer Object):数据传输对象,Service或Manager向外传输的对象。 • BO( Business Object):业务对象。 由Service层输出的封装业务逻辑的对象。 • AO( Ap..._dto命名规范

1015. Reversible Primes (20) PAT甲级刷题_pat甲级1015-程序员宅基地

文章浏览阅读91次。A reversible prime in any number system is a prime whose "reverse" in that number system is also a prime. For example in the decimal system 73 is a reversible prime because its reverse 37 is also a pr..._pat甲级1015

ABAP接口之Http发送json报文_abap http 转换为json输出-程序员宅基地

文章浏览阅读1.5k次。ABAP接口之Http发送json报文abap 调用http 发送 json 测试函数SE11创建结构:zsmlscpnoticeSE37创建函数:zqb_test_http_fuc1FUNCTIONzqb_test_http_fuc1.*"----------------------------------------------------------------..._abap http 转换为json输出