| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091 |
- import time
- import serial
- import serial.tools.list_ports
- from flask import Flask, request
- import requests
- from threading import Thread
- app = Flask(__name__)
- # 火车控制变量
- ser = None
- #串口打开
- def openSer():
- ser = serial.Serial("COM8", 9600, timeout=0.5)
- # 字节大小
- ser.bytesize = 8
- # 无校验
- ser.parity = serial.PARITY_NONE
- # 停止位
- ser.stopbits = 1
- print("已连接端口:" + str(ser.name) + "\n")
- return ser
- # 读取串口消息
- def recv(serial):
- while True:
- data = serial.read_all()
- if data == '':
- continue
- else:
- break
- sleep(0.02)
- return data
- step = 0
- # 左右转向的指令
- TO_L = b'\xc0\xc1\xed\xa5'
- TO_R = b'\xc0\xc2\xed\xa5'
- def start(ser):
- print("火车上报监听开启")
- while True:
- data = recv(ser)
- print("火车上报位置信息为 : ", data)
- if data == b'\xc0\x1c\xed\xc0\xb1\xed' or data == b'\xc0\xb1\xed':
- print("火车在左上报")
- res = requests.get("http://127.0.0.1:8080/api/train/updateTrainLocal?local=l")
- print(res.text)
- if data == b'\xc0,\xed\xc0\xb2\xed' or data == b'\xc0\xb2\xed':
- print("火车在右上报")
- res = requests.get("http://127.0.0.1:8080/api/train/updateTrainLocal?local=r")
- print(res.text)
- if data == b'\xc0\xb1\xedZ\xa5':
- res = requests.get("http://127.0.0.1:8080/api/train/updateTrainLocal?local=l")
- print(res.text)
- print("火车复位上报")
- time.sleep(0.1)
- #执行运动
- @app.route('/train/<action>')
- def moving(action):
- print(action)
- global ser
- if action == "2l":
- # ser = openSer()
- ser.write(TO_L)
- if action == "2r":
- # ser = openSer()
- ser.write(TO_R)
- return "success"
- @app.route('/start')
- def startListing():
- global ser
- ser = openSer()
- t = Thread(target=start, args=(ser,))
- t.start()
- return "success"
- if __name__ == '__main__':
- app.run(host='0.0.0.0', port=9000, debug=True)
|