from util import sleep, get_addr, digital_read, GDO0_PIN, GDO2_PIN, debug, delay import time from common import strobe, read_register SIDLE = get_addr('SIDLE') SFRX = get_addr('SFRX') SRX = get_addr('SRX') RXFIFO = get_addr('RXFIFO') RSSI = get_addr('RSSI') def print_packet(spi, packet_length): for i in range(packet_length): print(f", byte: {0}: 0x{1}", i, read_register(spi, RXFIFO)) #TODO def save_packet(): print() def get_rssi_from_pkt(packet): if len(packet) < 2: raise ValueError("Packet too short to contain RSSI and status bytes.") rssi_raw = packet[-2] # Second-to-last byte is RSSI return rssi_raw def get_rssi_from_reg(spi): return read_register(spi, RSSI) def get_signal_strength_rssi_raw(rssi_raw): if rssi_raw >= 128: rssi_dec = rssi_raw - 256 else: rssi_dec = rssi_raw return rssi_dec / 2.0 - 74 # According to CC2500 datasheet def flush_rx(spi): # Make sure that the radio is in IDLE state before flushing the FIFO # (Unless RXOFF_MODE has been changed, the radio should be in IDLE state at this point) delay(10) strobe(spi, SIDLE) delay(10) # Flush RX FIFO strobe(spi, SFRX) delay(10) def rx_data_rf(spi): strobe(spi, SRX) gdo2_state = False count = 0 strength = 0 while(gdo2_state == False): gdo2_state = digital_read(GDO2_PIN) delay(1) count = count+1 if count > 1000: flush_rx(spi) # print("ERR NO DATA") return while(gdo2_state == True): gdo2_state = digital_read(GDO2_PIN) delay(100) packet_length: int = read_register(spi, RXFIFO) # print("Packet Length {0}".format(packet_length)) packet: list = [read_register(spi, RXFIFO) for _ in range(packet_length)] rssi_raw = get_rssi_from_pkt(packet) strength = get_signal_strength_rssi_raw(rssi_raw) print("Length: {0} bytes\t Signal: {1} dBm".format(packet_length, strength) ) # Make sure that the radio is in IDLE state before flushing the FIFO # (Unless RXOFF_MODE has been changed, the radio should be in IDLE state at this point) strobe(spi, SIDLE) # Flush RX FIFO strobe(spi, SFRX) # def burst_read(spi, addr, length): # """Read multiple bytes""" # READ_SINGLE = get_addr("READ_SINGLE") # READ_BURST = get_addr("READ_BURST") # return spi.xfer2([addr | READ_SINGLE | READ_BURST] + [0x00] * length) # def read_fifo(spi): # # Burst read RX FIFO # READ_BURST = get_addr("READ_BURST") # RXFIFO = get_addr("RXFIFO") # fifo = spi.xfer2([RXFIFO | READ_BURST] + [0x00]*64) # Max 64 bytes # return fifo[1:] # def receive_packet(spi): # SFRX = get_addr("SFRX") # RXBYTES = get_addr("RXBYTES") # SRX = get_addr("SRX") # # Flush RX FIFO # strobe(spi, SFRX) # time.sleep(0.5) # 1 ms delay to allow flush # # Go into RX mode # strobe(spi, SRX) # # Wait for data (use GDO0 in real app) # sleep(0.5) # # Check RXBYTES # timeout = time.time() + 2 # 2-second timeout # while time.time() < timeout: # rx_bytes = read_register(spi, RXBYTES) & 0x7F # if rx_bytes > 0: # data = read_fifo(spi) # print(f"Received: {data}") # return data # time.sleep(0.01) # print("Timeout: no data received.")