123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121 |
- import logging
- from pylogix import PLC
- from threading import Thread
- import time
- from datetime import datetime
- from structures.plant import *
- from structures.measurement import *
- from inputs.common import Input
- localtz = datetime.now().astimezone().tzinfo
- logger = logging.getLogger(__name__)
- class AllenBradleyCPU(Input):
-
- def __init__(self, host):
- super().__init__(self.read_handler)
- self.comm = PLC()
- self.comm.IPAddress = host
- self.interval = 0.02
- self.cpu_state_tags = {
- "ejector_move_down": "B14[31]",
- "ejector_move_up": "B14[32]",
- "carriage_move_out": "B14[34]",
- "carriage_move_in": "B14[35]",
- "side_clamps_open": "B14[37]",
- "side_clamps_close": "B14[38]",
- "table_move_down": "B14[42]",
- "table_move_up": "B14[43]",
- "gassing_platemove_out": "B14[45]",
- "gassing_plate_move_in": "B14[46]",
- "cope_eject_plate_move_out": "B14[48]",
- "cope_eject_plate_move_in": "B14[49]",
- "top_part_move_up": "B14[51]",
- "top_part_move_down": "B14[52]",
- "front_door_open": "B14[54]",
- "front_door_close": "B14[55]",
- "pneumatic_loose_part_1_move_out": "B14[57]",
- "pneumatic_loose_part_1_move_in": "B14[58]",
- "hydraulic_loose_part_2_move_out": "B14[60]",
- "hydraulic_loose_part_2_move_in": "B14[61]",
- "hydraulic_loose_part_3_move_out": "B14[63]",
- "hydraulic_loose_part_3_move_in": "B14[64]",
- "clamping_device_side_clamp_left_clamp": "B14[69]",
- "clamping_device_side_clamp_left_loose": "B14[70]",
- "clamping_device_side_clamp_right_clamp": "B14[72]",
- "clamping_device_side_clamp_right_loose": "B14[73]",
- "clamping_device_shoot_plate_clamp": "B14[81]",
- "clamping_device_shoot_plate_loose": "B14[82]",
- "sand_refill": "B14[83]",
- "sand_gate_close": "B14[84]",
- "sand_gate_open": "B14[85]",
- "shoot": "B14[93]",
- "clamping_device_gassing_plate_clamp": "B14[103]",
- "central_amine_supply_refill": "B16[12]",
- "gassing": "B16[13]",
- "gas_generator_process_coldbox_betaset": "B16[15]",
- "mixer_lid_move_up": "B18[31]",
- "mixer_lid_move_down": "B18[32]",
- "mixer_wing_motor_on": "B18[35]",
- "mixer_move_up": "B18[37]",
- "mixer_move_down": "B18[38]",
- "sand_dosing_unit_inlet": "B18[40]",
- "sand_dosing_unit_outlet": "B18[42]",
- "binder_1_sucking": "B18[44]",
- "binder_1_blowing": "B18[45]",
- "binder_2_suction": "B18[47]",
- "binder_2_blowing": "B18[48]",
- "binder_3_sucking": "B18[50]",
- "binder_3_blowing": "B18[51]",
- "binder_4_sucking": "B18[53]",
- "binder_4_blowing": "B18[54]",
- "additive_1_dosing": "B18[59]",
- "additive_2_dosing": "B18[60]",
- "mixer_bowl_direction_eject_1_machine": "B18[62]",
- "mixer_bowl_direction_eject_2_scrap": "B18[63]",
- "cleaning_cylinder_move_up": "B18[65]",
- "cleaning_cylinder_move_down": "B18[66]",
- "mixer_sand_slide_move_to_machine": "B18[68]",
- "mixer_sand_slide_move_to_scrap": "B18[69]",
- "vertical_mixersand_slide_gateclose": "B18[71]",
- "vertical_mixer_sand_slide_gate_open": "B18[72]",
- "sand_sender": "B18[73]",
- }
- self.measurement_tags = {
- "24V": "B200",
- "480V": "B201"
- }
- self.tags = list(self.cpu_state_tags.values()) + list(self.measurement_tags.values())
-
- def read_handler(self):
- timestamp = datetime.now(localtz)
- ret = self.comm.Read(self.tags)
- if ret[0].Status == "Success":
- cpu_values = {t: r.Value for t, r in zip(self.cpu_state_tags, ret)}
- self._q.put(CompactLogixState(timestamp, "AB", **cpu_values))
-
- offset = self.cpu_state_tags.values()
- ifm_values_count = 22
- data = [r.Value for r in ret[offset:offset+ifm_values_count]]
- channels = 16
- self._q.put(Measurement24v(timestamp, "AB",
- current = tuple([x / 10 for x in data[0:channels]]),
- status = tuple([data[16] & (1 << i) > 0 for i in range(channels)]),
- overload = tuple([data[17] & (1 << i) > 0 for i in range(channels)]),
- short_circuit = tuple([data[18] & (1 << i) > 0 for i in range(channels)]),
- limit = tuple([data[19] & (1 << i) > 0 for i in range(channels)]),
- pushbutton = tuple([data[20] & (1 << i) > 0 for i in range(channels)]),
- voltage = data[22] / 100
- ))
- offset += ifm_values_count
- data = [r.Value for r in ret[offset:offset+9]]
- self._q.put(Measurement480v(timestamp, "AB",
- voltage = tuple(data[0:3]),
- current = tuple(data[3:6]),
- phase = tuple(data[6:9])
- ))
- else:
- logger.error("CPU read: " + ret[0].Status)
|