# SPDX-FileCopyrightText: 2022 German Aerospace Center (DLR)
#
# SPDX-License-Identifier: MIT
"""
Sensor Class for BME280
based on
https://github.com/adafruit/Adafruit_Python_BME280
Copyright (c) 2014 Adafruit Industries
Author: Tony DiCola
Based on the BMP280 driver with BME280 changes provided by
David J Taylor, Edinburgh (www.satsignal.eu). Additional functions added
by Tom Nardi (www.ifail.com)
"""
__author__ = "Konstantin Niehaus"
__copyright__ = "German Aerospace Center"
__credits__ = ["Konstantin Niehaus"]
__license__ = "MIT"
__version__ = "0.1.0"
__email__ = "konstantin.niehaus@dlr.de"
import time
from ..core.error import Error
from ..core.sensor import I2CSensor
import logging
logging.basicConfig(level=logging.DEBUG)
[docs]class BME280(I2CSensor):
"""
Reads absolute pressure [Pa], humidity [pct] and temperature [°C]
"""
__name__ = "BME280"
_units = {
"abs_pressure": "Pa",
"humidity": "pct",
"temperature": "C",
}
# BME280 default address.
_SENSOR_ADDRESS = 0x77
_MODE_SLEEP = 0x00
_MODE_FORCE = 0x01
_MODE_NORMAL = 0x03
# Maximum clock frequency [Hz]
_i2c_freq = 4e5
_delay = 2e-2
# Operating Modes
__OSAMPLE_1 = 1
__OSAMPLE_2 = 2
__OSAMPLE_4 = 3
__OSAMPLE_8 = 4
__OSAMPLE_16 = 5
# Standby Settings
__STANDBY_0p5 = 0
__STANDBY_62p5 = 1
__STANDBY_125 = 2
__STANDBY_250 = 3
__STANDBY_500 = 4
__STANDBY_1000 = 5
__STANDBY_10 = 6
__STANDBY_20 = 7
# Filter Settings
__FILTER_off = 0
__FILTER_2 = 1
__FILTER_4 = 2
__FILTER_8 = 3
__FILTER_16 = 4
# Trimming parameter registers
_REG_T1_LSB = 0x88
_REG_T1_MSB = 0x89
_REG_T2_LSB = 0x8A
_REG_T2_MSB = 0x8B
_REG_T3_LSB = 0x8C
_REG_T3_MSB = 0x8D
_REG_P1_LSB = 0x8E
_REG_P1_MSB = 0x8F
_REG_P2_LSB = 0x90
_REG_P2_MSB = 0x91
_REG_P3_LSB = 0x92
_REG_P3_MSB = 0x93
_REG_P4_LSB = 0x94
_REG_P4_MSB = 0x95
_REG_P5_LSB = 0x96
_REG_P5_MSB = 0x97
_REG_P6_LSB = 0x98
_REG_P6_MSB = 0x99
_REG_P7_LSB = 0x9A
_REG_P7_MSB = 0x9B
_REG_P8_LSB = 0x9C
_REG_P8_MSB = 0x9D
_REG_P9_LSB = 0x9E
_REG_P9_MSB = 0x9F
_REG_H1 = 0xA1
_REG_H2_LSB = 0xE1
_REG_H2_MSB = 0xE2
_REG_H3 = 0xE3
_REG_H4_LSB = 0xE5
_REG_H4_MSB = 0xE4
_REG_H5_LSB = 0xE5
_REG_H5_MSB = 0xE6
_REG_H6 = 0xE7
_REG_CHIPID = 0xD0
_REG_VERSION = 0xD1
_REG_SOFTRESET = 0xE0
_REG_STATUS = 0xF3
_REG_CONTROL_HUM = 0xF2
_REG_CONTROL = 0xF4
_REG_CONFIG = 0xF5
_REG_DATA = 0xF7
t1, t2, t3 = [None] * 3
p1, p2, p3 = [None] * 3
p4, p5, p6 = [None] * 3
p7, p8, p9 = [None] * 3
h1, h2, h3 = [None] * 3
h4, h5, h6 = [None] * 3
t_fine = 0.0
def __init__(
self,
*args,
t_mode: int = __OSAMPLE_1,
p_mode: int = __OSAMPLE_16,
h_mode: int = __OSAMPLE_1,
standby: int = __STANDBY_250,
_filter: int = __FILTER_off,
**kwargs
) -> None:
"""
Initialize mode configration
Args:
t_mode (int): Temperature mode (defaults to 1)
p_mode (int): Pressure mode (defaults to 5)
h_mode (int): Humidity mode (defaults to 1)
standby (int): Standby duration (defaults to 3 (250ms))
filter (int): IIR filter (defaults to off - 0)
"""
super().__init__(*args, **kwargs)
# Check arguments
## Check that p_mode is valid.
if p_mode not in [
self.__OSAMPLE_1,
self.__OSAMPLE_2,
self.__OSAMPLE_4,
self.__OSAMPLE_8,
self.__OSAMPLE_16,
]:
raise ValueError("Unexpected p_mode value {0}.".format(p_mode))
## Check that h_mode is valid.
if h_mode not in [
self.__OSAMPLE_1,
self.__OSAMPLE_2,
self.__OSAMPLE_4,
self.__OSAMPLE_8,
self.__OSAMPLE_16,
]:
raise ValueError("Unexpected h_mode value {0}.".format(h_mode))
## Check that standby is valid.
if standby not in [
self.__STANDBY_0p5,
self.__STANDBY_62p5,
self.__STANDBY_125,
self.__STANDBY_250,
self.__STANDBY_500,
self.__STANDBY_1000,
self.__STANDBY_10,
self.__STANDBY_20,
]:
raise ValueError("Unexpected standby value {0}.".format(standby))
## Check that filter is valid.
if _filter not in [
self.__FILTER_off,
self.__FILTER_2,
self.__FILTER_4,
self.__FILTER_8,
self.__FILTER_16,
]:
raise ValueError("Unexpected filter value {0}.".format(_filter))
self._p_mode = p_mode
self._t_mode = t_mode
self._h_mode = h_mode
self._standby = standby
self._filter = _filter
self.param_t1 = None
self.param_t2 = None
self.param_t3 = None
self.param_p1 = None
self.param_p2 = None
self.param_p3 = None
self.param_p2 = None
self.param_p3 = None
self.param_p4 = None
self.param_p5 = None
self.param_p6 = None
self.param_p7 = None
self.param_p8 = None
self.param_p9 = None
self.param_h1 = None
self.param_h2 = None
self.param_h3 = None
self.param_h4 = None
self.param_h5 = None
self.param_h6 = None
self._logger.debug("Init")
self.reset()
self.exists = self.sensor_exists()
[docs] def get_serial_number(self) -> str:
"""
A serial number is not supported by bme280
hence a pseudo identifier is calculated by
adding the individual calibration values together
Returns:
str: Serial number information or None
"""
serial_number = 0
for i in range(self._REG_T1_MSB, self._REG_H6)[:10]:
calib_parameter = self.getU8(i)
if calib_parameter is not None:
serial_number += calib_parameter
else:
serial_number = None
break
if serial_number is not None:
self.serial_number = serial_number
self._logger.info("Assigned pseudo-serial number %s", serial_number)
return serial_number
[docs] def reset(self) -> None:
"""
Reset sensor to default.
"""
self._logger.debug("Resetting...")
self.txrx(self._REG_SOFTRESET, readlen=2)
self._logger.debug("Done resetting...")
[docs] def init(self) -> None:
"""
Initialize sensor communication
* Activate sleep mode
* Write filter and standby calibration
* Write humidty oversampling rate
* Write temperature oversampling rate
* Write pressure oversampling rate
"""
# Sleep mode
self.txrx([self._REG_CONTROL, 0x24], readlen=0)
time.sleep(self._delay)
# Setup filter and standby
cmd = (self._standby << 5) | (self._filter << 2)
self.txrx([self._REG_CONFIG, cmd], readlen=0)
time.sleep(self._delay)
# Set humidity oversampling
self.txrx([self._REG_CONTROL_HUM, self._h_mode], readlen=0)
# Set Temp/Pressure Oversample and enter Normal mode
cmd = (self._t_mode << 5) + (self._p_mode << 2) + self._MODE_NORMAL
self.txrx([self._REG_CONTROL, cmd], readlen=0)
[docs] def sensor_exists(self) -> bool:
"""
Test if sensor is plugged in and works proper
Returns:
bool: True if test was successful, otherwise False.
"""
# Check if sensor is responsive
serial_number = self.get_serial_number()
self._logger.info("Pseudo serial number %s", serial_number)
if isinstance(serial_number, int):
# Prepare measurement
self.prepare_measurement()
self.exists = True
else:
self.error = Error().read(self)
self.exists = False
return self.exists
[docs] def load_calibration(self) -> bool:
"""
Load all calibration data needed for the calculation of
Temperature, Humidtiy and absolute pressure.
Returns:
bool: True if read has been completed successfully, False otherwise.
"""
self.param_t1 = self.getU16(self._REG_T1_MSB, self._REG_T1_LSB)
if self.param_t1 is None:
return False
self.param_t2 = self.getS16(self._REG_T2_MSB, self._REG_T2_LSB)
self.param_t3 = self.getS16(self._REG_T3_MSB, self._REG_T3_LSB)
self.param_p1 = self.getU16(self._REG_P1_MSB, self._REG_P1_LSB)
self.param_p2 = self.getS16(self._REG_P2_MSB, self._REG_P2_LSB)
self.param_p3 = self.getS16(self._REG_P3_MSB, self._REG_P3_LSB)
self.param_p4 = self.getS16(self._REG_P4_MSB, self._REG_P4_LSB)
self.param_p5 = self.getS16(self._REG_P5_MSB, self._REG_P5_LSB)
self.param_p6 = self.getS16(self._REG_P6_MSB, self._REG_P6_LSB)
self.param_p7 = self.getS16(self._REG_P7_MSB, self._REG_P7_LSB)
self.param_p8 = self.getS16(self._REG_P8_MSB, self._REG_P8_LSB)
self.param_p9 = self.getS16(self._REG_P9_MSB, self._REG_P9_LSB)
self.param_h1 = self.getU8(self._REG_H1)
self.param_h2 = self.getS16(self._REG_H2_MSB, self._REG_H2_LSB)
self.param_h3 = self.getU8(self._REG_H3)
self.param_h4 = (self.getU8(self._REG_H4_MSB) << 4) + (
self.getU8(self._REG_H4_LSB) & 0x0F
)
self.param_h5 = (self.getU8(self._REG_H5_MSB) << 4) + (
(self.getU8(self._REG_H5_LSB) >> 4) & 0x0F
)
self.param_h6 = self.getS8(self._REG_H6)
self._logger.debug("t1 = %s", self.param_t1)
self._logger.debug("t2 = %s", self.param_t2)
self._logger.debug("t3 = %s", self.param_t3)
self._logger.debug("p1 = %s", self.param_p1)
self._logger.debug("p2 = %s", self.param_p2)
self._logger.debug("p3 = %s", self.param_p3)
self._logger.debug("p4 = %s", self.param_p4)
self._logger.debug("p5 = %s", self.param_p5)
self._logger.debug("p6 = %s", self.param_p6)
self._logger.debug("p7 = %s", self.param_p7)
self._logger.debug("p8 = %s", self.param_p8)
self._logger.debug("p9 = %s", self.param_p9)
self._logger.debug("h1 = %s", self.param_h1)
self._logger.debug("h2 = %s", self.param_h2)
self._logger.debug("h3 = %s", self.param_h3)
self._logger.debug("h4 = %s", self.param_h4)
self._logger.debug("h5 = %s", self.param_h5)
self._logger.debug("h6 = %s", self.param_h6)
return True
[docs] def prepare_measurement(self) -> bool:
"""
Placeholder function to be compatible with other sensor classes
Returns:
bool: True if test was successful, otherwise False.
"""
if self.load_calibration():
self.init()
return True
else:
return False
[docs] def get_data(self) -> dict:
"""
Get current measurement values
Returns:
dict: Data dictionary
"""
# Wait until data is ready
if self.error is not None:
res = self.prepare_measurement()
if res:
self.error = None
else:
self.error = Error().read(self)
self.reset()
self.prepare_measurement()
self.data["object"] = "ERROR"
return self.data
self.data = self.default_data()
raw_data = None
try:
raw_data = self.get_raw_data()
except TypeError:
# Throw an error if not able to fetch data
self.error = Error().read(self)
self.reset()
self.prepare_measurement()
self.data["object"] = "ERROR"
return self.data
if raw_data is None:
# Throw an error if not able to fetch data
self.error = Error().read(self)
self.data["object"] = "ERROR"
return self.data
if self.error is None:
temperature = self.read_temperature(raw_data)
if temperature is not None:
temperature = round(temperature, 3)
else:
# Throw an error if not able to fetch data
self.error = Error().read(self)
self.data["object"] = "ERROR"
return self.data
pressure = int(self.read_pressure(raw_data))
humidity = round(self.read_humidity(raw_data), 3)
self.data["error"] = False
self.data["values"] = {}
self.data["values"]["abs_pressure"] = {
"value": pressure,
"unit": self._units["abs_pressure"],
}
self.data["values"]["temperature"] = {
"value": temperature,
"unit": self._units["temperature"],
}
self.data["values"]["humidity"] = {
"value": humidity,
"unit": self._units["humidity"],
}
return self.data
[docs] def get_raw_data(self) -> bytearray:
"""
Read all values from devices needed to calculate
humidity, pressure and temperature values
Returns:
bytearray: Raw data array
"""
i = 0
while i < self._max_attempts:
state = self.getU8([self._REG_STATUS])
if state is not None and state & 0x08:
time.sleep(self._delay)
elif state is None:
return None
i += 1
return self.txrx([self._REG_DATA], 8)
[docs] @classmethod
def read_raw_temp(cls, raw_data: bytearray) -> int:
"""
Calculate uncompensated raw temperature from sensor.
Returns:
int: Raw adc read
"""
raw = (raw_data[3] << 16) | (raw_data[4] << 8) | (raw_data[5])
raw >>= 4
return raw
[docs] @classmethod
def read_raw_pressure(cls, raw_data: bytearray) -> int:
"""
Calculate raw (uncompensated) pressure level from the sensor.
Returns:
int: Raw adc read
"""
raw = (raw_data[0] << 12) | (raw_data[1] << 4) | (raw_data[2] >> 4 & 0x0F)
return raw
[docs] @classmethod
def read_raw_humidity(cls, raw_data: bytearray) -> int:
"""
Calculate raw (uncompensated) humidity value from the sensor.
Args:
raw_data (bytearray): Full raw data array (temperature,pressure and humidity)
Returns:
int: Raw adc read
"""
raw = (raw_data[6] << 8) | raw_data[7]
if raw is not None and raw is int:
if raw > 32767:
raw -= 65536
return raw
[docs] def read_temperature(self, raw_data: bytearray) -> float:
"""
Convert raw temperature to compensated temperature in °C
Args:
raw_data (bytearray): Full raw data array (temperature,pressure and humidity)
Returns:
float: Temperature in °C or None if conversion failed
"""
adc = self.read_raw_temp(raw_data)
self._logger.debug("Temperature raw data %s", adc)
if adc is None:
self._logger.error("Temperature data not valid.")
elif None not in [self.param_t1, self.param_t2, self.param_t3]:
var1 = (adc / 16384.0 - self.param_t1 / 1024.0) * self.param_t2
var2 = ((adc / 131072.0 - self.param_t1 / 8192.0) ** 2) * self.param_t3
self.t_fine = var1 + var2
return self.t_fine / 5120.0
else:
self._logger.error("Calibration data not available.")
return None
[docs] def read_pressure(self, raw_data: bytearray) -> float:
"""
Convert raw pressure to compensated pressure in Pa
Args:
raw_data (bytearray): Full raw data array (temperature,pressure and humidity)
Returns:
float: Pressure in Pa
"""
adc = self.read_raw_pressure(raw_data)
self._logger.debug("Pressure raw data %s", adc)
var1 = float(self.t_fine) / 2.0 - 64000.0
var2 = var1 * var1 * float(self.param_p6) / 32768.0
var2 = var2 + var1 * float(self.param_p5) * 2.0
var2 = var2 / 4.0 + float(self.param_p4) * 65536.0
var1 = (
float(self.param_p3) * var1 * var1 / 524288.0 + float(self.param_p2) * var1
) / 524288.0
var1 = (1.0 + var1 / 32768.0) * float(self.param_p1)
if var1 == 0:
return 0
abs_p = 1048576.0 - adc
abs_p = ((abs_p - var2 / 4096.0) * 6250.0) / var1
var1 = float(self.param_p9) * abs_p * abs_p / 2147483648.0
var2 = abs_p * float(self.param_p8) / 32768.0
abs_p = abs_p + (var1 + var2 + float(self.param_p7)) / 16.0
return abs_p
[docs] def read_humidity(self, raw_data: bytearray) -> float:
"""
Convert raw rel. humidity to compensated relative humidity in %
Args:
raw_data (bytearray): Full raw data array (temperature,pressure and humidity)
Returns:
float: Relative humidity in %
"""
adc = self.read_raw_humidity(raw_data)
self._logger.debug("Humidity raw data %s", adc)
# Algorithm from the BME280 driver
# https://github.com/BoschSensortec/BME280_driver/blob/master/bme280.c
var1 = float(self.t_fine) - 76800.0
var2 = self.param_h4 * 64.0 + (self.param_h5 / 16384.0) * var1
var3 = adc - var2
var4 = self.param_h2 / 65536.0
var5 = 1.0 + (self.param_h3 / 67108864.0) * var1
var6 = 1.0 + (self.param_h6 / 67108864.0) * var1 * var5
var6 = var3 * var4 * var5 * var6
res = var6 * (1.0 - self.param_h1 * var6 / 524288.0)
humidity = max(0.0, min(res, 100.0))
return humidity
[docs] def read_dewpoint(self, raw_data: bytearray) -> float:
"""
Return calculated dewpoint in °C, only accurate at > 50% RH
"""
celsius = self.read_temperature(raw_data)
humidity = self.read_humidity(raw_data)
dewpoint = celsius - ((100 - humidity) / 5)
return dewpoint
[docs] def read_dewpoint_f(self, raw_data: bytearray) -> float:
"""
Calculated dewpoint in F, only accurate at > 50% RH
"""
dewpoint_c = self.read_dewpoint(raw_data)
dewpoint_f = dewpoint_c * 1.8 + 32
return dewpoint_f