Simple test

Ensure your device works with this simple test.

examples/minimqtt_simpletest.py
  1# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries
  2# SPDX-License-Identifier: MIT
  3
  4from os import getenv
  5
  6import adafruit_connection_manager
  7import board
  8import busio
  9from adafruit_esp32spi import adafruit_esp32spi
 10from digitalio import DigitalInOut
 11
 12import adafruit_minimqtt.adafruit_minimqtt as MQTT
 13
 14# Get WiFi details and Adafruit IO keys, ensure these are setup in settings.toml
 15# (visit io.adafruit.com if you need to create an account, or if you need your Adafruit IO key.)
 16ssid = getenv("CIRCUITPY_WIFI_SSID")
 17password = getenv("CIRCUITPY_WIFI_PASSWORD")
 18aio_username = getenv("ADAFRUIT_AIO_USERNAME")
 19aio_key = getenv("ADAFRUIT_AIO_KEY")
 20
 21# If you are using a board with pre-defined ESP32 Pins:
 22esp32_cs = DigitalInOut(board.ESP_CS)
 23esp32_ready = DigitalInOut(board.ESP_BUSY)
 24esp32_reset = DigitalInOut(board.ESP_RESET)
 25
 26# If you have an externally connected ESP32:
 27# esp32_cs = DigitalInOut(board.D9)
 28# esp32_ready = DigitalInOut(board.D10)
 29# esp32_reset = DigitalInOut(board.D5)
 30
 31spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
 32esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
 33
 34print("Connecting to AP...")
 35while not esp.is_connected:
 36    try:
 37        esp.connect_AP(ssid, password)
 38    except RuntimeError as e:
 39        print("could not connect to AP, retrying: ", e)
 40        continue
 41print("Connected to", esp.ap_info.ssid, "\tRSSI:", esp.ap_info.rssi)
 42
 43### Topic Setup ###
 44
 45# MQTT Topic
 46# Use this topic if you'd like to connect to a standard MQTT broker
 47# mqtt_topic = "test/topic"
 48
 49# Adafruit IO-style Topic
 50# Use this topic if you'd like to connect to io.adafruit.com
 51mqtt_topic = f"{aio_username}/feeds/temperature"
 52
 53
 54### Code ###
 55
 56
 57# Define callback methods which are called when events occur
 58def connect(mqtt_client, userdata, flags, rc):
 59    # This function will be called when the mqtt_client is connected
 60    # successfully to the broker.
 61    print("Connected to MQTT Broker!")
 62    print(f"Flags: {flags}\n RC: {rc}")
 63
 64
 65def disconnect(mqtt_client, userdata, rc):
 66    # This method is called when the mqtt_client disconnects
 67    # from the broker.
 68    print("Disconnected from MQTT Broker!")
 69
 70
 71def subscribe(mqtt_client, userdata, topic, granted_qos):
 72    # This method is called when the mqtt_client subscribes to a new feed.
 73    print(f"Subscribed to {topic} with QOS level {granted_qos}")
 74
 75
 76def unsubscribe(mqtt_client, userdata, topic, pid):
 77    # This method is called when the mqtt_client unsubscribes from a feed.
 78    print(f"Unsubscribed from {topic} with PID {pid}")
 79
 80
 81def publish(mqtt_client, userdata, topic, pid):
 82    # This method is called when the mqtt_client publishes data to a feed.
 83    print(f"Published to {topic} with PID {pid}")
 84
 85
 86def message(client, topic, message):
 87    print(f"New message on topic {topic}: {message}")
 88
 89
 90pool = adafruit_connection_manager.get_radio_socketpool(esp)
 91ssl_context = adafruit_connection_manager.get_radio_ssl_context(esp)
 92
 93# Set up a MiniMQTT Client
 94mqtt_client = MQTT.MQTT(
 95    broker="io.adafruit.com",
 96    username=aio_username,
 97    password=aio_key,
 98    socket_pool=pool,
 99    ssl_context=ssl_context,
100)
101
102# Connect callback handlers to mqtt_client
103mqtt_client.on_connect = connect
104mqtt_client.on_disconnect = disconnect
105mqtt_client.on_subscribe = subscribe
106mqtt_client.on_unsubscribe = unsubscribe
107mqtt_client.on_publish = publish
108mqtt_client.on_message = message
109
110print(f"Attempting to connect to {mqtt_client.broker}")
111mqtt_client.connect()
112
113print(f"Subscribing to {mqtt_topic}")
114mqtt_client.subscribe(mqtt_topic)
115
116print(f"Publishing to {mqtt_topic}")
117mqtt_client.publish(mqtt_topic, "Hello Broker!")
118
119print(f"Unsubscribing from {mqtt_topic}")
120mqtt_client.unsubscribe(mqtt_topic)
121
122print(f"Disconnecting from {mqtt_client.broker}")
123mqtt_client.disconnect()