Raspberry Pi Pico WをAWS IoT Coreに接続する

Raspberry Pi Pico WはWi-Fiモジュールを搭載しており、インターネットに簡単に接続できます。

そのため、インターネット経由でAWS IoT Coreにデータを送信することも簡単にできそうですが、IoT Coreと接続するにはPico WにTLSのクライアント証明書をインストールしたり、MQTTで通信するためのライブラリをインストールしたりと複数の手順をこなさなければなりません。

ウェブ上の記事をいくつか見て参考にしましたが、MQTTライブラリのアップデートで使い方が変わっていたりと上手くいかなかったので、この記事にて最新のAWS IoT Coreへの接続方法をまとめます。

目次

最初にPico WをWi-Fiへ接続する

AWS IoT Coreに接続するためには当然インターネットへの接続性が必要です。

したがって、最初にPico WをWi-Fiに接続する。

Pico WのWi-Fiへの接続方法は、以前の記事で紹介していますので、やり方が分からない場合は参考にしてください。

umqtt.simple のインストール

次に、AWS IoT Coreにメッセージを送信するためにMQTTのライブラリである、umqtt.simpleをPico Wにインストールします。

インストールはmipコマンドで行います。mipはPythonでパッケージインストールに使われるpipのMicroPython版です。

ターミナルで下記コマンドを実行するとlib/配下にumqtt.simpleのパッケージが取得されます。

mip.install("umqtt.simple")

AWS IoT Coreで証明書と秘密鍵をダウンロードする

AWS IoT Core上で「モノ」を作成すると証明書をダウンロードする画面に移ります。

この証明書はクライアント証明書であり、Pico WにインストールすることでIoT Coreが認証をすることができます。

必要なファイルは下記の3つです。

  • [証明書ID]-certificate.pem.crt
  • [証明書ID]-private.pem.key
  • AmazonRootCA1.pem

証明書のフォーマットをDER変換する

先ほど証明書と秘密鍵をダウンロードしましたが、MicroPythonではpemファイルを読み取る事ができません。

そのため、DER形式に変換を行います。

ファイル名の証明書ID部分は不要なのでリネームした後に下記のコマンドを実行し、証明書と秘密鍵のフォーマットをDERに変換します。

openssl pkey -inform PEM -in private.pem.key -outform DER -out private.key.der

openssl x509 -outform DER -in certificate.pem.crt -out certificate.crt.der           

openssl x509 -outform DER -in AmazonRootCA1.pem -out AmazonRootCA1.der

証明書が作成できたら、Pico Wのcert/ディレクトリに3つのファイルを保存します。

MicroPythonのコード

それでは実際に使用するMicroPythonのコードを見ていきましょう。

ntptime.py

最初に実行するのはNTPによる時刻合わせです。

Raspberry Pi Picoに内蔵されているRP2040マイクロコントローラーは簡易的なRTC(Real-Time Clock)を搭載していますが、電源が切れると時刻情報を失います。

そのため、AWS IoT Coreと接続する前にNTPで時刻を合わせる必要があります。

そのためのコードは下記のとおりです。

import time

import machine  # type: ignore
import ntptime


def set_time_with_ntp():
    rtc = machine.RTC()
    rtc.datetime((2024, 1, 1, 0, 0, 0, 00, 0))
    ntptime.settime()
    time.sleep(2)

aws_iot_core.py

AwsIoTCoreというクラスを作成します。インスタンス化した後にpublish_message()でAWS IoT Coreに送信したいメッセージを送信できます。

下記の部分が今回のキモです。

context = tls.SSLContext(tls.PROTOCOL_TLS_CLIENT)
context.load_cert_chain(
open(CERT_FILE, "rb").read(), open(KEY_FILE, "rb").read()
        )
context.load_verify_locations(open(CA_FILE, "rb").read())
client = MQTTClient(
            self.client_id, self.endpoint, port=8883, keepalive=3600, ssl=context
        )

umqttの古いバージョンとはTLSの設定方法が変わっているため、ウェブ上の記事の通りでは上手くいかず試行錯誤しました。

また、DERの変換の仕方によっては、証明書のロードも失敗していました。

上記で紹介した変換方法であれば読み取り可能なDERファイルが作成されます。

コード全文は下記です。

import json

import tls  # type: ignore
from umqtt.simple import MQTTClient  # type: ignore

# AWS IoT Core
CERT_FILE = "./cert/certificate.crt.der"
KEY_FILE = "./cert/private.key.der"
CA_FILE = "./cert/AmazonRootCA1.der"


class AwsIoTCore:
    def __init__(self, endpoint: str, topic: str, client_id: str):
        self.endpoint = endpoint
        self.topic = topic
        self.client_id = client_id

        print("Load Certificate and Key...")
        context = tls.SSLContext(tls.PROTOCOL_TLS_CLIENT)
        context.load_cert_chain(
            open(CERT_FILE, "rb").read(), open(KEY_FILE, "rb").read()
        )
        context.load_verify_locations(open(CA_FILE, "rb").read())

        print("Setting up MQTT Client...")
        client = MQTTClient(
            self.client_id, self.endpoint, port=8883, keepalive=3600, ssl=context
        )

        print("Connect to AWS IoT Core...")
        client.connect()
        print("Connected to AWS IoT")

        self._client = client

    def publish_message(self, message: dict) -> None:
        try:
            self._client.publish(self.topic, json.dumps(message))
            print(f"Message published: {message}")
        except Exception as e:
            print(f"Failed to publish message: {e}")

main.py

最後にAWS IoT CoreにMQTTでメッセージを送信するためのmain.pyです。

これまで紹介したモジュールを、core_modules/ に入れ、下記コマンドを実行するとAWS IoT Coreでメッセージが受信できます。

BME680とTGS8100は別の記事で紹介したセンサーから値を読み取るためのモジュールです。

これらのセンサーから値を読み出して、IoT Coreにデータを送信しています。

Pico W側では特にデータの加工はせず、クラウド側でデータのクリーニングは行う前提です。

コード全文は下記の通りです。

import gc
import time

from machine import Pin  # type: ignore

from core_modules import BME680, TGS8100, AwsIoTCore, connect_wifi, set_time_with_ntp

time.sleep(10)

# Wi-Fi
SSID = "XXX"
PASSWORD = "YYY"
# AWS IoT Core
IOT_CORE_CENDPOINT = "ZZZ.iot.ap-northeast-1.amazonaws.com"
TOPIC = "pico/data"
CLIENT_ID = "raspberry_pi_pico_w"

LOG_FILE = "./log.txt"
LED = Pin("LED", Pin.OUT)


def main():
    connect_wifi(SSID, PASSWORD)
    set_time_with_ntp()
    iot_core = AwsIoTCore(endpoint=IOT_CORE_CENDPOINT, topic=TOPIC, client_id=CLIENT_ID)
    bme680 = BME680(pin_v=4, pin_sda=2, pin_scl=3, i2c_id=1)
    tgs8100 = TGS8100(pin_v=14, pin_pulse=15, pin_out=26)

    while True:
        try:
            temperature, pressure, humidity, gas, heater = bme680.read_sensor()
            sensor_r = tgs8100.read_sensor()

            iot_core.publish_message(
                {
                    "temperature": temperature,
                    "pressure": pressure,
                    "humidity": humidity,
                    "gas": gas,
                    "status": heater,
                    "sensor_r": sensor_r,
                }
            )
        except Exception as e:
            print(f"Error: {e}")
        finally:
            gc.collect()

        time.sleep(1)


while True:
    try:
        main()
    except Exception as e:
        LED.on()
        print(f"Error: {e}")
        with open(LOG_FILE, "a") as f:
            print(f"{time.time()} Error: {e}", file=f)
        time.sleep(10)
        LED.off()

まとめ

この記事では、AWS IoT CoreにRaspberry Pi Pico WからMQTTでメッセージを送信する方法を紹介しました。

Raspberry Pi Pico Wだけでセンサーからのデータ収集と送信が行えるため、少ない消費電力で必要なデータを集められるでしょう。

AWS IoT Coreとの接続は、クライアント証明書による認証があるため、やや実装が難しいですが、本記事を参考にトライ頂けると嬉しく思います。

よかったらシェアしてね!

CD

目次