背景
前回はCAN定期送受信までできるようになったので、今度はOBD-Ⅱ通信を実装してみる。前回の記事はこちら。
UDSコマンド(CANID固定、SID=0x22)に対して応答するプログラム
まずはお試し。CANID=0x740に対してCANID=0x748を固定で応答するプログラム。SIDも0x22固定。
import tkinter as tk
from tkinter import ttk
import can
import threading
import time
class CANTransmitterApp:
def __init__(self, master):
self.master = master
self.master.title("CAN Transmitter")
# Interface Input
self.interface_label = ttk.Label(master, text="Interface:")
self.interface_label.grid(row=0, column=0, padx=10, pady=10, sticky=tk.W)
self.interface_entry = ttk.Entry(master)
self.interface_entry.grid(row=0, column=1, padx=10, pady=10)
self.interface_entry.insert(tk.END, "can0")
# CAN ID, CAN Data, and Period Inputs
self.can_id_labels = [ttk.Label(master, text=f"CAN ID {i+1}:") for i in range(3)]
self.data_labels = [ttk.Label(master, text=f"CAN Data {i+1} (comma-separated hex values):") for i in range(3)]
self.period_labels = [ttk.Label(master, text=f"Period {i+1} (seconds):") for i in range(3)]
self.can_id_entries = [ttk.Entry(master) for i in range(3)]
self.data_entries = [ttk.Entry(master) for i in range(3)]
self.period_entries = [ttk.Entry(master) for i in range(3)]
for i in range(3):
# CAN ID
self.can_id_labels[i].grid(row=1+i, column=0, padx=10, pady=10, sticky=tk.W)
self.can_id_entries[i].grid(row=1+i, column=1, padx=10, pady=10)
self.can_id_entries[i].insert(tk.END, f"{123 + i:03X}")
# CAN Data
self.data_labels[i].grid(row=1+i, column=2, padx=10, pady=10, sticky=tk.W)
self.data_entries[i].grid(row=1+i, column=3, padx=10, pady=10)
self.data_entries[i].insert(tk.END, "11,22,33,44,55,66,77,88")
# Period
self.period_labels[i].grid(row=1+i, column=4, padx=10, pady=10, sticky=tk.W)
self.period_entries[i].grid(row=1+i, column=5, padx=10, pady=10)
self.period_entries[i].insert(tk.END, "1.0")
# Control Buttons
self.start_button = ttk.Button(master, text="Start Sending", command=self.start_sending)
self.start_button.grid(row=4, column=0, columnspan=3, padx=10, pady=10)
self.stop_button = ttk.Button(master, text="Stop Sending", command=self.stop_sending)
self.stop_button.grid(row=4, column=3, columnspan=3, padx=10, pady=10)
# Status Label
self.status_label = ttk.Label(master, text="")
self.status_label.grid(row=5, columnspan=6, padx=10, pady=10)
# Log Text
self.log_text = tk.Text(master, height=10, width=80)
self.log_text.grid(row=6, columnspan=6, padx=10, pady=10)
self.is_sending = False
self.bus = None
self.send_timers = [None, None, None]
def start_sending(self):
if self.is_sending:
self.status_label.config(text="Already sending.")
return
interface = self.interface_entry.get().strip()
can_ids = [int(entry.get().strip(), 16) for entry in self.can_id_entries]
data_strs = [entry.get().strip() for entry in self.data_entries]
periods = [float(entry.get().strip()) for entry in self.period_entries]
try:
data_list = []
for data_str in data_strs:
data = [int(byte, 16) for byte in data_str.split(',')]
if len(data) != 8:
raise ValueError("Data length must be 8 bytes.")
data_list.append(data)
except ValueError:
self.status_label.config(text="Invalid CAN data format.")
return
try:
self.bus = can.interface.Bus(channel=interface, bustype='socketcan', bitrate=500000)
self.log_message("info", f"CAN interface {interface} connected.")
except Exception as e:
self.status_label.config(text=f"Error: {str(e)}")
return
self.is_sending = True
self.receive_thread = threading.Thread(target=self.receive_messages, daemon=True)
self.receive_thread.start()
for i in range(3):
self.send_messages(can_ids[i], data_list[i], periods[i], i)
def send_messages(self, can_id, data, period, index):
message = can.Message(arbitration_id=can_id, data=data)
if self.is_sending:
try:
self.bus.send(message)
self.log_message("send", message)
except can.CanError as e:
self.log_message("error", f"Failed to send CAN message - {str(e)}")
self.status_label.config(text=f"Error: Failed to send CAN message - {str(e)}")
self.send_timers[index] = threading.Timer(period, self.send_messages, args=(can_id, data, period, index))
self.send_timers[index].start()
def receive_messages(self):
while self.is_sending:
try:
message = self.bus.recv()
if message:
self.log_message("recv", message)
# Check for UDS command with CANID 0x740
if message.arbitration_id == 0x740:
response_data = self.create_ud_response(message.data)
response_message = can.Message(arbitration_id=0x748, data=response_data)
self.bus.send(response_message)
self.log_message("send", response_message)
except Exception as e:
self.log_message("error", f"Failed to receive CAN message - {str(e)}")
def create_ud_response(self, request_data):
# ISO 14229 compliant response example
request_data_list = list(request_data)
print("Received request data:", ", ".join(f"0x{byte:02X}" for byte in request_data))
if request_data_list[1] == 0x22: # Example of a specific UDS request
return [0x62] + request_data_list[1:] # Example response with 0x72 as the positive response code
return [0x7F, request_data_list[0], 0x31] # Negative response: request out of range
def stop_sending(self):
if not self.is_sending:
self.status_label.config(text="Not currently sending.")
return
self.is_sending = False
for timer in self.send_timers:
if timer:
timer.cancel()
if self.bus:
self.bus.shutdown()
self.status_label.config(text="Stopped sending.")
def log_message(self, msg_type, message):
if msg_type == "send":
log_entry = f"Send : ID={hex(message.arbitration_id)} Data={','.join(f'0x{byte:02X}' for byte in message.data)}\n"
elif msg_type == "recv":
log_entry = f"Recv : ID={hex(message.arbitration_id)} Data={','.join(f'0x{byte:02X}' for byte in message.data)}\n"
elif msg_type == "error":
log_entry = f"Error : {message}\n"
else:
log_entry = f"{message}\n"
self.log_text.insert(tk.END, log_entry)
self.log_text.see(tk.END)
def main():
root = tk.Tk()
app = CANTransmitterApp(root)
root.mainloop()
if __name__ == "__main__":
main()
応答を確認(フォーマットが間違っているが後で修正しよう)。
SID=0x22に対しDID(Data Identifier)を設定してみる
SID=0x22はRead data by Identifierと呼ばれているUDSコマンド。
Data Identiferとはデータの種別を識別するアドレスのようなもの。
例)SID=0x22, DID=0x1010
DID=0x1010を指定して要求した場合、そのDIDに紐づくデータを応答する仕組み。どのようなデータを紐づけるかはメーカーで定義できる。
まずはお試しで0xFFを固定応答させてみる。ついでにフォーマットも間違っているので修正。
修正後のコードは以下。
import tkinter as tk
from tkinter import ttk
import can
import threading
import time
class CANTransmitterApp:
def __init__(self, master):
self.master = master
self.master.title("CAN Transmitter")
# Interface Input
self.interface_label = ttk.Label(master, text="Interface:")
self.interface_label.grid(row=0, column=0, padx=10, pady=10, sticky=tk.W)
self.interface_entry = ttk.Entry(master)
self.interface_entry.grid(row=0, column=1, padx=10, pady=10)
self.interface_entry.insert(tk.END, "can0")
# CAN ID, CAN Data, and Period Inputs
self.can_id_labels = [ttk.Label(master, text=f"CAN ID {i+1}:") for i in range(3)]
self.data_labels = [ttk.Label(master, text=f"CAN Data {i+1} (comma-separated hex values):") for i in range(3)]
self.period_labels = [ttk.Label(master, text=f"Period {i+1} (seconds):") for i in range(3)]
self.can_id_entries = [ttk.Entry(master) for i in range(3)]
self.data_entries = [ttk.Entry(master) for i in range(3)]
self.period_entries = [ttk.Entry(master) for i in range(3)]
for i in range(3):
# CAN ID
self.can_id_labels[i].grid(row=1+i, column=0, padx=10, pady=10, sticky=tk.W)
self.can_id_entries[i].grid(row=1+i, column=1, padx=10, pady=10)
self.can_id_entries[i].insert(tk.END, f"{123 + i:03X}")
# CAN Data
self.data_labels[i].grid(row=1+i, column=2, padx=10, pady=10, sticky=tk.W)
self.data_entries[i].grid(row=1+i, column=3, padx=10, pady=10)
self.data_entries[i].insert(tk.END, "11,22,33,44,55,66,77,88")
# Period
self.period_labels[i].grid(row=1+i, column=4, padx=10, pady=10, sticky=tk.W)
self.period_entries[i].grid(row=1+i, column=5, padx=10, pady=10)
self.period_entries[i].insert(tk.END, "1.0")
# Control Buttons
self.start_button = ttk.Button(master, text="Start Sending", command=self.start_sending)
self.start_button.grid(row=4, column=0, columnspan=3, padx=10, pady=10)
self.stop_button = ttk.Button(master, text="Stop Sending", command=self.stop_sending)
self.stop_button.grid(row=4, column=3, columnspan=3, padx=10, pady=10)
# Status Label
self.status_label = ttk.Label(master, text="")
self.status_label.grid(row=5, columnspan=6, padx=10, pady=10)
# Log Text
self.log_text = tk.Text(master, height=10, width=80)
self.log_text.grid(row=6, columnspan=6, padx=10, pady=10)
self.is_sending = False
self.bus = None
self.send_timers = [None, None, None]
def start_sending(self):
if self.is_sending:
self.status_label.config(text="Already sending.")
return
interface = self.interface_entry.get().strip()
can_ids = [int(entry.get().strip(), 16) for entry in self.can_id_entries]
data_strs = [entry.get().strip() for entry in self.data_entries]
periods = [float(entry.get().strip()) for entry in self.period_entries]
try:
data_list = []
for data_str in data_strs:
data = [int(byte, 16) for byte in data_str.split(',')]
if len(data) != 8:
raise ValueError("Data length must be 8 bytes.")
data_list.append(data)
except ValueError:
self.status_label.config(text="Invalid CAN data format.")
return
try:
self.bus = can.interface.Bus(channel=interface, bustype='socketcan', bitrate=500000)
self.log_message("info", f"CAN interface {interface} connected.")
except Exception as e:
self.status_label.config(text=f"Error: {str(e)}")
return
self.is_sending = True
self.receive_thread = threading.Thread(target=self.receive_messages, daemon=True)
self.receive_thread.start()
for i in range(3):
self.send_messages(can_ids[i], data_list[i], periods[i], i)
def send_messages(self, can_id, data, period, index):
message = can.Message(arbitration_id=can_id, data=data)
if self.is_sending:
try:
self.bus.send(message)
self.log_message("send", message)
except can.CanError as e:
self.log_message("error", f"Failed to send CAN message - {str(e)}")
self.status_label.config(text=f"Error: Failed to send CAN message - {str(e)}")
self.send_timers[index] = threading.Timer(period, self.send_messages, args=(can_id, data, period, index))
self.send_timers[index].start()
def receive_messages(self):
while self.is_sending:
try:
message = self.bus.recv()
if message:
self.log_message("recv", message)
# Check for UDS command with CANID 0x740
if message.arbitration_id == 0x740:
response_data = self.create_ud_response(message.data)
response_message = can.Message(arbitration_id=0x748, data=response_data)
self.bus.send(response_message)
self.log_message("send", response_message)
except Exception as e:
self.log_message("error", f"Failed to receive CAN message - {str(e)}")
def create_ud_response(self, request_data):
# ISO 14229 compliant response example
request_data_list = list(request_data)
print("Received request data:", ", ".join(f"0x{byte:02X}" for byte in request_data))
if request_data_list[1] == 0x22:
if request_data_list[2] == 0x10 and request_data_list[3] == 0x10:
return [0x04] + [0x62] + request_data_list[2:4] + [0xFF] + [0x00] * 3
return [0x7F, request_data_list[0], 0x31]
def stop_sending(self):
if not self.is_sending:
self.status_label.config(text="Not currently sending.")
return
self.is_sending = False
for timer in self.send_timers:
if timer:
timer.cancel()
if self.bus:
self.bus.shutdown()
self.status_label.config(text="Stopped sending.")
def log_message(self, msg_type, message):
if msg_type == "send":
log_entry = f"Send : ID={hex(message.arbitration_id)} Data={','.join(f'0x{byte:02X}' for byte in message.data)}\n"
elif msg_type == "recv":
log_entry = f"Recv : ID={hex(message.arbitration_id)} Data={','.join(f'0x{byte:02X}' for byte in message.data)}\n"
elif msg_type == "error":
log_entry = f"Error : {message}\n"
else:
log_entry = f"{message}\n"
self.log_text.insert(tk.END, log_entry)
self.log_text.see(tk.END)
def main():
root = tk.Tk()
app = CANTransmitterApp(root)
root.mainloop()
if __name__ == "__main__":
main()
UDSのプロトコルに準拠しつつ0xFFを固定応答。
SID=0x2E(Write data by Identifier)に対応
固定値を読み出すだけでは面白くないので、読み出す値をSID=0x2Eで書き換えるプログラムにする。
DIDをキーにした辞書を用意する。
修正後のコードは以下。
import tkinter as tk
from tkinter import ttk
import can
import threading
import time
class CANTransmitterApp:
def __init__(self, master):
self.master = master
self.master.title("CAN Transmitter")
self.memory = {}
# Interface Input
self.interface_label = ttk.Label(master, text="Interface:")
self.interface_label.grid(row=0, column=0, padx=10, pady=10, sticky=tk.W)
self.interface_entry = ttk.Entry(master)
self.interface_entry.grid(row=0, column=1, padx=10, pady=10)
self.interface_entry.insert(tk.END, "can0")
# CAN ID, CAN Data, and Period Inputs
self.can_id_labels = [ttk.Label(master, text=f"CAN ID {i+1}:") for i in range(3)]
self.data_labels = [ttk.Label(master, text=f"CAN Data {i+1} (comma-separated hex values):") for i in range(3)]
self.period_labels = [ttk.Label(master, text=f"Period {i+1} (seconds):") for i in range(3)]
self.can_id_entries = [ttk.Entry(master) for i in range(3)]
self.data_entries = [ttk.Entry(master) for i in range(3)]
self.period_entries = [ttk.Entry(master) for i in range(3)]
for i in range(3):
# CAN ID
self.can_id_labels[i].grid(row=1+i, column=0, padx=10, pady=10, sticky=tk.W)
self.can_id_entries[i].grid(row=1+i, column=1, padx=10, pady=10)
self.can_id_entries[i].insert(tk.END, f"{123 + i:03X}")
# CAN Data
self.data_labels[i].grid(row=1+i, column=2, padx=10, pady=10, sticky=tk.W)
self.data_entries[i].grid(row=1+i, column=3, padx=10, pady=10)
self.data_entries[i].insert(tk.END, "11,22,33,44,55,66,77,88")
# Period
self.period_labels[i].grid(row=1+i, column=4, padx=10, pady=10, sticky=tk.W)
self.period_entries[i].grid(row=1+i, column=5, padx=10, pady=10)
self.period_entries[i].insert(tk.END, "1.0")
# Control Buttons
self.start_button = ttk.Button(master, text="Start Sending", command=self.start_sending)
self.start_button.grid(row=4, column=0, columnspan=3, padx=10, pady=10)
self.stop_button = ttk.Button(master, text="Stop Sending", command=self.stop_sending)
self.stop_button.grid(row=4, column=3, columnspan=3, padx=10, pady=10)
# Status Label
self.status_label = ttk.Label(master, text="")
self.status_label.grid(row=5, columnspan=6, padx=10, pady=10)
# Log Text
self.log_text = tk.Text(master, height=10, width=80)
self.log_text.grid(row=6, columnspan=6, padx=10, pady=10)
self.is_sending = False
self.bus = None
self.send_timers = [None, None, None]
def start_sending(self):
if self.is_sending:
self.status_label.config(text="Already sending.")
return
interface = self.interface_entry.get().strip()
can_ids = [int(entry.get().strip(), 16) for entry in self.can_id_entries]
data_strs = [entry.get().strip() for entry in self.data_entries]
periods = [float(entry.get().strip()) for entry in self.period_entries]
try:
data_list = []
for data_str in data_strs:
data = [int(byte, 16) for byte in data_str.split(',')]
if len(data) != 8:
raise ValueError("Data length must be 8 bytes.")
data_list.append(data)
except ValueError:
self.status_label.config(text="Invalid CAN data format.")
return
try:
self.bus = can.interface.Bus(channel=interface, bustype='socketcan', bitrate=500000)
self.log_message("info", f"CAN interface {interface} connected.")
except Exception as e:
self.status_label.config(text=f"Error: {str(e)}")
return
self.is_sending = True
self.receive_thread = threading.Thread(target=self.receive_messages, daemon=True)
self.receive_thread.start()
for i in range(3):
self.send_messages(can_ids[i], data_list[i], periods[i], i)
def send_messages(self, can_id, data, period, index):
message = can.Message(arbitration_id=can_id, data=data)
if self.is_sending:
try:
self.bus.send(message)
self.log_message("send", message)
except can.CanError as e:
self.log_message("error", f"Failed to send CAN message - {str(e)}")
self.status_label.config(text=f"Error: Failed to send CAN message - {str(e)}")
self.send_timers[index] = threading.Timer(period, self.send_messages, args=(can_id, data, period, index))
self.send_timers[index].start()
def receive_messages(self):
while self.is_sending:
try:
message = self.bus.recv()
if message:
self.log_message("recv", message)
# Check for UDS command with CANID 0x740
if message.arbitration_id == 0x740:
response_data = self.create_ud_response(message.data)
response_message = can.Message(arbitration_id=0x748, data=response_data)
self.bus.send(response_message)
self.log_message("send", response_message)
except Exception as e:
self.log_message("error", f"Failed to receive CAN message - {str(e)}")
def create_ud_response(self, request_data):
# ISO 14229 compliant response example
request_data_list = list(request_data)
print("Received request data:", ", ".join(f"0x{byte:02X}" for byte in request_data_list))
# SID=0x22
if request_data_list[1] == 0x22:
# DID=0x10,0x10
if request_data_list[2] == 0x10 and request_data_list[3] == 0x10:
did = (request_data_list[2], request_data_list[3])
if did in self.memory:
# value is valid
response_value = self.memory[did]
else:
# value is not valid
response_value = 0xFF
# responce data
return [0x04] + [0x62] + [0x10, 0x10, response_value] + [0x00] * 3
# SID=0x2E
elif request_data_list[1] == 0x2E:
# DID=0x10,0x10
if request_data_list[2] == 0x10 and request_data_list[3] == 0x10:
# write memory
value_to_write = request_data_list[4] if len(request_data_list) > 4 else 0x00
did = (0x10, 0x10)
self.memory[did] = value_to_write
# responce data
return [0x03] + [0x6E] + [0x10, 0x10] + [0x00] * 4
return [0x7F, request_data_list[0], 0x31] # Negative response
def stop_sending(self):
if not self.is_sending:
self.status_label.config(text="Not currently sending.")
return
self.is_sending = False
for timer in self.send_timers:
if timer:
timer.cancel()
if self.bus:
self.bus.shutdown()
self.status_label.config(text="Stopped sending.")
def log_message(self, msg_type, message):
if msg_type == "send":
log_entry = f"Send : ID={hex(message.arbitration_id)} Data={','.join(f'0x{byte:02X}' for byte in message.data)}\n"
elif msg_type == "recv":
log_entry = f"Recv : ID={hex(message.arbitration_id)} Data={','.join(f'0x{byte:02X}' for byte in message.data)}\n"
elif msg_type == "error":
log_entry = f"Error : {message}\n"
else:
log_entry = f"{message}\n"
self.log_text.insert(tk.END, log_entry)
self.log_text.see(tk.END)
def main():
root = tk.Tk()
app = CANTransmitterApp(root)
root.mainloop()
if __name__ == "__main__":
main()
まずはSID=0x22で読み出し。結果は0xFF(初期値)。
そしてSID=0x2EでDID=0x1010の値を0xFF⇒0x99に書き換える。
そして、再度SID=0x22でDID=0x1010の値を読みだしてみると・・・
0x99に書き換わっている!
次はNegative Response Code(NRC)の作り込みをしよう。
[…] Raspberry PI 4 Model B(ラズパイ)でCAN通信~OBD-Ⅱその①~ […]