transaction_test.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. import unittest
  2. import sys
  3. sys.path.append("..")
  4. from src.variables import variables
  5. from src.protocol import LumiaxClient, Result
  6. class TestTransaction(unittest.TestCase):
  7. def setUp(self):
  8. self.client = LumiaxClient()
  9. def test_1(self):
  10. device_id = 0x01
  11. byte_count = 56
  12. count = int(byte_count / 2)
  13. start_address = 0x3011
  14. end_address = 0x302C
  15. self.assertEqual(end_address - start_address + 1, count)
  16. send_buf = bytes([0x01, 0x04, 0x30, 0x11, 0x00, 0x1C, 0xAE, 0xC6])
  17. self.assertEqual(send_buf, self.client.get_read_command(device_id, start_address, count))
  18. recv_buf = bytes([0x01, 0x04, 0x38, 0x41, 0x01, 0x13, 0xF7, 0x00, 0x0F, 0x00, 0x00, 0x04, 0x38, 0x04, 0xB0, 0x04, 0x60, 0x04, 0x74, 0x05, 0x00, 0x04,0xB0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x2C, 0x03, 0x20, 0x03, 0x20, 0x00, 0x00, 0x00,0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x3C, 0x00, 0x00, 0xB1, 0xB7])
  19. self.assertEqual(len(recv_buf) - 5, byte_count)
  20. results = self.client.parse(start_address, recv_buf)
  21. for result in results:
  22. self.assertIsNotNone(result.value, f"{result.name} ({hex(result.address)})")
  23. self.assertGreaterEqual(len(results), byte_count / 2)
  24. self.assertEqual(hex(results[0].address), hex(start_address))
  25. self.assertEqual(hex(results[-1].address), hex(end_address))
  26. def test_2(self):
  27. device_id = 0x01
  28. byte_count = 80
  29. count = int(byte_count / 2)
  30. start_address = 0x3030
  31. end_address = start_address + count - 1
  32. self.assertEqual(end_address - start_address + 1, count)
  33. send_buf = bytes([0x01, 0x04, 0x30, 0x30, 0x00, 0x28, 0xFF, 0x1B])
  34. self.assertEqual(send_buf, self.client.get_read_command(device_id, start_address, count))
  35. recv_buf = bytes([0x01, 0x04, 0x50, 0x00, 0x01, 0x00, 0x00, 0x09, 0x60, 0x00, 0x00, 0x00, 0x20, 0x00, 0x01, 0x09, 0xC4, 0x0B, 0x54, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x1F, 0x09, 0x24, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x09, 0x24, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x44, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x70, 0x04])
  36. self.assertEqual(len(recv_buf) - 5, byte_count)
  37. results = self.client.parse(start_address, recv_buf)
  38. for result in results:
  39. self.assertIsNotNone(result.value, f"{result.name} ({hex(result.address)})")
  40. self.assertGreaterEqual(len(results), byte_count / 2)
  41. self.assertEqual(hex(results[0].address), hex(start_address))
  42. self.assertEqual(results[-1].is_32_bit, True)
  43. self.assertEqual(hex(results[-1].address), hex(end_address-1))
  44. def test_3(self):
  45. device_id = 0x01
  46. byte_count = 2
  47. count = int(byte_count / 2)
  48. start_address = 0X3000
  49. end_address = start_address + count - 1
  50. self.assertEqual(end_address - start_address + 1, count)
  51. send_buf = bytes([0x01, 0x04, 0x30, 0x00, 0x00, 0x01, 0x3E, 0xCA])
  52. self.assertEqual(send_buf, self.client.get_read_command(device_id, start_address, count))
  53. recv_buf = bytes([0x01, 0x04, 0x02, 0x17, 0x70, 0xB7, 0x24])
  54. self.assertEqual(len(recv_buf) - 5, byte_count)
  55. results = self.client.parse(start_address, recv_buf)
  56. for result in results:
  57. self.assertIsNotNone(result.value, f"{result.name} ({hex(result.address)})")
  58. self.assertGreaterEqual(len(results), byte_count / 2)
  59. self.assertEqual(hex(results[0].address), hex(start_address))
  60. self.assertEqual(hex(results[-1].address), hex(end_address))
  61. self.assertEqual(results[0].value, 60.0)
  62. def test_4(self):
  63. device_id = 0x01
  64. byte_count = 58
  65. count = int(byte_count / 2)
  66. start_address = 0X8FF0
  67. end_address = start_address + count - 1
  68. self.assertEqual(end_address - start_address + 1, count)
  69. send_buf = bytes([0x01, 0x03, 0x8F, 0xF0, 0x00, 0x1D, 0xAF, 0x24])
  70. self.assertEqual(send_buf, self.client.get_read_command(device_id, start_address, count))
  71. recv_buf = bytes([0x01, 0x03, 0x3A, 0x41, 0x01, 0x13, 0xF7, 0x00, 0x0F, 0x00, 0x00, 0x04, 0x38, 0x04, 0xB0, 0x04, 0x60, 0x04, 0x74, 0x05, 0x00, 0x04, 0xB0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x2C, 0x03, 0x20, 0x03, 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x09, 0x60, 0x00, 0x00, 0x00, 0x00, 0x00, 0x3C, 0x00, 0x00, 0x7B, 0xB4])
  72. self.assertEqual(len(recv_buf) - 5, byte_count)
  73. results = self.client.parse(start_address, recv_buf)
  74. for result in results:
  75. self.assertIsNotNone(result.value, f"{result.name} ({hex(result.address)})")
  76. self.assertEqual(hex(results[0].address), hex(start_address))
  77. self.assertEqual(hex(results[-1].address), hex(end_address))
  78. def test_5(self):
  79. device_id = 0x01
  80. byte_count = 20
  81. count = int(byte_count / 2)
  82. start_address = 0x9017
  83. end_address = start_address + count - 1
  84. self.assertEqual(end_address - start_address + 1, count)
  85. send_buf = bytes([0x01, 0x03, 0x90, 0x17, 0x00, 0x0A, 0x58, 0xC9])
  86. self.assertEqual(send_buf, self.client.get_read_command(device_id, start_address, count))
  87. recv_buf = bytes([0x01, 0x03, 0x14, 0x00, 0x34, 0x00, 0x24, 0x00, 0x00, 0x00, 0x01, 0x00, 0x01, 0x00, 0x12, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x45, 0x85])
  88. self.assertEqual(len(recv_buf) - 5, byte_count)
  89. results = self.client.parse(start_address, recv_buf)
  90. for result in results:
  91. self.assertIsNotNone(result.value, f"{result.name} ({hex(result.address)})")
  92. self.assertGreaterEqual(len(results), byte_count / 2)
  93. self.assertEqual(hex(results[0].address), hex(start_address))
  94. self.assertEqual(hex(results[-1].address), hex(end_address))
  95. def test_6(self):
  96. device_id = 0x01
  97. byte_count = 20
  98. count = int(byte_count / 2)
  99. start_address = 0x9021
  100. end_address = start_address + count - 1
  101. self.assertEqual(end_address - start_address + 1, count)
  102. items = [v for v in variables if v.address >= start_address and v.address <= end_address]
  103. values = ["Lithium", 10.6, 11.8, 14.4, 14.7, 13.6, "Auto", 14.4, 14.0, "Normal charging"]
  104. results = [Result(**vars(variable), value=value) for variable, value in (zip(items, values))]
  105. send_buf = bytes([0x01, 0x10, 0x90, 0x21, 0x00, 0x0A, 0x14, 0x00, 0x00, 0x04, 0x24, 0x04, 0x9C, 0x05, 0xA0, 0x05, 0xBE, 0x05, 0x50, 0x00, 0x00, 0x05, 0xA0, 0x05, 0x78, 0x00, 0x00, 0xCC, 0xE7])
  106. start_address, data = self.client.get_write_command(device_id, results)
  107. self.assertEqual(send_buf, data)
  108. recv_buf = bytes([0x01, 0x10, 0x90, 0x21, 0x00, 0x0A, 0x3D, 0x04])
  109. self.assertEqual(len(recv_buf) - 4, 4)
  110. results = self.client.parse(start_address, recv_buf)
  111. self.assertListEqual(list(results), [])
  112. if __name__ == "__main__":
  113. unittest.main()