dev #1

Merged
david merged 25 commits from dev into main 2025-04-28 14:20:45 -07:00
4 changed files with 25 additions and 22 deletions
Showing only changes of commit c89f936c07 - Show all commits

View File

@ -1,4 +1,4 @@
import spidev
from spidev import SpiDev
import RPi.GPIO as GPIO
from .util import sleep, get_addr
from .init_regs_value import init_regs_value
@ -15,26 +15,26 @@ VERSION = STATUS["VERSION"]
MARCSTATE = STATUS["MARCSTATE"]
def write_reg(spi, addr, value):
def write_reg(spi: SpiDev, addr: int, value: int):
"""Write single byte to a register"""
spi.xfer2([addr, value])
sleep(0.1)
def read_register(spi, addr):
def read_register(spi: SpiDev, addr: int):
READ_SINGLE = get_addr("READ_SINGLE")
# Send address | 0x80 (read), then 0x00 dummy to clock in response
response = spi.xfer2([READ_SINGLE | addr, 0x00])
# sleep(0.1)
return response[1]
def write_burst(spi, addr, data):
def write_burst(spi: SpiDev, addr: int, data):
"""Write multiple bytes using burst write"""
WRITE_BURST = get_addr("WRITE_BURST")
tbuf = [addr | WRITE_BURST] + data
spi.xfer2(tbuf)
sleep(0.1)
def read_burst(spi, addr, length):
def read_burst(spi: SpiDev, addr: int, length):
"""Read multiple bytes using burst read"""
READ_BURST = get_addr("READ_BURST")
rbuf = [addr | READ_BURST] + [0x00] * length
@ -42,18 +42,18 @@ def read_burst(spi, addr, length):
sleep(0.1)
return response[1:] # Skip status byte
def strobe(spi, command):
def strobe(spi: SpiDev, command: int):
"""Send a command strobe to CC2500"""
spi.xfer2([command])
sleep(0.1)
def init_cc_2500(spi):
def init_cc_2500(spi: SpiDev):
for reg_name, value in init_regs_value.items():
addr = get_addr(reg_name)
write_reg(spi, addr, value)
def setup_spi():
spi = spidev.SpiDev()
spi = SpiDev()
spi.open(0, 0) # Bus 0, CE0 (Pin 24)
spi.max_speed_hz = 100_000 # Safe start speed
spi.mode = 0b00 # SPI mode 0
@ -66,7 +66,7 @@ def setup_gpio(GDO0_PIN=17, GDO2_PIN=27):
GPIO.setup(GDO2_PIN, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
sleep(0.1)
def reset(spi):
def reset(spi: SpiDev):
# print("Sending SRES (reset)...")
spi.xfer2([SRES])
sleep(0.5)
@ -86,7 +86,7 @@ def reset(spi):
# raise Exception("Expected Version was 0x2F!! Quitting")
def test_read_write_reg(spi, dbg=False):
def test_read_write_reg(spi: SpiDev, dbg=False):
FIFOTHR = get_addr("FIFOTHR")
initial_val = read_register(spi, FIFOTHR)
test_value = initial_val + 1

View File

@ -1,5 +1,7 @@
from datetime import datetime
from pathlib import Path
from spidev import SpiDev
from .util import sleep, get_addr, digital_read, GDO0_PIN, GDO2_PIN, debug, delay
import time
from .common import strobe, read_register
@ -10,12 +12,12 @@ SRX = get_addr('SRX')
RXFIFO = get_addr('RXFIFO')
RSSI = get_addr('RSSI')
def print_packet(spi, packet_length):
def print_packet(spi: SpiDev, packet_length):
for i in range(packet_length):
print(f", byte: {0}: 0x{1}", i, read_register(spi, RXFIFO))
# Save packet to a timestamped binary file
def save_packet(packet: list):
def save_packet(packet: list[int]):
# Create directory to store packets if it doesn't exist
packet_dir = Path("saved_packets")
packet_dir.mkdir(parents=True, exist_ok=True)
@ -31,14 +33,14 @@ def save_packet(packet: list):
print(f"Packet saved to {filename}")
def get_rssi_from_pkt(packet):
def get_rssi_from_pkt(packet: list[int]):
if len(packet) < 2:
raise ValueError("Packet too short to contain RSSI and status bytes.")
rssi_raw = packet[-2] # Second-to-last byte is RSSI
return rssi_raw
def get_rssi_from_reg(spi):
def get_rssi_from_reg(spi: SpiDev):
return read_register(spi, RSSI)
def get_signal_strength_rssi_raw(rssi_raw):
@ -48,7 +50,7 @@ def get_signal_strength_rssi_raw(rssi_raw):
rssi_dec = rssi_raw
return rssi_dec / 2.0 - 74 # According to CC2500 datasheet
def flush_rx(spi):
def flush_rx(spi: SpiDev):
# Make sure that the radio is in IDLE state before flushing the FIFO
# (Unless RXOFF_MODE has been changed, the radio should be in IDLE state at this point)
delay(10)
@ -58,7 +60,7 @@ def flush_rx(spi):
strobe(spi, SFRX)
delay(10)
def rx_data_rf(spi):
def rx_data_rf(spi: SpiDev):
strobe(spi, SRX)
gdo2_state = False
count = 0
@ -75,7 +77,7 @@ def rx_data_rf(spi):
gdo2_state = digital_read(GDO2_PIN)
delay(100)
packet_length: int = read_register(spi, RXFIFO)
packet: list = [read_register(spi, RXFIFO) for _ in range(packet_length)]
packet: list[int] = [read_register(spi, RXFIFO) for _ in range(packet_length)]
rssi_raw = get_rssi_from_pkt(packet)
strength = get_signal_strength_rssi_raw(rssi_raw)
print("Length: {0} bytes\t Signal: {1} dBm".format(packet_length, strength) )

View File

@ -1,3 +1,4 @@
from spidev import SpiDev
from .regs_addr import regs_addr
import RPi.GPIO as GPIO
import time
@ -5,7 +6,7 @@ debug = True
GDO0_PIN=17
GDO2_PIN=27
def rr(spi, addr):
def rr(spi: SpiDev, addr):
READ_SINGLE = get_addr("READ_SINGLE")
# Send address | 0x80 (read), then 0x00 dummy to clock in response
response = spi.xfer2([READ_SINGLE | addr, 0x00])
@ -21,19 +22,19 @@ def print_gdo_state(GDO0_PIN=17, GDO2_PIN=27):
def digital_read(GDO_PIN: int):
return GPIO.input(GDO_PIN)
def get_addr(name):
addr = ""
def get_addr(name: str):
addr = 0x00
stop = False
for reg_type, reg_data in regs_addr.items():
for reg_name, reg_addr in reg_data.items():
if reg_name == name:
stop = True
addr = reg_addr
addr = int(reg_addr)
if stop == False:
raise Exception("Failed to find address for "+name)
return addr
def dump_regs(spi, cfgonly = False):
def dump_regs(spi: SpiDev, cfgonly = False):
if cfgonly:
for reg_name, reg_addr in regs_addr["CONFIG_REGS"].items():
name :str = reg_name

0
tx_packets/activate.bin Normal file
View File