transaction_test.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. import unittest
  2. import sys
  3. sys.path.append("..")
  4. from src.protocol import LumiaxClient
  5. class TestTransaction(unittest.TestCase):
  6. def setUp(self):
  7. self.client = LumiaxClient()
  8. def test_1(self):
  9. device_id = 0x01
  10. byte_count = 56
  11. count = int(byte_count / 2)
  12. start_address = 0x3011
  13. end_address = 0x302C
  14. self.assertEqual(end_address - start_address + 1, count)
  15. send_buf = bytes([0x01, 0x04, 0x30, 0x11, 0x00, 0x1C, 0xAE, 0xC6])
  16. self.assertEqual(send_buf, self.client.get_read_command(device_id, start_address, count))
  17. recv_buf = bytes([0x01, 0x04, 0x38, 0x41, 0x01, 0x13, 0xF7, 0x00, 0x0F, 0x00, 0x00, 0x04, 0x38, 0x04, 0xB0, 0x04, 0x60, 0x04, 0x74, 0x05, 0x00, 0x04,0xB0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x2C, 0x03, 0x20, 0x03, 0x20, 0x00, 0x00, 0x00,0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x3C, 0x00, 0x00, 0xB1, 0xB7])
  18. self.assertEqual(len(recv_buf) - 5, byte_count)
  19. results = self.client.parse(start_address, recv_buf)
  20. for variable, value in results:
  21. self.assertIsNotNone(value, f"{variable.name} ({hex(variable.address)})")
  22. self.assertGreaterEqual(len(results), byte_count / 2)
  23. self.assertEqual(hex(results[0][0].address), hex(start_address))
  24. self.assertEqual(hex(results[-1][0].address), hex(end_address))
  25. def test_2(self):
  26. device_id = 0x01
  27. byte_count = 80
  28. count = int(byte_count / 2)
  29. start_address = 0x3030
  30. end_address = start_address + count - 1
  31. self.assertEqual(end_address - start_address + 1, count)
  32. send_buf = bytes([0x01, 0x04, 0x30, 0x30, 0x00, 0x28, 0xFF, 0x1B])
  33. self.assertEqual(send_buf, self.client.get_read_command(device_id, start_address, count))
  34. recv_buf = bytes([0x01, 0x04, 0x50, 0x00, 0x01, 0x00, 0x00, 0x09, 0x60, 0x00, 0x00, 0x00, 0x20, 0x00, 0x01, 0x09, 0xC4, 0x0B, 0x54, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x1F, 0x09, 0x24, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x09, 0x24, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x44, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x70, 0x04])
  35. self.assertEqual(len(recv_buf) - 5, byte_count)
  36. results = self.client.parse(start_address, recv_buf)
  37. for variable, value in results:
  38. self.assertIsNotNone(value, f"{variable.name} ({hex(variable.address)})")
  39. self.assertGreaterEqual(len(results), byte_count / 2)
  40. self.assertEqual(hex(results[0][0].address), hex(start_address))
  41. self.assertEqual(results[-1][0].is_32_bit, True)
  42. self.assertEqual(hex(results[-1][0].address), hex(end_address-1))
  43. def test_3(self):
  44. device_id = 0x01
  45. byte_count = 2
  46. count = int(byte_count / 2)
  47. start_address = 0X3000
  48. end_address = start_address + count - 1
  49. self.assertEqual(end_address - start_address + 1, count)
  50. send_buf = bytes([0x01, 0x04, 0x30, 0x00, 0x00, 0x01, 0x3E, 0xCA])
  51. self.assertEqual(send_buf, self.client.get_read_command(device_id, start_address, count))
  52. recv_buf = bytes([0x01, 0x04, 0x02, 0x17, 0x70, 0xB7, 0x24])
  53. self.assertEqual(len(recv_buf) - 5, byte_count)
  54. results = self.client.parse(start_address, recv_buf)
  55. for variable, value in results:
  56. self.assertIsNotNone(value, f"{variable.name} ({hex(variable.address)})")
  57. self.assertGreaterEqual(len(results), byte_count / 2)
  58. self.assertEqual(hex(results[0][0].address), hex(start_address))
  59. self.assertEqual(hex(results[-1][0].address), hex(end_address))
  60. self.assertEqual(results[0][1], 60.0)
  61. def test_4(self):
  62. device_id = 0x01
  63. byte_count = 58
  64. count = int(byte_count / 2)
  65. start_address = 0X8FF0
  66. end_address = start_address + count - 1
  67. self.assertEqual(end_address - start_address + 1, count)
  68. send_buf = bytes([0x01, 0x03, 0x8F, 0xF0, 0x00, 0x1D, 0xAF, 0x24])
  69. self.assertEqual(send_buf, self.client.get_read_command(device_id, start_address, count))
  70. recv_buf = bytes([0x01, 0x03, 0x3A, 0x41, 0x01, 0x13, 0xF7, 0x00, 0x0F, 0x00, 0x00, 0x04, 0x38, 0x04, 0xB0, 0x04, 0x60, 0x04, 0x74, 0x05, 0x00, 0x04, 0xB0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x2C, 0x03, 0x20, 0x03, 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x09, 0x60, 0x00, 0x00, 0x00, 0x00, 0x00, 0x3C, 0x00, 0x00, 0x7B, 0xB4])
  71. self.assertEqual(len(recv_buf) - 5, byte_count)
  72. results = self.client.parse(start_address, recv_buf)
  73. for variable, value in results:
  74. self.assertIsNotNone(value, f"{variable.name} ({hex(variable.address)})")
  75. self.assertEqual(hex(results[0][0].address), hex(start_address))
  76. self.assertEqual(hex(results[-1][0].address), hex(end_address))
  77. def test_5(self):
  78. device_id = 0x01
  79. byte_count = 20
  80. count = int(byte_count / 2)
  81. start_address = 0x9017
  82. end_address = start_address + count - 1
  83. self.assertEqual(end_address - start_address + 1, count)
  84. send_buf = bytes([0x01, 0x03, 0x90, 0x17, 0x00, 0x0A, 0x58, 0xC9])
  85. self.assertEqual(send_buf, self.client.get_read_command(device_id, start_address, count))
  86. recv_buf = bytes([0x01, 0x03, 0x14, 0x00, 0x34, 0x00, 0x24, 0x00, 0x00, 0x00, 0x01, 0x00, 0x01, 0x00, 0x12, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x45, 0x85])
  87. self.assertEqual(len(recv_buf) - 5, byte_count)
  88. results = self.client.parse(start_address, recv_buf)
  89. for variable, value in results:
  90. self.assertIsNotNone(value, f"{variable.name} ({hex(variable.address)})")
  91. self.assertGreaterEqual(len(results), byte_count / 2)
  92. self.assertEqual(hex(results[0][0].address), hex(start_address))
  93. self.assertEqual(hex(results[-1][0].address), hex(end_address))
  94. def test_6(self):
  95. device_id = 0x01
  96. byte_count = 20
  97. count = int(byte_count / 2)
  98. start_address = 0x9021
  99. end_address = start_address + count - 1
  100. self.assertEqual(end_address - start_address + 1, count)
  101. variables = [v for v in self.client.variables if v.address >= start_address and v.address <= end_address]
  102. values = list(zip(variables, ["Lithium", 10.6, 11.8, 14.4, 14.7, 13.6, "Auto", 14.4, 14.0, "Normal charging"]))
  103. send_buf = bytes([0x01, 0x10, 0x90, 0x21, 0x00, 0x0A, 0x14, 0x00, 0x00, 0x04, 0x24, 0x04, 0x9C, 0x05, 0xA0, 0x05, 0xBE, 0x05, 0x50, 0x00, 0x00, 0x05, 0xA0, 0x05, 0x78, 0x00, 0x00, 0xCC, 0xE7])
  104. self.assertEqual(send_buf, self.client.get_write_command(device_id, values))
  105. recv_buf = bytes([0x01, 0x10, 0x90, 0x21, 0x00, 0x0A, 0x3D, 0x04])
  106. self.assertEqual(len(recv_buf) - 4, 4)
  107. results = self.client.parse(start_address, recv_buf)
  108. self.assertListEqual(results, [])
  109. if __name__ == "__main__":
  110. unittest.main()