Simple test
Ensure your device works with this simple test.
examples/aws_iot_simpletest.py
1# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries
2# SPDX-License-Identifier: MIT
3
4import json
5import time
6from os import getenv
7
8import adafruit_connection_manager
9import adafruit_minimqtt.adafruit_minimqtt as MQTT
10import board
11import busio
12import neopixel
13from adafruit_esp32spi import adafruit_esp32spi, adafruit_esp32spi_wifimanager
14from digitalio import DigitalInOut
15
16from adafruit_aws_iot import MQTT_CLIENT
17
18# Get WiFi details and AWS keys, ensure these are setup in settings.toml
19ssid = getenv("CIRCUITPY_WIFI_SSID")
20password = getenv("CIRCUITPY_WIFI_PASSWORD")
21broker = getenv("broker")
22client_id = getenv("client_id")
23
24# Get device certificate
25try:
26 with open("aws_cert.pem.crt", "rb") as f:
27 DEVICE_CERT = f.read()
28except ImportError:
29 print("Certificate (aws_cert.pem.crt) not found on CIRCUITPY filesystem.")
30 raise
31
32# Get device private key
33try:
34 with open("private.pem.key", "rb") as f:
35 DEVICE_KEY = f.read()
36except ImportError:
37 print("Certificate (private.pem.key) not found on CIRCUITPY filesystem.")
38 raise
39
40### WiFi ###
41
42# If you are using a board with pre-defined ESP32 Pins:
43esp32_cs = DigitalInOut(board.ESP_CS)
44esp32_ready = DigitalInOut(board.ESP_BUSY)
45esp32_reset = DigitalInOut(board.ESP_RESET)
46
47# If you have an externally connected ESP32:
48# esp32_cs = DigitalInOut(board.D9)
49# esp32_ready = DigitalInOut(board.D10)
50# esp32_reset = DigitalInOut(board.D5)
51
52spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
53esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
54
55# Verify nina-fw version >= 1.4.0
56assert int(bytes(esp.firmware_version).decode("utf-8")[2]) >= 4, "Please update nina-fw to >=1.4.0."
57
58# Use below for Most Boards
59status_pixel = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.2) # Uncomment for Most Boards
60# Uncomment below for ItsyBitsy M4
61# status_pixel = dotstar.DotStar(board.APA102_SCK, board.APA102_MOSI, 1, brightness=0.2)
62# Uncomment below for an externally defined RGB LED
63# import adafruit_rgbled
64# from adafruit_esp32spi import PWMOut
65# RED_LED = PWMOut.PWMOut(esp, 26)
66# GREEN_LED = PWMOut.PWMOut(esp, 27)
67# BLUE_LED = PWMOut.PWMOut(esp, 25)
68# status_pixel = adafruit_rgbled.RGBLED(RED_LED, BLUE_LED, GREEN_LED)
69wifi = adafruit_esp32spi_wifimanager.WiFiManager(esp, ssid, password, status_pixel=status_pixel)
70
71### Code ###
72
73topic = "circuitpython/aws"
74
75
76# Define callback methods which are called when events occur
77def connect(client, userdata, flags, rc):
78 # This function will be called when the client is connected
79 # successfully to the broker.
80 print("Connected to MQTT Broker!")
81 print(f"Flags: {flags}\n RC: {rc}")
82
83 # Subscribe to topic circuitpython/aws
84 print(f"Subscribing to topic {topic}")
85 aws_iot.subscribe(topic)
86
87
88def disconnect(client, userdata, rc):
89 # This method is called when the client disconnects
90 # from the broker.
91 print("Disconnected from MQTT Broker!")
92
93
94def subscribe(client, userdata, topic, granted_qos):
95 # This method is called when the client subscribes to a new topic.
96 print(f"Subscribed to {topic} with QOS level {granted_qos}")
97
98 # Create a json-formatted message
99 message = {"message": "Hello from AWS IoT CircuitPython"}
100 # Publish message to topic
101 aws_iot.publish(topic, json.dumps(message))
102
103
104def unsubscribe(client, userdata, topic, pid):
105 # This method is called when the client unsubscribes from a topic.
106 print(f"Unsubscribed from {topic} with PID {pid}")
107
108
109def publish(client, userdata, topic, pid):
110 # This method is called when the client publishes data to a topic.
111 print(f"Published to {topic} with PID {pid}")
112
113
114def message(client, topic, msg):
115 # This method is called when the client receives data from a topic.
116 print(f"Message from {topic}: {msg}")
117
118
119# Set AWS Device Certificate
120esp.set_certificate(DEVICE_CERT)
121
122# Set AWS RSA Private Key
123esp.set_private_key(DEVICE_KEY)
124
125# Connect to WiFi
126print("Connecting to WiFi...")
127wifi.connect()
128print("Connected!")
129
130pool = adafruit_connection_manager.get_radio_socketpool(esp)
131ssl_context = adafruit_connection_manager.get_radio_ssl_context(esp)
132
133# Set up a new MiniMQTT Client
134client = MQTT.MQTT(
135 broker=broker,
136 client_id=client_id,
137 is_ssl=True,
138 socket_pool=pool,
139 ssl_context=ssl_context,
140)
141
142# Initialize AWS IoT MQTT API Client
143aws_iot = MQTT_CLIENT(client)
144
145# Connect callback handlers to AWS IoT MQTT Client
146aws_iot.on_connect = connect
147aws_iot.on_disconnect = disconnect
148aws_iot.on_subscribe = subscribe
149aws_iot.on_unsubscribe = unsubscribe
150aws_iot.on_publish = publish
151aws_iot.on_message = message
152
153print(f"Attempting to connect to {client.broker}")
154aws_iot.connect()
155
156# Pump the message loop forever, all events
157# are handled in their callback handlers
158# while True:
159# aws_iot.loop(10)
160
161# Start a blocking message loop...
162# NOTE: NO code below this loop will execute
163# NOTE: Network reconnection is handled within this loop
164while True:
165 try:
166 aws_iot.loop(10)
167 except (ValueError, RuntimeError) as e:
168 print("Failed to get data, retrying\n", e)
169 wifi.reset()
170 aws_iot.reconnect()
171 continue
172 time.sleep(1)