231 lines
8.4 KiB
Python
231 lines
8.4 KiB
Python
#please use python 3.11
|
|
import socket
|
|
import struct
|
|
import tkinter.ttk
|
|
import serial
|
|
import time
|
|
import platform
|
|
import sys
|
|
import threading
|
|
from enum import Enum
|
|
import tkinter
|
|
from GAME_METHODS import *
|
|
import functools #used for passing variables into callbacks
|
|
|
|
##check if python 3.11 is running code and exit if not
|
|
|
|
if not(sys.version_info[0] == 3 and sys.version_info[1] == 11):
|
|
raise Exception("code must be run in python verion 3.11.\ninput struct for network needs to be changed per python version.")
|
|
|
|
|
|
|
|
##system init
|
|
runningOs = platform.system()
|
|
|
|
UDP_IP = "0.0.0.0"
|
|
BEAMNG_UDP_PORT = 4444
|
|
FORZA_UDP_PORT= 4843
|
|
SERIAL_PORT = ""
|
|
firstRun= True #used to do the headers for csv files
|
|
pushingToArduino = False
|
|
stopThread = False
|
|
|
|
class outputValue(Enum):
|
|
SPEED = "Speed"
|
|
GEAR = "Gear"
|
|
|
|
class GameType(Enum):
|
|
NONE=0
|
|
BEAMNG = "BEAMNG"
|
|
FORZA = "FORZA"
|
|
|
|
class ProgramState(Enum):
|
|
PUSHING_DATA = 1
|
|
WAITNG = 2
|
|
ERROR = 3
|
|
|
|
appState =ProgramState.WAITNG
|
|
connectedArduino = False
|
|
connectedWebSocket = False
|
|
gameSelected = False
|
|
portToConnect= 0
|
|
carData = 0
|
|
csvOut = False
|
|
|
|
#Functions <-------------------------------------------------------------------------------------------
|
|
def checkBoxChange():
|
|
if tkLoggingEnabled.get()==True:
|
|
loggingLocationEntry.config(state='normal')
|
|
loggingLocationText.set(value= "./"+ tkGametype.get()+"_"+str(int(time.time()))+".csv")
|
|
loggingLocationEntry.insert(0,loggingLocationText.get())
|
|
else:
|
|
loggingLocationText.set(value="")
|
|
loggingLocationEntry.delete(0,tkinter.END)
|
|
loggingLocationEntry.config(state='disabled')
|
|
|
|
def csvWriteOut(firstRun,carData,csvFile):
|
|
if firstRun == True:
|
|
out=""
|
|
for im,nm in carData.items():
|
|
out = out + im + ","
|
|
csvFile.write(out + "\n")
|
|
outString=""
|
|
for itm,num in carData.items():
|
|
outString = outString + str(num) +","
|
|
csvFile.write(outString+"\n")
|
|
|
|
def runningThread():
|
|
global stopThread
|
|
gameType= tkGametype.get()
|
|
|
|
#check everything is connected
|
|
if gameType==GameType.BEAMNG.value:
|
|
portToConnect=BEAMNG_UDP_PORT
|
|
elif gameType==GameType.FORZA.value:
|
|
portToConnect=FORZA_UDP_PORT
|
|
|
|
try:
|
|
sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
|
|
sock.bind((UDP_IP,portToConnect))
|
|
except:
|
|
print("please check you are able to open the socket on this system\ntired to open port: "+str(portToConnect))
|
|
exit()
|
|
|
|
try:
|
|
toPi=serial.Serial(SERIAL_PORT,115200,timeout=2) #connect to arduino
|
|
except:
|
|
print("please check connection to arduino and verify the correct serial port")
|
|
exit()
|
|
|
|
csvOut = tkLoggingEnabled.get()
|
|
|
|
if csvOut == True:
|
|
csvFile = open(loggingLocationText.get(),"a")
|
|
|
|
|
|
|
|
while not stopThread:
|
|
data, addr = sock.recvfrom(1024)
|
|
if gameType == GameType.BEAMNG.value:
|
|
unpackedData = struct.unpack(BEAMNG_METHODS.BEAMNG_DATA_FORMAT,data)
|
|
carData = BEAMNG_METHODS.unpackData(unpackedData)
|
|
kmh=carData[tkDataTypeOut.get()]
|
|
if tkDataTypeOut.get() == "Gear":
|
|
kmh = kmh -1
|
|
if csvOut == True:
|
|
csvWriteOut(firstRun,carData,csvFile)
|
|
firstRun=False
|
|
elif gameType == GameType.FORZA.value:
|
|
unpackedData = struct.unpack(FORZA_METHODS.FORZA_DATA_FORMAT,data)
|
|
carData = FORZA_METHODS.unpackData(unpackedData)
|
|
kmh = carData[tkDataTypeOut.get()]
|
|
if csvOut == True:
|
|
csvWriteOut(firstRun,carData,csvFile)
|
|
firstRun=False
|
|
|
|
try:
|
|
if (tkDataTypeOut.get() == outputValue.SPEED.value):
|
|
kmh = kmh*3.6
|
|
|
|
toPi.write(str(kmh).encode()+":".encode())
|
|
except:
|
|
print("arduino disconnected please check connection\n")
|
|
|
|
|
|
|
|
|
|
#code runs from here<-----------------------------------------------------------------------------------------------------------
|
|
|
|
if runningOs == 'Windows':
|
|
print("Windows detacted setting serial port to 'COM5'\n")
|
|
SERIAL_PORT= 'COM5'
|
|
elif runningOs == 'Linux':
|
|
print("Linux detected setting serial port to '/dev/ttyAMC0'\n")
|
|
SERIAL_PORT = '/dev/ttyACM0'
|
|
else:
|
|
print("OS detection failed setting serial port to 'COM5'\n")
|
|
SERIAL_PORT = 'COM5'
|
|
|
|
def startStopButtonFunc():
|
|
global appState
|
|
if appState == ProgramState.WAITNG:
|
|
startStopButton.config(text="stop")
|
|
appState = ProgramState.PUSHING_DATA
|
|
stopThread=False
|
|
workThread = threading.Thread(target=runningThread)
|
|
workThread.start()
|
|
statusTextVar.set("Running...")
|
|
|
|
elif appState == ProgramState.PUSHING_DATA:
|
|
startStopButton.config(text="start")
|
|
appState = ProgramState.WAITNG
|
|
statusTextVar.set("Waiting...")
|
|
stopThread = True
|
|
|
|
|
|
##GUI init
|
|
rootFrameGui= tkinter.Tk()
|
|
rootFrameGui.title("Car speed to arduino")
|
|
##rootFrameGui.geometry("400x200")
|
|
|
|
bottomBarFrameGui = tkinter.ttk.Frame(rootFrameGui,padding=0,relief="groove",borderwidth=2,)
|
|
bottomBarFrameGui.pack(anchor="se",side="bottom",fill="x")
|
|
firstFrameGui=tkinter.ttk.Frame(rootFrameGui,padding=2,relief="groove",borderwidth=2)
|
|
firstFrameGui.pack(anchor="ne",side="left",expand=True)
|
|
secondFrameGui=tkinter.ttk.Frame(rootFrameGui,padding=2,relief="groove",borderwidth=2)
|
|
secondFrameGui.pack(anchor="nw",side="right",expand=True)
|
|
serialFrameGui = tkinter.ttk.Frame(firstFrameGui,padding=5,relief="groove",borderwidth=2)
|
|
serialFrameGui.pack(anchor="nw")
|
|
serialLableText =tkinter.StringVar()
|
|
serialLableText.set("Serial Port:")
|
|
serialLable = tkinter.Label(serialFrameGui,textvariable=serialLableText)
|
|
serialLable.pack(side="top")
|
|
serialEntry=tkinter.Entry(serialFrameGui)
|
|
serialEntry.insert(0,SERIAL_PORT)
|
|
serialEntry.pack(side="left")
|
|
tkGametype = tkinter.StringVar()
|
|
tkGametype.set(GameType.BEAMNG.value)
|
|
gameSelectFrame= tkinter.ttk.Frame(secondFrameGui,padding=5,relief="groove",borderwidth=2)
|
|
gameSelectFrame.pack(anchor="se")
|
|
gameSelectText= tkinter.StringVar()
|
|
gameSelectText.set("Select Game:")
|
|
gameSelectLable = tkinter.Label(gameSelectFrame,textvariable=gameSelectText)
|
|
gameSelectLable.pack(side="top")
|
|
tkinter.Label(gameSelectFrame,textvariable=gameSelectText)
|
|
gameSelectLOptions= [GameType.BEAMNG,GameType.FORZA]
|
|
##gameSelectRadioButtons
|
|
##space after text is so the buttons are the same size
|
|
tkinter.Radiobutton(gameSelectFrame,text="BeamNG ",variable=tkGametype,value=GameType.BEAMNG.value,command=checkBoxChange).pack(anchor="w")
|
|
tkinter.Radiobutton(gameSelectFrame,text="Forza ",variable=tkGametype,value=GameType.FORZA.value,command=checkBoxChange).pack(anchor="w")
|
|
tkDataTypeOut = tkinter.StringVar()
|
|
tkDataTypeOut.set(outputValue.SPEED.value)
|
|
dataTypeFrame = tkinter.ttk.Frame(secondFrameGui,padding=5,relief="groove",borderwidth=2)
|
|
dataTypeFrame.pack()
|
|
dataTypeLableText = tkinter.StringVar()
|
|
dataTypeLableText.set(value="Select Data Type:")
|
|
dataTypeLable = tkinter.Label(dataTypeFrame,textvariable=dataTypeLableText).pack(side="top")
|
|
tkinter.Radiobutton(dataTypeFrame,text="Speed",variable=tkDataTypeOut,value=outputValue.SPEED.value).pack(anchor="w")
|
|
tkinter.Radiobutton(dataTypeFrame,text="Gear",variable=tkDataTypeOut,value=outputValue.GEAR.value).pack(anchor="w")
|
|
loggingFrameGui= tkinter.ttk.Frame(firstFrameGui,padding=5,relief="groove",borderwidth=2)
|
|
loggingFrameGui.pack(anchor="nw")
|
|
tkLoggingEnabled=tkinter.BooleanVar()
|
|
tkLoggingEnabled.set(False)
|
|
tkinter.Checkbutton(loggingFrameGui,text="Enable Logging",variable=tkLoggingEnabled,onvalue=True,offvalue=False,command=checkBoxChange).pack(side="top")
|
|
loggingLocationText = tkinter.StringVar(value="")
|
|
loggingLocationEntry = tkinter.Entry(loggingFrameGui)
|
|
loggingLocationEntry.insert(0,loggingLocationText.get())
|
|
loggingLocationEntry.config(state='disabled')
|
|
loggingLocationEntry.pack(side="bottom")
|
|
startStopButton= tkinter.Button(bottomBarFrameGui,text= "start",command= startStopButtonFunc)
|
|
startStopButton.pack(side="right",fill="x")
|
|
statusTextVar = tkinter.StringVar(value="waiting...")
|
|
statusLable = tkinter.Label(bottomBarFrameGui,textvariable= statusTextVar)
|
|
statusLable.pack(anchor="w")
|
|
arduinoConnectedFrameGui = tkinter.ttk.Frame(secondFrameGui,padding=5,relief="groove",borderwidth=2)
|
|
arduinoConnectedFrameGui.pack()
|
|
arduinoConnectedStatusText=tkinter.StringVar(value="Connected OK")
|
|
arduinoConnectedLable = tkinter.Label(arduinoConnectedFrameGui,textvariable=arduinoConnectedStatusText)
|
|
arduinoConnectedLable.pack()
|
|
|
|
|
|
rootFrameGui.mainloop()
|