Simple test

Ensure your device works with this simple test.

examples/aws_iot_simpletest.py
  1# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries
  2# SPDX-License-Identifier: MIT
  3
  4import json
  5import time
  6from os import getenv
  7
  8import adafruit_connection_manager
  9import adafruit_minimqtt.adafruit_minimqtt as MQTT
 10import board
 11import busio
 12import neopixel
 13from adafruit_esp32spi import adafruit_esp32spi, adafruit_esp32spi_wifimanager
 14from digitalio import DigitalInOut
 15
 16from adafruit_aws_iot import MQTT_CLIENT
 17
 18# Get WiFi details and AWS keys, ensure these are setup in settings.toml
 19ssid = getenv("CIRCUITPY_WIFI_SSID")
 20password = getenv("CIRCUITPY_WIFI_PASSWORD")
 21broker = getenv("broker")
 22client_id = getenv("client_id")
 23
 24# Get device certificate
 25try:
 26    with open("aws_cert.pem.crt", "rb") as f:
 27        DEVICE_CERT = f.read()
 28except ImportError:
 29    print("Certificate (aws_cert.pem.crt) not found on CIRCUITPY filesystem.")
 30    raise
 31
 32# Get device private key
 33try:
 34    with open("private.pem.key", "rb") as f:
 35        DEVICE_KEY = f.read()
 36except ImportError:
 37    print("Certificate (private.pem.key) not found on CIRCUITPY filesystem.")
 38    raise
 39
 40### WiFi ###
 41
 42# If you are using a board with pre-defined ESP32 Pins:
 43esp32_cs = DigitalInOut(board.ESP_CS)
 44esp32_ready = DigitalInOut(board.ESP_BUSY)
 45esp32_reset = DigitalInOut(board.ESP_RESET)
 46
 47# If you have an externally connected ESP32:
 48# esp32_cs = DigitalInOut(board.D9)
 49# esp32_ready = DigitalInOut(board.D10)
 50# esp32_reset = DigitalInOut(board.D5)
 51
 52spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
 53esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
 54
 55# Verify nina-fw version >= 1.4.0
 56assert int(bytes(esp.firmware_version).decode("utf-8")[2]) >= 4, "Please update nina-fw to >=1.4.0."
 57
 58# Use below for Most Boards
 59status_pixel = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.2)  # Uncomment for Most Boards
 60# Uncomment below for ItsyBitsy M4
 61# status_pixel = dotstar.DotStar(board.APA102_SCK, board.APA102_MOSI, 1, brightness=0.2)
 62# Uncomment below for an externally defined RGB LED
 63# import adafruit_rgbled
 64# from adafruit_esp32spi import PWMOut
 65# RED_LED = PWMOut.PWMOut(esp, 26)
 66# GREEN_LED = PWMOut.PWMOut(esp, 27)
 67# BLUE_LED = PWMOut.PWMOut(esp, 25)
 68# status_pixel = adafruit_rgbled.RGBLED(RED_LED, BLUE_LED, GREEN_LED)
 69wifi = adafruit_esp32spi_wifimanager.WiFiManager(esp, ssid, password, status_pixel=status_pixel)
 70
 71### Code ###
 72
 73topic = "circuitpython/aws"
 74
 75
 76# Define callback methods which are called when events occur
 77def connect(client, userdata, flags, rc):
 78    # This function will be called when the client is connected
 79    # successfully to the broker.
 80    print("Connected to MQTT Broker!")
 81    print(f"Flags: {flags}\n RC: {rc}")
 82
 83    # Subscribe to topic circuitpython/aws
 84    print(f"Subscribing to topic {topic}")
 85    aws_iot.subscribe(topic)
 86
 87
 88def disconnect(client, userdata, rc):
 89    # This method is called when the client disconnects
 90    # from the broker.
 91    print("Disconnected from MQTT Broker!")
 92
 93
 94def subscribe(client, userdata, topic, granted_qos):
 95    # This method is called when the client subscribes to a new topic.
 96    print(f"Subscribed to {topic} with QOS level {granted_qos}")
 97
 98    # Create a json-formatted message
 99    message = {"message": "Hello from AWS IoT CircuitPython"}
100    # Publish message to topic
101    aws_iot.publish(topic, json.dumps(message))
102
103
104def unsubscribe(client, userdata, topic, pid):
105    # This method is called when the client unsubscribes from a topic.
106    print(f"Unsubscribed from {topic} with PID {pid}")
107
108
109def publish(client, userdata, topic, pid):
110    # This method is called when the client publishes data to a topic.
111    print(f"Published to {topic} with PID {pid}")
112
113
114def message(client, topic, msg):
115    # This method is called when the client receives data from a topic.
116    print(f"Message from {topic}: {msg}")
117
118
119# Set AWS Device Certificate
120esp.set_certificate(DEVICE_CERT)
121
122# Set AWS RSA Private Key
123esp.set_private_key(DEVICE_KEY)
124
125# Connect to WiFi
126print("Connecting to WiFi...")
127wifi.connect()
128print("Connected!")
129
130pool = adafruit_connection_manager.get_radio_socketpool(esp)
131ssl_context = adafruit_connection_manager.get_radio_ssl_context(esp)
132
133# Set up a new MiniMQTT Client
134client = MQTT.MQTT(
135    broker=broker,
136    client_id=client_id,
137    is_ssl=True,
138    socket_pool=pool,
139    ssl_context=ssl_context,
140)
141
142# Initialize AWS IoT MQTT API Client
143aws_iot = MQTT_CLIENT(client)
144
145# Connect callback handlers to AWS IoT MQTT Client
146aws_iot.on_connect = connect
147aws_iot.on_disconnect = disconnect
148aws_iot.on_subscribe = subscribe
149aws_iot.on_unsubscribe = unsubscribe
150aws_iot.on_publish = publish
151aws_iot.on_message = message
152
153print(f"Attempting to connect to {client.broker}")
154aws_iot.connect()
155
156# Pump the message loop forever, all events
157# are handled in their callback handlers
158# while True:
159#   aws_iot.loop(10)
160
161# Start a blocking message loop...
162# NOTE: NO code below this loop will execute
163# NOTE: Network reconnection is handled within this loop
164while True:
165    try:
166        aws_iot.loop(10)
167    except (ValueError, RuntimeError) as e:
168        print("Failed to get data, retrying\n", e)
169        wifi.reset()
170        aws_iot.reconnect()
171        continue
172    time.sleep(1)