# SPDX-FileCopyrightText: 2022 German Aerospace Center (DLR)
#
# SPDX-License-Identifier: MIT
"""
Sensor Class for SDP8
"""
# import logging
# logging.basicConfig(level=logging.DEBUG)
from ..core.error import Error
from ..core.sensor import I2CSensor
# https://www.sensirion.com/fileadmin/user_upload/customers/sensirion/Dokumente/0_Datasheets/Differential_Pressure/Sensirion_Differential_Pressure_Sensors_SDP8xx_Digital_Datasheet.pdf
[docs]class SDP8(I2CSensor):
"""
Differential Pressure sensor SDP8 125Pa and 8500
"""
__name__ = "SDP8"
_units = {"diff_pressure": "Pa", "temperature": "C"}
_i2c_freq = 10e3
_SENSOR_ADDRESS = 0x25
_crc_check_init = 0xFF
_temperature_scale_factor = 200 # 1/°C
_temperature_unit = "deg C"
_cmd_cont_meas_diff_pressure_avg = [0x36, 0x03]
_cmd_cont_meas_diff_pressure = [0x36, 0x08]
_cmd_cont_meas_diff_mass_avg = [0x36, 0x15]
_cmd_cont_meas_diff_mass = [0x36, 0x1E]
_cmd_stop_cont_meas = [0x3F, 0xF9]
_cmd_single_meas_mass = [0x36, 0x24]
_cmd_single_meas_pressure = [0x36, 0x2F]
_cmd_soft_rst = [0x00, 0x06]
_cmd_product_identifier = [0x36, 0x7C, 0xE1, 0x02]
__supported_sensors = {
"030201": {"id": "SDP800-500Pa", "max_pressure": 500}, # noqa: E501
"03020A": {"id": "SDP810-500Pa", "max_pressure": 500}, # noqa: E501
"030204": {"id": "SDP801-500Pa", "max_pressure": 500}, # noqa: E501
"03020D": {"id": "SDP811-500Pa", "max_pressure": 500}, # noqa: E501
"030202": {"id": "SDP800-125Pa", "max_pressure": 125}, # noqa: E501
"03020B": {"id": "SDP810-125Pa", "max_pressure": 125},
} # noqa: E501
def __init__(self, *args, **kwargs):
""""""
super().__init__(*args, **kwargs)
self.product_number = None
self.exists = self.sensor_exists()
[docs] def sensor_exists(self):
"""Test if sensor is plugged in and works proper
:return: [BOOLEAN] True if test was successful otherwise False
"""
self.reset()
sn = self.get_serial_number()
self._logger.debug("Got serial sn: %s", sn)
if isinstance(sn, int) and sn != 0:
return True
else:
self.error = Error().crc(self)
return False
[docs] def reset(self):
"""Reseting the sensor
:return: True for successful reset else False
"""
try:
self.txrx(self._cmd_soft_rst, readlen=0)
self.txrx(self._cmd_stop_cont_meas, readlen=0)
return True
except IOError:
return False
[docs] def crc_check(self, ba: bytearray):
"""
:param ba: byte array with n*3 entries. Two data bytes are followed by one crc check.
:return: list of booleans with len n indicating crc results
"""
crc = [False] * int(len(ba) / 3)
for i in range(0, len(ba), 3):
crc[int(i / 3)] = Error().checksum(
byte_values=[ba[i], ba[i + 1]], crc_value=ba[i + 2], crc_init=0xFF
)[1]
return crc
def check_if_supported(self, ba):
if ba not in self.__supported_sensors.keys():
return False
return True
def get_raw_serial(self):
self.txrx(self._cmd_product_identifier[:2], readlen=0)
self.txrx(self._cmd_product_identifier[2:], readlen=0)
ba = self.mux.receive(self._SENSOR_ADDRESS, self.mux_port, readlength=18)
return ba
[docs] def get_serial_number(self):
"""Reads unique serial number of sensor
:return: [STRING] serial number of the sensor
"""
self.serial_number = None
self.error = None
ba = self.get_raw_serial()
if ba is not None and len(ba) == 18:
crc = self.crc_check(ba)
# Product number
if crc[0] and crc[1]:
hex_str = ""
for i in [0, 1, 3, 4]:
hex_str += f"{ba[i]:02X}"
if not self.check_if_supported(hex_str[:-2]):
self.error = Error().unsupported_sensor(self)
return self.error
self.product_number = hex_str[:-2]
self.sensor_type = self.__name__
else:
self.error = Error().crc(self)
return self.error
# Serial number
if crc[2] and crc[3] and crc[4] and crc[5]:
binary_str = ""
for i in [6, 7, 9, 10, 12, 13, 15, 16]:
bybi = str(bin(ba[i])[2:])
binary_str += bybi.zfill(8)
self.serial_number = int(binary_str, 2)
return self.serial_number
else:
self.error = Error().crc(self)
return self.error
else:
self.error = Error().read(self)
return self.error
[docs] def prepare_measurement(self):
self.start_continuous_measurement()
ba = self.get_raw_data()
if ba is not None and len(ba) == 9:
self.crc_check(ba)
self.scale_factor = ba[6] << 8 | ba[7]
return True
else:
self.mux.close_all_ports()
self.reset()
self.error = Error().read(self)
return self.error
def start_continuous_measurement(self):
self.txrx(self._cmd_cont_meas_diff_pressure, readlen=0)
def get_raw_data(self):
return self.txrx([], readlen=9)
[docs] def get_data(self):
"""
:return: data dictionary
:rtype: dict
"""
data = self.default_data()
ba = self.get_raw_data()
if ba is not None and len(ba) == 9:
# Perform crc check.
crc = self.crc_check(ba)
if crc[0] and crc[1] and crc[2]:
k_dp = self.bytes_to_s16(ba[6], ba[7]) # 1 / Pa
if k_dp == 0:
self._logger.warning("Differential pressure scale == 0")
return Error().read(self)
# Calculate physical quantities
dp = self.bytes_to_s16(ba[0], ba[1]) / k_dp # Pa
T = (
self.bytes_to_s16(ba[3], ba[4]) / self._temperature_scale_factor
) # °C
# Setup return values
data["values"] = {}
data["values"]["temperature"] = {
"value": round(T, 2),
"unit": self._units["temperature"],
}
data["values"]["diff_pressure"] = {
"value": round(dp, 3),
"unit": self._units["diff_pressure"],
}
# Mark values as valid
data["error"] = False
else:
self._logger.debug("CRC Error [%s, %s, %s]", crc)
self.error = Error().crc(self)
data = self.error
self.reset()
self.prepare_measurement()
else:
data = Error().read(self)
self.reset()
self.prepare_measurement()
return data
class SDP8XX(SDP8):
pass