blflashcommand/libs/bflb_interface_openocd.py

426 lines
17 KiB
Python

# decompyle3 version 3.9.0
# Python bytecode version base 3.7.0 (3394)
# Decompiled from: Python 3.7.16 (default, Mar 30 2023, 01:25:49)
# [GCC 12.2.1 20220924]
# Embedded file name: libs/bflb_interface_openocd.py
import os, time, binascii, subprocess, threading, telnetlib, serial
from libs import bflb_utils
from libs.bflb_utils import app_path
openocd_path = os.path.join(app_path, 'utils/openocd', 'openocd.exe')
class ThreadOpenocdServer(threading.Thread):
def __init__(self, chiptype='bl602', device='rv_dbg_plus', serial=None):
threading.Thread.__init__(self)
self.timeToQuit = threading.Event()
self.timeToQuit.clear()
self._chiptype = chiptype
self._device = device
self._serial = serial
bflb_utils.printf('SN is ' + str(self._serial))
def stop(self):
self.timeToQuit.set()
def run(self):
cmd = ''
if self._serial:
cmd_ftdi_serial = ' -c "ftdi_serial \\"' + self._serial + '\\""'
else:
cmd_ftdi_serial = ''
if self._device == 'rv_dbg_plus':
if self._chiptype == 'bl602':
cmd = openocd_path + ' -f ' + app_path + '/utils/openocd/if_rv_dbg_plus.cfg ' + cmd_ftdi_serial + ' -f ' + app_path + '/utils/openocd/tgt_602.cfg'
else:
cmd = openocd_path + ' -f ' + app_path + '/utils/openocd/if_rv_dbg_plus.cfg' + cmd_ftdi_serial + ' -f ' + app_path + '/utils/openocd/tgt_702.cfg'
else:
if self._device == 'ft2232hl':
if self._chiptype == 'bl602':
cmd = openocd_path + ' -f ' + app_path + '/utils/openocd/if_bflb_dbg.cfg' + cmd_ftdi_serial + ' -f ' + app_path + '/utils/openocd/tgt_602.cfg'
else:
cmd = openocd_path + ' -f ' + app_path + '/utils/openocd/if_bflb_dbg.cfg' + cmd_ftdi_serial + ' -f ' + app_path + '/utils/openocd/tgt_702.cfg'
else:
cmd = openocd_path + ' -f ' + app_path + '/utils/openocd/openocd-usb-sipeed.cfg ' + cmd_ftdi_serial
bflb_utils.printf(cmd)
p = subprocess.Popen(cmd, shell=True, stdin=(subprocess.PIPE), stdout=(subprocess.PIPE), stderr=(subprocess.PIPE))
out, err = p.communicate()
bflb_utils.printf(out)
class BflbOpenocdPort(object):
def __init__(self):
self._speed = 5000
self._rx_timeout = 10000
self._openocd_shake_hand_addr = '20000000'
self._openocd_data_addr = '20000004'
self._inited = False
self._chiptype = 'bl60x'
self._chipname = 'bl60x'
self._openocd_run_addr = '22010000'
self.tn = telnetlib.Telnet()
def if_init(self, device, sn, rate, chiptype='bl60x', chipname='bl60x'):
if self._inited is False:
sub_module = __import__(('libs.base.' + chiptype), fromlist=[chiptype])
self._openocd_shake_hand_addr = sub_module.openocd_load_cfg.openocd_shake_hand_addr
self._openocd_data_addr = sub_module.openocd_load_cfg.openocd_data_addr
self._openocd_run_addr = sub_module.openocd_load_cfg.openocd_run_addr
self._speed = rate
self._inited = True
self._chiptype = chiptype
self._chipname = chipname
if sn:
serial = 'FactoryAIOT Prog ' + str(sn)
else:
serial = None
self._openocd_th = None
self._openocd_th = ThreadOpenocdServer(chiptype, device, serial)
self._openocd_th.setDaemon(True)
self._openocd_th.start()
try:
self.tn.open('127.0.0.1', port=4444, timeout=10)
self.tn.write(('adapter speed ' + str(rate)).encode('ascii') + b'\n')
self.tn.write('WaitCmd\n'.encode('ascii'))
self.tn.read_until(('"WaitCmd"'.encode('ascii')), timeout=10)
except Exception:
bflb_utils.printf('Failed to connect openocd server')
bflb_utils.set_error_code('0009')
self.if_close()
return False
def repeat_init(self, device, sn, rate, chiptype='bl60x', chipname='bl60x'):
if self._inited is False:
sub_module = __import__(('libs.base.' + chiptype), fromlist=[chiptype])
self._openocd_shake_hand_addr = sub_module.openocd_load_cfg.openocd_shake_hand_addr
self._openocd_data_addr = sub_module.openocd_load_cfg.openocd_data_addr
self._openocd_run_addr = sub_module.openocd_load_cfg.openocd_run_addr
self._speed = rate
self._inited = True
self._chiptype = chiptype
self._chipname = chipname
if sn:
serial = 'FactoryAIOT Prog ' + str(sn)
else:
serial = None
self._openocd_th = None
self._openocd_th = ThreadOpenocdServer(chiptype, device, serial)
self._openocd_th.setDaemon(True)
self._openocd_th.start()
try:
self.tn.open('127.0.0.1', port=4444, timeout=10)
self.tn.write(('adapter speed ' + str(rate)).encode('ascii') + b'\n')
self.tn.write('WaitCmd\n'.encode('ascii'))
self.tn.read_until(('"WaitCmd"'.encode('ascii')), timeout=10)
except Exception:
bflb_utils.printf('Failed to connect openocd server')
bflb_utils.set_error_code('0009')
self.if_close()
return False
def if_clear_buf(self):
pass
def clear_buf(self):
pass
def if_set_rx_timeout(self, val):
self._rx_timeout = val * 1000
def set_timeout(self, val):
self._rx_timeout = val * 1000
def if_get_rate(self):
return self._speed
def halt_cpu(self):
self.tn.write('halt'.encode('ascii') + b'\n')
return True
def reset_cpu(self, ms=0, halt=True):
if halt:
self.halt_cpu()
self.tn.write('reset'.encode('ascii') + b'\n')
def set_pc_msp(self, pc, msp):
self.halt_cpu()
if self._chiptype == 'bl602' or self._chiptype == 'bl702' or self._chiptype == 'bl702l':
self.tn.write(('reg pc 0x' + self._openocd_run_addr).encode('ascii') + b'\n')
self.tn.write('resume'.encode('ascii') + b'\n')
def if_raw_write(self, addr, data_send):
addr_int = int(addr, 16)
if len(data_send) > 32:
fp = open('openocd_load_data.bin', 'wb+')
fp.write(data_send)
fp.close()
self.tn.write(('load_image openocd_load_data.bin ' + hex(addr_int)).encode('ascii') + b'\n')
else:
for data in data_send:
self.tn.write(('mwb ' + hex(addr_int) + ' ' + hex(data)).encode('ascii') + b'\n')
addr_int += 1
def if_write(self, data_send):
self.if_raw_write(self._openocd_data_addr, data_send)
self.if_raw_write(self._openocd_shake_hand_addr, bytearray.fromhex('48524459'))
def write(self, data_send):
self.if_raw_write(self._openocd_data_addr, data_send)
self.if_raw_write(self._openocd_shake_hand_addr, bytearray.fromhex('48524459'))
def read_data_parse(self, buff, aligned):
strdata = buff.decode().strip()
data = bytearray(0)
index = strdata.find(': ')
if aligned is True:
lstr = strdata[index + 2:strdata.find('WaitCmd') - 6].split('0x')
for l in lstr:
ldata = []
if l.find(': ') != -1:
ldata = l[9:].split(' ')
else:
ldata = l.split(' ')
for d in ldata:
if len(d) != 8:
continue
else:
hexstr = d[6:8] + d[4:6] + d[2:4] + d[0:2]
data += bflb_utils.hexstr_to_bytearray(hexstr)
else:
data += bflb_utils.hexstr_to_bytearray(strdata[index + 2:strdata.find('WaitCmd') - 6].replace(' ', ''))
return data
def if_addr_unaligned_read(self, addr, data_len):
addr_int = int(addr, 16)
data = bytearray(0)
dummy = self.tn.read_very_eager().decode('utf-8')
self.tn.write(('mdb ' + hex(addr_int) + ' ' + hex(data_len) + '\n').encode('ascii'))
self.tn.write('WaitCmd\n'.encode('ascii'))
ret = self.tn.read_until(('"WaitCmd"'.encode('ascii')), timeout=10)
data += self.read_data_parse(ret, False)
return data
def if_addr_aligned_read(self, addr, data_len):
addr_int = int(addr, 16)
leftlen = data_len
data = bytearray(0)
dummy = self.tn.read_very_eager().decode('utf-8')
self.tn.write(('mdw ' + hex(addr_int) + ' ' + hex(data_len // 4) + '\n').encode('ascii'))
self.tn.write('WaitCmd\n'.encode('ascii'))
ret = self.tn.read_until(('"WaitCmd"'.encode('ascii')), timeout=10)
data += self.read_data_parse(ret, True)
addr_int = addr_int + data_len // 4 * 4
leftlen = data_len - data_len // 4 * 4
if leftlen != 0:
data += self.if_addr_unaligned_read(hex(addr_int)[2:], leftlen)
return data
def if_raw_read(self, addr, data_len):
data = bytearray(0)
if data_len <= 4:
return self.if_addr_unaligned_read(addr, data_len)
addr_int = int(addr, 16)
pre_read_len = 4 - addr_int % 4
if pre_read_len != 0:
data += self.if_addr_unaligned_read(addr, pre_read_len)
data += self.if_addr_aligned_read(hex(addr_int + pre_read_len), data_len - pre_read_len)
return data[:data_len]
def if_read(self, data_len):
start_time = time.time() * 1000
while True:
ready = self.if_raw_read(self._openocd_shake_hand_addr, 16)
if ready[0:4] == bytearray([83, 65, 67, 75]):
break
else:
elapsed = time.time() * 1000 - start_time
if elapsed >= self._rx_timeout:
return (0, 'waiting response time out'.encode('utf-8'))
time.sleep(0.001)
data = self.if_raw_read(self._openocd_data_addr, data_len)
if len(data) != data_len:
return (0, data)
return (1, data)
def read(self, data_len):
start_time = time.time() * 1000
while True:
ready = self.if_raw_read(self._openocd_shake_hand_addr, 16)
if ready[0:4] == bytearray([83, 65, 67, 75]):
break
else:
elapsed = time.time() * 1000 - start_time
if elapsed >= self._rx_timeout:
return (0, 'waiting response time out'.encode('utf-8'))
time.sleep(0.001)
data = self.if_raw_read(self._openocd_data_addr, data_len)
if len(data) != data_len:
return (0, data)
return (1, data)
def if_clear_buff(self):
self.tn.write('WaitCmd\n'.encode('ascii'))
self.tn.read_until(('"WaitCmd"'.encode('ascii')), timeout=10)
def clear_buff(self):
self.tn.write('WaitCmd\n'.encode('ascii'))
self.tn.read_until(('"WaitCmd"'.encode('ascii')), timeout=10)
def if_shakehand(self, do_reset=False, reset_hold_time=100, shake_hand_delay=100, reset_revert=True, cutoff_time=0, shake_hand_retry=2, isp_timeout=0, boot_load=False):
self.if_clear_buff()
self.if_write(bytearray(1))
success, ack = self.if_read(2)
bflb_utils.printf(binascii.hexlify(ack))
if ack.find(b'O') != -1 or ack.find(b'K') != -1:
time.sleep(0.03)
return 'OK'
return 'FL'
def if_close(self):
if self.tn.get_socket():
self.tn.write('shutdown\n'.encode('ascii'))
time.sleep(0.05)
self.tn.close()
if self._openocd_th:
self._openocd_th.stop()
self._inited = False
def close(self):
if self.tn.get_socket():
self.tn.write('shutdown\n'.encode('ascii'))
time.sleep(0.05)
self.tn.close()
if self._openocd_th:
self._openocd_th.stop()
self._inited = False
def if_deal_ack(self):
success, ack = self.if_read(2)
if success == 0:
bflb_utils.printf('ack:' + str(binascii.hexlify(ack)))
return ack.decode('utf-8')
if ack.find(b'O') != -1 or ack.find(b'K') != -1:
return 'OK'
if ack.find(b'P') != -1 or ack.find(b'D') != -1:
return 'PD'
success, err_code = self.if_read(4)
if success == 0:
bflb_utils.printf('err_code:' + str(binascii.hexlify(err_code)))
return 'FL'
err_code_str = str(binascii.hexlify(err_code[3:4] + err_code[2:3]).decode('utf-8'))
ack = 'FL'
try:
ret = ack + err_code_str + '(' + bflb_utils.get_bflb_error_code(err_code_str) + ')'
except Exception:
ret = ack + err_code_str + ' unknown'
bflb_utils.printf(ret)
return ret
def deal_ack(self):
success, ack = self.if_read(2)
if success == 0:
bflb_utils.printf('ack:' + str(binascii.hexlify(ack)))
return ack.decode('utf-8')
if ack.find(b'O') != -1 or ack.find(b'K') != -1:
return 'OK'
if ack.find(b'P') != -1 or ack.find(b'D') != -1:
return 'PD'
success, err_code = self.if_read(4)
if success == 0:
bflb_utils.printf('err_code:' + str(binascii.hexlify(err_code)))
return 'FL'
err_code_str = str(binascii.hexlify(err_code[3:4] + err_code[2:3]).decode('utf-8'))
ack = 'FL'
try:
ret = ack + err_code_str + '(' + bflb_utils.get_bflb_error_code(err_code_str) + ')'
except Exception:
ret = ack + err_code_str + ' unknown'
bflb_utils.printf(ret)
return ret
def if_deal_response(self):
ack = self.if_deal_ack()
if ack == 'OK':
success, len_bytes = self.if_read(4)
if success == 0:
bflb_utils.printf('Get length error')
bflb_utils.printf(binascii.hexlify(len_bytes))
return (
'Get length error', len_bytes)
tmp = bflb_utils.bytearray_reverse(len_bytes[2:4])
data_len = bflb_utils.bytearray_to_int(tmp)
success, data_bytes = self.if_read(data_len + 4)
if success == 0:
bflb_utils.printf('Read data error')
return (
'Read data error', data_bytes)
data_bytes = data_bytes[4:]
if len(data_bytes) != data_len:
bflb_utils.printf('Not get excepted length')
return (
'Not get excepted length', data_bytes)
return (ack, data_bytes)
bflb_utils.printf('Not ack OK')
bflb_utils.printf(ack)
return (
ack, None)
def deal_response(self):
ack = self.if_deal_ack()
if ack == 'OK':
success, len_bytes = self.if_read(4)
if success == 0:
bflb_utils.printf('Get length error')
bflb_utils.printf(binascii.hexlify(len_bytes))
return (
'Get length error', len_bytes)
tmp = bflb_utils.bytearray_reverse(len_bytes[2:4])
data_len = bflb_utils.bytearray_to_int(tmp)
success, data_bytes = self.if_read(data_len + 4)
if success == 0:
bflb_utils.printf('Read data error')
return (
'Read data error', data_bytes)
data_bytes = data_bytes[4:]
if len(data_bytes) != data_len:
bflb_utils.printf('Not get excepted length')
return (
'Not get excepted length', data_bytes)
return (ack, data_bytes)
bflb_utils.printf('Not ack OK')
bflb_utils.printf(ack)
return (
ack, None)
if __name__ == '__main__':
eflash_loader_t = BflbOpenocdPort()
eflash_loader_t.if_init('', 100, 'bl60x')
bflb_utils.printf('read test')
bflb_utils.printf(eflash_loader_t.if_raw_read('21000000', 2))
bflb_utils.printf(eflash_loader_t.if_raw_read('21000000', 4))
bflb_utils.printf(eflash_loader_t.if_raw_read('21000000', 10))
bflb_utils.printf(eflash_loader_t.if_raw_read('21000000', 16))
bflb_utils.printf(eflash_loader_t.if_raw_read('21000001', 2))
bflb_utils.printf(eflash_loader_t.if_raw_read('21000001', 4))
bflb_utils.printf(eflash_loader_t.if_raw_read('21000001', 10))
bflb_utils.printf(eflash_loader_t.if_raw_read('21000001', 16))
bflb_utils.printf(eflash_loader_t.if_raw_read('21000002', 2))
bflb_utils.printf(eflash_loader_t.if_raw_read('21000002', 4))
bflb_utils.printf(eflash_loader_t.if_raw_read('21000002', 10))
bflb_utils.printf(eflash_loader_t.if_raw_read('21000002', 16))
bflb_utils.printf('write test')
data = bytearray([
1,2,3,4,1,2,3,4,1,2,3,4,1,2,3,4,1,2,3,4,1,2,3,4,1,2,3,4,1,2,3,4,1,2,3,4,1,2,3,4,1,2,3,4,1,2,3,
4])
eflash_loader_t.if_raw_write('42020000', data)
bflb_utils.printf(eflash_loader_t.if_raw_read('42020000', 62))
# okay decompiling ./libs/bflb_interface_openocd.pyc