From c89f936c07fdf94f7c3b361e860253c7d6fc9b2d Mon Sep 17 00:00:00 2001 From: David Westgate Date: Fri, 11 Apr 2025 19:35:12 -0700 Subject: [PATCH] add strong spidev typings --- src/common.py | 20 ++++++++++---------- src/receive.py | 16 +++++++++------- src/util.py | 11 ++++++----- tx_packets/activate.bin | 0 4 files changed, 25 insertions(+), 22 deletions(-) create mode 100644 tx_packets/activate.bin diff --git a/src/common.py b/src/common.py index 1845ffb..834ceda 100644 --- a/src/common.py +++ b/src/common.py @@ -1,4 +1,4 @@ -import spidev +from spidev import SpiDev import RPi.GPIO as GPIO from .util import sleep, get_addr from .init_regs_value import init_regs_value @@ -15,26 +15,26 @@ VERSION = STATUS["VERSION"] MARCSTATE = STATUS["MARCSTATE"] -def write_reg(spi, addr, value): +def write_reg(spi: SpiDev, addr: int, value: int): """Write single byte to a register""" spi.xfer2([addr, value]) sleep(0.1) -def read_register(spi, addr): +def read_register(spi: SpiDev, addr: int): READ_SINGLE = get_addr("READ_SINGLE") # Send address | 0x80 (read), then 0x00 dummy to clock in response response = spi.xfer2([READ_SINGLE | addr, 0x00]) # sleep(0.1) return response[1] -def write_burst(spi, addr, data): +def write_burst(spi: SpiDev, addr: int, data): """Write multiple bytes using burst write""" WRITE_BURST = get_addr("WRITE_BURST") tbuf = [addr | WRITE_BURST] + data spi.xfer2(tbuf) sleep(0.1) -def read_burst(spi, addr, length): +def read_burst(spi: SpiDev, addr: int, length): """Read multiple bytes using burst read""" READ_BURST = get_addr("READ_BURST") rbuf = [addr | READ_BURST] + [0x00] * length @@ -42,18 +42,18 @@ def read_burst(spi, addr, length): sleep(0.1) return response[1:] # Skip status byte -def strobe(spi, command): +def strobe(spi: SpiDev, command: int): """Send a command strobe to CC2500""" spi.xfer2([command]) sleep(0.1) -def init_cc_2500(spi): +def init_cc_2500(spi: SpiDev): for reg_name, value in init_regs_value.items(): addr = get_addr(reg_name) write_reg(spi, addr, value) def setup_spi(): - spi = spidev.SpiDev() + spi = SpiDev() spi.open(0, 0) # Bus 0, CE0 (Pin 24) spi.max_speed_hz = 100_000 # Safe start speed spi.mode = 0b00 # SPI mode 0 @@ -66,7 +66,7 @@ def setup_gpio(GDO0_PIN=17, GDO2_PIN=27): GPIO.setup(GDO2_PIN, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) sleep(0.1) -def reset(spi): +def reset(spi: SpiDev): # print("Sending SRES (reset)...") spi.xfer2([SRES]) sleep(0.5) @@ -86,7 +86,7 @@ def reset(spi): # raise Exception("Expected Version was 0x2F!! Quitting") -def test_read_write_reg(spi, dbg=False): +def test_read_write_reg(spi: SpiDev, dbg=False): FIFOTHR = get_addr("FIFOTHR") initial_val = read_register(spi, FIFOTHR) test_value = initial_val + 1 diff --git a/src/receive.py b/src/receive.py index b18875f..1e0ac46 100644 --- a/src/receive.py +++ b/src/receive.py @@ -1,5 +1,7 @@ from datetime import datetime from pathlib import Path + +from spidev import SpiDev from .util import sleep, get_addr, digital_read, GDO0_PIN, GDO2_PIN, debug, delay import time from .common import strobe, read_register @@ -10,12 +12,12 @@ SRX = get_addr('SRX') RXFIFO = get_addr('RXFIFO') RSSI = get_addr('RSSI') -def print_packet(spi, packet_length): +def print_packet(spi: SpiDev, packet_length): for i in range(packet_length): print(f", byte: {0}: 0x{1}", i, read_register(spi, RXFIFO)) # Save packet to a timestamped binary file -def save_packet(packet: list): +def save_packet(packet: list[int]): # Create directory to store packets if it doesn't exist packet_dir = Path("saved_packets") packet_dir.mkdir(parents=True, exist_ok=True) @@ -31,14 +33,14 @@ def save_packet(packet: list): print(f"Packet saved to {filename}") -def get_rssi_from_pkt(packet): +def get_rssi_from_pkt(packet: list[int]): if len(packet) < 2: raise ValueError("Packet too short to contain RSSI and status bytes.") rssi_raw = packet[-2] # Second-to-last byte is RSSI return rssi_raw -def get_rssi_from_reg(spi): +def get_rssi_from_reg(spi: SpiDev): return read_register(spi, RSSI) def get_signal_strength_rssi_raw(rssi_raw): @@ -48,7 +50,7 @@ def get_signal_strength_rssi_raw(rssi_raw): rssi_dec = rssi_raw return rssi_dec / 2.0 - 74 # According to CC2500 datasheet -def flush_rx(spi): +def flush_rx(spi: SpiDev): # Make sure that the radio is in IDLE state before flushing the FIFO # (Unless RXOFF_MODE has been changed, the radio should be in IDLE state at this point) delay(10) @@ -58,7 +60,7 @@ def flush_rx(spi): strobe(spi, SFRX) delay(10) -def rx_data_rf(spi): +def rx_data_rf(spi: SpiDev): strobe(spi, SRX) gdo2_state = False count = 0 @@ -75,7 +77,7 @@ def rx_data_rf(spi): gdo2_state = digital_read(GDO2_PIN) delay(100) packet_length: int = read_register(spi, RXFIFO) - packet: list = [read_register(spi, RXFIFO) for _ in range(packet_length)] + packet: list[int] = [read_register(spi, RXFIFO) for _ in range(packet_length)] rssi_raw = get_rssi_from_pkt(packet) strength = get_signal_strength_rssi_raw(rssi_raw) print("Length: {0} bytes\t Signal: {1} dBm".format(packet_length, strength) ) diff --git a/src/util.py b/src/util.py index bd5baa9..9e3240e 100644 --- a/src/util.py +++ b/src/util.py @@ -1,3 +1,4 @@ +from spidev import SpiDev from .regs_addr import regs_addr import RPi.GPIO as GPIO import time @@ -5,7 +6,7 @@ debug = True GDO0_PIN=17 GDO2_PIN=27 -def rr(spi, addr): +def rr(spi: SpiDev, addr): READ_SINGLE = get_addr("READ_SINGLE") # Send address | 0x80 (read), then 0x00 dummy to clock in response response = spi.xfer2([READ_SINGLE | addr, 0x00]) @@ -21,19 +22,19 @@ def print_gdo_state(GDO0_PIN=17, GDO2_PIN=27): def digital_read(GDO_PIN: int): return GPIO.input(GDO_PIN) -def get_addr(name): - addr = "" +def get_addr(name: str): + addr = 0x00 stop = False for reg_type, reg_data in regs_addr.items(): for reg_name, reg_addr in reg_data.items(): if reg_name == name: stop = True - addr = reg_addr + addr = int(reg_addr) if stop == False: raise Exception("Failed to find address for "+name) return addr -def dump_regs(spi, cfgonly = False): +def dump_regs(spi: SpiDev, cfgonly = False): if cfgonly: for reg_name, reg_addr in regs_addr["CONFIG_REGS"].items(): name :str = reg_name diff --git a/tx_packets/activate.bin b/tx_packets/activate.bin new file mode 100644 index 0000000..e69de29