# SPDX-FileCopyrightText: 2022 German Aerospace Center (DLR)
#
# SPDX-License-Identifier: MIT
"""
Sensor Class for SCD30
Datasheet
https://www.sensirion.com/fileadmin/user_upload/customers/sensirion/Dokumente/0_Datasheets/CO2/Sensirion_CO2_Sensors_SCD30_Datasheet.pdf
Interface description
https://www.sensirion.com/fileadmin/user_upload/customers/sensirion/Dokumente/9.5_CO2/Sensirion_CO2_Sensors_SCD30_Interface_Description.pdf
"""
import time
from ..core.error import Error
from ..core.sensor import I2CSensor
[docs]class SCD30(I2CSensor):
"""
SCD30 - CO2 and RH/T Sensor Module
"""
__name__ = "SCD30"
_serial_mode = "I2C"
_units = {
"co2": "ppm",
"temperature": "C",
"humidity": "pct",
}
_SENSOR_ADDRESS = 0x61
_crc_check_init = 0xFF
_clk_stretch = True
_i2c_freq = 1e3
# idx 2 + 3: pressure bytes [default 0mBar]
__cmd_cont_meas = [0x00, 0x10] # noqa
__cmd_stop_cont_meas = [0x01, 0x04] # noqa
# idx 2 + 3: time bytes [default 2s]
__cmd_get_measurement_interval = [0x46, 0x00] # noqa
# idx 4 + 5: time bytes [default 2s]
__cmd_set_measurement_interval = [0x46, 0x00, 0x25, 0x00] # noqa
__cmd_get_data_rdy = [0x02, 0x02] # noqa
__cmd_read_meas = [0x03, 0x00] # noqa
__cmd_asc_get_state = [0x53, 0x06] # noqa
__cmd_asc_activate = [0x53, 0x06, 0x00, 0x01, 0xB0] # noqa
__cmd_asc_deactivate = [0x53, 0x06, 0x00, 0x00, 0x81] # noqa
# idx 2 + 3: time bytes [default 2s]
__cmd_forced_calibration = [0x52, 0x04] # noqa
# idx 2 + 3: temperature deg C*100
__cmd_set_temperature_offset = [0x00, 0x3B] # noqa
__cmd_get_temperature_offset = [0x54, 0x03] # noqa
# idx 2 + 3: altitude [m]
__cmd_set_altitude_compensation = [0x00, 0x38]
__cmd_get_altitude_compensation = [0x51, 0x02]
__cmd_get_serial_number = [0xD0, 0x33] # noqa
__cmd_get_firmware_version = [0xD1, 0x00] # noqa
__cmd_soft_rst = [0xD3, 0x04] # noqa
__supported_sensors = ["SCD30"]
def __init__(self, *args, **kwargs):
""""""
super().__init__(*args, **kwargs)
# Identifiers
super().__init__(*args, **kwargs)
self.serial_number = None
self.exists = self.sensor_exists()
[docs] def txrx(self, *args, crc=True, **kwargs):
"""
Standard txrx module is extended by a crc check
"""
data = super().txrx(*args, **kwargs)
if data is not None:
if crc and False in self.crc_check(data):
self._logger.warning("CRC check failed.")
data = None
return data
[docs] def sensor_exists(self):
"""
Test if sensor is plugged in and works proper
Returns:
bool: True if test was successful otherwise False
"""
serial_number = self.get_serial_number()
exists = False
if isinstance(serial_number, str) and serial_number != "":
self.serial_number = serial_number
exists = True
else:
self.error = "No connection."
return exists
[docs] def get_firmware(self):
"""
Read firmware version.
Returns:
str: Firmware version identifier.
"""
data = self.txrx(self.__cmd_get_firmware_version, readlen=3)
if data is not None:
return f"{data[0]}.{data[1]}"
return False
[docs] def get_asc_status(self):
"""
Get automatic self-calibration (acs) status.
Returns:
bool: True if asc active, false otherwise.
"""
data = self.txrx(self.__cmd_asc_get_state, readlen=3)
if data is not None:
if data[1] == 1:
return True
elif data[1] == 0:
return False
else:
return None
[docs] def get_measurement_intervall(self):
"""
Read current measurement interval.
Returns:
int: Seconds per measurement
"""
data = None
for i in range(self._max_attempts):
data = self.txrx(self.__cmd_get_measurement_interval, readlen=3)
if data is not None:
t_int = self.bytes_to_u16(data[0], data[1])
self._logger.debug(f"Intervall time: %d", t_int)
return t_int
return data
[docs] def stop_measurement(self):
"""Stop measurement run."""
self.txrx(self.__cmd_stop_cont_meas, readlen=0)
[docs] def reset(self):
"""
Reseting the sensor
"""
self.txrx(self.__cmd_soft_rst, readlen=0)
time.sleep(0.5)
[docs] def crc_check(self, ba):
"""
Assert if received data is valid
Args:
ba (bytearray): Byte array with a crc check sum at every third entry
Returns:
bytearray: Array of booleans indicating correct crc check sums
"""
crc = [False] * int(len(ba) / 3)
if len(ba) % 3 != 0:
return crc
for i in range(0, len(ba), 3):
crc[int(i / 3)] = Error.checksum(
byte_values=[ba[i], ba[i + 1]],
crc_value=ba[i + 2],
crc_init=self._crc_check_init,
)[1]
return crc
[docs] def get_serial_number(self):
"""
Get device serial number
Returns:
str: serial number or bool if no number has been read
"""
for i in range(self._max_attempts):
data = self.txrx(self.__cmd_get_serial_number, crc=False, readlen=9)
if data is not None:
self.serial_number = ""
for i in [0, 1, 3, 4, 6, 7]:
self.serial_number += chr(data[i])
return self.serial_number
else:
self.error = Error().crc(self)
msg = "Failed to fetch serial number."
msg += "(attempt {i}/{self._max_attempts})"
self._logger.debug(msg)
return False
[docs] def set_asc(self, state):
"""
Setup automatic self calibration
Args:
state (bool): activate or deactivate automatic self-calibration
"""
current_state = self.get_asc_status()
if current_state != state:
if state:
self.txrx(self.__cmd_asc_activate, readlen=0)
else:
self.txrx(self.__cmd_asc_deactivate, readlen=0)
time.sleep(self.measurement_intervall + 1)
[docs] def prepare_measurement(self, amb_pressure=0):
t0 = time.time()
self.start_continuous_measurement(amb_pressure=amb_pressure)
self._logger.debug("Measurement prep. took %d seconds.", int(time.time() - t0))
[docs] def start_continuous_measurement(self, amb_pressure: int = 0):
"""
Initializes a continuous measurement of CO2 temperature and humidity
Args:
amb_pressure (int): ambiet pressure (default==off) ranges from 700-1400mBar
Returns:
bool: Continuous measurement established successfully (true) or failed (false)
"""
intervall = self.get_measurement_intervall()
if isinstance(intervall, int):
self.measurement_intervall = intervall
else:
self.measurement_intervall = 2
self.set_asc(True)
assert amb_pressure == 0 or amb_pressure > 700, "amb_pressure too low"
assert amb_pressure == 0 or amb_pressure < 1400, "amb_pressure too high"
amb_pressure = 1000
if amb_pressure == 0:
# defaults to 1013.25mBar
p_lsb = 0x00
p_msb = 0x00
p_crc = 0x81
else:
p_msb, p_lsb = divmod(amb_pressure, 256)
p_crc = Error.checksum(
byte_values=[p_msb, p_lsb],
crc_value=0x00,
crc_init=self._crc_check_init,
)[0]
cmd = self.__cmd_cont_meas + [p_msb, p_lsb, p_crc]
self.txrx(cmd)
time.sleep(self.measurement_intervall + 1)
[docs] def get_data_rdy(self):
"""
Check if measurement data is available
Returns:
bool: True if data is present, false otherwise.
"""
data = self.txrx(self.__cmd_get_data_rdy, readlen=3)
if data is not None and data[1] == 0x01:
return True
self._logger.debug("Data is not ready.")
return False
[docs] def get_data(self):
"""
Read CO2 concentration, temperature and humidty
Returns:
dict: data dictionary (keys: error, values)
"""
# Wait until data is ready
i = 0
self.data = self.default_data()
while not self.get_data_rdy():
i += 1
time.sleep(2 * self.measurement_intervall / (self._max_attempts - 1))
if i > self._max_attempts:
self._logger.warning("Reset.")
self.error = Error().read(self)
self.reset()
self.prepare_measurement()
return self.error
ba = self.txrx(self.__cmd_read_meas, readlen=18)
if ba is not None:
co2 = int(self.byte_to_float([ba[0], ba[1], ba[3], ba[4]]))
T = self.byte_to_float([ba[6], ba[7], ba[9], ba[10]])
RH = self.byte_to_float([ba[12], ba[13], ba[15], ba[16]])
self.data["error"] = False
self.data["values"] = {}
self.data["values"]["co2"] = {"value": co2, "unit": self._units["co2"]}
self.data["values"]["temperature"] = {
"value": T,
"unit": self._units["temperature"],
}
self.data["values"]["humidity"] = {
"value": RH,
"unit": self._units["humidity"],
}
return self.data
else:
self.error = Error().read(self)
self.prepare_measurement()
return self.error