csvFile.py 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. import csv
  2. import os
  3. from datetime import datetime
  4. import dataclasses
  5. from zipfile import ZipFile
  6. from structures.measurement import Measurement24v
  7. class CSVFile:
  8. path = "logs"
  9. file = None
  10. filename = None
  11. row_count = 0
  12. def __init__(self) -> None:
  13. if not os.path.exists(self.path):
  14. os.mkdir(self.path)
  15. self.new_file()
  16. def new_file(self):
  17. if self.file:
  18. self.file.close()
  19. with ZipFile(self.filename + ".zip", 'w') as zf:
  20. zf.write(self.filename, os.path.basename(self.filename))
  21. os.remove(self.filename)
  22. self.filename = os.path.join(self.path, F"{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.csv")
  23. self.file = open(self.filename, "w", newline='')
  24. self.writer = csv.writer(self.file, delimiter=',')
  25. def write(self, values: list):
  26. try:
  27. meas: Measurement24v
  28. for i, meas in enumerate(values):
  29. row = dataclass_to_dict(meas)
  30. if self.row_count == 0:
  31. self.writer.writerow(row)
  32. self.writer.writerow(row.values())
  33. self.row_count += 1
  34. self.file.flush()
  35. if self.row_count > 50000:
  36. self.new_file()
  37. self.row_count = 0
  38. except Exception as ex:
  39. print("CSV write failed", ex)
  40. def dataclass_to_dict(dc):
  41. ret = {}
  42. for field in dataclasses.fields(dc):
  43. value = getattr(dc, field.name)
  44. if not type(value) is tuple:
  45. ret[field.name] = value
  46. else:
  47. for i, v in enumerate(value):
  48. ret[F"{field.name}_{i}"] = v
  49. return ret