# SPDX-FileCopyrightText: 2022 German Aerospace Center (DLR)
#
# SPDX-License-Identifier: MIT
from .MUX import TCA9548 as MUX
import time
from ..core.error import Error
from ..core.sensor import I2CSensor
from typing import Union
# import logging
# logging.basicConfig(level=logging.DEBUG)
try:
import struct
except ImportError:
import ustruct as struct
[docs]class SFM(I2CSensor):
__name__ = "SFM"
_serial_mode = "I2C"
_SENSOR_ADDRESS = 0x40
_units = {
"volumeflow": "slm",
}
_ADDR_SN = [0x31, 0xAE]
_i2c_freq = 4e5
_offset = None
_scale = None
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.reset()
self.exists = self.sensor_exists()
[docs] def sensor_exists(self) -> bool:
"""
Test if sensor is plugged on and works proper
Returns:
bbol: True if test was successful, false otherwise.
"""
serial_number = self.get_serial_number()
attempt = 0
while attempt < self._max_attempts:
if isinstance(serial_number, int) and serial_number != 0:
self._logger.info("Serial number: %s", serial_number)
self.prepare_measurement()
return True
else:
self._logger.warning("Reading serial number failed. Resetting...")
rst = self.reset()
self._logger.debug("Reset status: %s", rst)
attempt += 1
serial_number = self.get_serial_number()
self.error = Error().crc(self)
return False
[docs] def reset(self) -> bool:
"""
Reseting the sensor
Returns:
bool: True for successful reset else False
"""
try:
self.txrx([0x20, 0x00], readlen=0)
time.sleep(0.1)
return True
except IOError:
return False
[docs] def get_serial_number(self) -> Union[str, None]:
"""
Reads unique serial number of sensor.
Returns:
str: Serial number of the sensor
"""
serial_number = 0
self.error = None
data = self.txrx(self._ADDR_SN, readlen=6)
if data is not None:
check = [
Error.checksum([data[0], data[1]], data[2], crc_init=0x00)[1],
Error.checksum([data[3], data[4]], data[5], crc_init=0x00)[1],
]
if False not in check:
serial_number = data[0]
for i in [1, 3, 4]:
serial_number <<= 8
serial_number += data[i]
self.serial_number = serial_number
else:
self._logger.warning("CRC failed while serial number is read")
self.error = Error().crc(self)
else:
self.error = Error().read(self)
self._logger.debug("Got serial number: %s", serial_number)
return serial_number
[docs] def prepare_measurement(self) -> bool:
"""
Initializes a continuous measurement of the mass flow.
Returns:
bool: Continuous measurement established (True = successful | False = failed)
"""
self._scale = self.get_scale_factor()
if self._scale is None:
return self.error
self._offset = self.get_offset()
if self._offset is None:
return self.error
data = self.txrx([0x10, 0x00], readlen=3)
attempts = 0
while data is None:
attempts += 1
data = self.txrx([0x10, 0x00], readlen=3)
self._logger.warning(
"Failed to prepare measurement (%s/%s).", attempts, self._max_attempts
)
if attempts > self._max_attempts:
self.mux.close_all_ports()
self.error = Error().read(self)
break
if data is not None and 0xFF not in data[:2]:
self._logger.info("Started measurement cycle.")
return True
else:
self.mux.close_all_ports()
self.error = Error().read(self)
self._logger.warning("Failed to initialize measurement cycle.")
return self.error
[docs] def get_scale_factor(self) -> Union[int, None]:
"""
Read conversion factor from sensor.
Returns:
(int, None): Scale factor or in case of an error None
"""
data = self.txrx([0x30, 0xDE], readlen=3)
if Error.checksum([data[0], data[1]], data[2], crc_init=0x00)[1]:
scale_factor = self.bytes_to_u16(data[0], data[1])
return scale_factor
else:
self.error = Error().crc
return None
[docs] def get_offset(self) -> Union[int, None]:
"""
Read offset value from sensor.
Returns:
(int, None): Scale factor or in case of an error None
"""
data = self.txrx([0x30, 0xDF], readlen=3)
if Error.checksum([data[0], data[1]], data[2], crc_init=0x00)[1]:
offset = self.bytes_to_u16(data[0], data[1])
return offset
else:
self.error = Error().crc
return None
[docs] def get_data(self) -> dict:
"""
Read data array.
Returns:
dict: Data dictionary
"""
res = self.txrx([], readlen=3)
self._logger.debug("Result bytes: %s", res)
self.data = self.default_data()
if res is not None:
if Error.checksum([res[0], res[1]], res[2], crc_init=0x00)[1]:
value, crc = struct.unpack(">HB", res)
flow = round((value - self._offset) / self._scale, 3)
if flow <= -218.807:
self.reset()
self.prepare_measurement()
self.error = Error().read(self)
self._logger.warning("Resetting sensor due to overflow.")
return self.error
self.data["values"]["volumeflow"] = {
"value": flow,
"unit": self._units["volumeflow"],
}
self.data["error"] = False
self._logger.debug("Reading successful: %s", flow)
return self.data
else:
self.mux.close_all_ports()
self.error = Error().crc(self)
return self.error
else:
self.mux.close_all_ports()
self.error = Error().read(self)
return self.error
[docs]class SFMXXX(SFM):
"""Deprecated class identifier. Use SFM instead."""
_SENSOR_ADDRESS = 0x01
_ADDR_SN = [0x02, 0xF8]
[docs]class SFM3XXX(SFM):
"""Deprecated class identifier. Use SFM instead."""
pass