Simple test

Ensure your device works with this simple test.

examples/gc_iot_core_simpletest.py
  1# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries
  2# SPDX-License-Identifier: MIT
  3
  4import time
  5import board
  6import busio
  7from digitalio import DigitalInOut
  8import neopixel
  9import adafruit_connection_manager
 10from adafruit_esp32spi import adafruit_esp32spi
 11from adafruit_esp32spi import adafruit_esp32spi_wifimanager
 12
 13import adafruit_minimqtt.adafruit_minimqtt as MQTT
 14from adafruit_gc_iot_core import Cloud_Core, MQTT_API
 15
 16### WiFi ###
 17
 18# Get wifi details and more from a secrets.py file
 19try:
 20    from secrets import secrets
 21except ImportError:
 22    print("WiFi secrets are kept in secrets.py, please add them there!")
 23    raise
 24
 25# If you are using a board with pre-defined ESP32 Pins:
 26esp32_cs = DigitalInOut(board.ESP_CS)
 27esp32_ready = DigitalInOut(board.ESP_BUSY)
 28esp32_reset = DigitalInOut(board.ESP_RESET)
 29
 30# If you have an externally connected ESP32:
 31# esp32_cs = DigitalInOut(board.D9)
 32# esp32_ready = DigitalInOut(board.D10)
 33# esp32_reset = DigitalInOut(board.D5)
 34
 35spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
 36esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
 37"""Use below for Most Boards"""
 38status_light = neopixel.NeoPixel(
 39    board.NEOPIXEL, 1, brightness=0.2
 40)  # Uncomment for Most Boards
 41"""Uncomment below for ItsyBitsy M4"""
 42# status_light = dotstar.DotStar(board.APA102_SCK, board.APA102_MOSI, 1, brightness=0.2)
 43# Uncomment below for an externally defined RGB LED
 44# import adafruit_rgbled
 45# from adafruit_esp32spi import PWMOut
 46# RED_LED = PWMOut.PWMOut(esp, 26)
 47# GREEN_LED = PWMOut.PWMOut(esp, 27)
 48# BLUE_LED = PWMOut.PWMOut(esp, 25)
 49# status_light = adafruit_rgbled.RGBLED(RED_LED, BLUE_LED, GREEN_LED)
 50wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light)
 51
 52### Code ###
 53
 54
 55# Define callback methods which are called when events occur
 56# pylint: disable=unused-argument, redefined-outer-name
 57def connect(client, userdata, flags, rc):
 58    # This function will be called when the client is connected
 59    # successfully to the broker.
 60    print("Connected to MQTT Broker!")
 61    print("Flags: {0}\n RC: {1}".format(flags, rc))
 62    # Subscribes to commands/# topic
 63    google_mqtt.subscribe_to_all_commands()
 64
 65    # Publish to the default "events" topic
 66    google_mqtt.publish("testing", "events", qos=1)
 67
 68
 69def disconnect(client, userdata, rc):
 70    # This method is called when the client disconnects
 71    # from the broker.
 72    print("Disconnected from MQTT Broker!")
 73
 74
 75def subscribe(client, userdata, topic, granted_qos):
 76    # This method is called when the client subscribes to a new topic.
 77    print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos))
 78
 79
 80def unsubscribe(client, userdata, topic, pid):
 81    # This method is called when the client unsubscribes from a topic.
 82    print("Unsubscribed from {0} with PID {1}".format(topic, pid))
 83
 84
 85def publish(client, userdata, topic, pid):
 86    # This method is called when the client publishes data to a topic.
 87    print("Published to {0} with PID {1}".format(topic, pid))
 88
 89
 90def message(client, topic, msg):
 91    # This method is called when the client receives data from a topic.
 92    print("Message from {}: {}".format(topic, msg))
 93
 94
 95# Connect to WiFi
 96print("Connecting to WiFi...")
 97wifi.connect()
 98print("Connected!")
 99
100pool = adafruit_connection_manager.get_radio_socketpool(esp)
101ssl_context = adafruit_connection_manager.get_radio_ssl_context(esp)
102
103# Initialize Google Cloud IoT Core interface
104google_iot = Cloud_Core(esp, secrets)
105
106# Optional JSON-Web-Token (JWT) Generation
107# print("Generating JWT...")
108# jwt = google_iot.generate_jwt()
109# print("Your JWT is: ", jwt)
110
111# Set up a new MiniMQTT Client
112client = MQTT.MQTT(
113    broker=google_iot.broker,
114    username=google_iot.username,
115    password=secrets["jwt"],
116    client_id=google_iot.cid,
117    socket_pool=pool,
118    ssl_context=ssl_context,
119)
120
121# Initialize Google MQTT API Client
122google_mqtt = MQTT_API(client)
123
124# Connect callback handlers to Google MQTT Client
125google_mqtt.on_connect = connect
126google_mqtt.on_disconnect = disconnect
127google_mqtt.on_subscribe = subscribe
128google_mqtt.on_unsubscribe = unsubscribe
129google_mqtt.on_publish = publish
130google_mqtt.on_message = message
131
132print("Attempting to connect to %s" % client.broker)
133google_mqtt.connect()
134
135# Pump the message loop forever, all events are
136# handled in their callback handlers
137# while True:
138#    google_mqtt.loop()
139
140# Start a blocking message loop...
141# NOTE: NO code below this loop will execute
142# NOTE: Network reconnection is handled within this loop
143while True:
144    try:
145        google_mqtt.loop()
146    except (ValueError, RuntimeError) as e:
147        print("Failed to get data, retrying\n", e)
148        wifi.reset()
149        google_mqtt.reconnect()
150        continue
151    time.sleep(1)