Simple test¶
Ensure your device works with this simple test.
1# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries
2# SPDX-License-Identifier: MIT
3
4import time
5import json
6import board
7import busio
8from digitalio import DigitalInOut
9import neopixel
10import adafruit_connection_manager
11from adafruit_esp32spi import adafruit_esp32spi
12from adafruit_esp32spi import adafruit_esp32spi_wifimanager
13import adafruit_minimqtt.adafruit_minimqtt as MQTT
14from adafruit_aws_iot import MQTT_CLIENT
15
16### WiFi ###
17
18# Get wifi details and more from a secrets.py file
19try:
20 from secrets import secrets
21except ImportError:
22 print("WiFi secrets are kept in secrets.py, please add them there!")
23 raise
24
25# Get device certificate
26try:
27 with open("aws_cert.pem.crt", "rb") as f:
28 DEVICE_CERT = f.read()
29except ImportError:
30 print("Certificate (aws_cert.pem.crt) not found on CIRCUITPY filesystem.")
31 raise
32
33# Get device private key
34try:
35 with open("private.pem.key", "rb") as f:
36 DEVICE_KEY = f.read()
37except ImportError:
38 print("Certificate (private.pem.key) not found on CIRCUITPY filesystem.")
39 raise
40
41# If you are using a board with pre-defined ESP32 Pins:
42esp32_cs = DigitalInOut(board.ESP_CS)
43esp32_ready = DigitalInOut(board.ESP_BUSY)
44esp32_reset = DigitalInOut(board.ESP_RESET)
45
46# If you have an externally connected ESP32:
47# esp32_cs = DigitalInOut(board.D9)
48# esp32_ready = DigitalInOut(board.D10)
49# esp32_reset = DigitalInOut(board.D5)
50
51spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
52esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
53
54# Verify nina-fw version >= 1.4.0
55assert (
56 int(bytes(esp.firmware_version).decode("utf-8")[2]) >= 4
57), "Please update nina-fw to >=1.4.0."
58
59# Use below for Most Boards
60status_light = neopixel.NeoPixel(
61 board.NEOPIXEL, 1, brightness=0.2
62) # Uncomment for Most Boards
63# Uncomment below for ItsyBitsy M4
64# status_light = dotstar.DotStar(board.APA102_SCK, board.APA102_MOSI, 1, brightness=0.2)
65# Uncomment below for an externally defined RGB LED
66# import adafruit_rgbled
67# from adafruit_esp32spi import PWMOut
68# RED_LED = PWMOut.PWMOut(esp, 26)
69# GREEN_LED = PWMOut.PWMOut(esp, 27)
70# BLUE_LED = PWMOut.PWMOut(esp, 25)
71# status_light = adafruit_rgbled.RGBLED(RED_LED, BLUE_LED, GREEN_LED)
72wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light)
73
74### Code ###
75
76topic = "circuitpython/aws"
77
78
79# Define callback methods which are called when events occur
80# pylint: disable=unused-argument, redefined-outer-name
81def connect(client, userdata, flags, rc):
82 # This function will be called when the client is connected
83 # successfully to the broker.
84 print("Connected to MQTT Broker!")
85 print("Flags: {0}\n RC: {1}".format(flags, rc))
86
87 # Subscribe to topic circuitpython/aws
88 print("Subscribing to topic {}".format(topic))
89 aws_iot.subscribe(topic)
90
91
92def disconnect(client, userdata, rc):
93 # This method is called when the client disconnects
94 # from the broker.
95 print("Disconnected from MQTT Broker!")
96
97
98def subscribe(client, userdata, topic, granted_qos):
99 # This method is called when the client subscribes to a new topic.
100 print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos))
101
102 # Create a json-formatted message
103 message = {"message": "Hello from AWS IoT CircuitPython"}
104 # Publish message to topic
105 aws_iot.publish(topic, json.dumps(message))
106
107
108def unsubscribe(client, userdata, topic, pid):
109 # This method is called when the client unsubscribes from a topic.
110 print("Unsubscribed from {0} with PID {1}".format(topic, pid))
111
112
113def publish(client, userdata, topic, pid):
114 # This method is called when the client publishes data to a topic.
115 print("Published to {0} with PID {1}".format(topic, pid))
116
117
118def message(client, topic, msg):
119 # This method is called when the client receives data from a topic.
120 print("Message from {}: {}".format(topic, msg))
121
122
123# Set AWS Device Certificate
124esp.set_certificate(DEVICE_CERT)
125
126# Set AWS RSA Private Key
127esp.set_private_key(DEVICE_KEY)
128
129# Connect to WiFi
130print("Connecting to WiFi...")
131wifi.connect()
132print("Connected!")
133
134pool = adafruit_connection_manager.get_radio_socketpool(esp)
135ssl_context = adafruit_connection_manager.get_radio_ssl_context(esp)
136
137# Set up a new MiniMQTT Client
138client = MQTT.MQTT(
139 broker=secrets["broker"],
140 client_id=secrets["client_id"],
141 socket_pool=pool,
142 ssl_context=ssl_context,
143)
144
145# Initialize AWS IoT MQTT API Client
146aws_iot = MQTT_CLIENT(client)
147
148# Connect callback handlers to AWS IoT MQTT Client
149aws_iot.on_connect = connect
150aws_iot.on_disconnect = disconnect
151aws_iot.on_subscribe = subscribe
152aws_iot.on_unsubscribe = unsubscribe
153aws_iot.on_publish = publish
154aws_iot.on_message = message
155
156print("Attempting to connect to %s" % client.broker)
157aws_iot.connect()
158
159# Pump the message loop forever, all events
160# are handled in their callback handlers
161# while True:
162# aws_iot.loop()
163
164# Start a blocking message loop...
165# NOTE: NO code below this loop will execute
166# NOTE: Network reconnection is handled within this loop
167while True:
168 try:
169 aws_iot.loop()
170 except (ValueError, RuntimeError) as e:
171 print("Failed to get data, retrying\n", e)
172 wifi.reset()
173 aws_iot.reconnect()
174 continue
175 time.sleep(1)