Simple test

Ensure your device works with this simple test.

examples/gc_iot_core_simpletest.py
  1# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries
  2# SPDX-License-Identifier: MIT
  3
  4import time
  5from os import getenv
  6
  7import adafruit_connection_manager
  8import adafruit_minimqtt.adafruit_minimqtt as MQTT
  9import board
 10import busio
 11import neopixel
 12from adafruit_esp32spi import adafruit_esp32spi, adafruit_esp32spi_wifimanager
 13from digitalio import DigitalInOut
 14
 15from adafruit_gc_iot_core import MQTT_API, Cloud_Core
 16
 17# Get WiFi details and Google Cloud keys, ensure these are setup in settings.toml
 18ssid = getenv("CIRCUITPY_WIFI_SSID")
 19password = getenv("CIRCUITPY_WIFI_PASSWORD")
 20cloud_region = getenv("cloud_region")
 21device_id = getenv("device_id")
 22private_key = getenv("private_key")
 23project_id = getenv("project_id")
 24registry_id = getenv("registry_id")
 25jwt = getenv("jwt")
 26
 27### WiFi ###
 28
 29# If you are using a board with pre-defined ESP32 Pins:
 30esp32_cs = DigitalInOut(board.ESP_CS)
 31esp32_ready = DigitalInOut(board.ESP_BUSY)
 32esp32_reset = DigitalInOut(board.ESP_RESET)
 33
 34# If you have an externally connected ESP32:
 35# esp32_cs = DigitalInOut(board.D9)
 36# esp32_ready = DigitalInOut(board.D10)
 37# esp32_reset = DigitalInOut(board.D5)
 38
 39spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
 40esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
 41"""Use below for Most Boards"""
 42status_pixel = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.2)  # Uncomment for Most Boards
 43"""Uncomment below for ItsyBitsy M4"""
 44# status_pixel = dotstar.DotStar(board.APA102_SCK, board.APA102_MOSI, 1, brightness=0.2)
 45# Uncomment below for an externally defined RGB LED
 46# import adafruit_rgbled
 47# from adafruit_esp32spi import PWMOut
 48# RED_LED = PWMOut.PWMOut(esp, 26)
 49# GREEN_LED = PWMOut.PWMOut(esp, 27)
 50# BLUE_LED = PWMOut.PWMOut(esp, 25)
 51# status_pixel = adafruit_rgbled.RGBLED(RED_LED, BLUE_LED, GREEN_LED)
 52wifi = adafruit_esp32spi_wifimanager.WiFiManager(esp, ssid, password, status_pixel=status_pixel)
 53
 54### Code ###
 55
 56
 57# Define callback methods which are called when events occur
 58def connect(client, userdata, flags, rc):
 59    # This function will be called when the client is connected
 60    # successfully to the broker.
 61    print("Connected to MQTT Broker!")
 62    print(f"Flags: {flags}\n RC: {rc}")
 63    # Subscribes to commands/# topic
 64    google_mqtt.subscribe_to_all_commands()
 65
 66    # Publish to the default "events" topic
 67    google_mqtt.publish("testing", "events", qos=1)
 68
 69
 70def disconnect(client, userdata, rc):
 71    # This method is called when the client disconnects
 72    # from the broker.
 73    print("Disconnected from MQTT Broker!")
 74
 75
 76def subscribe(client, userdata, topic, granted_qos):
 77    # This method is called when the client subscribes to a new topic.
 78    print(f"Subscribed to {topic} with QOS level {granted_qos}")
 79
 80
 81def unsubscribe(client, userdata, topic, pid):
 82    # This method is called when the client unsubscribes from a topic.
 83    print(f"Unsubscribed from {topic} with PID {pid}")
 84
 85
 86def publish(client, userdata, topic, pid):
 87    # This method is called when the client publishes data to a topic.
 88    print(f"Published to {topic} with PID {pid}")
 89
 90
 91def message(client, topic, msg):
 92    # This method is called when the client receives data from a topic.
 93    print(f"Message from {topic}: {msg}")
 94
 95
 96# Connect to WiFi
 97print("Connecting to WiFi...")
 98wifi.connect()
 99print("Connected!")
100
101pool = adafruit_connection_manager.get_radio_socketpool(esp)
102ssl_context = adafruit_connection_manager.get_radio_ssl_context(esp)
103
104# Initialize Google Cloud IoT Core interface
105settings = {
106    cloud_region: cloud_region,
107    device_id: device_id,
108    private_key: private_key,
109    project_id: project_id,
110    registry_id: registry_id,
111}
112google_iot = Cloud_Core(esp, settings)
113
114# Optional JSON-Web-Token (JWT) Generation
115# print("Generating JWT...")
116# jwt = google_iot.generate_jwt()
117# print("Your JWT is: ", jwt)
118
119# Set up a new MiniMQTT Client
120client = MQTT.MQTT(
121    broker=google_iot.broker,
122    username=google_iot.username,
123    password=jwt,
124    client_id=google_iot.cid,
125    socket_pool=pool,
126    ssl_context=ssl_context,
127)
128
129# Initialize Google MQTT API Client
130google_mqtt = MQTT_API(client)
131
132# Connect callback handlers to Google MQTT Client
133google_mqtt.on_connect = connect
134google_mqtt.on_disconnect = disconnect
135google_mqtt.on_subscribe = subscribe
136google_mqtt.on_unsubscribe = unsubscribe
137google_mqtt.on_publish = publish
138google_mqtt.on_message = message
139
140
141print(f"Attempting to connect to {client.broker}")
142google_mqtt.connect()
143
144# Pump the message loop forever, all events are
145# handled in their callback handlers
146# while True:
147#    google_mqtt.loop()
148
149# Start a blocking message loop...
150# NOTE: NO code below this loop will execute
151# NOTE: Network reconnection is handled within this loop
152while True:
153    try:
154        google_mqtt.loop()
155    except (ValueError, RuntimeError) as e:
156        print("Failed to get data, retrying\n", e)
157        wifi.reset()
158        google_mqtt.reconnect()
159        continue
160    time.sleep(1)