import time import serial import serial.tools.list_ports from flask import Flask, request import requests from threading import Thread app = Flask(__name__) # 火车控制变量 ser = None #串口打开 def openSer(): ser = serial.Serial("COM8", 9600, timeout=0.5) # 字节大小 ser.bytesize = 8 # 无校验 ser.parity = serial.PARITY_NONE # 停止位 ser.stopbits = 1 print("已连接端口:" + str(ser.name) + "\n") return ser # 读取串口消息 def recv(serial): while True: data = serial.read_all() if data == '': continue else: break sleep(0.02) return data step = 0 # 左右转向的指令 TO_L = b'\xc0\xc1\xed\xa5' TO_R = b'\xc0\xc2\xed\xa5' def start(ser): print("火车上报监听开启") while True: data = recv(ser) print("火车上报位置信息为 : ", data) if data == b'\xc0\x1c\xed\xc0\xb1\xed' or data == b'\xc0\xb1\xed': print("火车在左上报") res = requests.get("http://127.0.0.1:8080/api/train/updateTrainLocal?local=l") print(res.text) if data == b'\xc0,\xed\xc0\xb2\xed' or data == b'\xc0\xb2\xed': print("火车在右上报") res = requests.get("http://127.0.0.1:8080/api/train/updateTrainLocal?local=r") print(res.text) if data == b'\xc0\xb1\xedZ\xa5': res = requests.get("http://127.0.0.1:8080/api/train/updateTrainLocal?local=l") print(res.text) print("火车复位上报") time.sleep(0.1) #执行运动 @app.route('/train/') def moving(action): print(action) global ser if action == "2l": # ser = openSer() ser.write(TO_L) if action == "2r": # ser = openSer() ser.write(TO_R) return "success" @app.route('/start') def startListing(): global ser ser = openSer() t = Thread(target=start, args=(ser,)) t.start() return "success" if __name__ == '__main__': app.run(host='0.0.0.0', port=9000, debug=True)