120 lines
3.5 KiB
Python
120 lines
3.5 KiB
Python
#please use python 3.11
|
|
import socket
|
|
import struct
|
|
import serial
|
|
import time
|
|
import platform
|
|
from enum import Enum
|
|
from GAME_METHODS import *
|
|
|
|
runningOs = platform.system()
|
|
|
|
UDP_IP = "127.0.0.1"
|
|
BEAMNG_UDP_PORT = 4444
|
|
FORZA_UDP_PORT= 4843
|
|
SERIAL_PORT = ""
|
|
firstRun= True #used to do the headers for csv files
|
|
|
|
class GameType(Enum):
|
|
NONE=0,
|
|
BEAMNG = 1,
|
|
FORZA = 2
|
|
|
|
connectedArduino = False
|
|
connectedWebSocket = False
|
|
gameSelected = False
|
|
portToConnect= 0
|
|
gameType= GameType.NONE
|
|
carData = 0
|
|
csvOut = False
|
|
|
|
#Functions <-------------------------------------------------------------------------------------------
|
|
def csvWriteOut(firstRun,carData,csvFile):
|
|
if firstRun == True:
|
|
out=""
|
|
for im,nm in carData.items():
|
|
out = out + im + ","
|
|
csvFile.write(out + "\n")
|
|
outString=""
|
|
for itm,num in carData.items():
|
|
outString = outString + str(num) +","
|
|
csvFile.write(outString+"\n")
|
|
|
|
#code runs from here<-----------------------------------------------------------------------------------------------------------
|
|
|
|
if runningOs == 'Windows':
|
|
print("Windows detacted setting serial port to 'COM5'\n")
|
|
SERIAL_PORT= 'COM5'
|
|
elif runningOs == 'Linux':
|
|
print("Linux detected setting serial port to '/dev/ttyAMC0'\n")
|
|
SERIAL_PORT = '/dev/ttyACM0'
|
|
else:
|
|
print("OS detection failed setting serial port to 'COM5'\n")
|
|
SERIAL_PORT = 'COM5'
|
|
|
|
#select game
|
|
while gameSelected == False:
|
|
gameNo = input("1:BEAMNG\n2:FORZA\n\n7:Toggle CSV out ("+str(csvOut)+")\n9:SET SERIAL PORT ("+SERIAL_PORT+")\n")
|
|
if gameNo == "1":
|
|
portToConnect = BEAMNG_UDP_PORT
|
|
gameSelected = True
|
|
gameType=GameType.BEAMNG
|
|
print("BeamNG Selected")
|
|
elif gameNo == "2":
|
|
portToConnect = FORZA_UDP_PORT
|
|
gameType=GameType.FORZA
|
|
gameSelected=True
|
|
print("Forza Selected")
|
|
elif gameNo == "7":
|
|
if csvOut == True:
|
|
csvOut=False
|
|
elif csvOut ==False:
|
|
csvOut =True
|
|
elif gameNo == "9":
|
|
SERIAL_PORT = input("set serial port: ")
|
|
else:
|
|
print("please select a number from the list")
|
|
|
|
#check everything is connected
|
|
while connectedWebSocket == False:
|
|
try:
|
|
sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
|
|
sock.bind((UDP_IP,portToConnect))
|
|
connectedWebSocket = True
|
|
except:
|
|
print("please check you are able to open the socket on this system\ntired to open port: "+portToConnect)
|
|
exit()
|
|
|
|
while connectedArduino == False:
|
|
try:
|
|
toPi=serial.Serial(SERIAL_PORT,115200) #connect to arduino
|
|
connectedArduino = True
|
|
except:
|
|
print("please check connection to arduino and verify the correct serial port")
|
|
time.sleep(1)
|
|
|
|
if csvOut == True:
|
|
csvFile = open(str(int(time.time())+'.csv',"a"))
|
|
|
|
|
|
|
|
print("ready:\n")
|
|
while True:
|
|
data, addr = sock.recvfrom(1024)
|
|
if gameType == GameType.BEAMNG:
|
|
unpackedData = struct.unpack(BEAMNG_METHODS.BEAMNG_DATA_FORMAT,data)
|
|
carData = BEAMNG_METHODS.unpackData(unpackedData)
|
|
kmh=carData["speed"]*3.6
|
|
if csvOut == True:
|
|
csvWriteOut(firstRun,carData,csvFile)
|
|
firstRun=False
|
|
elif gameType == GameType.FORZA:
|
|
unpackedData = struct.unpack(FORZA_METHODS.FORZA_DATA_FORMAT,data)
|
|
carData = FORZA_METHODS.unpackData(unpackedData)
|
|
kmh = carData["Speed"]*3.6
|
|
if csvOut == True:
|
|
csvWriteOut(firstRun,carData,csvFile)
|
|
firstRun=False
|
|
|
|
toPi.write(str(kmh).encode()+":".encode())
|
|
|