Simple test
Ensure your device works with this simple test.
examples/minimqtt_simpletest.py
1# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries
2# SPDX-License-Identifier: MIT
3
4import os
5
6import adafruit_connection_manager
7import board
8import busio
9from adafruit_esp32spi import adafruit_esp32spi
10from digitalio import DigitalInOut
11
12import adafruit_minimqtt.adafruit_minimqtt as MQTT
13
14# Add settings.toml to your filesystem CIRCUITPY_WIFI_SSID and CIRCUITPY_WIFI_PASSWORD keys
15# with your WiFi credentials. Add your Adafruit IO username and key as well.
16# DO NOT share that file or commit it into Git or other source control.
17
18aio_username = os.getenv("aio_username")
19aio_key = os.getenv("aio_key")
20
21# If you are using a board with pre-defined ESP32 Pins:
22esp32_cs = DigitalInOut(board.ESP_CS)
23esp32_ready = DigitalInOut(board.ESP_BUSY)
24esp32_reset = DigitalInOut(board.ESP_RESET)
25
26# If you have an externally connected ESP32:
27# esp32_cs = DigitalInOut(board.D9)
28# esp32_ready = DigitalInOut(board.D10)
29# esp32_reset = DigitalInOut(board.D5)
30
31spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
32esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
33
34print("Connecting to AP...")
35while not esp.is_connected:
36 try:
37 esp.connect_AP(os.getenv("CIRCUITPY_WIFI_SSID"), os.getenv("CIRCUITPY_WIFI_PASSWORD"))
38 except RuntimeError as e:
39 print("could not connect to AP, retrying: ", e)
40 continue
41print("Connected to", str(esp.ssid, "utf-8"), "\tRSSI:", esp.rssi)
42
43### Topic Setup ###
44
45# MQTT Topic
46# Use this topic if you'd like to connect to a standard MQTT broker
47# mqtt_topic = "test/topic"
48
49# Adafruit IO-style Topic
50# Use this topic if you'd like to connect to io.adafruit.com
51mqtt_topic = aio_username + "/feeds/temperature"
52
53
54### Code ###
55
56
57# Define callback methods which are called when events occur
58def connect(mqtt_client, userdata, flags, rc):
59 # This function will be called when the mqtt_client is connected
60 # successfully to the broker.
61 print("Connected to MQTT Broker!")
62 print(f"Flags: {flags}\n RC: {rc}")
63
64
65def disconnect(mqtt_client, userdata, rc):
66 # This method is called when the mqtt_client disconnects
67 # from the broker.
68 print("Disconnected from MQTT Broker!")
69
70
71def subscribe(mqtt_client, userdata, topic, granted_qos):
72 # This method is called when the mqtt_client subscribes to a new feed.
73 print(f"Subscribed to {topic} with QOS level {granted_qos}")
74
75
76def unsubscribe(mqtt_client, userdata, topic, pid):
77 # This method is called when the mqtt_client unsubscribes from a feed.
78 print(f"Unsubscribed from {topic} with PID {pid}")
79
80
81def publish(mqtt_client, userdata, topic, pid):
82 # This method is called when the mqtt_client publishes data to a feed.
83 print(f"Published to {topic} with PID {pid}")
84
85
86def message(client, topic, message):
87 print(f"New message on topic {topic}: {message}")
88
89
90pool = adafruit_connection_manager.get_radio_socketpool(esp)
91ssl_context = adafruit_connection_manager.get_radio_ssl_context(esp)
92
93# Set up a MiniMQTT Client
94mqtt_client = MQTT.MQTT(
95 broker="io.adafruit.com",
96 username=aio_username,
97 password=aio_key,
98 socket_pool=pool,
99 ssl_context=ssl_context,
100)
101
102# Connect callback handlers to mqtt_client
103mqtt_client.on_connect = connect
104mqtt_client.on_disconnect = disconnect
105mqtt_client.on_subscribe = subscribe
106mqtt_client.on_unsubscribe = unsubscribe
107mqtt_client.on_publish = publish
108mqtt_client.on_message = message
109
110print("Attempting to connect to %s" % mqtt_client.broker)
111mqtt_client.connect()
112
113print("Subscribing to %s" % mqtt_topic)
114mqtt_client.subscribe(mqtt_topic)
115
116print("Publishing to %s" % mqtt_topic)
117mqtt_client.publish(mqtt_topic, "Hello Broker!")
118
119print("Unsubscribing from %s" % mqtt_topic)
120mqtt_client.unsubscribe(mqtt_topic)
121
122print("Disconnecting from %s" % mqtt_client.broker)
123mqtt_client.disconnect()