from warnings import catch_warnings import libptqs1005 import time import logging import influxdb_client import sys import signal NUM_READS = 10 DELAY_PER_READ = 1 DELAY_PER_MEASURE = 300 DB_TOKEN = 'sensor1234sensor' DB_ORG = 'sensor' g_exit = False COLUMN_MAP = { "temp" : "Temperature", "hum" : "Humidity", "pm10" : "PM1 (std)", "pm25" : "PM2.5 (std)", "pm100" : "PM10 (std)", "pm10_atm" : "PM10 (atmosphere)", "pm25_atm" : "PM2.5 (atmosphere)", "pm100_atm" : "PM10 (atmosphere)", "tvoc" : "TVOC", "tvoc_quan" : "TVOC quantity", "hcho" : "HCHO", "hcho_quan" : "HCHO quantity", "co2" : "CO2", "part3" : "0.3um particles", "part5" : "0.5um particles", "part10" : "1um particles", "part25" : "2.5um particles", "part50" : "5um particles", "part100" : "10um particles" } def signal_handler(): global g_exit logging.warn("SIGINT detected, exiting gracefully...") g_exit = True def current_ms(): return round(time.time() * 1000) def measure(drv : libptqs1005.PTQS1005Driver) -> libptqs1005.PTQS1005AC: all_data = [] for i in range(0, NUM_READS): all_data.append(drv.read()) logging.debug("Measurement " + str(i) + " out of " + str(NUM_READS) + ".") time.sleep(DELAY_PER_READ) ret = libptqs1005.PTQS1005AC(raw_resp = None) for attr in COLUMN_MAP: # average all measurements val = 0 for dat in all_data: val += getattr(dat, attr) val = val / NUM_READS setattr(ret, attr, val) return ret def data_to_line(data : libptqs1005.PTQS1005AC) -> str: ret = "air " for col in COLUMN_MAP: desc = COLUMN_MAP[col].replace(" ", "\\ ") attr = col ret = ret + desc + "=" + str(getattr(data, attr)) + "," ret = ret[:-1] return ret def main(): logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) logging.info("Setting up signal handler...") signal.signal(signal.SIGINT, signal_handler) logging.info("Setting up sensor driver...") drv = libptqs1005.PTQS1005Driver(tty="/dev/ttyAMA0", reset_pin=23) logging.info("Setting up InfluxDB connection...") dbconn = influxdb_client.InfluxDBClient(url="http://nas:8086",token="sensor1234sensor", org="sensor") logging.info("Begin monitoring...") next_ms = current_ms() while(True): if g_exit: break cur_ms = current_ms() if cur_ms >= next_ms: logging.info("Reading data from sensor. Current milliseconds: " + str(cur_ms)) try: data = measure(drv) except Exception as ex: logging.error("Failed to read data from sensor. Error: " + str(ex.args)) logging.info("Uploading data to database...") dbline = data_to_line(data) logging.debug("DB line: " + dbline) if not dbconn.write_api().write("sensor", "sensor", [dbline]): logging.error("Failed to update database.") #print("PM2.5: " + str(data.pm25) + " PM10: " + str(data.pm100) + " PM1: " + str(data.pm10) + " Temp: " + str(data.temp) + " Humidity: " + str(data.hum) + " CO2: " + str(data.co2) + " HCHO: " + str(data.hcho) + " TVOC: " + str(data.tvoc) ) next_ms = cur_ms + DELAY_PER_MEASURE * 1000 logging.debug("Next measurement point: " + str(next_ms)) else: logging.debug("Next measurement point hasn't arrived. Sleeping for 1s...") time.sleep(1) logging.info("Stopped monitoring...") main()