You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
237 lines
8.0 KiB
237 lines
8.0 KiB
import serial
|
|
import time
|
|
import threading
|
|
import serial.tools.list_ports
|
|
|
|
info_distance = {'count': 0,
|
|
'forward_dist': 0,
|
|
'right_dist': 0,
|
|
'back_dist': 0,
|
|
'left_dist': 0,
|
|
'light': 0}
|
|
|
|
info_sensor = {'count': 0,
|
|
'temp': 0,
|
|
'pressure': 0,
|
|
'humidity': 0,
|
|
'TVOC': 0,
|
|
'CO2': 0,
|
|
'LIGHT': 0}
|
|
|
|
|
|
class SerialTread (threading.Thread):
|
|
c = 0
|
|
|
|
def __init__(self, name, counter, module):
|
|
global class_module
|
|
threading.Thread.__init__(self)
|
|
self.threadID = counter
|
|
self.name = name
|
|
self.counter = counter
|
|
class_module = module
|
|
|
|
def run(self):
|
|
while self.c == 0:
|
|
if class_module == 'distance':
|
|
line = str(ser.readline())[4:-3].split()
|
|
info_distance['count'] = int(line[0][2:])
|
|
info_distance['forward_dist'] = int(line[1][6:])
|
|
info_distance['right_dist'] = int(line[2][6:])
|
|
info_distance['back_dist'] = int(line[3][6:])
|
|
info_distance['left_dist'] = int(line[4][6:])
|
|
info_distance['light'] = int(line[5][6:])
|
|
|
|
if class_module =='sensor':
|
|
line = str(ser.readline())[4:-3].split()
|
|
info_sensor['count'] = int(line[0][2:])
|
|
info_sensor['temp'] = float(line[1][2:])
|
|
info_sensor['pressure'] = float(line[2][2:])
|
|
info_sensor['humidity'] = float(line[3][2:])
|
|
info_sensor['TVOC'] = int(line[4][5:])
|
|
info_sensor['CO2'] = int(line[5][4:])
|
|
info_sensor['light'] = int(line[6][6:])
|
|
|
|
def join(self):
|
|
self.c = 1
|
|
|
|
|
|
class TelloModule():
|
|
|
|
def __init__(self, module=''):
|
|
global serial_port
|
|
global type_module
|
|
|
|
if module != 'distance' and module != 'sensor':
|
|
print("Error: Please select \'distance\' or \'sensor\' module.")
|
|
exit()
|
|
|
|
serial_port = ""
|
|
type_module = module
|
|
|
|
ports = serial.tools.list_ports.comports()
|
|
for p in ports:
|
|
if p.manufacturer == 'SBRICKS':
|
|
serial_port = p.device
|
|
if serial_port == "":
|
|
print(
|
|
"Error: SMART BRICKS receiver not found, please insert it into your computer's USB port.")
|
|
exit()
|
|
|
|
line = None
|
|
ser = serial.Serial(serial_port, 115200, timeout=2)
|
|
line = ser.readline()
|
|
ser.close()
|
|
if line == b'':
|
|
print("Error: module \'" + module +
|
|
"\' not found, please enable it.")
|
|
exit()
|
|
|
|
def module_init(self):
|
|
global t
|
|
global ser
|
|
|
|
t = SerialTread("Thread", 1, type_module)
|
|
ser = serial.Serial(serial_port, 115200)
|
|
ser.readline()
|
|
|
|
if type_module == 'distance' or type_module == 'sensor':
|
|
t.start()
|
|
time.sleep(1)
|
|
else:
|
|
print("Error: There is NO module with this name, please select \"distance\" or \"sensor\".")
|
|
exit()
|
|
|
|
def module_deinit(self):
|
|
if type_module == 'distance' or type_module == 'sensor':
|
|
t.join()
|
|
type_module == ''
|
|
else:
|
|
print("Error: First you need to initialize the module.")
|
|
|
|
# function for distance:
|
|
|
|
def get_forward_dist(self):
|
|
return info_distance['forward_dist']
|
|
|
|
def get_left_dist(self):
|
|
return info_distance['left_dist']
|
|
|
|
def get_right_dist(self):
|
|
return info_distance['right_dist']
|
|
|
|
def get_back_dist(self):
|
|
return info_distance['back_dist']
|
|
|
|
def get_light(self):
|
|
if type_module == 'distance':
|
|
return info_distance['light']
|
|
if type_module == 'sensor':
|
|
return info_sensor['light']
|
|
|
|
def get_count(self):
|
|
if type_module == 'distance':
|
|
return info_distance['count']
|
|
if type_module == 'sensor':
|
|
return info_sensor['count']
|
|
|
|
def get_dist(self):
|
|
return [info_distance['forward_dist'], info_distance['right_dist'], info_distance['left_dist'], info_distance['back_dist']]
|
|
|
|
# function for sensor:
|
|
def get_temp(self):
|
|
return info_sensor['temp']
|
|
|
|
def get_pressure(self):
|
|
return info_sensor['pressure']
|
|
|
|
def get_humidity(self):
|
|
return info_sensor['humidity']
|
|
|
|
def get_TVOC(self):
|
|
return info_sensor['TVOC']
|
|
|
|
def get_CO2(self):
|
|
return info_sensor['CO2']
|
|
|
|
def get_block_dist(self): # count + forward + right + back + left + light
|
|
ser = serial.Serial(serial_port, 115200)
|
|
line = str(ser.readline())[4:-3].split()
|
|
ser.close()
|
|
if line[0][3] == "r":
|
|
return [int(line[0][6:]), int(line[1][6:]), int(line[2][6:]), int(line[3][6:]), int(line[4][6:]), int(line[5][6:])]
|
|
else:
|
|
return [int(line[0]), int(line[1][6:]), int(line[2][6:]), int(line[3][6:]), int(line[4][6:]), int(line[5][6:])]
|
|
|
|
def get_block_sensor(self): # count + temp + pressure + humidity + TVOC + CO2 + LIGHT
|
|
ser = serial.Serial(serial_port, 115200)
|
|
line = str(ser.readline())[4:-3].split()
|
|
ser.close()
|
|
if line[0][3] == "r":
|
|
return [int(line[0][6:]), float(line[1][2:]), float(line[2][2:]), float(line[3][2:]), int(line[4][5:]), int(line[5][4:]), int(line[6][6:])]
|
|
else:
|
|
return [int(line[0]), float(line[1][2:]), float(line[2][2:]), float(line[3][2:]), int(line[4][5:]), int(line[5][4:]), int(line[6][6:])]
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
# Select the module under test: distance OR sensor
|
|
module = 'sensor'
|
|
|
|
mod = TelloModule(module)
|
|
if module == 'distance':
|
|
print("\n -------- Test blocking mode --------- \n")
|
|
for i in range(10):
|
|
line = mod.get_block_dist()
|
|
print("Test " + str(i) +
|
|
" count = " + str(line[0]) +
|
|
" forward = " + str(line[1]) +
|
|
" right = " + str(line[2]) +
|
|
" back = " + str(line[3]) +
|
|
" left = " + str(line[4]) +
|
|
" light = " + str(line[5]))
|
|
|
|
print("\n ------ Test NON Blocking mode ------- \n")
|
|
mod.module_init()
|
|
for i in range(10):
|
|
print("Test " + str(i) +
|
|
" count = " + str(mod.get_count()) +
|
|
" forward = " + str(mod.get_forward_dist()) +
|
|
" right = " + str(mod.get_right_dist()) +
|
|
" back = " + str(mod.get_back_dist()) +
|
|
" left = " + str(mod.get_left_dist()) +
|
|
" light = " + str(mod.get_light()))
|
|
time.sleep(0.5)
|
|
mod.module_deinit()
|
|
|
|
print("\n ------------ Finish Test ------------ \n")
|
|
|
|
elif module == 'sensor':
|
|
print("\n -------- Test blocking mode --------- \n")
|
|
for i in range(10):
|
|
line = mod.get_block_sensor()
|
|
print("Test " + str(i) +
|
|
" count = " + str(line[0]) +
|
|
" temp = " + str(line[1]) +
|
|
" pressure = " + str(line[2]) +
|
|
" humidity = " + str(line[3]) +
|
|
" TVOC = " + str(line[4]) +
|
|
" CO2 = " + str(line[5]) +
|
|
" light = " + str(line[6]))
|
|
|
|
print("\n ------ Test NON Blocking mode ------- \n")
|
|
mod.module_init()
|
|
for i in range(10):
|
|
print("Test " + str(i) +
|
|
" count = " + str(mod.get_count()) +
|
|
" temp = " + str(mod.get_temp()) +
|
|
" pressure = " + str(mod.get_pressure()) +
|
|
" humidity = " + str(mod.get_humidity()) +
|
|
" TVOC = " + str(mod.get_TVOC()) +
|
|
" CO2 = " + str(mod.get_CO2()) +
|
|
" light = " + str(mod.get_light()))
|
|
time.sleep(0.5)
|
|
mod.module_deinit()
|
|
|
|
print("\n ------------ Finish Test ------------ \n")
|
|
else:
|
|
print("ERROR: Select the module under test: distance OR sensor.")
|
|
|