dev #1

Merged
david merged 25 commits from dev into main 2025-04-28 14:20:45 -07:00
4 changed files with 109 additions and 47 deletions
Showing only changes of commit 242a41fb91 - Show all commits

View File

@ -24,17 +24,17 @@ def read_register(spi, addr):
READ_SINGLE = get_addr("READ_SINGLE") READ_SINGLE = get_addr("READ_SINGLE")
# Send address | 0x80 (read), then 0x00 dummy to clock in response # Send address | 0x80 (read), then 0x00 dummy to clock in response
response = spi.xfer2([READ_SINGLE | addr, 0x00]) response = spi.xfer2([READ_SINGLE | addr, 0x00])
sleep(0.1) # sleep(0.1)
return response[1] return response[1]
def spi_write_burst(spi, addr, data): def write_burst(spi, addr, data):
"""Write multiple bytes using burst write""" """Write multiple bytes using burst write"""
WRITE_BURST = get_addr("WRITE_BURST") WRITE_BURST = get_addr("WRITE_BURST")
tbuf = [addr | WRITE_BURST] + data tbuf = [addr | WRITE_BURST] + data
spi.xfer2(tbuf) spi.xfer2(tbuf)
sleep(0.1) sleep(0.1)
def spi_read_burst(spi, addr, length): def read_burst(spi, addr, length):
"""Read multiple bytes using burst read""" """Read multiple bytes using burst read"""
READ_BURST = get_addr("READ_BURST") READ_BURST = get_addr("READ_BURST")
rbuf = [addr | READ_BURST] + [0x00] * length rbuf = [addr | READ_BURST] + [0x00] * length

13
main.py
View File

@ -1,18 +1,16 @@
import spidev import spidev
from util import print_gdo_state, sleep, get_addr, dump_regs from util import print_gdo_state, sleep, get_addr, dump_regs, debug, GDO0_PIN, GDO2_PIN
from receive import receive_packet from receive import rx_data_rf
from common import reset, setup_spi, setup_gpio, read_register, test_read_write_reg, init_cc_2500, write_reg, SRES, SNOP, MARCSTATE, VERSION from common import reset, setup_spi, setup_gpio, read_register, test_read_write_reg, init_cc_2500, write_reg, SRES, SNOP, MARCSTATE, VERSION
debug = True
GDO0_PIN=17
GDO2_PIN=27
def menu(): def menu():
print("\nMenu") print("\nMenu")
print("1: Read Reg by name") print("1: Read Reg by name")
print("2: Write reg hex value by name") print("2: Write reg hex value by name")
print("3: Dump registers") print("3: Dump registers")
print("4: Poll for packets") print("4: rx_data_rf")
print("5: Run Read+Write test") print("5: Run Read+Write test")
print("6: Print GDO state") print("6: Print GDO state")
print("0: Quit") print("0: Quit")
@ -51,7 +49,8 @@ if __name__ == "__main__":
elif cmd == 3: elif cmd == 3:
dump_regs(spi, True) dump_regs(spi, True)
elif cmd == 4: elif cmd == 4:
receive_packet(spi) while(True):
rx_data_rf(spi)
elif cmd == 5: elif cmd == 5:
res = test_read_write_reg(spi, True) res = test_read_write_reg(spi, True)
print("Test result : "+str(res)) print("Test result : "+str(res))

View File

@ -1,45 +1,87 @@
from util import * from util import sleep, get_addr, digital_read, GDO0_PIN, GDO2_PIN, debug, delay
import time import time
from common import strobe, read_register
def burst_read(spi, addr, length): SIDLE = get_addr('SIDLE')
"""Read multiple bytes""" SFRX = get_addr('SFRX')
READ_SINGLE = get_addr("READ_SINGLE") SRX = get_addr('SRX')
READ_BURST = get_addr("READ_BURST") RXFIFO = get_addr('RXFIFO')
return spi.xfer2([addr | READ_SINGLE | READ_BURST] + [0x00] * length)
def flush_rx(spi):
# Make sure that the radio is in IDLE state before flushing the FIFO
def read_fifo(spi): # (Unless RXOFF_MODE has been changed, the radio should be in IDLE state at this point)
# Burst read RX FIFO delay(10)
READ_BURST = get_addr("READ_BURST") strobe(spi, SIDLE)
RXFIFO = get_addr("RXFIFO") delay(10)
fifo = spi.xfer2([RXFIFO | READ_BURST] + [0x00]*64) # Max 64 bytes
return fifo[1:]
def receive_packet(spi):
SFRX = get_addr("SFRX")
RXBYTES = get_addr("RXBYTES")
SRX = get_addr("SRX")
# Flush RX FIFO # Flush RX FIFO
strobe(spi, SFRX) strobe(spi, SFRX)
time.sleep(0.5) # 1 ms delay to allow flush delay(10)
# Go into RX mode
strobe(spi, SRX)
# Wait for data (use GDO0 in real app) def rx_data_rf(spi):
sleep(0.5) strobe(spi, SRX)
gdo2_state = False
count = 0
while(gdo2_state == False):
gdo2_state = digital_read(GDO2_PIN)
delay(1)
count = count+1
if count > 1000:
flush_rx(spi)
print("ERR NO DATA")
return
while(gdo2_state == True):
gdo2_state = digital_read(GDO2_PIN)
delay(100)
packet_length: int = read_register(spi, RXFIFO)
print("Packet Length {0}".format(packet_length))
for i in range(packet_length):
print(f", byte: {0}: 0x{1}", i, read_register(spi, RXFIFO))
# Check RXBYTES # Make sure that the radio is in IDLE state before flushing the FIFO
# (Unless RXOFF_MODE has been changed, the radio should be in IDLE state at this point)
strobe(spi, SIDLE)
# Flush RX FIFO
strobe(spi, SFRX)
# def burst_read(spi, addr, length):
# """Read multiple bytes"""
# READ_SINGLE = get_addr("READ_SINGLE")
# READ_BURST = get_addr("READ_BURST")
# return spi.xfer2([addr | READ_SINGLE | READ_BURST] + [0x00] * length)
# def read_fifo(spi):
# # Burst read RX FIFO
# READ_BURST = get_addr("READ_BURST")
# RXFIFO = get_addr("RXFIFO")
# fifo = spi.xfer2([RXFIFO | READ_BURST] + [0x00]*64) # Max 64 bytes
# return fifo[1:]
# def receive_packet(spi):
# SFRX = get_addr("SFRX")
# RXBYTES = get_addr("RXBYTES")
# SRX = get_addr("SRX")
timeout = time.time() + 2 # 2-second timeout # # Flush RX FIFO
while time.time() < timeout: # strobe(spi, SFRX)
rx_bytes = read_register(spi, RXBYTES) & 0x7F # time.sleep(0.5) # 1 ms delay to allow flush
if rx_bytes > 0: # # Go into RX mode
data = read_fifo(spi) # strobe(spi, SRX)
print(f"Received: {data}")
return data
time.sleep(0.01)
print("Timeout: no data received.") # # Wait for data (use GDO0 in real app)
# sleep(0.5)
# # Check RXBYTES
# timeout = time.time() + 2 # 2-second timeout
# while time.time() < timeout:
# rx_bytes = read_register(spi, RXBYTES) & 0x7F
# if rx_bytes > 0:
# data = read_fifo(spi)
# print(f"Received: {data}")
# return data
# time.sleep(0.01)
# print("Timeout: no data received.")

23
util.py
View File

@ -1,6 +1,9 @@
from regs_addr import regs_addr from regs_addr import regs_addr
import RPi.GPIO as GPIO import RPi.GPIO as GPIO
import time import time
debug = True
GDO0_PIN=17
GDO2_PIN=27
def rr(spi, addr): def rr(spi, addr):
READ_SINGLE = get_addr("READ_SINGLE") READ_SINGLE = get_addr("READ_SINGLE")
@ -15,6 +18,16 @@ def print_gdo_state(GDO0_PIN=17, GDO2_PIN=27):
print(f"GDO0 (GPIO{GDO0_PIN}): {gdo0}, GDO2 (GPIO{GDO2_PIN}): {gdo2}") print(f"GDO0 (GPIO{GDO0_PIN}): {gdo0}, GDO2 (GPIO{GDO2_PIN}): {gdo2}")
sleep(0.1) sleep(0.1)
def digital_read(GDO_PIN: int):
bit = GPIO.input(GDO_PIN)
# print("bit " + str(bit))
if bit == 1:
return True
elif bit == 0:
return False
else:
raise Exception("Unexpected digital read GDO_PIN{GDO_PIN} {bit}")
def get_addr(name): def get_addr(name):
addr = "" addr = ""
stop = False stop = False
@ -44,4 +57,12 @@ def dump_regs(spi, cfgonly = False):
# spi.xfer2([CONFIG_REGS["PKTCTRL0"], 0x01]) # spi.xfer2([CONFIG_REGS["PKTCTRL0"], 0x01])
def sleep(t): def sleep(t):
return time.sleep(t) return time.sleep(t)
def delay(t):
"""Delay for t milliseconds."""
time.sleep(t / 1000.0)
def delayMicroseconds(t):
"""Delay for t microseconds."""
time.sleep(t / 1_000_000.0)