Source code for pyserialsensors.devices.nau7802

# SPDX-FileCopyrightText: 2022 German Aerospace Center (DLR)
# SPDX-License-Identifier: MIT

"""
Sensor Class for NAU7802

"""

import time
from ..core.error import Error
from ..core.sensor import I2CSensor

# import logging
# logging.basicConfig(level=logging.DEBUG)


[docs]class NAU7802(I2CSensor): """ Qwiic Scale driver """ __name__ = "NAU7802" _SENSOR_ADDRESS = 0x2A _units = { "adc": "[-]", } _i2c_freq = 100e3 # Register __REG = { "PU_CTRL": 0x00, "CTRL1": 0x01, "CTRL2": 0x02, "OCAL1_B2": 0x03, "OCAL1_B1": 0x04, "OCAL1_B0": 0x05, "GCAL1_B3": 0x06, "GCAL1_B2": 0x07, "GCAL1_B1": 0x08, "GCAL1_B0": 0x09, "OCAL2_B2": 0x0A, "OCAL2_B1": 0x0B, "OCAL2_B0": 0x0C, "GCAL2_B3": 0x0D, "GCAL2_B2": 0x0E, "GCAL2_B1": 0x0F, "GCAL2_B0": 0x10, "I2C_CONTROL": 0x11, "ADCO_B2": 0x12, "ADCO_B1": 0x13, "ADCO_B0": 0x14, "ADC": 0x15, # Shared ADC and OTP 32: 24 "OTP_B1": 0x16, # OTP 23:16 or 7:0? "OTP_B0": 0x17, # OTP 15: 8 "PGA": 0x1B, "PGA_PWR": 0x1C, "DEVICE_REV": 0x1F, } __BIT = { # Bits within the PU_CTRL register "PU_CTRL": { "RR": 0, "PUD": 1, "PUA": 2, "PUR": 3, "CS": 4, "CR": 5, "OSCS": 6, "AVDDS": 7, }, # Bits within the CTRL2 register "CTRL2": { "CHS": 7, "CALMOD": 0, "CALS": 2, "CAL_ERROR": 3, "CRS": 4, }, } # Bits within the PGA register __NAU7802_PGA_CHP_DIS = 0 __NAU7802_PGA_INV = 3 __NAU7802_PGA_BYPASS_EN = 4 __NAU7802_PGA_OUT_EN = 5 __NAU7802_PGA_LDOMODE = 6 __NAU7802_PGA_RD_OTP_SEL = 7 # Bits within the PGA PWR register __NAU7802_PGA_PWR_PGA_CURR = 0 __NAU7802_PGA_PWR_ADC_CURR = 2 __NAU7802_PGA_PWR_MSTR_BIAS_CURR = 4 __NAU7802_PGA_PWR_PGA_CAP_EN = 7 # Allowed Low drop out regulator __NAU7802_LDO_Voltage = { "2.4": 0b111, "2.7": 0b110, "3.0": 0b101, "3.3": 0b100, "3.6": 0b011, "3.9": 0b010, "4.2": 0b001, "4.5": 0b000, } # Allowed gains __NAU7802_GAIN = { "128": 0b111, "64": 0b110, "32": 0b101, "16": 0b100, "8": 0b011, "4": 0b010, "2": 0b001, "1": 0b000, } # Allowed samples per second __NAU7802_SPS = {"320": 0b111, "80": 0b011, "40": 0b010, "20": 0b001, "10": 0b000} # Select between channel values __NAU7802_CHANNEL_1 = 0 __NAU7802_CHANNEL_2 = 1 # Calibration state __CAL_STATUS = {"SUCCESS": 0, "IN PROGRESS": 1, "FAILURE": 2} def __init__(self, *args, **kwargs): """ See pyserialsensors.core.sensors initialization. """ super().__init__(*args, **kwargs) self.exists = self.check_exists() self.offset_factor = None self.gain_factor = None
[docs] def set_bit(self, address: int, value: int): """ Set register a specific bit in a register to 1 Args: address (int): Register address value (int): bit index Returns: int: New register value """ current_value = self.getU8(address) new_value = 0 if current_value is not None: set_value = current_value | (1 << value) new_value = self.getU8([address, set_value]) if new_value is not None: new_value = ((new_value << value) | 1) & 1 self._logger.debug("Set bit %s at register %s", value, address) else: self._logger.warning("Failed to set bit %s at register %s", value, address) return new_value
[docs] def clear_bit(self, address: int, value: int): """ Set register a specific bit in a register to 0 Args: address (int): Register address value (int): bit index Returns: int: New register value """ current_value = self.getU8(address) if current_value is not None: set_value = int(current_value) & ~(1 << value) res = self.txrx([address, set_value], readlen=1) if res is not None: return self.getU8(address) return None
[docs] def get_bit(self, address: int, bit_number: int): """ Read single bit from register defined by address Args: address (int): Register address value (int): bit index Returns: int: bit value """ data = self.getU8(address) res = None if data: res = (data >> bit_number) & 1 return res
[docs] def get_serial_number(self): """ A serial number is not supported by bme280 hence a pseudo identifier is calculated by adding the individual calibration values together Returns: str: Pseudo-serial number """ serial_number = str(self.mux_port) serial_number += str(self.mux.address) serial_number += str(self.bus.ftdi._usb_dev.serial_number) self._logger.info("Assigned pseudo-serial number %s", serial_number) return serial_number
[docs] def check_exists(self): """ Assert if the sensor exists and is responsive Returns: bool: True if sensor answered, false otherwise. """ init = self.initialise() if init: self.serial_number = self.get_serial_number() return True return False
[docs] def get_data_rdy(self): """ Get data ready state Returns: bool: True if data is available, false otherwise. """ return bool( self.get_bit(self.__REG["PU_CTRL"], self.__BIT["PU_CTRL"]["CR"]) == 1 )
[docs] def reset(self): """ Reset sensor Returns: bool: True if sensor has been reset correctly, None otherwise. """ self._logger.info("Reset...") rst = self.set_bit(self.__REG["PU_CTRL"], self.__BIT["PU_CTRL"]["RR"]) if rst is not None: stop_rst = self.clear_bit( self.__REG["PU_CTRL"], self.__BIT["PU_CTRL"]["RR"] ) if stop_rst is not None: return True else: return None else: return None
[docs] def power_up(self): """ Power up analog and digital circuit Returns: bool: True if power up was successful, false otherwise. """ pua_status = True self.set_bit(self.__REG["PU_CTRL"], self.__BIT["PU_CTRL"]["PUD"]) pud_status = self.get_bit(self.__REG["PU_CTRL"], self.__BIT["PU_CTRL"]["PUD"]) if pud_status is not None: self._logger.info("Power up digital successfull.") else: self._logger.warning("Digital power up failed. State:{pud_status}") pua_status = False self.set_bit(self.__REG["PU_CTRL"], self.__BIT["PU_CTRL"]["PUA"]) pua_status = self.get_bit(self.__REG["PU_CTRL"], self.__BIT["PU_CTRL"]["PUA"]) if pua_status: self._logger.info("Power up analog successfull.") else: self._logger.warning("Analog power up failed. State:{pud_status}") pua_status = False return pua_status
def set_ldo_voltage(self, voltage): voltage = str(voltage) if voltage in self.__NAU7802_LDO_Voltage: entry = self.getU8(self.__REG["CTRL1"]) if entry is not None: value = entry & 0b11000111 value += self.__NAU7802_LDO_Voltage[voltage] << 3 self.txrx([self.__REG["CTRL1"], value], readlen=0) current_voltage = self.get_ldo_voltage() self._logger.debug( "LDO Voltage: Wrote %s (%s) Read %s", voltage, self.__NAU7802_LDO_Voltage[voltage], current_voltage, ) if current_voltage == self.__NAU7802_LDO_Voltage[voltage]: self._logger.debug("Successfully set LDO Voltage: %s", value) return True else: self._logger.warning( "Failed to set LDO Voltage: Set: %s \ Current: %s", value, current_voltage, ) return False else: return False else: return False def get_ldo_voltage(self): entry = self.getU8(self.__REG["CTRL1"]) if entry is not None: value = (entry >> 3) & 0b111 self._logger.debug("Current LDO Voltage %s", value) return value def set_gain(self, gain): gain = str(gain) if gain in self.__NAU7802_GAIN: entry = self.getU8(self.__REG["CTRL1"]) if entry is not None: value = (entry & 0b11111000) + self.__NAU7802_GAIN[gain] self.txrx([self.__REG["CTRL1"], value], readlen=0) current_gain = self.get_gain() if current_gain == gain: return True else: self._logger.warning( "Failed to set gain: Set: %s \ Current: %s", value, current_gain, ) return False else: return False else: return False def get_gain(self): entry = self.getU8(self.__REG["CTRL1"]) gain = None if entry is not None: value = entry & 0b111 self._logger.debug("Gain: %s", value) for k in self.__NAU7802_GAIN: if self.__NAU7802_GAIN[k] == value: gain = k else: self._logger.warning("Failed to get gain") return gain def set_sample_rate(self, rate): rate = str(rate) if rate in self.__NAU7802_SPS: entry = self.getU8(self.__REG["CTRL2"]) if entry is not None: self._logger.debug("CTRL2 before SPS manipulation: %d)", entry) value = (entry & 0b10001111) + (self.__NAU7802_SPS[rate] << 4) self._logger.debug("Set SPS to: %s (CTRL2: %d)", rate, value) self.txrx([self.__REG["CTRL2"], value], readlen=0) if self.get_sample_rate() == rate: return True else: self._logger.debug("Incorrect rate") return False else: self._logger.debug("No response when calling CTRL2 %s", entry) return False else: self._logger.debug("Invalid rate: %s", rate) return False def get_sample_rate(self): entry = self.getU8(self.__REG["CTRL1"]) if entry is not None: value = (entry | 0b0111000) >> 3 self._logger.debug("SPS value: %s", value) for k in self.__NAU7802_SPS: if self.__NAU7802_SPS[k] == value: return k return None
[docs] def calibrate(self): """ Initiate calibration of analog front end and wait for calibration to finish Returns: bool: True if successfull, false otherwise. """ calibration_state = False if self.begin_calibrate_afe(): self._logger.info("Waiting for calibration to finish.") state = self.wait_for_calibration(10) if state: calibration_state = True self._logger.info("Finished calibration successfully.") else: self._logger.info("Calibration failed.") else: self._logger.error("Error initializing calibration.") calibration_state = False return calibration_state
[docs] def begin_calibrate_afe(self): """ Start calibration of analog interface. Returns: int: Calibration state """ self._logger.info("Begin calibration of analog interface.") return self.set_bit(self.__REG["CTRL2"], self.__BIT["CTRL2"]["CALS"]) == 1
[docs] def check_calibration_status(self): """ Read calibration status Returns: int: - 0 internal calibration - 2 Offset calibration - 3 Gain calibration """ return self.getU8(self.__REG["CTRL2"]) & 0b11
def wait_for_calibration(self, timeout_ms): calibration_begin = time.time() status = self.check_calibration_status() while status == self.__CAL_STATUS["IN PROGRESS"]: delta_time = (calibration_begin - time.time()) * 1000 self._logger.debug("Calib status: \t %s \t %s", status, delta_time) if delta_time > timeout_ms: self._logger.error("Calibration timed out.") break status = self.check_calibration_status() if status == self.__CAL_STATUS["SUCCESS"]: return True else: return False def get_offset_calibration(self): data = [self.txrx(self.__REG["OCAL1_B2"], readlen=1)[0]] data += self.txrx(self.__REG["OCAL1_B1"], readlen=1) data += self.txrx(self.__REG["OCAL1_B0"], readlen=1) res = None if None not in data: res = (((data[0] << 8) + data[1]) << 8) + data[2] return res def get_gain_calibration(self): data = [self.txrx(self.__REG["GCAL1_B3"], readlen=1)[0]] data += self.txrx(self.__REG["GCAL1_B2"], readlen=1) data += self.txrx(self.__REG["GCAL1_B1"], readlen=1) data += self.txrx(self.__REG["GCAL1_B0"], readlen=1) res = None if None not in data: res = ((((data[0] << 8) + data[1]) << 8) + data[2]) << 8 res += data[3] return res def enable_AVDDS(self): res = self.set_bit(self.__REG["PU_CTRL"], self.__BIT["PU_CTRL"]["AVDDS"]) state = True if res == 0: self._logger.error("Failed to enable ACDDS") state = False return state def wait_for_conversion_cycle(self, timeout_ms): cycle_begin = time.time() status = self.get_data_rdy() while not status: delta_time = (time.time() - cycle_begin) * 1000 self._logger.debug("Wating for conversion %s", delta_time) if delta_time > timeout_ms: time.sleep(0.01) self._logger.error("Conversion timed out.") return False status = self.get_data_rdy() return True def start_conversion(self): return self.set_bit(self.__REG["CTRL2"], self.__BIT["PU_CTRL"]["CS"])
[docs] def prepare_measurement(self): self.reset() self.initialise()
[docs] def initialise(self, voltage=4.5, sample_rate=320, gain=128): """ Initialization procedure when NAU7802 is started # Power up Digital and Analog power supplies # First calibration run # Disable clock # Enable 330pF decoupling capacity # Set sample rate """ init_ok = True # Reset all registers self.reset() # Power on analog and digital sections of the scale init_ok &= self.power_up() self._logger.info("Power up %s", init_ok) # Calibrate analog front end self.calibrate() self._logger.debug("Re-Calibration %s", init_ok) # Turn off CLK_CHP. see manual 9.1 power on sequencing init_ok &= self.txrx([self.__REG["ADC"], 0x30]) is not None self._logger.debug("Set clock frequency %s", init_ok) # Enable 330pF decoupling cap on chan 2. # see manual 9.14 application circuit note init_ok &= ( self.set_bit(self.__REG["PGA_PWR"], self.__NAU7802_PGA_PWR_PGA_CAP_EN) == 1 ) self._logger.debug("Enable 330pF decoupling %s", init_ok) # Set samples per second init_ok &= self.set_sample_rate(sample_rate) self._logger.debug("Set sample rate %s", init_ok) self._logger.debug("PU_CTRL %s", self.getU8(self.__REG["PU_CTRL"])) self._logger.debug("CTRL1 %s", self.getU8(self.__REG["CTRL1"])) self._logger.debug("CTRL2 %s", self.getU8(self.__REG["CTRL2"])) # Set gain init_ok &= self.set_gain(gain) self._logger.debug("Set gain %s", init_ok) # Set LDO Voltage init_ok &= self.set_ldo_voltage(voltage) self._logger.debug("Set internal LDO voltage %s", init_ok) # Re-calibrate analog front end in case of chaning gain, # sample rate or channel init_ok &= self.calibrate() self._logger.debug("Re-Calibration %s", init_ok) # Set internal reference voltage init_ok &= self.enable_AVDDS() self._logger.debug("Enable AVDDS %s", init_ok) # State after initialization self._logger.debug("PU_CTRL %s", self.getU8(self.__REG["PU_CTRL"])) self._logger.debug("CTRL1 %s", self.getU8(self.__REG["CTRL1"])) self._logger.debug("CTRL2 %s", self.getU8(self.__REG["CTRL2"])) # Start conversion cycle self.start_conversion() self.offset_factor = self.get_offset_calibration() self.gain_factor = self.get_gain_calibration() return init_ok
[docs] def get_adc(self): """ Fetch data after conversion cycle has finished :returns: Anlog-Digital-Converter read """ self.wait_for_conversion_cycle(1000) data = self.txrx(self.__REG["ADCO_B2"], readlen=3) adc = None if data is not None and len(data) == 3: adc = ((data[0] << 16) + (data[1] << 8) + data[2]) << 8 adc = self.int32(adc) return adc
[docs] def get_data(self): """ Get current measurement values :return: data dictionary """ # Wait until data is ready self.data = self.default_data() adc = None try: if self.error is None: adc = self.get_adc() except TypeError: # Throw an error if not able to fetch data self.error = Error().read(self) detect = self.mux.poll(self._SENSOR_ADDRESS) if detect: self._logger.warning("Sensor detected") self.reset() else: self._logger.error("Sensor not detected. %s", detect) self.data["object"] = "ERROR" return self.data if adc is None: # Throw an error if not able to fetch data self._logger.error("Error during reading.") self.error = Error().read(self) self.mux.open_single_port(self.mux_port) detect = self.mux.poll(self._SENSOR_ADDRESS) if detect: self._logger.warning("Sensor detected") self.reset() init = self.initialise() if init: self.error = None else: self._logger.error("Sensor not detected. %s", detect) self.data["object"] = "ERROR" return self.data if self.error is None: self.data["error"] = False self.data["values"] = {} self.data["values"]["adc"] = {"value": adc, "unit": self._units["adc"]} return self.data