umqttsimple.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. try:
  2. import usocket as socket
  3. except:
  4. import socket
  5. import ustruct as struct
  6. from ubinascii import hexlify
  7. class MQTTException(Exception):
  8. pass
  9. class MQTTClient:
  10. def __init__(self, client_id, server, port=0, user=None, password=None, keepalive=0,
  11. ssl=False, ssl_params={}):
  12. if port == 0:
  13. port = 8883 if ssl else 1883
  14. self.client_id = client_id
  15. self.sock = None
  16. self.server = server
  17. self.port = port
  18. self.ssl = ssl
  19. self.ssl_params = ssl_params
  20. self.pid = 0
  21. self.cb = None
  22. self.user = user
  23. self.pswd = password
  24. self.keepalive = keepalive
  25. self.lw_topic = None
  26. self.lw_msg = None
  27. self.lw_qos = 0
  28. self.lw_retain = False
  29. def _send_str(self, s):
  30. self.sock.write(struct.pack("!H", len(s)))
  31. self.sock.write(s)
  32. def _recv_len(self):
  33. n = 0
  34. sh = 0
  35. while 1:
  36. b = self.sock.read(1)[0]
  37. n |= (b & 0x7f) << sh
  38. if not b & 0x80:
  39. return n
  40. sh += 7
  41. def set_callback(self, f):
  42. self.cb = f
  43. def set_last_will(self, topic, msg, retain=False, qos=0):
  44. assert 0 <= qos <= 2
  45. assert topic
  46. self.lw_topic = topic
  47. self.lw_msg = msg
  48. self.lw_qos = qos
  49. self.lw_retain = retain
  50. def connect(self, clean_session=True):
  51. self.sock = socket.socket()
  52. addr = socket.getaddrinfo(self.server, self.port)[0][-1]
  53. self.sock.connect(addr)
  54. if self.ssl:
  55. import ussl
  56. self.sock = ussl.wrap_socket(self.sock, **self.ssl_params)
  57. premsg = bytearray(b"\x10\0\0\0\0\0")
  58. msg = bytearray(b"\x04MQTT\x04\x02\0\0")
  59. sz = 10 + 2 + len(self.client_id)
  60. msg[6] = clean_session << 1
  61. if self.user is not None:
  62. sz += 2 + len(self.user) + 2 + len(self.pswd)
  63. msg[6] |= 0xC0
  64. if self.keepalive:
  65. assert self.keepalive < 65536
  66. msg[7] |= self.keepalive >> 8
  67. msg[8] |= self.keepalive & 0x00FF
  68. if self.lw_topic:
  69. sz += 2 + len(self.lw_topic) + 2 + len(self.lw_msg)
  70. msg[6] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3
  71. msg[6] |= self.lw_retain << 5
  72. i = 1
  73. while sz > 0x7f:
  74. premsg[i] = (sz & 0x7f) | 0x80
  75. sz >>= 7
  76. i += 1
  77. premsg[i] = sz
  78. self.sock.write(premsg, i + 2)
  79. self.sock.write(msg)
  80. #print(hex(len(msg)), hexlify(msg, ":"))
  81. self._send_str(self.client_id)
  82. if self.lw_topic:
  83. self._send_str(self.lw_topic)
  84. self._send_str(self.lw_msg)
  85. if self.user is not None:
  86. self._send_str(self.user)
  87. self._send_str(self.pswd)
  88. resp = self.sock.read(4)
  89. assert resp[0] == 0x20 and resp[1] == 0x02
  90. if resp[3] != 0:
  91. raise MQTTException(resp[3])
  92. return resp[2] & 1
  93. def disconnect(self):
  94. self.sock.write(b"\xe0\0")
  95. self.sock.close()
  96. def ping(self):
  97. self.sock.write(b"\xc0\0")
  98. def publish(self, topic, msg, retain=False, qos=0):
  99. pkt = bytearray(b"\x30\0\0\0")
  100. pkt[0] |= qos << 1 | retain
  101. sz = 2 + len(topic) + len(msg)
  102. if qos > 0:
  103. sz += 2
  104. assert sz < 2097152
  105. i = 1
  106. while sz > 0x7f:
  107. pkt[i] = (sz & 0x7f) | 0x80
  108. sz >>= 7
  109. i += 1
  110. pkt[i] = sz
  111. #print(hex(len(pkt)), hexlify(pkt, ":"))
  112. self.sock.write(pkt, i + 1)
  113. self._send_str(topic)
  114. if qos > 0:
  115. self.pid += 1
  116. pid = self.pid
  117. struct.pack_into("!H", pkt, 0, pid)
  118. self.sock.write(pkt, 2)
  119. self.sock.write(msg)
  120. if qos == 1:
  121. while 1:
  122. op = self.wait_msg()
  123. if op == 0x40:
  124. sz = self.sock.read(1)
  125. assert sz == b"\x02"
  126. rcv_pid = self.sock.read(2)
  127. rcv_pid = rcv_pid[0] << 8 | rcv_pid[1]
  128. if pid == rcv_pid:
  129. return
  130. elif qos == 2:
  131. assert 0
  132. def subscribe(self, topic, qos=0):
  133. assert self.cb is not None, "Subscribe callback is not set"
  134. pkt = bytearray(b"\x82\0\0\0")
  135. self.pid += 1
  136. struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid)
  137. #print(hex(len(pkt)), hexlify(pkt, ":"))
  138. self.sock.write(pkt)
  139. self._send_str(topic)
  140. self.sock.write(qos.to_bytes(1, "little"))
  141. while 1:
  142. op = self.wait_msg()
  143. if op == 0x90:
  144. resp = self.sock.read(4)
  145. #print(resp)
  146. assert resp[1] == pkt[2] and resp[2] == pkt[3]
  147. if resp[3] == 0x80:
  148. raise MQTTException(resp[3])
  149. return
  150. # Wait for a single incoming MQTT message and process it.
  151. # Subscribed messages are delivered to a callback previously
  152. # set by .set_callback() method. Other (internal) MQTT
  153. # messages processed internally.
  154. def wait_msg(self):
  155. res = self.sock.read(1)
  156. #self.sock.setblocking(True)
  157. if res is None:
  158. return None
  159. if res == b"":
  160. raise OSError(-1)
  161. if res == b"\xd0": # PINGRESP
  162. sz = self.sock.read(1)[0]
  163. assert sz == 0
  164. return None
  165. op = res[0]
  166. if op & 0xf0 != 0x30:
  167. return op
  168. sz = self._recv_len()
  169. topic_len = self.sock.read(2)
  170. topic_len = (topic_len[0] << 8) | topic_len[1]
  171. topic = self.sock.read(topic_len)
  172. sz -= topic_len + 2
  173. if op & 6:
  174. pid = self.sock.read(2)
  175. pid = pid[0] << 8 | pid[1]
  176. sz -= 2
  177. msg = self.sock.read(sz)
  178. self.cb(topic, msg)
  179. if op & 6 == 2:
  180. pkt = bytearray(b"\x40\x02\0\0")
  181. struct.pack_into("!H", pkt, 2, pid)
  182. self.sock.write(pkt)
  183. elif op & 6 == 4:
  184. assert 0
  185. # Checks whether a pending message from server is available.
  186. # If not, returns immediately with None. Otherwise, does
  187. # the same processing as wait_msg.
  188. def check_msg(self):
  189. self.sock.setblocking(False)
  190. return self.wait_msg()