Raspberry Pi Pico WはWi-Fiモジュールを搭載しており、インターネットに簡単に接続できます。
そのため、インターネット経由でAWS IoT Coreにデータを送信することも簡単にできそうですが、IoT Coreと接続するにはPico WにTLSのクライアント証明書をインストールしたり、MQTTで通信するためのライブラリをインストールしたりと複数の手順をこなさなければなりません。
ウェブ上の記事をいくつか見て参考にしましたが、MQTTライブラリのアップデートで使い方が変わっていたりと上手くいかなかったので、この記事にて最新のAWS IoT Coreへの接続方法をまとめます。
最初にPico WをWi-Fiへ接続する
AWS IoT Coreに接続するためには当然インターネットへの接続性が必要です。
したがって、最初にPico WをWi-Fiに接続する。
Pico WのWi-Fiへの接続方法は、以前の記事で紹介していますので、やり方が分からない場合は参考にしてください。
umqtt.simple のインストール
次に、AWS IoT Coreにメッセージを送信するためにMQTTのライブラリである、umqtt.simpleをPico Wにインストールします。
インストールはmipコマンドで行います。mipはPythonでパッケージインストールに使われるpipのMicroPython版です。
ターミナルで下記コマンドを実行するとlib/
配下にumqtt.simple
のパッケージが取得されます。
mip.install("umqtt.simple")
AWS IoT Coreで証明書と秘密鍵をダウンロードする
AWS IoT Core上で「モノ」を作成すると証明書をダウンロードする画面に移ります。
この証明書はクライアント証明書であり、Pico WにインストールすることでIoT Coreが認証をすることができます。
必要なファイルは下記の3つです。
- [証明書ID]-certificate.pem.crt
- [証明書ID]-private.pem.key
- AmazonRootCA1.pem
証明書のフォーマットをDER変換する
先ほど証明書と秘密鍵をダウンロードしましたが、MicroPythonではpemファイルを読み取る事ができません。
そのため、DER形式に変換を行います。
ファイル名の証明書ID部分は不要なのでリネームした後に下記のコマンドを実行し、証明書と秘密鍵のフォーマットをDERに変換します。
openssl pkey -inform PEM -in private.pem.key -outform DER -out private.key.der
openssl x509 -outform DER -in certificate.pem.crt -out certificate.crt.der
openssl x509 -outform DER -in AmazonRootCA1.pem -out AmazonRootCA1.der
証明書が作成できたら、Pico Wのcert/
ディレクトリに3つのファイルを保存します。
MicroPythonのコード
それでは実際に使用するMicroPythonのコードを見ていきましょう。
ntptime.py
最初に実行するのはNTPによる時刻合わせです。
Raspberry Pi Picoに内蔵されているRP2040マイクロコントローラーは簡易的なRTC(Real-Time Clock)を搭載していますが、電源が切れると時刻情報を失います。
そのため、AWS IoT Coreと接続する前にNTPで時刻を合わせる必要があります。
そのためのコードは下記のとおりです。
import time
import machine # type: ignore
import ntptime
def set_time_with_ntp():
rtc = machine.RTC()
rtc.datetime((2024, 1, 1, 0, 0, 0, 00, 0))
ntptime.settime()
time.sleep(2)
aws_iot_core.py
AwsIoTCore
というクラスを作成します。インスタンス化した後にpublish_message()
でAWS IoT Coreに送信したいメッセージを送信できます。
下記の部分が今回のキモです。
context = tls.SSLContext(tls.PROTOCOL_TLS_CLIENT)
context.load_cert_chain(
open(CERT_FILE, "rb").read(), open(KEY_FILE, "rb").read()
)
context.load_verify_locations(open(CA_FILE, "rb").read())
client = MQTTClient(
self.client_id, self.endpoint, port=8883, keepalive=3600, ssl=context
)
umqttの古いバージョンとはTLSの設定方法が変わっているため、ウェブ上の記事の通りでは上手くいかず試行錯誤しました。
また、DERの変換の仕方によっては、証明書のロードも失敗していました。
上記で紹介した変換方法であれば読み取り可能なDERファイルが作成されます。
コード全文は下記です。
import json
import tls # type: ignore
from umqtt.simple import MQTTClient # type: ignore
# AWS IoT Core
CERT_FILE = "./cert/certificate.crt.der"
KEY_FILE = "./cert/private.key.der"
CA_FILE = "./cert/AmazonRootCA1.der"
class AwsIoTCore:
def __init__(self, endpoint: str, topic: str, client_id: str):
self.endpoint = endpoint
self.topic = topic
self.client_id = client_id
print("Load Certificate and Key...")
context = tls.SSLContext(tls.PROTOCOL_TLS_CLIENT)
context.load_cert_chain(
open(CERT_FILE, "rb").read(), open(KEY_FILE, "rb").read()
)
context.load_verify_locations(open(CA_FILE, "rb").read())
print("Setting up MQTT Client...")
client = MQTTClient(
self.client_id, self.endpoint, port=8883, keepalive=3600, ssl=context
)
print("Connect to AWS IoT Core...")
client.connect()
print("Connected to AWS IoT")
self._client = client
def publish_message(self, message: dict) -> None:
try:
self._client.publish(self.topic, json.dumps(message))
print(f"Message published: {message}")
except Exception as e:
print(f"Failed to publish message: {e}")
main.py
最後にAWS IoT CoreにMQTTでメッセージを送信するためのmain.pyです。
これまで紹介したモジュールを、core_modules/
に入れ、下記コマンドを実行するとAWS IoT Coreでメッセージが受信できます。
BME680とTGS8100は別の記事で紹介したセンサーから値を読み取るためのモジュールです。
これらのセンサーから値を読み出して、IoT Coreにデータを送信しています。
Pico W側では特にデータの加工はせず、クラウド側でデータのクリーニングは行う前提です。
コード全文は下記の通りです。
import gc
import time
from machine import Pin # type: ignore
from core_modules import BME680, TGS8100, AwsIoTCore, connect_wifi, set_time_with_ntp
time.sleep(10)
# Wi-Fi
SSID = "XXX"
PASSWORD = "YYY"
# AWS IoT Core
IOT_CORE_CENDPOINT = "ZZZ.iot.ap-northeast-1.amazonaws.com"
TOPIC = "pico/data"
CLIENT_ID = "raspberry_pi_pico_w"
LOG_FILE = "./log.txt"
LED = Pin("LED", Pin.OUT)
def main():
connect_wifi(SSID, PASSWORD)
set_time_with_ntp()
iot_core = AwsIoTCore(endpoint=IOT_CORE_CENDPOINT, topic=TOPIC, client_id=CLIENT_ID)
bme680 = BME680(pin_v=4, pin_sda=2, pin_scl=3, i2c_id=1)
tgs8100 = TGS8100(pin_v=14, pin_pulse=15, pin_out=26)
while True:
try:
temperature, pressure, humidity, gas, heater = bme680.read_sensor()
sensor_r = tgs8100.read_sensor()
iot_core.publish_message(
{
"temperature": temperature,
"pressure": pressure,
"humidity": humidity,
"gas": gas,
"status": heater,
"sensor_r": sensor_r,
}
)
except Exception as e:
print(f"Error: {e}")
finally:
gc.collect()
time.sleep(1)
while True:
try:
main()
except Exception as e:
LED.on()
print(f"Error: {e}")
with open(LOG_FILE, "a") as f:
print(f"{time.time()} Error: {e}", file=f)
time.sleep(10)
LED.off()
まとめ
この記事では、AWS IoT CoreにRaspberry Pi Pico WからMQTTでメッセージを送信する方法を紹介しました。
Raspberry Pi Pico Wだけでセンサーからのデータ収集と送信が行えるため、少ない消費電力で必要なデータを集められるでしょう。
AWS IoT Coreとの接続は、クライアント証明書による認証があるため、やや実装が難しいですが、本記事を参考にトライ頂けると嬉しく思います。