118 lines
3.3 KiB
Python
118 lines
3.3 KiB
Python
from .util import sleep, get_addr, digital_read, GDO0_PIN, GDO2_PIN, debug, delay
|
|
import time
|
|
from .common import strobe, read_register
|
|
|
|
SIDLE = get_addr('SIDLE')
|
|
SFRX = get_addr('SFRX')
|
|
SRX = get_addr('SRX')
|
|
RXFIFO = get_addr('RXFIFO')
|
|
RSSI = get_addr('RSSI')
|
|
|
|
def print_packet(spi, packet_length):
|
|
for i in range(packet_length):
|
|
print(f", byte: {0}: 0x{1}", i, read_register(spi, RXFIFO))
|
|
|
|
#TODO
|
|
def save_packet():
|
|
print()
|
|
|
|
|
|
def get_rssi_from_pkt(packet):
|
|
if len(packet) < 2:
|
|
raise ValueError("Packet too short to contain RSSI and status bytes.")
|
|
rssi_raw = packet[-2] # Second-to-last byte is RSSI
|
|
return rssi_raw
|
|
|
|
|
|
def get_rssi_from_reg(spi):
|
|
return read_register(spi, RSSI)
|
|
|
|
def get_signal_strength_rssi_raw(rssi_raw):
|
|
if rssi_raw >= 128:
|
|
rssi_dec = rssi_raw - 256
|
|
else:
|
|
rssi_dec = rssi_raw
|
|
return rssi_dec / 2.0 - 74 # According to CC2500 datasheet
|
|
|
|
def flush_rx(spi):
|
|
# Make sure that the radio is in IDLE state before flushing the FIFO
|
|
# (Unless RXOFF_MODE has been changed, the radio should be in IDLE state at this point)
|
|
delay(10)
|
|
strobe(spi, SIDLE)
|
|
delay(10)
|
|
# Flush RX FIFO
|
|
strobe(spi, SFRX)
|
|
delay(10)
|
|
|
|
def rx_data_rf(spi):
|
|
strobe(spi, SRX)
|
|
gdo2_state = False
|
|
count = 0
|
|
strength = 0
|
|
while(gdo2_state == False):
|
|
gdo2_state = digital_read(GDO2_PIN)
|
|
delay(1)
|
|
count = count+1
|
|
if count > 1000:
|
|
flush_rx(spi)
|
|
# print("ERR NO DATA")
|
|
return
|
|
while(gdo2_state == True):
|
|
gdo2_state = digital_read(GDO2_PIN)
|
|
delay(100)
|
|
packet_length: int = read_register(spi, RXFIFO)
|
|
# print("Packet Length {0}".format(packet_length))
|
|
packet: list = [read_register(spi, RXFIFO) for _ in range(packet_length)]
|
|
rssi_raw = get_rssi_from_pkt(packet)
|
|
strength = get_signal_strength_rssi_raw(rssi_raw)
|
|
print("Length: {0} bytes\t Signal: {1} dBm".format(packet_length, strength) )
|
|
|
|
# Make sure that the radio is in IDLE state before flushing the FIFO
|
|
# (Unless RXOFF_MODE has been changed, the radio should be in IDLE state at this point)
|
|
strobe(spi, SIDLE)
|
|
# Flush RX FIFO
|
|
strobe(spi, SFRX)
|
|
|
|
# def burst_read(spi, addr, length):
|
|
# """Read multiple bytes"""
|
|
# READ_SINGLE = get_addr("READ_SINGLE")
|
|
# READ_BURST = get_addr("READ_BURST")
|
|
# return spi.xfer2([addr | READ_SINGLE | READ_BURST] + [0x00] * length)
|
|
|
|
|
|
|
|
# def read_fifo(spi):
|
|
# # Burst read RX FIFO
|
|
# READ_BURST = get_addr("READ_BURST")
|
|
# RXFIFO = get_addr("RXFIFO")
|
|
# fifo = spi.xfer2([RXFIFO | READ_BURST] + [0x00]*64) # Max 64 bytes
|
|
# return fifo[1:]
|
|
|
|
|
|
# def receive_packet(spi):
|
|
# SFRX = get_addr("SFRX")
|
|
# RXBYTES = get_addr("RXBYTES")
|
|
# SRX = get_addr("SRX")
|
|
|
|
# # Flush RX FIFO
|
|
# strobe(spi, SFRX)
|
|
# time.sleep(0.5) # 1 ms delay to allow flush
|
|
# # Go into RX mode
|
|
# strobe(spi, SRX)
|
|
|
|
# # Wait for data (use GDO0 in real app)
|
|
# sleep(0.5)
|
|
|
|
# # Check RXBYTES
|
|
|
|
# timeout = time.time() + 2 # 2-second timeout
|
|
# while time.time() < timeout:
|
|
# rx_bytes = read_register(spi, RXBYTES) & 0x7F
|
|
# if rx_bytes > 0:
|
|
# data = read_fifo(spi)
|
|
# print(f"Received: {data}")
|
|
# return data
|
|
# time.sleep(0.01)
|
|
|
|
# print("Timeout: no data received.")
|