at USMB on Summer 2024
Takayuki FUJITA, Univ. of Hyogo, JAPAN
Downlaod and Install Thonny from here.
Look at the screen for an explanation.
At first, connect your ESP32 MCU to your PC by USB cable.
Tools > Options... > [Interpreter] tab > Install or update MicroPython (esptool)
Then press [Install] and wait for [Done]
Copy and paste following Python code on Thonny's "Editor area", then press the Run current script button.
from machine import Pin
import time
# Initialize pin 2 for the LED as an output
led = Pin(2, Pin.OUT)
# Main loop
while True:
led.on() # Turn the LED on
time.sleep(1) # Wait for 1 second
led.off() # Turn the LED off
time.sleep(1) # Wait for 1 second
Check the blinking LED and change the time interval.
Copy and paste following Python code on Thonny's "Editor area", then press the Run current script button.
import network
import time
# Initialize the WiFi interface in station mode
wlan = network.WLAN(network.STA_IF)
# Activate WiFi
wlan.active(True)
print("Nearby WiFi networks:")
for _ in range(5): # Repeat the scan 5 times
networks = wlan.scan()
for ssid, bssid, channel, rssi, authmode, hidden in networks:
print(f"SSID: {ssid.decode()}, RSSI: {rssi} dBm")
time.sleep(1) # Wait for 1 second
When you check in the Shell window, you will see the SSID and RSSI values around you, and among them you will find the base station “JFWM-WS” for the hands-on training.
Remove the ESP32 from the USB cable.
Be carefully connect 4 wires between BME280 and ESP32. VIN(3.3V), GND, SCL, SDA.
Copy and paste following Python code on Thonny's "Editor area", then press the Run current script button.
from machine import Pin, SoftI2C
from time import sleep
import BME280
# BME280 setup
SCL = 22
SDA = 21
i2c = SoftI2C(scl=Pin(SCL), sda=Pin(SDA))
bme = BME280.BME280(i2c=i2c)
while True:
temp = bme.temperature
hum = bme.humidity
press = bme.pressure
print(f"Temperature: {temp}°C, Humidity: {hum}%, Pressure: {press}hPa")
sleep(1)
Because the BME280.py library is missing, several errors appear in the shell window.
Open a new Editor window, copy-paste the following code and save it to ESP32 memory under the name BME280.py.
or Download BME280.txt
from machine import I2C
import time
# BME280 default address.
BME280_I2CADDR = 0x76
# Operating Modes
BME280_OSAMPLE_1 = 1
BME280_OSAMPLE_2 = 2
BME280_OSAMPLE_4 = 3
BME280_OSAMPLE_8 = 4
BME280_OSAMPLE_16 = 5
# BME280 Registers
BME280_REGISTER_DIG_T1 = 0x88 # Trimming parameter registers
BME280_REGISTER_DIG_T2 = 0x8A
BME280_REGISTER_DIG_T3 = 0x8C
BME280_REGISTER_DIG_P1 = 0x8E
BME280_REGISTER_DIG_P2 = 0x90
BME280_REGISTER_DIG_P3 = 0x92
BME280_REGISTER_DIG_P4 = 0x94
BME280_REGISTER_DIG_P5 = 0x96
BME280_REGISTER_DIG_P6 = 0x98
BME280_REGISTER_DIG_P7 = 0x9A
BME280_REGISTER_DIG_P8 = 0x9C
BME280_REGISTER_DIG_P9 = 0x9E
BME280_REGISTER_DIG_H1 = 0xA1
BME280_REGISTER_DIG_H2 = 0xE1
BME280_REGISTER_DIG_H3 = 0xE3
BME280_REGISTER_DIG_H4 = 0xE4
BME280_REGISTER_DIG_H5 = 0xE5
BME280_REGISTER_DIG_H6 = 0xE6
BME280_REGISTER_DIG_H7 = 0xE7
BME280_REGISTER_CHIPID = 0xD0
BME280_REGISTER_VERSION = 0xD1
BME280_REGISTER_SOFTRESET = 0xE0
BME280_REGISTER_CONTROL_HUM = 0xF2
BME280_REGISTER_CONTROL = 0xF4
BME280_REGISTER_CONFIG = 0xF5
BME280_REGISTER_PRESSURE_DATA = 0xF7
BME280_REGISTER_TEMP_DATA = 0xFA
BME280_REGISTER_HUMIDITY_DATA = 0xFD
class Device:
"""Class for communicating with an I2C device.
Allows reading and writing 8-bit, 16-bit, and byte array values to
registers on the device."""
def __init__(self, address, i2c):
"""Create an instance of the I2C device at the specified address using
the specified I2C interface object."""
self._address = address
self._i2c = i2c
def writeRaw8(self, value):
"""Write an 8-bit value on the bus (without register)."""
value = value & 0xFF
self._i2c.writeto(self._address, value)
def write8(self, register, value):
"""Write an 8-bit value to the specified register."""
b=bytearray(1)
b[0]=value & 0xFF
self._i2c.writeto_mem(self._address, register, b)
def write16(self, register, value):
"""Write a 16-bit value to the specified register."""
value = value & 0xFFFF
b=bytearray(2)
b[0]= value & 0xFF
b[1]= (value>>8) & 0xFF
self.i2c.writeto_mem(self._address, register, value)
def readRaw8(self):
"""Read an 8-bit value on the bus (without register)."""
return int.from_bytes(self._i2c.readfrom(self._address, 1),'little') & 0xFF
def readU8(self, register):
"""Read an unsigned byte from the specified register."""
return int.from_bytes(
self._i2c.readfrom_mem(self._address, register, 1),'little') & 0xFF
def readS8(self, register):
"""Read a signed byte from the specified register."""
result = self.readU8(register)
if result > 127:
result -= 256
return result
def readU16(self, register, little_endian=True):
"""Read an unsigned 16-bit value from the specified register, with the
specified endianness (default little endian, or least significant byte
first)."""
result = int.from_bytes(
self._i2c.readfrom_mem(self._address, register, 2),'little') & 0xFFFF
if not little_endian:
result = ((result << 8) & 0xFF00) + (result >> 8)
return result
def readS16(self, register, little_endian=True):
"""Read a signed 16-bit value from the specified register, with the
specified endianness (default little endian, or least significant byte
first)."""
result = self.readU16(register, little_endian)
if result > 32767:
result -= 65536
return result
def readU16LE(self, register):
"""Read an unsigned 16-bit value from the specified register, in little
endian byte order."""
return self.readU16(register, little_endian=True)
def readU16BE(self, register):
"""Read an unsigned 16-bit value from the specified register, in big
endian byte order."""
return self.readU16(register, little_endian=False)
def readS16LE(self, register):
"""Read a signed 16-bit value from the specified register, in little
endian byte order."""
return self.readS16(register, little_endian=True)
def readS16BE(self, register):
"""Read a signed 16-bit value from the specified register, in big
endian byte order."""
return self.readS16(register, little_endian=False)
class BME280:
def __init__(self, mode=BME280_OSAMPLE_1, address=BME280_I2CADDR, i2c=None,
**kwargs):
# Check that mode is valid.
if mode not in [BME280_OSAMPLE_1, BME280_OSAMPLE_2, BME280_OSAMPLE_4,
BME280_OSAMPLE_8, BME280_OSAMPLE_16]:
raise ValueError(
'Unexpected mode value {0}. Set mode to one of '
'BME280_ULTRALOWPOWER, BME280_STANDARD, BME280_HIGHRES, or '
'BME280_ULTRAHIGHRES'.format(mode))
self._mode = mode
# Create I2C device.
if i2c is None:
raise ValueError('An I2C object is required.')
self._device = Device(address, i2c)
# Load calibration values.
self._load_calibration()
self._device.write8(BME280_REGISTER_CONTROL, 0x3F)
self.t_fine = 0
def _load_calibration(self):
self.dig_T1 = self._device.readU16LE(BME280_REGISTER_DIG_T1)
self.dig_T2 = self._device.readS16LE(BME280_REGISTER_DIG_T2)
self.dig_T3 = self._device.readS16LE(BME280_REGISTER_DIG_T3)
self.dig_P1 = self._device.readU16LE(BME280_REGISTER_DIG_P1)
self.dig_P2 = self._device.readS16LE(BME280_REGISTER_DIG_P2)
self.dig_P3 = self._device.readS16LE(BME280_REGISTER_DIG_P3)
self.dig_P4 = self._device.readS16LE(BME280_REGISTER_DIG_P4)
self.dig_P5 = self._device.readS16LE(BME280_REGISTER_DIG_P5)
self.dig_P6 = self._device.readS16LE(BME280_REGISTER_DIG_P6)
self.dig_P7 = self._device.readS16LE(BME280_REGISTER_DIG_P7)
self.dig_P8 = self._device.readS16LE(BME280_REGISTER_DIG_P8)
self.dig_P9 = self._device.readS16LE(BME280_REGISTER_DIG_P9)
self.dig_H1 = self._device.readU8(BME280_REGISTER_DIG_H1)
self.dig_H2 = self._device.readS16LE(BME280_REGISTER_DIG_H2)
self.dig_H3 = self._device.readU8(BME280_REGISTER_DIG_H3)
self.dig_H6 = self._device.readS8(BME280_REGISTER_DIG_H7)
h4 = self._device.readS8(BME280_REGISTER_DIG_H4)
h4 = (h4 << 24) >> 20
self.dig_H4 = h4 | (self._device.readU8(BME280_REGISTER_DIG_H5) & 0x0F)
h5 = self._device.readS8(BME280_REGISTER_DIG_H6)
h5 = (h5 << 24) >> 20
self.dig_H5 = h5 | (
self._device.readU8(BME280_REGISTER_DIG_H5) >> 4 & 0x0F)
def read_raw_temp(self):
"""Reads the raw (uncompensated) temperature from the sensor."""
meas = self._mode
self._device.write8(BME280_REGISTER_CONTROL_HUM, meas)
meas = self._mode << 5 | self._mode << 2 | 1
self._device.write8(BME280_REGISTER_CONTROL, meas)
sleep_time = 1250 + 2300 * (1 << self._mode)
sleep_time = sleep_time + 2300 * (1 << self._mode) + 575
sleep_time = sleep_time + 2300 * (1 << self._mode) + 575
time.sleep_us(sleep_time) # Wait the required time
msb = self._device.readU8(BME280_REGISTER_TEMP_DATA)
lsb = self._device.readU8(BME280_REGISTER_TEMP_DATA + 1)
xlsb = self._device.readU8(BME280_REGISTER_TEMP_DATA + 2)
raw = ((msb << 16) | (lsb << 8) | xlsb) >> 4
return raw
def read_raw_pressure(self):
"""Reads the raw (uncompensated) pressure level from the sensor."""
"""Assumes that the temperature has already been read """
"""i.e. that enough delay has been provided"""
msb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA)
lsb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA + 1)
xlsb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA + 2)
raw = ((msb << 16) | (lsb << 8) | xlsb) >> 4
return raw
def read_raw_humidity(self):
"""Assumes that the temperature has already been read """
"""i.e. that enough delay has been provided"""
msb = self._device.readU8(BME280_REGISTER_HUMIDITY_DATA)
lsb = self._device.readU8(BME280_REGISTER_HUMIDITY_DATA + 1)
raw = (msb << 8) | lsb
return raw
def read_temperature(self):
"""Get the compensated temperature in 0.01 of a degree celsius."""
adc = self.read_raw_temp()
var1 = ((adc >> 3) - (self.dig_T1 << 1)) * (self.dig_T2 >> 11)
var2 = ((
(((adc >> 4) - self.dig_T1) * ((adc >> 4) - self.dig_T1)) >> 12) *
self.dig_T3) >> 14
self.t_fine = var1 + var2
return (self.t_fine * 5 + 128) >> 8
def read_pressure(self):
"""Gets the compensated pressure in Pascals."""
adc = self.read_raw_pressure()
var1 = self.t_fine - 128000
var2 = var1 * var1 * self.dig_P6
var2 = var2 + ((var1 * self.dig_P5) << 17)
var2 = var2 + (self.dig_P4 << 35)
var1 = (((var1 * var1 * self.dig_P3) >> 8) +
((var1 * self.dig_P2) >> 12))
var1 = (((1 << 47) + var1) * self.dig_P1) >> 33
if var1 == 0:
return 0
p = 1048576 - adc
p = (((p << 31) - var2) * 3125) // var1
var1 = (self.dig_P9 * (p >> 13) * (p >> 13)) >> 25
var2 = (self.dig_P8 * p) >> 19
return ((p + var1 + var2) >> 8) + (self.dig_P7 << 4)
def read_humidity(self):
adc = self.read_raw_humidity()
# print 'Raw humidity = {0:d}'.format (adc)
h = self.t_fine - 76800
h = (((((adc << 14) - (self.dig_H4 << 20) - (self.dig_H5 * h)) +
16384) >> 15) * (((((((h * self.dig_H6) >> 10) * (((h *
self.dig_H3) >> 11) + 32768)) >> 10) + 2097152) *
self.dig_H2 + 8192) >> 14))
h = h - (((((h >> 15) * (h >> 15)) >> 7) * self.dig_H1) >> 4)
h = 0 if h < 0 else h
h = 419430400 if h > 419430400 else h
return h >> 12
@property
def temperature(self):
"Return the temperature in degrees."
t = self.read_temperature()
ti = t // 100
td = t - ti * 100
return "{}.{:02d}".format(ti, td)
@property
def pressure(self):
"Return the temperature in hPa."
p = self.read_pressure() // 256
pi = p // 100
pd = p - pi * 100
return "{}.{:02d}".format(pi, pd)
@property
def humidity(self):
"Return the humidity in percent."
h = self.read_humidity()
hi = h // 1024
hd = h * 100 // 1024 - hi * 100
return "{}.{:02d}".format(hi, hd)
Then return to the original editor window and rerun the script.
Several sensor values are displayed. Touch them with your finger to see the changing values.
The following script displays the BME280 data and sends it via UDP protocol to the InfluxDB database on the cloud server (161.34.0.113).
from machine import Pin, SoftI2C, unique_id
from time import sleep
import BME280
import socket
import ubinascii
import network
import time
# BME280 setup
SCL = 22
SDA = 21
i2c = SoftI2C(scl=Pin(SCL), sda=Pin(SDA))
bme = BME280.BME280(i2c=i2c)
# Wi-Fi credentials
SSID = "JFWM-WS"
PASSWORD = "goodlife"
print(f"Connecting to Wi-Fi network: {SSID}")
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(SSID, PASSWORD)
while not wlan.isconnected():
print("Waiting for Wi-Fi connection...")
time.sleep(1)
print(f"Connected to Wi-Fi. IP: {wlan.ifconfig()[0]}")
# InfluxDB server details
INFLUXDB_IP = "161.34.0.113"
INFLUXDB_PORT = 8089
# Get ESP32 MAC address
mac_address = ubinascii.hexlify(unique_id()).decode().upper()
# UDP socket setup
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
def send_to_influxdb(temp, hum, press):
measurement = f"ble_rssi_{mac_address}"
data = f"{measurement} temp={temp},hum={hum},press={press}"
sock.sendto(data.encode(), (INFLUXDB_IP, INFLUXDB_PORT))
while True:
temp = bme.temperature
hum = bme.humidity
press = bme.pressure
print(f"Temperature: {temp}°C, Humidity: {hum}%, Pressure: {press}hPa")
send_to_influxdb(temp, hum, press)
sleep(1)
After running the script you can see Temperature, Humidity and Pressure values.
Since it would be difficult to distinguish data if each group accessed the database individually, the script automatically assigns each ESP32 a unique MAC address and writes it to the database, as listed below. The number is labeled on the ESP32.
Access to Grafana server on http://161.34.0.113:3000/
Log on by username = iomt, password = iomt2023
More details will be demonstrated on the screen.
BLE beacons transmit a unique ID periodically, and the radio signal strength (RSSI) between the beacon and receiver can be used to determine the approximate distance.
The following script uses the BLE function of the EPS32 to display the RSSI information from the disk-shaped beacon, which is recorded in the InfluxDB database along with the MAC address of the ESP32 and the link strength to the wireless base station, as before.
To monitor the beacons for each group, read the QR code on the back of the beacon with a smartphone, replace the MAC address with the yellow marker in the script, and execute. (In the sample, the private beacon of fujita is shown.)
import network
import socket
import time
from ubluetooth import BLE
import ubinascii
# Wi-Fi credentials
SSID = "JFWM-WS"
PASSWORD = "goodlife"
# InfluxDB server details
INFLUXDB_IP = "161.34.0.113"
INFLUXDB_PORT = 8089
# Target MAC addresses (add or remove as needed)
TARGET_MACS = [
"ce3f413259a9", # Fujita's Mi-band
]
print("Processing target MAC addresses...")
target_macs = [mac.replace(':', '').lower() for mac in TARGET_MACS]
print(f"Loaded {len(target_macs)} target MAC addresses")
print("Initializing BLE...")
ble = BLE()
ble.active(True)
print("BLE initialized")
print(f"Connecting to Wi-Fi network: {SSID}")
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(SSID, PASSWORD)
while not wlan.isconnected():
print("Waiting for Wi-Fi connection...")
time.sleep(1)
print(f"Connected to Wi-Fi. IP: {wlan.ifconfig()[0]}")
# Get ESP32's MAC address
esp32_mac = ubinascii.hexlify(wlan.config('mac')).decode().upper()
print(f"ESP32 MAC address: {esp32_mac}")
print("Creating UDP socket...")
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print("UDP socket created")
def ble_irq(event, data):
if event == 5: # Scan result
addr_type, addr, adv_type, rssi, adv_data = data
addr_str = ubinascii.hexlify(addr).decode().lower()
if addr_str in target_macs:
print(f"Target device found. MAC: {addr_str}, RSSI: {rssi}")
send_to_influxdb(addr_str, rssi)
def send_to_influxdb(mac, rssi):
measurement = f"ble_rssi_{esp32_mac} Uplink_RSSI={wlan.status('rssi')},{mac}={rssi}"
try:
udp_socket.sendto(measurement.encode(), (INFLUXDB_IP, INFLUXDB_PORT))
print(f"Sent to InfluxDB: {measurement}")
except Exception as e:
print(f"Error sending to InfluxDB: {e}")
print("Setting up BLE scanning...")
ble.irq(ble_irq)
ble.gap_scan(0, 30000, 30000)
print("BLE scanning started")
print("Main loop started")
while True:
time.sleep(1)
Visualization with Grafana will be demonstrated on screen.