import logging from pylogix import PLC from threading import Thread import time from datetime import datetime from structures.plant import * from structures.measurement import * from inputs.common import Input localtz = datetime.now().astimezone().tzinfo logger = logging.getLogger(__name__) class AllenBradleyCPU(Input): def __init__(self, host): super().__init__(self.read_handler) self.comm = PLC() self.comm.IPAddress = host self.interval = 0.02 self.cpu_state_tags = { "ejector_move_down": "B14[31]", "ejector_move_up": "B14[32]", "carriage_move_out": "B14[34]", "carriage_move_in": "B14[35]", "side_clamps_open": "B14[37]", "side_clamps_close": "B14[38]", "table_move_down": "B14[42]", "table_move_up": "B14[43]", "gassing_platemove_out": "B14[45]", "gassing_plate_move_in": "B14[46]", "cope_eject_plate_move_out": "B14[48]", "cope_eject_plate_move_in": "B14[49]", "top_part_move_up": "B14[51]", "top_part_move_down": "B14[52]", "front_door_open": "B14[54]", "front_door_close": "B14[55]", "pneumatic_loose_part_1_move_out": "B14[57]", "pneumatic_loose_part_1_move_in": "B14[58]", "hydraulic_loose_part_2_move_out": "B14[60]", "hydraulic_loose_part_2_move_in": "B14[61]", "hydraulic_loose_part_3_move_out": "B14[63]", "hydraulic_loose_part_3_move_in": "B14[64]", "clamping_device_side_clamp_left_clamp": "B14[69]", "clamping_device_side_clamp_left_loose": "B14[70]", "clamping_device_side_clamp_right_clamp": "B14[72]", "clamping_device_side_clamp_right_loose": "B14[73]", "clamping_device_shoot_plate_clamp": "B14[81]", "clamping_device_shoot_plate_loose": "B14[82]", "sand_refill": "B14[83]", "sand_gate_close": "B14[84]", "sand_gate_open": "B14[85]", "shoot": "B14[93]", "clamping_device_gassing_plate_clamp": "B14[103]", "central_amine_supply_refill": "B16[12]", "gassing": "B16[13]", "gas_generator_process_coldbox_betaset": "B16[15]", "mixer_lid_move_up": "B18[31]", "mixer_lid_move_down": "B18[32]", "mixer_wing_motor_on": "B18[35]", "mixer_move_up": "B18[37]", "mixer_move_down": "B18[38]", "sand_dosing_unit_inlet": "B18[40]", "sand_dosing_unit_outlet": "B18[42]", "binder_1_sucking": "B18[44]", "binder_1_blowing": "B18[45]", "binder_2_suction": "B18[47]", "binder_2_blowing": "B18[48]", "binder_3_sucking": "B18[50]", "binder_3_blowing": "B18[51]", "binder_4_sucking": "B18[53]", "binder_4_blowing": "B18[54]", "additive_1_dosing": "B18[59]", "additive_2_dosing": "B18[60]", "mixer_bowl_direction_eject_1_machine": "B18[62]", "mixer_bowl_direction_eject_2_scrap": "B18[63]", "cleaning_cylinder_move_up": "B18[65]", "cleaning_cylinder_move_down": "B18[66]", "mixer_sand_slide_move_to_machine": "B18[68]", "mixer_sand_slide_move_to_scrap": "B18[69]", "vertical_mixersand_slide_gateclose": "B18[71]", "vertical_mixer_sand_slide_gate_open": "B18[72]", "sand_sender": "B18[73]", } self.measurement_tags = { "24V": "B200", "480V": "B201" } self.tags = list(self.cpu_state_tags.values()) + list(self.measurement_tags.values()) def read_handler(self): timestamp = datetime.now(localtz) ret = self.comm.Read(self.tags) if ret[0].Status == "Success": cpu_values = {t: r.Value for t, r in zip(self.cpu_state_tags, ret)} self._q.put(CompactLogixState(timestamp, "AB", **cpu_values)) offset = self.cpu_state_tags.values() ifm_values_count = 22 data = [r.Value for r in ret[offset:offset+ifm_values_count]] channels = 16 self._q.put(Measurement24v(timestamp, "AB", current = tuple([x / 10 for x in data[0:channels]]), status = tuple([data[16] & (1 << i) > 0 for i in range(channels)]), overload = tuple([data[17] & (1 << i) > 0 for i in range(channels)]), short_circuit = tuple([data[18] & (1 << i) > 0 for i in range(channels)]), limit = tuple([data[19] & (1 << i) > 0 for i in range(channels)]), pushbutton = tuple([data[20] & (1 << i) > 0 for i in range(channels)]), voltage = data[22] / 100 )) offset += ifm_values_count data = [r.Value for r in ret[offset:offset+9]] self._q.put(Measurement480v(timestamp, "AB", voltage = tuple(data[0:3]), current = tuple(data[3:6]), phase = tuple(data[6:9]) )) else: logger.error("CPU read: " + ret[0].Status)