import serial import time import threading import serial.tools.list_ports info_distance = {'count': 0, 'forward_dist': 0, 'right_dist': 0, 'back_dist': 0, 'left_dist': 0, 'light': 0} info_sensor = {'count': 0, 'temp': 0, 'pressure': 0, 'humidity': 0, 'TVOC': 0, 'CO2': 0, 'LIGHT': 0} class SerialTread (threading.Thread): c = 0 def __init__(self, name, counter, module): global class_module threading.Thread.__init__(self) self.threadID = counter self.name = name self.counter = counter class_module = module def run(self): while self.c == 0: if class_module == 'distance': line = str(ser.readline())[4:-3].split() info_distance['count'] = int(line[0][2:]) info_distance['forward_dist'] = int(line[1][6:]) info_distance['right_dist'] = int(line[2][6:]) info_distance['back_dist'] = int(line[3][6:]) info_distance['left_dist'] = int(line[4][6:]) info_distance['light'] = int(line[5][6:]) if class_module =='sensor': line = str(ser.readline())[4:-3].split() info_sensor['count'] = int(line[0][2:]) info_sensor['temp'] = float(line[1][2:]) info_sensor['pressure'] = float(line[2][2:]) info_sensor['humidity'] = float(line[3][2:]) info_sensor['TVOC'] = int(line[4][5:]) info_sensor['CO2'] = int(line[5][4:]) info_sensor['light'] = int(line[6][6:]) def join(self): self.c = 1 class TelloModule(): def __init__(self, module=''): global serial_port global type_module if module != 'distance' and module != 'sensor': print("Error: Please select \'distance\' or \'sensor\' module.") exit() serial_port = "" type_module = module ports = serial.tools.list_ports.comports() for p in ports: if p.manufacturer == 'SBRICKS': serial_port = p.device if serial_port == "": print( "Error: SMART BRICKS receiver not found, please insert it into your computer's USB port.") exit() line = None ser = serial.Serial(serial_port, 115200, timeout=2) line = ser.readline() ser.close() if line == b'': print("Error: module \'" + module + "\' not found, please enable it.") exit() def module_init(self): global t global ser t = SerialTread("Thread", 1, type_module) ser = serial.Serial(serial_port, 115200) ser.readline() if type_module == 'distance' or type_module == 'sensor': t.start() time.sleep(1) else: print("Error: There is NO module with this name, please select \"distance\" or \"sensor\".") exit() def module_deinit(self): if type_module == 'distance' or type_module == 'sensor': t.join() type_module == '' else: print("Error: First you need to initialize the module.") # function for distance: def get_forward_dist(self): return info_distance['forward_dist'] def get_left_dist(self): return info_distance['left_dist'] def get_right_dist(self): return info_distance['right_dist'] def get_back_dist(self): return info_distance['back_dist'] def get_light(self): if type_module == 'distance': return info_distance['light'] if type_module == 'sensor': return info_sensor['light'] def get_count(self): if type_module == 'distance': return info_distance['count'] if type_module == 'sensor': return info_sensor['count'] def get_dist(self): return [info_distance['forward_dist'], info_distance['right_dist'], info_distance['left_dist'], info_distance['back_dist']] # function for sensor: def get_temp(self): return info_sensor['temp'] def get_pressure(self): return info_sensor['pressure'] def get_humidity(self): return info_sensor['humidity'] def get_TVOC(self): return info_sensor['TVOC'] def get_CO2(self): return info_sensor['CO2'] def get_block_dist(self): # count + forward + right + back + left + light ser = serial.Serial(serial_port, 115200) line = str(ser.readline())[4:-3].split() ser.close() if line[0][3] == "r": return [int(line[0][6:]), int(line[1][6:]), int(line[2][6:]), int(line[3][6:]), int(line[4][6:]), int(line[5][6:])] else: return [int(line[0]), int(line[1][6:]), int(line[2][6:]), int(line[3][6:]), int(line[4][6:]), int(line[5][6:])] def get_block_sensor(self): # count + temp + pressure + humidity + TVOC + CO2 + LIGHT ser = serial.Serial(serial_port, 115200) line = str(ser.readline())[4:-3].split() ser.close() if line[0][3] == "r": return [int(line[0][6:]), float(line[1][2:]), float(line[2][2:]), float(line[3][2:]), int(line[4][5:]), int(line[5][4:]), int(line[6][6:])] else: return [int(line[0]), float(line[1][2:]), float(line[2][2:]), float(line[3][2:]), int(line[4][5:]), int(line[5][4:]), int(line[6][6:])] if __name__ == "__main__": # Select the module under test: distance OR sensor module = 'sensor' mod = TelloModule(module) if module == 'distance': print("\n -------- Test blocking mode --------- \n") for i in range(10): line = mod.get_block_dist() print("Test " + str(i) + " count = " + str(line[0]) + " forward = " + str(line[1]) + " right = " + str(line[2]) + " back = " + str(line[3]) + " left = " + str(line[4]) + " light = " + str(line[5])) print("\n ------ Test NON Blocking mode ------- \n") mod.module_init() for i in range(10): print("Test " + str(i) + " count = " + str(mod.get_count()) + " forward = " + str(mod.get_forward_dist()) + " right = " + str(mod.get_right_dist()) + " back = " + str(mod.get_back_dist()) + " left = " + str(mod.get_left_dist()) + " light = " + str(mod.get_light())) time.sleep(0.5) mod.module_deinit() print("\n ------------ Finish Test ------------ \n") elif module == 'sensor': print("\n -------- Test blocking mode --------- \n") for i in range(10): line = mod.get_block_sensor() print("Test " + str(i) + " count = " + str(line[0]) + " temp = " + str(line[1]) + " pressure = " + str(line[2]) + " humidity = " + str(line[3]) + " TVOC = " + str(line[4]) + " CO2 = " + str(line[5]) + " light = " + str(line[6])) print("\n ------ Test NON Blocking mode ------- \n") mod.module_init() for i in range(10): print("Test " + str(i) + " count = " + str(mod.get_count()) + " temp = " + str(mod.get_temp()) + " pressure = " + str(mod.get_pressure()) + " humidity = " + str(mod.get_humidity()) + " TVOC = " + str(mod.get_TVOC()) + " CO2 = " + str(mod.get_CO2()) + " light = " + str(mod.get_light())) time.sleep(0.5) mod.module_deinit() print("\n ------------ Finish Test ------------ \n") else: print("ERROR: Select the module under test: distance OR sensor.")