From 6c717d5bd7a82516f95f9ce917bd820e48cd20da Mon Sep 17 00:00:00 2001 From: David Westgate Date: Fri, 11 Apr 2025 19:40:00 -0700 Subject: [PATCH] black formatter --- src/common.py | 20 ++++++++++++++------ src/main.py | 38 ++++++++++++++++++++++++++++---------- src/receive.py | 26 +++++++++++++++----------- src/regs_addr.py | 30 +++++++++++++----------------- src/transmit.py | 8 ++++---- src/util.py | 28 +++++++++++++++++++--------- 6 files changed, 93 insertions(+), 57 deletions(-) diff --git a/src/common.py b/src/common.py index 834ceda..12b9c19 100644 --- a/src/common.py +++ b/src/common.py @@ -3,6 +3,7 @@ import RPi.GPIO as GPIO from .util import sleep, get_addr from .init_regs_value import init_regs_value from .regs_addr import regs_addr + CONFIG_REGS = regs_addr["CONFIG_REGS"] STROBES = regs_addr["STROBES"] STATUS = regs_addr["STATUS"] @@ -20,6 +21,7 @@ def write_reg(spi: SpiDev, addr: int, value: int): spi.xfer2([addr, value]) sleep(0.1) + def read_register(spi: SpiDev, addr: int): READ_SINGLE = get_addr("READ_SINGLE") # Send address | 0x80 (read), then 0x00 dummy to clock in response @@ -27,6 +29,7 @@ def read_register(spi: SpiDev, addr: int): # sleep(0.1) return response[1] + def write_burst(spi: SpiDev, addr: int, data): """Write multiple bytes using burst write""" WRITE_BURST = get_addr("WRITE_BURST") @@ -34,6 +37,7 @@ def write_burst(spi: SpiDev, addr: int, data): spi.xfer2(tbuf) sleep(0.1) + def read_burst(spi: SpiDev, addr: int, length): """Read multiple bytes using burst read""" READ_BURST = get_addr("READ_BURST") @@ -42,16 +46,19 @@ def read_burst(spi: SpiDev, addr: int, length): sleep(0.1) return response[1:] # Skip status byte + def strobe(spi: SpiDev, command: int): """Send a command strobe to CC2500""" spi.xfer2([command]) sleep(0.1) + def init_cc_2500(spi: SpiDev): for reg_name, value in init_regs_value.items(): addr = get_addr(reg_name) write_reg(spi, addr, value) + def setup_spi(): spi = SpiDev() spi.open(0, 0) # Bus 0, CE0 (Pin 24) @@ -66,6 +73,7 @@ def setup_gpio(GDO0_PIN=17, GDO2_PIN=27): GPIO.setup(GDO2_PIN, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) sleep(0.1) + def reset(spi: SpiDev): # print("Sending SRES (reset)...") spi.xfer2([SRES]) @@ -76,7 +84,7 @@ def reset(spi: SpiDev): print(f"Status byte: 0x{status:02X}") # print("Reading MARCSTATE...") - marc = spi.xfer2([MARCSTATE| 0x80, 0x00])[0] + marc = spi.xfer2([MARCSTATE | 0x80, 0x00])[0] print(f"Marcstate byte: 0x{marc:02X}") print("Reading VERSION register...") @@ -93,10 +101,10 @@ def test_read_write_reg(spi: SpiDev, dbg=False): write_reg(spi, FIFOTHR, test_value) check = read_register(spi, FIFOTHR) write_reg(spi, FIFOTHR, initial_val) - if((check != test_value) or dbg): + if (check != test_value) or dbg: print("initial value ", initial_val) print("test value ", test_value) - print("check ",check) - if(not dbg): - raise Exception("Test Read+Write failed") - return check == test_value \ No newline at end of file + print("check ", check) + if not dbg: + raise Exception("Test Read+Write failed") + return check == test_value diff --git a/src/main.py b/src/main.py index 52ece6c..47983ca 100644 --- a/src/main.py +++ b/src/main.py @@ -2,8 +2,19 @@ import spidev from .util import print_gdo_state, sleep, get_addr, dump_regs, debug, GDO0_PIN, GDO2_PIN from .receive import rx_data_rf from .transmit import transmit_packet -from .common import reset, setup_spi, setup_gpio, read_register, test_read_write_reg, init_cc_2500, write_reg, SRES, SNOP, MARCSTATE, VERSION - +from .common import ( + reset, + setup_spi, + setup_gpio, + read_register, + test_read_write_reg, + init_cc_2500, + write_reg, + SRES, + SNOP, + MARCSTATE, + VERSION, +) def menu(): @@ -34,30 +45,37 @@ if __name__ == "__main__": menu() cmd = int(input("$: ")) if cmd == 0: - stop=True + stop = True elif cmd == 1: reg_name = input("Register name: ") addr = get_addr(reg_name) - value = read_register(spi,addr) - print("Address: " + hex(addr)+ ", Value: " + hex(value) +" == " + str(value)) + value = read_register(spi, addr) + print( + "Address: " + + hex(addr) + + ", Value: " + + hex(value) + + " == " + + str(value) + ) elif cmd == 2: reg_name = input("Register name: ") addr = get_addr(reg_name) - value = int(input("New Register value (hex): "),16) + value = int(input("New Register value (hex): "), 16) write_reg(spi, addr, value) sleep(0.1) - value_check = read_register(spi,addr) - print("Updated Value: " + hex(value) +" == " + str(value)) + value_check = read_register(spi, addr) + print("Updated Value: " + hex(value) + " == " + str(value)) elif cmd == 3: dump_regs(spi, True) elif cmd == 4: - while(True): + while True: rx_data_rf(spi) elif cmd == 5: transmit_packet(spi) elif cmd == 6: res = test_read_write_reg(spi, True) - print("Test result : "+str(res)) + print("Test result : " + str(res)) elif cmd == 7: print_gdo_state(GDO0_PIN, GDO2_PIN) else: diff --git a/src/receive.py b/src/receive.py index 2179643..56300e5 100644 --- a/src/receive.py +++ b/src/receive.py @@ -3,19 +3,20 @@ from pathlib import Path from spidev import SpiDev from .util import sleep, get_addr, digital_read, GDO0_PIN, GDO2_PIN, debug, delay -import time from .common import strobe, read_register -SIDLE = get_addr('SIDLE') -SFRX = get_addr('SFRX') -SRX = get_addr('SRX') -RXFIFO = get_addr('RXFIFO') -RSSI = get_addr('RSSI') +SIDLE = get_addr("SIDLE") +SFRX = get_addr("SFRX") +SRX = get_addr("SRX") +RXFIFO = get_addr("RXFIFO") +RSSI = get_addr("RSSI") + def print_packet(spi: SpiDev, packet_length): for i in range(packet_length): print(f", byte: {0}: 0x{1}", i, read_register(spi, RXFIFO)) + # Save packet to a timestamped binary file def save_packet(packet: list[int]): # Create directory to store packets if it doesn't exist @@ -43,12 +44,14 @@ def get_rssi_from_pkt(packet: list[int]): def get_rssi_from_reg(spi: SpiDev): return read_register(spi, RSSI) + def get_signal_strength_rssi_raw(rssi_raw: int) -> float: if rssi_raw >= 128: rssi_dec = rssi_raw - 256 else: rssi_dec = rssi_raw - return rssi_dec / 2.0 - 74 # According to CC2500 datasheet + return rssi_dec / 2.0 - 74 # According to CC2500 datasheet + def flush_rx(spi: SpiDev): # Make sure that the radio is in IDLE state before flushing the FIFO @@ -60,27 +63,28 @@ def flush_rx(spi: SpiDev): strobe(spi, SFRX) delay(10) + def rx_data_rf(spi: SpiDev): strobe(spi, SRX) gdo2_state = False count = 0 strength = 0 - while(gdo2_state == False): + while gdo2_state == False: gdo2_state = digital_read(GDO2_PIN) delay(1) - count = count+1 + count = count + 1 if count > 1000: flush_rx(spi) # print("ERR NO DATA") return - while(gdo2_state == True): + while gdo2_state == True: gdo2_state = digital_read(GDO2_PIN) delay(100) packet_length: int = read_register(spi, RXFIFO) packet: list[int] = [read_register(spi, RXFIFO) for _ in range(packet_length)] rssi_raw = get_rssi_from_pkt(packet) strength = get_signal_strength_rssi_raw(rssi_raw) - print("Length: {0} bytes\t Signal: {1} dBm".format(packet_length, strength) ) + print("Length: {0} bytes\t Signal: {1} dBm".format(packet_length, strength)) save_packet(packet) # Make sure that the radio is in IDLE state before flushing the FIFO # (Unless RXOFF_MODE has been changed, the radio should be in IDLE state at this point) diff --git a/src/regs_addr.py b/src/regs_addr.py index 4a25151..64f371e 100644 --- a/src/regs_addr.py +++ b/src/regs_addr.py @@ -1,5 +1,6 @@ from typing import TypedDict + class ConfigRegs(TypedDict): IOCFG2: int IOCFG1: int @@ -49,6 +50,7 @@ class ConfigRegs(TypedDict): TEST1: int TEST0: int + class Strobes(TypedDict): SRES: int SFSTXON: int @@ -65,6 +67,7 @@ class Strobes(TypedDict): SWORRST: int SNOP: int + class Status(TypedDict): PARTNUM: int VERSION: int @@ -80,20 +83,24 @@ class Status(TypedDict): RXBYTES: int NUM_RXBYTES: int + class Memory(TypedDict): PATABLE: int TXFIFO: int RXFIFO: int + class Masks(TypedDict): LQI_RX: int CRC_OK: int + class Access(TypedDict): WRITE_BURST: int READ_SINGLE: int READ_BURST: int + class RegsAddr(TypedDict): CONFIG_REGS: ConfigRegs STROBES: Strobes @@ -151,7 +158,7 @@ regs_addr: RegsAddr = { "AGCTEST": 0x2B, "TEST2": 0x2C, "TEST1": 0x2D, - "TEST0": 0x2E + "TEST0": 0x2E, }, "STROBES": { "SRES": 0x30, @@ -167,7 +174,7 @@ regs_addr: RegsAddr = { "SFRX": 0x3A, "SFTX": 0x3B, "SWORRST": 0x3C, - "SNOP": 0x3D + "SNOP": 0x3D, }, "STATUS": { "PARTNUM": 0x30, @@ -182,20 +189,9 @@ regs_addr: RegsAddr = { "VCO_VC_DAC": 0x39, "TXBYTES": 0x3A, "RXBYTES": 0x3B, - "NUM_RXBYTES": 0x7F + "NUM_RXBYTES": 0x7F, }, - "MEMORY": { - "PATABLE": 0x3E, - "TXFIFO": 0x3F, - "RXFIFO": 0x3F - }, - "MASKS": { - "LQI_RX": 0x01, - "CRC_OK": 0x80 - }, - "ACCESS": { - "WRITE_BURST": 0x40, - "READ_SINGLE": 0x80, - "READ_BURST": 0xC0 - } + "MEMORY": {"PATABLE": 0x3E, "TXFIFO": 0x3F, "RXFIFO": 0x3F}, + "MASKS": {"LQI_RX": 0x01, "CRC_OK": 0x80}, + "ACCESS": {"WRITE_BURST": 0x40, "READ_SINGLE": 0x80, "READ_BURST": 0xC0}, } diff --git a/src/transmit.py b/src/transmit.py index 086cd1e..8748476 100644 --- a/src/transmit.py +++ b/src/transmit.py @@ -2,12 +2,13 @@ from .common import strobe, write_burst from .util import get_addr -SIDLE = get_addr('SIDLE') -SFTX = get_addr('SFTX') -STX = get_addr('STX') +SIDLE = get_addr("SIDLE") +SFTX = get_addr("SFTX") +STX = get_addr("STX") WRITE_BURST = get_addr("WRITE_BURST") TXFIFO_BURST = 0x7F + def transmit_packet(spi): strobe(spi, SIDLE) strobe(spi, SFTX) @@ -18,4 +19,3 @@ def transmit_packet(spi): data[3] = 3 data[4] = 4 write_burst(spi, TXFIFO_BURST, data) - diff --git a/src/util.py b/src/util.py index 9e3240e..cb95bc2 100644 --- a/src/util.py +++ b/src/util.py @@ -2,9 +2,11 @@ from spidev import SpiDev from .regs_addr import regs_addr import RPi.GPIO as GPIO import time + debug = True -GDO0_PIN=17 -GDO2_PIN=27 +GDO0_PIN = 17 +GDO2_PIN = 27 + def rr(spi: SpiDev, addr): READ_SINGLE = get_addr("READ_SINGLE") @@ -13,15 +15,18 @@ def rr(spi: SpiDev, addr): sleep(0.1) return response[1] + def print_gdo_state(GDO0_PIN=17, GDO2_PIN=27): gdo0 = GPIO.input(GDO0_PIN) # Reads 1 or 0 gdo2 = GPIO.input(GDO2_PIN) print(f"GDO0 (GPIO{GDO0_PIN}): {gdo0}, GDO2 (GPIO{GDO2_PIN}): {gdo2}") - sleep(0.1) + sleep(0.1) + def digital_read(GDO_PIN: int): return GPIO.input(GDO_PIN) + def get_addr(name: str): addr = 0x00 stop = False @@ -31,32 +36,37 @@ def get_addr(name: str): stop = True addr = int(reg_addr) if stop == False: - raise Exception("Failed to find address for "+name) + raise Exception("Failed to find address for " + name) return addr -def dump_regs(spi: SpiDev, cfgonly = False): + +def dump_regs(spi: SpiDev, cfgonly=False): if cfgonly: for reg_name, reg_addr in regs_addr["CONFIG_REGS"].items(): - name :str = reg_name + name: str = reg_name value = rr(spi, reg_addr) - print((name+":").ljust(15) +hex(value).ljust(4)+"\t"+str(value)) + print((name + ":").ljust(15) + hex(value).ljust(4) + "\t" + str(value)) else: for reg_type, reg_data in regs_addr.items(): for reg_name, reg_addr in reg_data.items(): - name :str = reg_name + name: str = reg_name value = rr(spi, reg_addr) - print((name+":").ljust(15) +hex(value).ljust(4)+"\t"+str(value)) + print((name + ":").ljust(15) + hex(value).ljust(4) + "\t" + str(value)) + # def disable_crc(spi): # spi.xfer2([CONFIG_REGS["PKTCTRL0"], 0x01]) + def sleep(t): return time.sleep(t) + def delay(t): """Delay for t milliseconds.""" time.sleep(t / 1000.0) + def delayMicroseconds(t): """Delay for t microseconds.""" time.sleep(t / 1_000_000.0)