csvFile.py 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. import csv
  2. import os
  3. from datetime import datetime
  4. import dataclasses
  5. import zipfile
  6. import logging
  7. class CSVStorage:
  8. files = {}
  9. def __init__(self, path) -> None:
  10. self.path = path
  11. self.zipname = os.path.join(self.path, F"logs_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.zip")
  12. def write(self, values: list):
  13. try:
  14. for meas in values:
  15. if not meas.series in self.files:
  16. self.files[meas.series] = CSVFile(self.path, meas.series, self.zipname)
  17. self.files[meas.series].write(meas)
  18. except Exception as ex:
  19. logging.exception("CSV write failed")
  20. class CSVFile:
  21. file = None
  22. filename = None
  23. row_count = 0
  24. def __init__(self, path, series, zipname) -> None:
  25. self.path = path
  26. self.series = series
  27. if not os.path.exists(self.path):
  28. os.mkdir(self.path)
  29. self.zipname = zipname
  30. self.new_file()
  31. def new_file(self):
  32. if self.file:
  33. self.file.close()
  34. with zipfile.ZipFile(self.zipname, 'a', compression=zipfile.ZIP_BZIP2, compresslevel=9) as zf:
  35. zf.write(self.filename, os.path.basename(self.filename))
  36. os.remove(self.filename)
  37. self.filename = os.path.join(self.path, F"{self.series}_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.csv")
  38. self.file = open(self.filename, "w", newline='')
  39. self.writer = csv.writer(self.file, delimiter=',')
  40. def write(self, meas):
  41. row = dataclass_to_dict(meas)
  42. if self.row_count == 0:
  43. self.writer.writerow(row)
  44. self.writer.writerow(row.values())
  45. self.row_count += 1
  46. if self.row_count % 1000 == 0:
  47. self.file.flush()
  48. if self.row_count > 50000:
  49. self.new_file()
  50. self.row_count = 0
  51. def dataclass_to_dict(dc):
  52. ret = {}
  53. for field in dataclasses.fields(dc):
  54. value = getattr(dc, field.name)
  55. if not type(value) is tuple:
  56. ret[field.name] = value
  57. else:
  58. for i, v in enumerate(value):
  59. ret[F"{field.name}_{i}"] = v
  60. return ret