csv_file.py 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. import csv
  2. import os
  3. from datetime import datetime
  4. import dataclasses
  5. import zipfile
  6. import logging
  7. logger = logging.getLogger(__name__)
  8. class CSVStorage:
  9. files = {}
  10. def __init__(self, path) -> None:
  11. self.path = path
  12. self.zipname = os.path.join(self.path, F"logs_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.zip")
  13. def write(self, values: list):
  14. try:
  15. for meas in values:
  16. if not meas.series in self.files:
  17. self.files[meas.series] = CSVFile(self.path, meas.series, self.zipname)
  18. self.files[meas.series].write(meas)
  19. except Exception as ex:
  20. logger.exception("CSV write failed")
  21. class CSVFile:
  22. file = None
  23. filename = None
  24. row_count = 0
  25. def __init__(self, path, series, zipname) -> None:
  26. self.path = path
  27. self.series = series
  28. if not os.path.exists(self.path):
  29. os.mkdir(self.path)
  30. self.zipname = zipname
  31. self.new_file()
  32. def new_file(self):
  33. if self.file:
  34. self.file.close()
  35. with zipfile.ZipFile(self.zipname, 'a', compression=zipfile.ZIP_BZIP2, compresslevel=9) as zf:
  36. zf.write(self.filename, os.path.basename(self.filename))
  37. os.remove(self.filename)
  38. self.filename = os.path.join(self.path, F"{self.series}_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.csv")
  39. self.file = open(self.filename, "w", newline='')
  40. self.writer = csv.writer(self.file, delimiter=',')
  41. def write(self, meas):
  42. row = dataclass_to_dict(meas)
  43. if self.row_count == 0:
  44. self.writer.writerow(row)
  45. self.writer.writerow(row.values())
  46. self.row_count += 1
  47. if self.row_count % 1000 == 0:
  48. self.file.flush()
  49. if self.row_count > 50000:
  50. self.new_file()
  51. self.row_count = 0
  52. def dataclass_to_dict(dc):
  53. ret = {}
  54. for field in dataclasses.fields(dc):
  55. value = getattr(dc, field.name)
  56. if not type(value) is tuple:
  57. ret[field.name] = value
  58. else:
  59. for i, v in enumerate(value):
  60. ret[F"{field.name}_{i}"] = v
  61. return ret