dev #1

Merged
david merged 25 commits from dev into main 2025-04-28 14:20:45 -07:00
6 changed files with 93 additions and 57 deletions
Showing only changes of commit 6c717d5bd7 - Show all commits

View File

@ -3,6 +3,7 @@ import RPi.GPIO as GPIO
from .util import sleep, get_addr
from .init_regs_value import init_regs_value
from .regs_addr import regs_addr
CONFIG_REGS = regs_addr["CONFIG_REGS"]
STROBES = regs_addr["STROBES"]
STATUS = regs_addr["STATUS"]
@ -20,6 +21,7 @@ def write_reg(spi: SpiDev, addr: int, value: int):
spi.xfer2([addr, value])
sleep(0.1)
def read_register(spi: SpiDev, addr: int):
READ_SINGLE = get_addr("READ_SINGLE")
# Send address | 0x80 (read), then 0x00 dummy to clock in response
@ -27,6 +29,7 @@ def read_register(spi: SpiDev, addr: int):
# sleep(0.1)
return response[1]
def write_burst(spi: SpiDev, addr: int, data):
"""Write multiple bytes using burst write"""
WRITE_BURST = get_addr("WRITE_BURST")
@ -34,6 +37,7 @@ def write_burst(spi: SpiDev, addr: int, data):
spi.xfer2(tbuf)
sleep(0.1)
def read_burst(spi: SpiDev, addr: int, length):
"""Read multiple bytes using burst read"""
READ_BURST = get_addr("READ_BURST")
@ -42,16 +46,19 @@ def read_burst(spi: SpiDev, addr: int, length):
sleep(0.1)
return response[1:] # Skip status byte
def strobe(spi: SpiDev, command: int):
"""Send a command strobe to CC2500"""
spi.xfer2([command])
sleep(0.1)
def init_cc_2500(spi: SpiDev):
for reg_name, value in init_regs_value.items():
addr = get_addr(reg_name)
write_reg(spi, addr, value)
def setup_spi():
spi = SpiDev()
spi.open(0, 0) # Bus 0, CE0 (Pin 24)
@ -66,6 +73,7 @@ def setup_gpio(GDO0_PIN=17, GDO2_PIN=27):
GPIO.setup(GDO2_PIN, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
sleep(0.1)
def reset(spi: SpiDev):
# print("Sending SRES (reset)...")
spi.xfer2([SRES])
@ -93,10 +101,10 @@ def test_read_write_reg(spi: SpiDev, dbg=False):
write_reg(spi, FIFOTHR, test_value)
check = read_register(spi, FIFOTHR)
write_reg(spi, FIFOTHR, initial_val)
if((check != test_value) or dbg):
if (check != test_value) or dbg:
print("initial value ", initial_val)
print("test value ", test_value)
print("check ", check)
if(not dbg):
if not dbg:
raise Exception("Test Read+Write failed")
return check == test_value

View File

@ -2,8 +2,19 @@ import spidev
from .util import print_gdo_state, sleep, get_addr, dump_regs, debug, GDO0_PIN, GDO2_PIN
from .receive import rx_data_rf
from .transmit import transmit_packet
from .common import reset, setup_spi, setup_gpio, read_register, test_read_write_reg, init_cc_2500, write_reg, SRES, SNOP, MARCSTATE, VERSION
from .common import (
reset,
setup_spi,
setup_gpio,
read_register,
test_read_write_reg,
init_cc_2500,
write_reg,
SRES,
SNOP,
MARCSTATE,
VERSION,
)
def menu():
@ -39,7 +50,14 @@ if __name__ == "__main__":
reg_name = input("Register name: ")
addr = get_addr(reg_name)
value = read_register(spi, addr)
print("Address: " + hex(addr)+ ", Value: " + hex(value) +" == " + str(value))
print(
"Address: "
+ hex(addr)
+ ", Value: "
+ hex(value)
+ " == "
+ str(value)
)
elif cmd == 2:
reg_name = input("Register name: ")
addr = get_addr(reg_name)
@ -51,7 +69,7 @@ if __name__ == "__main__":
elif cmd == 3:
dump_regs(spi, True)
elif cmd == 4:
while(True):
while True:
rx_data_rf(spi)
elif cmd == 5:
transmit_packet(spi)

View File

@ -3,19 +3,20 @@ from pathlib import Path
from spidev import SpiDev
from .util import sleep, get_addr, digital_read, GDO0_PIN, GDO2_PIN, debug, delay
import time
from .common import strobe, read_register
SIDLE = get_addr('SIDLE')
SFRX = get_addr('SFRX')
SRX = get_addr('SRX')
RXFIFO = get_addr('RXFIFO')
RSSI = get_addr('RSSI')
SIDLE = get_addr("SIDLE")
SFRX = get_addr("SFRX")
SRX = get_addr("SRX")
RXFIFO = get_addr("RXFIFO")
RSSI = get_addr("RSSI")
def print_packet(spi: SpiDev, packet_length):
for i in range(packet_length):
print(f", byte: {0}: 0x{1}", i, read_register(spi, RXFIFO))
# Save packet to a timestamped binary file
def save_packet(packet: list[int]):
# Create directory to store packets if it doesn't exist
@ -43,6 +44,7 @@ def get_rssi_from_pkt(packet: list[int]):
def get_rssi_from_reg(spi: SpiDev):
return read_register(spi, RSSI)
def get_signal_strength_rssi_raw(rssi_raw: int) -> float:
if rssi_raw >= 128:
rssi_dec = rssi_raw - 256
@ -50,6 +52,7 @@ def get_signal_strength_rssi_raw(rssi_raw: int) -> float:
rssi_dec = rssi_raw
return rssi_dec / 2.0 - 74 # According to CC2500 datasheet
def flush_rx(spi: SpiDev):
# Make sure that the radio is in IDLE state before flushing the FIFO
# (Unless RXOFF_MODE has been changed, the radio should be in IDLE state at this point)
@ -60,12 +63,13 @@ def flush_rx(spi: SpiDev):
strobe(spi, SFRX)
delay(10)
def rx_data_rf(spi: SpiDev):
strobe(spi, SRX)
gdo2_state = False
count = 0
strength = 0
while(gdo2_state == False):
while gdo2_state == False:
gdo2_state = digital_read(GDO2_PIN)
delay(1)
count = count + 1
@ -73,7 +77,7 @@ def rx_data_rf(spi: SpiDev):
flush_rx(spi)
# print("ERR NO DATA")
return
while(gdo2_state == True):
while gdo2_state == True:
gdo2_state = digital_read(GDO2_PIN)
delay(100)
packet_length: int = read_register(spi, RXFIFO)

View File

@ -1,5 +1,6 @@
from typing import TypedDict
class ConfigRegs(TypedDict):
IOCFG2: int
IOCFG1: int
@ -49,6 +50,7 @@ class ConfigRegs(TypedDict):
TEST1: int
TEST0: int
class Strobes(TypedDict):
SRES: int
SFSTXON: int
@ -65,6 +67,7 @@ class Strobes(TypedDict):
SWORRST: int
SNOP: int
class Status(TypedDict):
PARTNUM: int
VERSION: int
@ -80,20 +83,24 @@ class Status(TypedDict):
RXBYTES: int
NUM_RXBYTES: int
class Memory(TypedDict):
PATABLE: int
TXFIFO: int
RXFIFO: int
class Masks(TypedDict):
LQI_RX: int
CRC_OK: int
class Access(TypedDict):
WRITE_BURST: int
READ_SINGLE: int
READ_BURST: int
class RegsAddr(TypedDict):
CONFIG_REGS: ConfigRegs
STROBES: Strobes
@ -151,7 +158,7 @@ regs_addr: RegsAddr = {
"AGCTEST": 0x2B,
"TEST2": 0x2C,
"TEST1": 0x2D,
"TEST0": 0x2E
"TEST0": 0x2E,
},
"STROBES": {
"SRES": 0x30,
@ -167,7 +174,7 @@ regs_addr: RegsAddr = {
"SFRX": 0x3A,
"SFTX": 0x3B,
"SWORRST": 0x3C,
"SNOP": 0x3D
"SNOP": 0x3D,
},
"STATUS": {
"PARTNUM": 0x30,
@ -182,20 +189,9 @@ regs_addr: RegsAddr = {
"VCO_VC_DAC": 0x39,
"TXBYTES": 0x3A,
"RXBYTES": 0x3B,
"NUM_RXBYTES": 0x7F
"NUM_RXBYTES": 0x7F,
},
"MEMORY": {
"PATABLE": 0x3E,
"TXFIFO": 0x3F,
"RXFIFO": 0x3F
},
"MASKS": {
"LQI_RX": 0x01,
"CRC_OK": 0x80
},
"ACCESS": {
"WRITE_BURST": 0x40,
"READ_SINGLE": 0x80,
"READ_BURST": 0xC0
}
"MEMORY": {"PATABLE": 0x3E, "TXFIFO": 0x3F, "RXFIFO": 0x3F},
"MASKS": {"LQI_RX": 0x01, "CRC_OK": 0x80},
"ACCESS": {"WRITE_BURST": 0x40, "READ_SINGLE": 0x80, "READ_BURST": 0xC0},
}

View File

@ -2,12 +2,13 @@ from .common import strobe, write_burst
from .util import get_addr
SIDLE = get_addr('SIDLE')
SFTX = get_addr('SFTX')
STX = get_addr('STX')
SIDLE = get_addr("SIDLE")
SFTX = get_addr("SFTX")
STX = get_addr("STX")
WRITE_BURST = get_addr("WRITE_BURST")
TXFIFO_BURST = 0x7F
def transmit_packet(spi):
strobe(spi, SIDLE)
strobe(spi, SFTX)
@ -18,4 +19,3 @@ def transmit_packet(spi):
data[3] = 3
data[4] = 4
write_burst(spi, TXFIFO_BURST, data)

View File

@ -2,10 +2,12 @@ from spidev import SpiDev
from .regs_addr import regs_addr
import RPi.GPIO as GPIO
import time
debug = True
GDO0_PIN = 17
GDO2_PIN = 27
def rr(spi: SpiDev, addr):
READ_SINGLE = get_addr("READ_SINGLE")
# Send address | 0x80 (read), then 0x00 dummy to clock in response
@ -13,15 +15,18 @@ def rr(spi: SpiDev, addr):
sleep(0.1)
return response[1]
def print_gdo_state(GDO0_PIN=17, GDO2_PIN=27):
gdo0 = GPIO.input(GDO0_PIN) # Reads 1 or 0
gdo2 = GPIO.input(GDO2_PIN)
print(f"GDO0 (GPIO{GDO0_PIN}): {gdo0}, GDO2 (GPIO{GDO2_PIN}): {gdo2}")
sleep(0.1)
def digital_read(GDO_PIN: int):
return GPIO.input(GDO_PIN)
def get_addr(name: str):
addr = 0x00
stop = False
@ -34,6 +39,7 @@ def get_addr(name: str):
raise Exception("Failed to find address for " + name)
return addr
def dump_regs(spi: SpiDev, cfgonly=False):
if cfgonly:
for reg_name, reg_addr in regs_addr["CONFIG_REGS"].items():
@ -47,16 +53,20 @@ def dump_regs(spi: SpiDev, cfgonly = False):
value = rr(spi, reg_addr)
print((name + ":").ljust(15) + hex(value).ljust(4) + "\t" + str(value))
# def disable_crc(spi):
# spi.xfer2([CONFIG_REGS["PKTCTRL0"], 0x01])
def sleep(t):
return time.sleep(t)
def delay(t):
"""Delay for t milliseconds."""
time.sleep(t / 1000.0)
def delayMicroseconds(t):
"""Delay for t microseconds."""
time.sleep(t / 1_000_000.0)