2022-01-21 21:03:29 +00:00
|
|
|
import serial
|
|
|
|
import time
|
|
|
|
import periphery
|
|
|
|
|
|
|
|
class PTQS1005AC:
|
|
|
|
RESP_LEN = 42
|
|
|
|
|
|
|
|
def __make_16bit_int(self, high, low) -> int:
|
|
|
|
return int(high) << 8 | int(low)
|
|
|
|
|
2022-01-21 21:50:48 +00:00
|
|
|
def __clear_val(self):
|
2022-01-21 21:03:29 +00:00
|
|
|
self.pm10 = 0
|
|
|
|
self.pm25 = 0
|
|
|
|
self.pm100 = 0
|
|
|
|
self.pm10_atm = 0
|
|
|
|
self.pm25_atm = 0
|
|
|
|
self.pm100_atm = 0
|
|
|
|
|
|
|
|
self.part3 = 0
|
|
|
|
self.part5 = 0
|
|
|
|
self.part10 = 0
|
|
|
|
self.part25 = 0
|
|
|
|
self.part50 = 0
|
|
|
|
self.part100 = 0
|
|
|
|
|
|
|
|
self.tvoc = 0
|
|
|
|
self.tvoc_quan = 0
|
|
|
|
self.hcho = 0
|
2022-01-21 21:52:20 +00:00
|
|
|
self.hcho_quan = 0
|
2022-01-21 21:03:29 +00:00
|
|
|
self.co2 = 0
|
|
|
|
self.temp = 0
|
|
|
|
self.hum = 0
|
|
|
|
|
|
|
|
def __init__(self, raw_resp : bytes):
|
2022-01-21 21:50:48 +00:00
|
|
|
if raw_resp == None:
|
|
|
|
self.__clear_val()
|
|
|
|
return
|
|
|
|
|
2022-01-21 21:03:29 +00:00
|
|
|
if len(raw_resp) != 42:
|
|
|
|
raise Exception("Invalid response length: " + str(len(raw_resp)))
|
|
|
|
|
|
|
|
if (raw_resp[0] != 0x42 or raw_resp[1] != 0x4d or raw_resp[2] != 0x00 or raw_resp[3] != 0x26):
|
|
|
|
raise Exception("Invalid magic header")
|
|
|
|
|
|
|
|
checksum = self.__make_16bit_int(raw_resp[40], raw_resp[41])
|
|
|
|
sum = 0
|
|
|
|
for i in range(0, 40):
|
|
|
|
sum = sum + int(raw_resp[i])
|
|
|
|
if sum != checksum:
|
|
|
|
raise Exception("Invalid checksum")
|
|
|
|
|
|
|
|
self.pm10 = self.__make_16bit_int(raw_resp[4], raw_resp[5])
|
|
|
|
self.pm25 = self.__make_16bit_int(raw_resp[6], raw_resp[7])
|
|
|
|
self.pm100 = self.__make_16bit_int(raw_resp[8], raw_resp[9])
|
|
|
|
self.pm10_atm = self.__make_16bit_int(raw_resp[10], raw_resp[11])
|
|
|
|
self.pm25_atm = self.__make_16bit_int(raw_resp[12], raw_resp[13])
|
|
|
|
self.pm100_atm = self.__make_16bit_int(raw_resp[14], raw_resp[15])
|
|
|
|
|
|
|
|
self.part3 = self.__make_16bit_int(raw_resp[16], raw_resp[17])
|
|
|
|
self.part5 = self.__make_16bit_int(raw_resp[18], raw_resp[19])
|
|
|
|
self.part10 = self.__make_16bit_int(raw_resp[20], raw_resp[21])
|
|
|
|
self.part25 = self.__make_16bit_int(raw_resp[22], raw_resp[23])
|
|
|
|
self.part50 = self.__make_16bit_int(raw_resp[24], raw_resp[25])
|
|
|
|
self.part100 = self.__make_16bit_int(raw_resp[26], raw_resp[27])
|
|
|
|
|
|
|
|
self.tvoc = self.__make_16bit_int(raw_resp[28], raw_resp[29]) / 100.0
|
|
|
|
self.tvoc_quan = int(raw_resp[30])
|
|
|
|
self.hcho = self.__make_16bit_int(raw_resp[31], raw_resp[32]) / 100.0
|
2022-01-21 21:52:20 +00:00
|
|
|
self.hcho_quan = int(raw_resp[33])
|
2022-01-21 21:03:29 +00:00
|
|
|
self.co2 = self.__make_16bit_int(raw_resp[34], raw_resp[35])
|
|
|
|
self.temp = self.__make_16bit_int(raw_resp[36], raw_resp[37]) / 10.0
|
|
|
|
self.hum = self.__make_16bit_int(raw_resp[38], raw_resp[39]) / 10.0
|
|
|
|
|
|
|
|
class PTQS1005Driver:
|
|
|
|
def __init__(self, tty : str, reset_pin : int = None):
|
|
|
|
# init serial port
|
|
|
|
self.ser = serial.Serial(tty, 9600, timeout = 1)
|
|
|
|
self.ser.flushInput()
|
|
|
|
self.ser.flushOutput()
|
|
|
|
|
|
|
|
# time.sleep(10)
|
|
|
|
if reset_pin != None:
|
2022-01-21 21:21:36 +00:00
|
|
|
self.reset_gpio = None
|
|
|
|
#self.reset_gpio = periphery.GPIO(reset_pin)
|
2022-01-21 21:03:29 +00:00
|
|
|
else:
|
|
|
|
self.reset_gpio = None
|
|
|
|
|
|
|
|
def __make_cmd(self, cmd : int, data : int) -> bytes :
|
|
|
|
arr = bytearray(7)
|
|
|
|
arr[0] = 0x42
|
|
|
|
arr[1] = 0x4d
|
|
|
|
arr[2] = cmd & 0xFF
|
|
|
|
arr[3] = (data & 0xFF00) >> 8
|
|
|
|
arr[4] = data & 0xFF
|
|
|
|
checksum = 0
|
|
|
|
for i in range(0,5):
|
|
|
|
checksum = checksum + int(arr[i])
|
|
|
|
arr[5] = (checksum & 0xFF00) >> 8
|
|
|
|
arr[6] = (checksum & 0xFF)
|
|
|
|
return bytes(arr)
|
|
|
|
|
|
|
|
|
|
|
|
def read(self) -> PTQS1005AC:
|
|
|
|
cmd = self.__make_cmd(0xac, 0)
|
|
|
|
self.ser.write(cmd)
|
|
|
|
self.ser.flush()
|
|
|
|
time.sleep(1e-3)
|
2022-01-21 21:43:34 +00:00
|
|
|
resp = self.ser.read(PTQS1005AC.RESP_LEN)
|
2022-01-21 21:03:29 +00:00
|
|
|
if(len(resp) != PTQS1005AC.RESP_LEN):
|
|
|
|
raise Exception("invalid response length")
|
|
|
|
return PTQS1005AC(resp)
|
|
|
|
|
|
|
|
def reset(self):
|
|
|
|
raise Exception("unimplemented yet!")
|