715 lines
24 KiB
Python
715 lines
24 KiB
Python
# decompyle3 version 3.9.0
|
|
# Python bytecode version base 3.7.0 (3394)
|
|
# Decompiled from: Python 3.7.16 (default, Mar 30 2023, 01:25:49)
|
|
# [GCC 12.2.1 20220924]
|
|
# Embedded file name: libs/base/bflb_serial.py
|
|
__doc__ = '\nCreated on 20220830\n\n@author: Dillon\n'
|
|
import re, os, sys, time, serial, struct, pylink, cklink, subprocess
|
|
from glob import glob
|
|
from libs.bflb_utils import *
|
|
from serial.tools.list_ports import comports
|
|
import config as gol
|
|
|
|
class BLSerialUart(object):
|
|
__doc__ = '\n Bouffalolab serial package\n '
|
|
|
|
def __init__(self, rts_state=False, dtr_state=False):
|
|
self._device = 'COM1'
|
|
self._baudrate = 115200
|
|
self._isp_baudrate = 2000000
|
|
self._ser = None
|
|
self._shakehand_flag = False
|
|
self._chiptype = 'bl602'
|
|
self.rts_state = rts_state
|
|
self.dtr_state = dtr_state
|
|
|
|
def repeat_init(self, device, rate=0, _chip_type='bl602', _chip_name='bl602'):
|
|
try:
|
|
if not rate:
|
|
rate = self._isp_baudrate
|
|
if self._ser is None:
|
|
self._baudrate = rate
|
|
if ' (' in device:
|
|
dev = device[:device.find(' (')]
|
|
else:
|
|
dev = device
|
|
self._device = dev.upper()
|
|
for i in range(3):
|
|
try:
|
|
self._ser = serial.Serial(dev, rate, timeout=2.0, xonxoff=False, rtscts=False, write_timeout=None, dsrdtr=False)
|
|
except Exception as error:
|
|
try:
|
|
printf(error)
|
|
time.sleep(1)
|
|
finally:
|
|
error = None
|
|
del error
|
|
|
|
else:
|
|
break
|
|
|
|
else:
|
|
self._ser.baudrate = rate
|
|
self._baudrate = rate
|
|
self._602a0_dln_fix = False
|
|
self._chiptype = _chip_type
|
|
self._chipname = _chip_name
|
|
except Exception as e:
|
|
try:
|
|
printf('Error: %s' % e)
|
|
finally:
|
|
e = None
|
|
del e
|
|
|
|
def write(self, message):
|
|
if self._ser:
|
|
self._ser.write(message)
|
|
|
|
def read(self, length=1):
|
|
try:
|
|
data = bytearray(0)
|
|
received = 0
|
|
while received < length:
|
|
tmp = self._ser.read(length - received)
|
|
if len(tmp) == 0:
|
|
break
|
|
else:
|
|
data += tmp
|
|
received += len(tmp)
|
|
|
|
if len(data) != length:
|
|
return (0, data)
|
|
return (1, data)
|
|
except Exception as e:
|
|
try:
|
|
printf('Error: %s' % e)
|
|
finally:
|
|
e = None
|
|
del e
|
|
|
|
def raw_read(self):
|
|
return self._ser.read(self._ser.in_waiting or 1)
|
|
|
|
def clear_buf(self):
|
|
if self._ser:
|
|
self._ser.read_all()
|
|
|
|
def close(self):
|
|
if self._ser:
|
|
self._ser.close()
|
|
|
|
def _is_bouffalo_chip(self):
|
|
bl_sign = False
|
|
if sys.platform.startswith('win'):
|
|
for port, data, info in comports():
|
|
if not port:
|
|
continue
|
|
if self._device.upper() == port.upper():
|
|
if not 'VID:PID=42BF:B210' in info.upper():
|
|
if 'VID:PID=FFFF:FFFF' in info.upper():
|
|
pass
|
|
bl_sign = True
|
|
break
|
|
|
|
else:
|
|
if sys.platform.startswith('linux') or sys.platform.startswith('darwin'):
|
|
ports = []
|
|
for port, data, info in comports():
|
|
if not port:
|
|
continue
|
|
if self._device.upper() == port.upper():
|
|
if not 'PID=FFFF' in info.upper():
|
|
if 'PID=42BF:B210' in info.upper():
|
|
pass
|
|
bl_sign = True
|
|
break
|
|
|
|
return bl_sign
|
|
|
|
def bl_usb_serial_write(self, cutoff_time, reset_revert):
|
|
try:
|
|
boot_revert = 0
|
|
printf('usb serial port')
|
|
if cutoff_time != 0:
|
|
boot_revert = 0
|
|
if cutoff_time > 1000:
|
|
boot_revert = 1
|
|
data = bytearray()
|
|
specialstr = string_to_bytearray('BOUFFALOLAB5555RESET')
|
|
for b in specialstr:
|
|
data.append(b)
|
|
|
|
data.append(boot_revert)
|
|
data.append(reset_revert)
|
|
self._ser.write(data)
|
|
time.sleep(0.05)
|
|
except Exception as e:
|
|
try:
|
|
printf('Error: %s' % e)
|
|
finally:
|
|
e = None
|
|
del e
|
|
|
|
def set_dtr(self, dtr):
|
|
"""
|
|
dtr: If dtr is 1, it shows high. If dtr is 0, it shows low.
|
|
"""
|
|
try:
|
|
if sys.platform.startswith('darwin'):
|
|
self._ser.write(b'BOUFFALOLAB5555DTR' + str(dtr).encode())
|
|
self.clear_buf()
|
|
if dtr == 1:
|
|
self._ser.setDTR(0)
|
|
else:
|
|
if dtr == 0:
|
|
self._ser.setDTR(1)
|
|
else:
|
|
bl_sign = self._is_bouffalo_chip()
|
|
if bl_sign:
|
|
self._ser.write(b'BOUFFALOLAB5555DTR' + str(dtr).encode())
|
|
else:
|
|
if dtr == 1:
|
|
self._ser.setDTR(0)
|
|
else:
|
|
if dtr == 0:
|
|
self._ser.setDTR(1)
|
|
except:
|
|
pass
|
|
|
|
def setDTR(self, dtr):
|
|
self._ser.setDTR(dtr)
|
|
|
|
def set_rts(self, rts):
|
|
"""
|
|
rts: If rts is 1, it shows high. If rts is 0, it shows low.
|
|
"""
|
|
try:
|
|
if sys.platform.startswith('darwin'):
|
|
self._ser.write(b'BOUFFALOLAB5555RTS' + str(rts).encode())
|
|
self.clear_buf()
|
|
if rts == 1:
|
|
self._ser.setRTS(0)
|
|
else:
|
|
if rts == 0:
|
|
self._ser.setRTS(1)
|
|
else:
|
|
bl_sign = self._is_bouffalo_chip()
|
|
if bl_sign:
|
|
self._ser.write(b'BOUFFALOLAB5555RTS' + str(rts).encode())
|
|
else:
|
|
if rts == 1:
|
|
self._ser.setRTS(0)
|
|
else:
|
|
if rts == 0:
|
|
self._ser.setRTS(1)
|
|
except:
|
|
pass
|
|
|
|
def setRTS(self, rts):
|
|
self._ser.setRTS(rts)
|
|
|
|
def reset(self):
|
|
if sys.platform.startswith('darwin'):
|
|
self._ser.write(b'BOUFFALOLAB5555DTR0')
|
|
time.sleep(0.05)
|
|
self._ser.write(b'BOUFFALOLAB5555RTS0')
|
|
time.sleep(0.05)
|
|
self._ser.write(b'BOUFFALOLAB5555RTS1')
|
|
self.clear_buf()
|
|
self._ser.setDTR(1)
|
|
time.sleep(0.05)
|
|
self._ser.setRTS(1)
|
|
time.sleep(0.05)
|
|
self._ser.setRTS(0)
|
|
else:
|
|
bl_sign = self._is_bouffalo_chip()
|
|
if bl_sign:
|
|
self._ser.write(b'BOUFFALOLAB5555DTR0')
|
|
time.sleep(0.05)
|
|
self._ser.write(b'BOUFFALOLAB5555RTS0')
|
|
time.sleep(0.05)
|
|
self._ser.write(b'BOUFFALOLAB5555RTS1')
|
|
else:
|
|
self._ser.setDTR(1)
|
|
time.sleep(0.05)
|
|
self._ser.setRTS(1)
|
|
time.sleep(0.05)
|
|
self._ser.setRTS(0)
|
|
|
|
def set_isp_baudrate(self, baudrate):
|
|
printf('isp mode speed: ', baudrate)
|
|
self._isp_baudrate = baudrate
|
|
|
|
def set_timeout(self, timeout):
|
|
self._ser.timeout = timeout
|
|
|
|
def get_timeout(self):
|
|
return self._ser.timeout
|
|
|
|
def deal_ack(self):
|
|
try:
|
|
success, ack = self.read(2)
|
|
if success == 0:
|
|
printf('ack is ', str(binascii.hexlify(ack)))
|
|
return ack.decode('utf-8')
|
|
if ack.find(b'O') != -1 or ack.find(b'K') != -1:
|
|
return 'OK'
|
|
if ack.find(b'P') != -1 or ack.find(b'D') != -1:
|
|
return 'PD'
|
|
success, err_code = self.read(2)
|
|
if success == 0:
|
|
printf('err code is ', str(binascii.hexlify(err_code)))
|
|
return 'FL'
|
|
err_code_str = str(binascii.hexlify(err_code[1:2] + err_code[0:1]).decode('utf-8'))
|
|
ack = 'FL'
|
|
try:
|
|
ret = ack + err_code_str + '(' + get_bflb_error_code(err_code_str) + ')'
|
|
except Exception as e:
|
|
try:
|
|
printf(e)
|
|
ret = ack + err_code_str + ' unknown'
|
|
finally:
|
|
e = None
|
|
del e
|
|
|
|
printf(ret)
|
|
return ret
|
|
except Exception as e:
|
|
try:
|
|
printf('Error: %s' % e)
|
|
finally:
|
|
e = None
|
|
del e
|
|
|
|
def deal_response(self):
|
|
try:
|
|
ack = self.deal_ack()
|
|
if ack == 'OK':
|
|
success, len_bytes = self.read(2)
|
|
if success == 0:
|
|
printf('Get length error')
|
|
printf('len error is ', binascii.hexlify(len_bytes))
|
|
return (
|
|
'Get length error', len_bytes)
|
|
tmp = bytearray_reverse(len_bytes)
|
|
data_len = bytearray_to_int(tmp)
|
|
success, data_bytes = self.read(data_len)
|
|
if success == 0 or len(data_bytes) != data_len:
|
|
printf('Read data error,maybe not get excepted length')
|
|
return (
|
|
'Read data error,maybe not get excepted length', data_bytes)
|
|
return (
|
|
ack, data_bytes)
|
|
printf('Not ack OK')
|
|
printf(ack)
|
|
return (
|
|
ack, None)
|
|
except Exception as e:
|
|
try:
|
|
printf('Error: %s' % e)
|
|
finally:
|
|
e = None
|
|
del e
|
|
|
|
|
|
class BLSerialJLink(object):
|
|
|
|
def __init__(self, device, speed, jlink_shake_hand_addr, jlink_data_addr, jlink_set_tif, jlink_core_type, jlink_run_addr):
|
|
self.device = device
|
|
self.speed = speed
|
|
self.jlink_shake_hand_addr = jlink_shake_hand_addr
|
|
self.jlink_data_addr = jlink_data_addr
|
|
self.jlink_run_addr = jlink_run_addr
|
|
self._rx_timeout = 10000
|
|
self._chiptype = 'bl602'
|
|
if sys.platform == 'win32':
|
|
obj_dll = pylink.Library(dllpath=path_dll)
|
|
self._jlink = pylink.JLink(lib=obj_dll)
|
|
self.jlink_path = os.path.join(app_path, 'utils/jlink', 'JLink.exe')
|
|
else:
|
|
self._jlink = pylink.JLink()
|
|
self.jlink_path = 'JLinkExe'
|
|
match = re.search('\\d{8,10}', device, re.I)
|
|
if match is not None:
|
|
self._jlink.open(serial_no=(int(self.device)))
|
|
else:
|
|
self._jlink.open()
|
|
self._jlink.set_tif(jlink_set_tif)
|
|
self._jlink.connect(jlink_core_type, self.speed)
|
|
|
|
def set_chip_type(self, chiptype):
|
|
self._chiptype = chiptype
|
|
|
|
def write(self, message):
|
|
self.raw_write(self.jlink_data_addr, message)
|
|
data_list = []
|
|
data_list.append(int('59445248', 16))
|
|
self._jlink.memory_write((int(self.jlink_shake_hand_addr, 16)), data_list, nbits=32)
|
|
|
|
def raw_write(self, addr, data_send):
|
|
addr_int = int(addr, 16)
|
|
len2 = len(data_send) % 4
|
|
len1 = len(data_send) - len2
|
|
if len1 != 0:
|
|
data_list = []
|
|
for i in range(int(len1 / 4)):
|
|
data_list.append(data_send[4 * i] + (data_send[4 * i + 1] << 8) + (data_send[4 * i + 2] << 16) + (data_send[4 * i + 3] << 24))
|
|
|
|
self._jlink.memory_write(addr_int, data_list, nbits=32)
|
|
if len2 != 0:
|
|
data_list = []
|
|
for i in range(len2):
|
|
data_list.append(data_send[len1 + i])
|
|
|
|
self._jlink.memory_write((addr_int + len1), data_list, nbits=8)
|
|
|
|
def raw_write_8(self, addr, data_send):
|
|
data_list = []
|
|
for data in data_send:
|
|
data_list.append(data)
|
|
|
|
self._jlink.memory_write((int(addr, 16)), data_list, nbits=8)
|
|
|
|
def raw_write_16(self, addr, data_send):
|
|
data_list = []
|
|
for i in range(int(len(data_send) / 2)):
|
|
data_list.append(data_send[2 * i] + (data_send[2 * i + 1] << 8))
|
|
|
|
self._jlink.memory_write((int(addr, 16)), data_list, nbits=16)
|
|
|
|
def raw_write_32(self, addr, data_send):
|
|
data_list = []
|
|
for i in range(int(len(data_send) / 4)):
|
|
data_list.append(data_send[4 * i] + (data_send[4 * i + 1] << 8) + (data_send[4 * i + 2] << 16) + (data_send[4 * i + 3] << 24))
|
|
|
|
self._jlink.memory_write((int(addr, 16)), data_list, nbits=32)
|
|
|
|
def read(self, length):
|
|
start_time = time.time() * 1000
|
|
while True:
|
|
ready = self._jlink.memory_read((int(self.jlink_shake_hand_addr, 16)), 1, nbits=32)
|
|
if len(ready) >= 1:
|
|
if ready[0] == int('4B434153', 16):
|
|
break
|
|
elapsed = time.time() * 1000 - start_time
|
|
if elapsed >= self._rx_timeout:
|
|
return 'waiting response time out'.encode('utf-8')
|
|
else:
|
|
time.sleep(0.001)
|
|
|
|
data = self._raw_read(self.jlink_data_addr, length)
|
|
if len(data) != length:
|
|
return (0, data)
|
|
return (1, data)
|
|
|
|
def _raw_read(self, addr, data_len):
|
|
addr_int = int(addr, 16)
|
|
if addr_int % 4 == 0:
|
|
len2 = data_len % 4
|
|
len1 = data_len - len2
|
|
data1 = bytearray(0)
|
|
data2 = bytearray(0)
|
|
if len1 != 0:
|
|
data1 = self._jlink.memory_read(addr_int, (int(len1 / 4)), nbits=32)
|
|
if len2 != 0:
|
|
data2 = self._jlink.memory_read((addr_int + len1), len2, nbits=8)
|
|
data = bytearray(0)
|
|
for tmp in data1:
|
|
data += int_to_4bytearray_l(tmp)
|
|
|
|
data += bytearray(data2)
|
|
return data
|
|
return self._raw_read8(addr, data_len)
|
|
|
|
def _raw_read8(self, addr, data_len):
|
|
data = self._jlink.memory_read((int(addr, 16)), data_len, nbits=8)
|
|
return bytearray(data)
|
|
|
|
def _raw_read16(self, addr, data_len):
|
|
raw_data = self._jlink.memory_read((int(addr, 16)), (data_len / 2), nbits=16)
|
|
data = bytearray(0)
|
|
for tmp in raw_data:
|
|
data += int_to_2bytearray_l(tmp)
|
|
|
|
return bytearray(data)
|
|
|
|
def _raw_read32(self, addr, data_len):
|
|
raw_data = self._jlink.memory_read((int(addr, 16)), (data_len / 4), nbits=32)
|
|
data = bytearray(0)
|
|
for tmp in raw_data:
|
|
data += int_to_4bytearray_l(tmp)
|
|
|
|
return bytearray(data)
|
|
|
|
def set_rx_timeout(self, val):
|
|
self._rx_timeout = val * 1000
|
|
|
|
def halt_cpu(self):
|
|
if self._jlink.halted() is False:
|
|
self._jlink.halt()
|
|
if self._jlink.halted():
|
|
return True
|
|
printf("couldn't halt cpu")
|
|
return False
|
|
|
|
def reset_cpu(self, ms=0, halt=True):
|
|
if self._chiptype != 'bl60x':
|
|
self._jlink.set_reset_pin_low()
|
|
self._jlink.set_reset_pin_high()
|
|
return self._jlink.reset(ms, False)
|
|
|
|
def set_pc_msp(self, pc, msp):
|
|
if self._jlink.halted() is False:
|
|
self._jlink.halt()
|
|
if self._jlink.halted():
|
|
if self._chiptype == 'bl602' or self._chiptype == 'bl702' or self._chiptype == 'bl702l':
|
|
jlink_script = 'jlink.cmd'
|
|
fp = open(jlink_script, 'w+')
|
|
cmd = 'h\r\nSetPC ' + str(self._jlink_run_addr) + '\r\nexit'
|
|
printf(cmd)
|
|
fp.write(cmd)
|
|
fp.close()
|
|
if self._device:
|
|
jlink_cmd = self.jlink_path + ' -device RISC-V -Speed {0} -SelectEmuBySN {1} -IF JTAG -jtagconf -1,-1 -autoconnect 1 -CommanderScript jlink.cmd'.format(str(self._speed), str(self._device))
|
|
else:
|
|
jlink_cmd = self.jlink_path + ' -device RISC-V -Speed {0} -IF JTAG -jtagconf -1,-1 -autoconnect 1 -CommanderScript jlink.cmd'.format(str(self._speed))
|
|
printf(jlink_cmd)
|
|
p = subprocess.Popen(jlink_cmd, shell=True, stdin=(subprocess.PIPE), stdout=(subprocess.PIPE), stderr=(subprocess.PIPE))
|
|
out, err = p.communicate()
|
|
printf(out, err)
|
|
os.remove(jlink_script)
|
|
else:
|
|
self._jlink.register_write(15, int(pc, 16))
|
|
self._jlink.register_write(13, int(msp, 16))
|
|
self._jlink.restart()
|
|
else:
|
|
printf("couldn't halt cpu")
|
|
|
|
def shakehand(self, do_reset=False, reset_hold_time=100, shake_hand_delay=100, reset_revert=True, cutoff_time=0, shake_hand_retry=2, isp_timeout=0, boot_load=False):
|
|
self.write(bytearray(1))
|
|
success, ack = self.if_read(2)
|
|
printf(binascii.hexlify(ack))
|
|
if ack.find(b'O') != -1 or ack.find(b'K') != -1:
|
|
time.sleep(0.03)
|
|
return 'OK'
|
|
return 'FL'
|
|
|
|
def deal_ack(self):
|
|
success, ack = self.read(2)
|
|
if success == 0:
|
|
printf('ack:' + str(binascii.hexlify(ack)))
|
|
return ack.decode('utf-8')
|
|
if ack.find(b'O') != -1 or ack.find(b'K') != -1:
|
|
return 'OK'
|
|
if ack.find(b'P') != -1 or ack.find(b'D') != -1:
|
|
return 'PD'
|
|
success, err_code = self.read(4)
|
|
if success == 0:
|
|
printf('err_code:' + str(binascii.hexlify(err_code)))
|
|
return 'FL'
|
|
err_code_str = str(binascii.hexlify(err_code[3:4] + err_code[2:3]).decode('utf-8'))
|
|
ack = 'FL'
|
|
try:
|
|
ret = ack + err_code_str + '(' + get_bflb_error_code(err_code_str) + ')'
|
|
except Exception:
|
|
ret = ack + err_code_str + ' unknown'
|
|
|
|
printf(ret)
|
|
return ret
|
|
|
|
def deal_response(self):
|
|
ack = self.if_deal_ack()
|
|
if ack == 'OK':
|
|
success, len_bytes = self.read(4)
|
|
if success == 0:
|
|
printf('Get length error')
|
|
printf(binascii.hexlify(len_bytes))
|
|
return (
|
|
'Get length error', len_bytes)
|
|
tmp = bytearray_reverse(len_bytes[2:4])
|
|
data_len = bytearray_to_int(tmp)
|
|
success, data_bytes = self.read(data_len + 4)
|
|
if success == 0:
|
|
printf('Read data error')
|
|
return (
|
|
'Read data error', data_bytes)
|
|
data_bytes = data_bytes[4:]
|
|
if len(data_bytes) != data_len:
|
|
printf('Not get excepted length')
|
|
return (
|
|
'Not get excepted length', data_bytes)
|
|
return (ack, data_bytes)
|
|
printf('Not ack OK')
|
|
printf(ack)
|
|
return (
|
|
ack, None)
|
|
|
|
def close(self):
|
|
self._jlink.close()
|
|
|
|
|
|
class BLSerialCKLink(object):
|
|
|
|
def __init__(self, device, serial, speed, rx_timeout, cklink_shake_hand_addr='20000000', cklink_data_addr='20000004', cklink_run_addr='22010000'):
|
|
self.device = device
|
|
self.serial = serial
|
|
self._speed = speed
|
|
self._rx_timeout = rx_timeout
|
|
self._inited = False
|
|
self._cklink_reg_pc = 32
|
|
self._cklink_shake_hand_addr = cklink_shake_hand_addr
|
|
self._cklink_data_addr = cklink_data_addr
|
|
self._cklink_run_addr = cklink_run_addr
|
|
self._chiptype = 'bl602'
|
|
self.link = gol.obj_cklink
|
|
self.temp_init()
|
|
|
|
def temp_init(self):
|
|
if self._inited is False:
|
|
dev = self.device.split('|')
|
|
vid = int(dev[0].replace('0x', ''), 16)
|
|
pid = int(dev[1].replace('0x', ''), 16)
|
|
printf('SN is ' + str(self.serial))
|
|
self._cklink_vid = vid
|
|
self._cklink_pid = pid
|
|
self._inited = True
|
|
if not self.link:
|
|
self.link = cklink.CKLink(dlldir=cklink_dll, vid=(self._cklink_vid), pid=(self._cklink_pid), sn=serial, arch=2, cdi=0)
|
|
gol.obj_cklink = self.link
|
|
self.link.open()
|
|
if self.link.connected():
|
|
self.link.reset(1)
|
|
return False
|
|
|
|
def set_chip_type(self, chiptype):
|
|
self._chiptype = chiptype
|
|
|
|
def set_rx_timeout(self, val):
|
|
self._rx_timeout = val * 1000
|
|
|
|
def halt_cpu(self):
|
|
return self.link.halt()
|
|
|
|
def resume_cpu(self):
|
|
return self.link.resume()
|
|
|
|
def reset_cpu(self):
|
|
return self.link.reset(1)
|
|
|
|
def set_pc_msp(self):
|
|
self.halt_cpu()
|
|
if self._chiptype == 'bl602' or self._chiptype == 'bl702' or self._chiptype == 'bl702l':
|
|
addr = int(self._cklink_run_addr, 16)
|
|
self.link.write_cpu_reg(self._cklink_reg_pc, addr)
|
|
|
|
def write(self, message):
|
|
self.if_raw_write(self._cklink_data_addr, message)
|
|
self.if_raw_write(self._cklink_shake_hand_addr, binascii.unhexlify('48524459'))
|
|
|
|
def _raw_write(self, addr, data_send):
|
|
self.halt_cpu()
|
|
addr_int = int(addr, 16)
|
|
data_send = bytes(data_send)
|
|
self.link.write_memory(addr_int, data_send)
|
|
self.resume_cpu()
|
|
|
|
def read(self, length):
|
|
start_time = time.time() * 1000
|
|
while True:
|
|
self.halt_cpu()
|
|
ready = self.link.read_memory(int(self._cklink_shake_hand_addr, 16), 4)
|
|
if len(ready) >= 1:
|
|
ready = binascii.hexlify(ready).decode()
|
|
if ready == '5341434b':
|
|
self.resume_cpu()
|
|
break
|
|
elapsed = time.time() * 1000 - start_time
|
|
if elapsed >= self._rx_timeout:
|
|
return (0, 'waiting response time out'.encode('utf-8'))
|
|
else:
|
|
self.resume_cpu()
|
|
time.sleep(0.001)
|
|
|
|
data = self._raw_read(self._cklink_data_addr, length)
|
|
if len(data) != length:
|
|
return (0, data)
|
|
return (1, data)
|
|
|
|
def _raw_read(self, addr, data_len):
|
|
return self.if_raw_read8(addr, data_len)
|
|
|
|
def _raw_read8(self, addr, data_len):
|
|
self.halt_cpu()
|
|
data = self.link.read_memory(int(addr, 16), data_len)
|
|
self.resume_cpu()
|
|
return bytearray(data)
|
|
|
|
def if_shakehand(self):
|
|
self.if_write(bytearray(1))
|
|
success, ack = self.read(2)
|
|
printf(binascii.hexlify(ack))
|
|
if ack.find(b'O') != -1 or ack.find(b'K') != -1:
|
|
time.sleep(0.03)
|
|
return 'OK'
|
|
return 'FL'
|
|
|
|
def deal_ack(self):
|
|
success, ack = self.read(2)
|
|
if success == 0:
|
|
printf('ack:' + str(binascii.hexlify(ack)))
|
|
return ack.decode('utf-8')
|
|
if ack.find(b'O') != -1 or ack.find(b'K') != -1:
|
|
return 'OK'
|
|
if ack.find(b'P') != -1 or ack.find(b'D') != -1:
|
|
return 'PD'
|
|
success, err_code = self.if_read(4)
|
|
if success == 0:
|
|
printf('err_code:' + str(binascii.hexlify(err_code)))
|
|
return 'FL'
|
|
err_code_str = str(binascii.hexlify(err_code[3:4] + err_code[2:3]).decode('utf-8'))
|
|
ack = 'FL'
|
|
try:
|
|
ret = ack + err_code_str + '(' + get_bflb_error_code(err_code_str) + ')'
|
|
except Exception:
|
|
ret = ack + err_code_str + ' unknown'
|
|
|
|
printf(ret)
|
|
return ret
|
|
|
|
def deal_response(self):
|
|
ack = self.deal_ack()
|
|
if ack == 'OK':
|
|
success, len_bytes = self.read(16)
|
|
if success == 0:
|
|
printf('Get length error')
|
|
printf(binascii.hexlify(len_bytes))
|
|
return (
|
|
'Get length error', len_bytes)
|
|
tmp = bytearray_reverse(len_bytes[2:4])
|
|
data_len = bytearray_to_int(tmp)
|
|
success, data_bytes = self.if_read(data_len + 4)
|
|
if success == 0:
|
|
printf('Read data error')
|
|
return (
|
|
'Read data error', data_bytes)
|
|
data_bytes = data_bytes[4:]
|
|
if len(data_bytes) != data_len:
|
|
printf('Not get excepted length')
|
|
return (
|
|
'Not get excepted length', data_bytes)
|
|
return (ack, data_bytes)
|
|
printf('Not ack OK')
|
|
printf(ack)
|
|
return (
|
|
ack, None)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
print(pylink_enumerate())
|
|
ls = BLSerialUart('COM28', 2000000, '4201BFF0', '4201C000', 0, 'RISC-V', '22010000')
|
|
ls.set_dtr(0)
|
|
time.sleep(10)
|
|
ls.set_dtr(1)
|
|
ls.close()
|
|
# okay decompiling ./libs/base/bflb_serial.pyc
|