Simple test¶
Ensure your device works with this simple test.
examples/minimqtt_simpletest.py¶
1# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries
2# SPDX-License-Identifier: MIT
3
4import os
5import board
6import busio
7from digitalio import DigitalInOut
8from adafruit_esp32spi import adafruit_esp32spi
9import adafruit_esp32spi.adafruit_esp32spi_socket as socket
10import adafruit_minimqtt.adafruit_minimqtt as MQTT
11
12# Add settings.toml to your filesystem CIRCUITPY_WIFI_SSID and CIRCUITPY_WIFI_PASSWORD keys
13# with your WiFi credentials. Add your Adafruit IO username and key as well.
14# DO NOT share that file or commit it into Git or other source control.
15
16aio_username = os.getenv("aio_username")
17aio_key = os.getenv("aio_key")
18
19# If you are using a board with pre-defined ESP32 Pins:
20esp32_cs = DigitalInOut(board.ESP_CS)
21esp32_ready = DigitalInOut(board.ESP_BUSY)
22esp32_reset = DigitalInOut(board.ESP_RESET)
23
24# If you have an externally connected ESP32:
25# esp32_cs = DigitalInOut(board.D9)
26# esp32_ready = DigitalInOut(board.D10)
27# esp32_reset = DigitalInOut(board.D5)
28
29spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
30esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
31
32print("Connecting to AP...")
33while not esp.is_connected:
34 try:
35 esp.connect_AP(
36 os.getenv("CIRCUITPY_WIFI_SSID"), os.getenv("CIRCUITPY_WIFI_PASSWORD")
37 )
38 except RuntimeError as e:
39 print("could not connect to AP, retrying: ", e)
40 continue
41print("Connected to", str(esp.ssid, "utf-8"), "\tRSSI:", esp.rssi)
42
43### Topic Setup ###
44
45# MQTT Topic
46# Use this topic if you'd like to connect to a standard MQTT broker
47# mqtt_topic = "test/topic"
48
49# Adafruit IO-style Topic
50# Use this topic if you'd like to connect to io.adafruit.com
51mqtt_topic = aio_username + "/feeds/temperature"
52
53
54### Code ###
55
56
57# Define callback methods which are called when events occur
58# pylint: disable=unused-argument, redefined-outer-name
59def connect(mqtt_client, userdata, flags, rc):
60 # This function will be called when the mqtt_client is connected
61 # successfully to the broker.
62 print("Connected to MQTT Broker!")
63 print("Flags: {0}\n RC: {1}".format(flags, rc))
64
65
66def disconnect(mqtt_client, userdata, rc):
67 # This method is called when the mqtt_client disconnects
68 # from the broker.
69 print("Disconnected from MQTT Broker!")
70
71
72def subscribe(mqtt_client, userdata, topic, granted_qos):
73 # This method is called when the mqtt_client subscribes to a new feed.
74 print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos))
75
76
77def unsubscribe(mqtt_client, userdata, topic, pid):
78 # This method is called when the mqtt_client unsubscribes from a feed.
79 print("Unsubscribed from {0} with PID {1}".format(topic, pid))
80
81
82def publish(mqtt_client, userdata, topic, pid):
83 # This method is called when the mqtt_client publishes data to a feed.
84 print("Published to {0} with PID {1}".format(topic, pid))
85
86
87def message(client, topic, message):
88 print("New message on topic {0}: {1}".format(topic, message))
89
90
91socket.set_interface(esp)
92MQTT.set_socket(socket, esp)
93
94# Set up a MiniMQTT Client
95mqtt_client = MQTT.MQTT(
96 broker="io.adafruit.com",
97 username=aio_username,
98 password=aio_key,
99)
100
101# Connect callback handlers to mqtt_client
102mqtt_client.on_connect = connect
103mqtt_client.on_disconnect = disconnect
104mqtt_client.on_subscribe = subscribe
105mqtt_client.on_unsubscribe = unsubscribe
106mqtt_client.on_publish = publish
107mqtt_client.on_message = message
108
109print("Attempting to connect to %s" % mqtt_client.broker)
110mqtt_client.connect()
111
112print("Subscribing to %s" % mqtt_topic)
113mqtt_client.subscribe(mqtt_topic)
114
115print("Publishing to %s" % mqtt_topic)
116mqtt_client.publish(mqtt_topic, "Hello Broker!")
117
118print("Unsubscribing from %s" % mqtt_topic)
119mqtt_client.unsubscribe(mqtt_topic)
120
121print("Disconnecting from %s" % mqtt_client.broker)
122mqtt_client.disconnect()