pyptqs1005/libptqs1005/__init__.py

115 lines
3.8 KiB
Python
Raw Permalink Normal View History

2022-01-21 21:03:29 +00:00
import serial
import time
import periphery
class PTQS1005AC:
RESP_LEN = 42
def __make_16bit_int(self, high, low) -> int:
return int(high) << 8 | int(low)
2022-01-21 21:50:48 +00:00
def __clear_val(self):
2022-01-21 21:03:29 +00:00
self.pm10 = 0
self.pm25 = 0
self.pm100 = 0
self.pm10_atm = 0
self.pm25_atm = 0
self.pm100_atm = 0
self.part3 = 0
self.part5 = 0
self.part10 = 0
self.part25 = 0
self.part50 = 0
self.part100 = 0
self.tvoc = 0
self.tvoc_quan = 0
self.hcho = 0
2022-01-21 21:52:20 +00:00
self.hcho_quan = 0
2022-01-21 21:03:29 +00:00
self.co2 = 0
self.temp = 0
self.hum = 0
def __init__(self, raw_resp : bytes):
2022-01-21 21:50:48 +00:00
if raw_resp == None:
self.__clear_val()
return
2022-01-21 21:03:29 +00:00
if len(raw_resp) != 42:
raise Exception("Invalid response length: " + str(len(raw_resp)))
if (raw_resp[0] != 0x42 or raw_resp[1] != 0x4d or raw_resp[2] != 0x00 or raw_resp[3] != 0x26):
raise Exception("Invalid magic header")
checksum = self.__make_16bit_int(raw_resp[40], raw_resp[41])
sum = 0
for i in range(0, 40):
sum = sum + int(raw_resp[i])
if sum != checksum:
raise Exception("Invalid checksum")
self.pm10 = self.__make_16bit_int(raw_resp[4], raw_resp[5])
self.pm25 = self.__make_16bit_int(raw_resp[6], raw_resp[7])
self.pm100 = self.__make_16bit_int(raw_resp[8], raw_resp[9])
self.pm10_atm = self.__make_16bit_int(raw_resp[10], raw_resp[11])
self.pm25_atm = self.__make_16bit_int(raw_resp[12], raw_resp[13])
self.pm100_atm = self.__make_16bit_int(raw_resp[14], raw_resp[15])
self.part3 = self.__make_16bit_int(raw_resp[16], raw_resp[17])
self.part5 = self.__make_16bit_int(raw_resp[18], raw_resp[19])
self.part10 = self.__make_16bit_int(raw_resp[20], raw_resp[21])
self.part25 = self.__make_16bit_int(raw_resp[22], raw_resp[23])
self.part50 = self.__make_16bit_int(raw_resp[24], raw_resp[25])
self.part100 = self.__make_16bit_int(raw_resp[26], raw_resp[27])
self.tvoc = self.__make_16bit_int(raw_resp[28], raw_resp[29]) / 100.0
self.tvoc_quan = int(raw_resp[30])
self.hcho = self.__make_16bit_int(raw_resp[31], raw_resp[32]) / 100.0
2022-01-21 21:52:20 +00:00
self.hcho_quan = int(raw_resp[33])
2022-01-21 21:03:29 +00:00
self.co2 = self.__make_16bit_int(raw_resp[34], raw_resp[35])
self.temp = self.__make_16bit_int(raw_resp[36], raw_resp[37]) / 10.0
self.hum = self.__make_16bit_int(raw_resp[38], raw_resp[39]) / 10.0
class PTQS1005Driver:
def __init__(self, tty : str, reset_pin : int = None):
# init serial port
self.ser = serial.Serial(tty, 9600, timeout = 1)
self.ser.flushInput()
self.ser.flushOutput()
# time.sleep(10)
if reset_pin != None:
2022-01-21 21:21:36 +00:00
self.reset_gpio = None
#self.reset_gpio = periphery.GPIO(reset_pin)
2022-01-21 21:03:29 +00:00
else:
self.reset_gpio = None
def __make_cmd(self, cmd : int, data : int) -> bytes :
arr = bytearray(7)
arr[0] = 0x42
arr[1] = 0x4d
arr[2] = cmd & 0xFF
arr[3] = (data & 0xFF00) >> 8
arr[4] = data & 0xFF
checksum = 0
for i in range(0,5):
checksum = checksum + int(arr[i])
arr[5] = (checksum & 0xFF00) >> 8
arr[6] = (checksum & 0xFF)
return bytes(arr)
def read(self) -> PTQS1005AC:
cmd = self.__make_cmd(0xac, 0)
self.ser.write(cmd)
self.ser.flush()
time.sleep(1e-3)
2022-01-21 21:43:34 +00:00
resp = self.ser.read(PTQS1005AC.RESP_LEN)
2022-01-21 21:03:29 +00:00
if(len(resp) != PTQS1005AC.RESP_LEN):
raise Exception("invalid response length")
return PTQS1005AC(resp)
def reset(self):
raise Exception("unimplemented yet!")