ci: Adds target tests and move to gitlab components

Ths adopts gitlab components for better maintanance of CI.
Adds target tests
This commit is contained in:
Euripedes Rocha Filho
2025-10-01 10:21:02 +02:00
parent 110613a8a6
commit 71534b1c43
37 changed files with 685 additions and 480 deletions
-1
View File
@@ -63,7 +63,6 @@ examples/ssl_ds:
- if: IDF_TARGET == IDF_TARGET
reason: Advanced feature demonstration, only build
# Custom outbox implementation example
examples/custom_outbox:
<<: *default_rules
disable_test:
+1
View File
@@ -13,6 +13,7 @@ variables:
EXAMPLE_MQTT_BROKER_CERTIFICATE: "${EXAMPLE_MQTT_BROKER_CERTIFICATE}"
include:
- local: ".gitlab/ci/pre-check.yml"
- local: ".gitlab/ci/build.yml"
- local: ".gitlab/ci/docs.yml"
- local: ".gitlab/ci/test.yml"
+33 -50
View File
@@ -1,52 +1,35 @@
# Note: No need to run build and test on master branch since we use FastForward merge strategy and so each merge request
# is tested and then merged onto top of master branch.
.build_template:
stage: build
tags:
- build
- internet
timeout: 1h
# rules:
# # Run build jobs only when source or build configuration changes
# - changes:
# - examples/**/*
# - test/apps/**/*
# - include/**/*
# - lib/**/*
# - mqtt_client.c
# - mqtt5_client.c
# - Kconfig
# - idf_component.yml
# - .build-test-rules.yml
# - .idf_build_apps.toml
# - .idf_ci.toml
# - when: never
parallel:
matrix:
- PATHS:
- "-p examples/tcp -p examples/mqtt5"
- "-p examples/ws -p examples/wss"
- "-p examples/ssl -p examples/ssl_mutual_auth"
- "-p examples/ssl_psk -p examples/ssl_ds -p examples/custom_outbox"
- "-p test/apps"
script:
- pip install -U 'idf-ci<1'
- echo "Running idf-ci build run ${PATHS}"
- idf-ci build run ${PATHS}
build_idf_v5.3:
extends: .build_template
image: espressif/idf:release-v5.3
build_idf_v5.4:
extends: .build_template
image: espressif/idf:release-v5.4
build_idf_v5.5:
extends: .build_template
image: espressif/idf:release-v5.5
build_idf_latest:
extends: .build_template
image: espressif/idf:latest
include:
- component: $CI_SERVER_FQDN/ci/actions/idf-dynamic-pipeline/ci-component@main
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
inputs:
build-job-image: "espressif/idf:latest"
test-job-image: "$CI_DOCKER_REGISTRY/target-test-env-v6.0:2"
build-job-tags: ["brew", "build"]
job-suffix: ":idf-v6.0"
- component: $CI_SERVER_FQDN/ci/actions/idf-dynamic-pipeline/ci-component@main
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
inputs:
build-job-image: "espressif/idf:v5.5"
test-job-image: "$CI_DOCKER_REGISTRY/target-test-env-v5.5:2"
build-job-tags: ["brew", "build"]
job-suffix: ":idf-v5.5"
- component: $CI_SERVER_FQDN/ci/actions/idf-dynamic-pipeline/ci-component@main
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
inputs:
build-job-image: "espressif/idf:v5.4"
test-job-image: "$CI_DOCKER_REGISTRY/target-test-env-v5.4:2"
build-job-tags: ["brew", "build"]
job-suffix: ":idf-v5.4"
- component: $CI_SERVER_FQDN/ci/actions/idf-dynamic-pipeline/ci-component@main
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
inputs:
build-job-image: "espressif/idf:v5.3"
test-job-image: "$CI_DOCKER_REGISTRY/target-test-env-v5.3:1"
build-job-tags: ["brew", "build"]
job-suffix: ":idf-v5.3"
+3
View File
@@ -24,6 +24,9 @@ docs_build:
script:
- cd docs
- build-docs -t esp32 -l en
rules:
- if: $CI_COMMIT_REF_NAME == "master"
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
.deploy_docs_template:
image: $ESP_DOCS_ENV_IMAGE
+1
View File
@@ -3,3 +3,4 @@ Warning: Deprecated: Option '--flash_mode' is deprecated. Use '--flash-mode' ins
Warning: Deprecated: Option '--flash_freq' is deprecated. Use '--flash-freq' instead.
Warning: Deprecated: Command 'sign_data' is deprecated. Use 'sign-data' instead.
Warning: Deprecated: Command 'extract_public_key' is deprecated. Use 'extract-public-key' instead.
CryptographyDeprecationWarning
+2
View File
@@ -0,0 +1,2 @@
examples/tcp/pytest_mqtt_tcp.py::test_examples_protocol_mqtt_qos1[esp32] # Test is failing to receive the final QoS1 message but QoS1 and 2 are already tested in publish connect
test_mqtt_publish_local* # Local broker case failing with disconnection in the test
+39 -4
View File
@@ -10,12 +10,47 @@ build_job_filepatterns = [
"**/build*/*.bin",
"**/build*/*.elf",
"**/build*/flasher_args.json",
"**/build.log",
]
test_job_filepatterns = [
"**/test_logs",
"**/XUNIT_RESULT_*.xml",
]
test_job_filepatterns = ["**/test_logs", "**/XUNIT_RESULT_*.xml"]
[gitlab.build_pipeline]
job_template_jinja = """
{{ settings.gitlab.build_pipeline.job_template_name }}:
stage: "{{ settings.gitlab.build_pipeline.job_stage }}"
tags: {{ settings.gitlab.build_pipeline.job_tags }}
image: "{{ settings.gitlab.build_pipeline.job_image }}"
timeout: "1h"
artifacts:
paths:
{%- for path in settings.gitlab.artifacts.build_job_filepatterns %}
- "{{ path }}"
{%- endfor %}
expire_in: "1 week"
when: "always"
before_script:
- pip install -U 'idf-ci<1'
script: |
set -euo pipefail
ORIG_DIR="${CI_PROJECT_DIR:-$(pwd)}"
cd "${ORIG_DIR}"
echo "Original dir: $(pwd)"
rm -rf mqtt
git worktree add --force mqtt HEAD || (echo "Worktree failed; using local clone" && git clone --local --no-hardlinks . mqtt)
git worktree list || true
cd mqtt
echo "Build dir: $(pwd)"
idf-ci build run --parallel-count ${CI_NODE_TOTAL:-1} --parallel-index ${CI_NODE_INDEX:-1}
cd "${ORIG_DIR}"
echo "Moving build* dirs back to repo root..."
find mqtt -type d -name 'build_esp32_*' -print0 | while IFS= read -r -d '' d; do \
rel="${d#mqtt/}"; \
dest="${rel}"; \
mkdir -p "$(dirname "${dest}")"; \
rm -rf "${dest}"; \
mv "$d" "${dest}"; \
done
git worktree remove -f mqtt || rm -rf mqtt
"""
runs_per_job = 15
+1 -1
View File
@@ -21,7 +21,7 @@ repos:
- id: check-executables-have-shebangs # Checks executables have a proper shebang
- id: mixed-line-ending # Detects mixed line endings (CRLF/LF)
args: ['-f=lf'] # Forces files to use LF line endings
- id: double-quote-string-fixer # Converts single quotes to double quotes in strings
# Removed double-quote-string-fixer in favor of Ruff enforcing quotes
- repo: https://github.com/espressif/check-copyright/
rev: v1.1.1
+48
View File
@@ -0,0 +1,48 @@
# SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import socket
import pytest
import logging
import typing as t
def get_host_ip4_by_dest_ip(dest_ip: str = "") -> str:
"""
Get the local IP address that would be used to reach a destination IP.
Args:
dest_ip: Destination IP address. Defaults to 8.8.8.8 if not provided.
Returns:
The local IP address as a string.
"""
if not dest_ip:
dest_ip = "8.8.8.8"
s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s1.connect((dest_ip, 80))
host_ip = s1.getsockname()[0]
s1.close()
assert isinstance(host_ip, str)
print(f"Using host ip: {host_ip}")
return host_ip
@pytest.fixture
def log_performance(
record_property: t.Callable[[str, object], None],
) -> t.Callable[[str, str], None]:
"""
log performance item with pre-defined format to the console
and record it under the ``properties`` tag in the junit report if available.
"""
def real_func(item: str, value: str) -> None:
"""
:param item: performance item name
:param value: performance value
"""
logging.info("[Performance][%s]: %s", item, value)
record_property(item, value)
return real_func
+10 -10
View File
@@ -18,22 +18,22 @@ from esp_docs.conf_docs import * # noqa: F403,F401
# Only required when using ESP-IDF extensions that depend on IDF environment
extensions += ['sphinx_copybutton', # noqa: F405
extensions += ["sphinx_copybutton", # noqa: F405
# Needed as a trigger for running doxygen
'esp_docs.esp_extensions.dummy_build_system',
'esp_docs.esp_extensions.run_doxygen'
"esp_docs.esp_extensions.dummy_build_system",
"esp_docs.esp_extensions.run_doxygen"
]
# link roles config
github_repo = 'espressif/esp-mqtt'
github_repo = "espressif/esp-mqtt"
# context used by sphinx_idf_theme
html_context['github_user'] = 'espressif' # noqa: F405
html_context['github_repo'] = 'esp-mqtt' # noqa: F405
html_context["github_user"] = "espressif" # noqa: F405
html_context["github_repo"] = "esp-mqtt" # noqa: F405
# Extra options required by sphinx_idf_theme
project_slug = 'esp-mqtt'
versions_url = './_static/mqtt_docs_versions.js'
project_slug = "esp-mqtt"
versions_url = "./_static/mqtt_docs_versions.js"
idf_targets = [ 'esp32' ]
languages = ['en']
idf_targets = [ "esp32" ]
languages = ["en"]
+4 -4
View File
@@ -13,7 +13,7 @@ try:
except ImportError:
import os
import sys
sys.path.insert(0, os.path.abspath('../'))
sys.path.insert(0, os.path.abspath("../"))
from conf_common import * # noqa: F403,F401
import datetime
@@ -21,9 +21,9 @@ import datetime
current_year = datetime.datetime.now().year
# General information about the project.
project = u'ESP-MQTT Programming Guide'
copyright = u'2019 - {}, Espressif Systems (Shanghai) Co., Ltd'.format(current_year)
project = u"ESP-MQTT Programming Guide"
copyright = u"2019 - {}, Espressif Systems (Shanghai) Co., Ltd".format(current_year)
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
language = 'en'
language = "en"
+4 -4
View File
@@ -13,7 +13,7 @@ try:
except ImportError:
import os
import sys
sys.path.insert(0, os.path.abspath('../'))
sys.path.insert(0, os.path.abspath("../"))
from conf_common import * # noqa: F403,F401
import datetime
@@ -21,9 +21,9 @@ import datetime
current_year = datetime.datetime.now().year
# General information about the project.
project = u'ESP-MQTT Programming Guide'
copyright = u'2019 - {}, Espressif Systems (Shanghai) Co., Ltd'.format(current_year)
project = u"ESP-MQTT Programming Guide"
copyright = u"2019 - {}, Espressif Systems (Shanghai) Co., Ltd".format(current_year)
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
language = 'zh_CN'
language = "zh_CN"
+34 -34
View File
@@ -10,8 +10,8 @@ from pytest_embedded import Dut
from pytest_embedded_idf.utils import idf_parametrize
@pytest.mark.ethernet
@idf_parametrize('target', ['esp32'], indirect=['target'])
@pytest.mark.eth_ip101
@idf_parametrize("target", ["esp32"], indirect=["target"])
def test_examples_protocol_mqtt5(dut: Dut) -> None:
"""
steps: |
@@ -20,46 +20,46 @@ def test_examples_protocol_mqtt5(dut: Dut) -> None:
3. check connection success
"""
# check and log bin size
binary_file = os.path.join(dut.app.binary_path, 'mqtt5.bin')
binary_file = os.path.join(dut.app.binary_path, "mqtt5.bin")
bin_size = os.path.getsize(binary_file)
logging.info('mqtt5_bin_size : {}KB'.format(bin_size // 1024))
logging.info("mqtt5_bin_size : {}KB".format(bin_size // 1024))
# check if connected or not
dut.expect_exact('MQTT_EVENT_CONNECTED', timeout=30)
dut.expect_exact("MQTT_EVENT_CONNECTED", timeout=30)
# check log
res = dut.expect(r'sent publish successful, msg_id=(\d+)[^\d]')
msgid_pub1 = res.group(1).decode('utf8')
res = dut.expect(r'sent subscribe successful, msg_id=(\d+)[^\d]')
msgid_sub1 = res.group(1).decode('utf8')
res = dut.expect(r'sent subscribe successful, msg_id=(\d+)[^\d]')
msgid_sub2 = res.group(1).decode('utf8')
res = dut.expect(r'sent unsubscribe successful, msg_id=(\d+)[^\d]')
msgid_unsub = res.group(1).decode('utf8')
res = dut.expect(r'MQTT_EVENT_PUBLISHED, msg_id=(\d+)[^\d]')
msgid_pubd = res.group(1).decode('utf8')
res = dut.expect(r"sent publish successful, msg_id=(\d+)[^\d]")
msgid_pub1 = res.group(1).decode("utf8")
res = dut.expect(r"sent subscribe successful, msg_id=(\d+)[^\d]")
msgid_sub1 = res.group(1).decode("utf8")
res = dut.expect(r"sent subscribe successful, msg_id=(\d+)[^\d]")
msgid_sub2 = res.group(1).decode("utf8")
res = dut.expect(r"sent unsubscribe successful, msg_id=(\d+)[^\d]")
msgid_unsub = res.group(1).decode("utf8")
res = dut.expect(r"MQTT_EVENT_PUBLISHED, msg_id=(\d+)[^\d]")
msgid_pubd = res.group(1).decode("utf8")
assert msgid_pubd == msgid_pub1
res = dut.expect(r'MQTT_EVENT_SUBSCRIBED, msg_id=(\d+)[^\d]')
msgid_subd = res.group(1).decode('utf8')
res = dut.expect(r"MQTT_EVENT_SUBSCRIBED, msg_id=(\d+)[^\d]")
msgid_subd = res.group(1).decode("utf8")
assert msgid_subd == msgid_sub1
dut.expect_exact('sent publish successful, msg_id=0')
res = dut.expect(r'MQTT_EVENT_SUBSCRIBED, msg_id=(\d+)[^\d]')
msgid_subd = res.group(1).decode('utf8')
dut.expect_exact("sent publish successful, msg_id=0")
res = dut.expect(r"MQTT_EVENT_SUBSCRIBED, msg_id=(\d+)[^\d]")
msgid_subd = res.group(1).decode("utf8")
assert msgid_subd == msgid_sub2
dut.expect_exact('sent publish successful, msg_id=0')
dut.expect_exact('MQTT_EVENT_DATA')
dut.expect_exact('key is board, value is esp32')
dut.expect_exact('key is u, value is user')
dut.expect_exact('key is p, value is password')
dut.expect_exact('payload_format_indicator is 1')
dut.expect_exact('response_topic is /topic/test/response')
dut.expect_exact('correlation_data is 123456')
dut.expect_exact('TOPIC=/topic/qos1')
dut.expect_exact('DATA=data_3')
res = dut.expect(r'MQTT_EVENT_UNSUBSCRIBED, msg_id=(\d+)[^\d]')
msgid_unsubd = res.group(1).decode('utf8')
dut.expect_exact("sent publish successful, msg_id=0")
dut.expect_exact("MQTT_EVENT_DATA")
dut.expect_exact("key is board, value is esp32")
dut.expect_exact("key is u, value is user")
dut.expect_exact("key is p, value is password")
dut.expect_exact("payload_format_indicator is 1")
dut.expect_exact("response_topic is /topic/test/response")
dut.expect_exact("correlation_data is 123456")
dut.expect_exact("TOPIC=/topic/qos1")
dut.expect_exact("DATA=data_3")
res = dut.expect(r"MQTT_EVENT_UNSUBSCRIBED, msg_id=(\d+)[^\d]")
msgid_unsubd = res.group(1).decode("utf8")
assert msgid_unsubd == msgid_unsub
dut.expect_exact('MQTT_EVENT_DISCONNECTED')
logging.info('MQTT5 pytest pass')
dut.expect_exact("MQTT_EVENT_DISCONNECTED")
logging.info("MQTT5 pytest pass")
+2 -1
View File
@@ -7,4 +7,5 @@ CONFIG_EXAMPLE_ETH_MDIO_GPIO=18
CONFIG_EXAMPLE_ETH_PHY_RST_GPIO=5
CONFIG_EXAMPLE_ETH_PHY_ADDR=1
CONFIG_MQTT_PROTOCOL_5=y
CONFIG_BROKER_URL="mqtt://${EXAMPLE_MQTTV5_BROKER_TCP}"
CONFIG_BROKER_URL="mqtt://${TEST_BROKER_BRNO_TCP}"
CONFIG_LOG_MAXIMUM_LEVEL_DEBUG=y
+1 -1
View File
@@ -140,7 +140,7 @@ static void mqtt_app_start(void)
const esp_mqtt_client_config_t mqtt_cfg = {
.broker = {
.address.uri = CONFIG_BROKER_URI,
.verification.certificate = (const char *)mqtt_eclipseprojects_io_pem_start
.verification.certificate = (const char *)mqtt_eclipseprojects_io_pem_start,
},
};
ESP_LOGI(TAG, "[APP] Free memory: %" PRIu32 " bytes", esp_get_free_heap_size());
+36 -36
View File
@@ -18,15 +18,15 @@ event_client_connected = Event()
event_stop_client = Event()
event_client_received_correct = Event()
event_client_received_binary = Event()
message_log = ''
message_log = ""
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc): # type: (mqtt.Client, str, bool, str) -> None
_ = (userdata, flags)
print('Connected with result code ' + str(rc))
print("Connected with result code " + str(rc))
event_client_connected.set()
client.subscribe('/topic/qos0')
client.subscribe("/topic/qos0")
def mqtt_client_task(client): # type: (mqtt.Client) -> None
@@ -39,35 +39,35 @@ def on_message(client, userdata, msg): # type: (mqtt.Client, tuple, mqtt.client
global message_log
global event_client_received_correct
global event_client_received_binary
if msg.topic == '/topic/binary':
if msg.topic == "/topic/binary":
binary, bin_size = userdata
print('Receiving binary from esp and comparing with {}, size {}...'.format(binary, bin_size))
with open(binary, 'rb') as f:
print("Receiving binary from esp and comparing with {}, size {}...".format(binary, bin_size))
with open(binary, "rb") as f:
bin = f.read()
if bin[:bin_size] == msg.payload[:bin_size]:
print('...matches!')
print("...matches!")
event_client_received_binary.set()
return
recv_binary = binary + '.received'
with open(recv_binary, 'w', encoding='utf-8') as fw:
recv_binary = binary + ".received"
with open(recv_binary, "w", encoding="utf-8") as fw:
fw.write(msg.payload)
raise ValueError(
'Received binary (saved as: {}) does not match the original file: {}'.format(recv_binary, binary)
"Received binary (saved as: {}) does not match the original file: {}".format(recv_binary, binary)
)
payload = msg.payload.decode()
if not event_client_received_correct.is_set() and payload == 'data':
client.subscribe('/topic/binary')
client.publish('/topic/qos0', 'send binary please')
if msg.topic == '/topic/qos0' and payload == 'data':
if not event_client_received_correct.is_set() and payload == "data":
client.subscribe("/topic/binary")
client.publish("/topic/qos0", "send binary please")
if msg.topic == "/topic/qos0" and payload == "data":
event_client_received_correct.set()
message_log += 'Received data:' + msg.topic + ' ' + payload + '\n'
message_log += "Received data:" + msg.topic + " " + payload + "\n"
@pytest.mark.ethernet
@idf_parametrize('target', ['esp32'], indirect=['target'])
@pytest.mark.eth_ip101
@idf_parametrize("target", ["esp32"], indirect=["target"])
def test_examples_protocol_mqtt_ssl(dut): # type: (Dut) -> None
broker_url = ''
broker_url = ""
broker_port = 0
"""
steps:
@@ -77,19 +77,19 @@ def test_examples_protocol_mqtt_ssl(dut): # type: (Dut) -> None
4. Test ESP32 client received correct qos0 message
5. Test python client receives binary data from running partition and compares it with the binary
"""
binary_file = os.path.join(dut.app.binary_path, 'mqtt_ssl.bin')
binary_file = os.path.join(dut.app.binary_path, "mqtt_ssl.bin")
bin_size = os.path.getsize(binary_file)
logging.info('[Performance][mqtt_ssl_bin_size]: %s KB', bin_size // 1024)
logging.info("[Performance][mqtt_ssl_bin_size]: %s KB", bin_size // 1024)
# Look for host:port in sdkconfig
try:
value = re.search(r'\:\/\/([^:]+)\:([0-9]+)', dut.app.sdkconfig.get('BROKER_URI'))
value = re.search(r"\:\/\/([^:]+)\:([0-9]+)", dut.app.sdkconfig.get("BROKER_URI"))
assert value is not None
broker_url = value.group(1)
broker_port = int(value.group(2))
bin_size = min(int(dut.app.sdkconfig.get('BROKER_BIN_SIZE_TO_SEND')), bin_size)
bin_size = min(int(dut.app.sdkconfig.get("BROKER_BIN_SIZE_TO_SEND")), bin_size)
except Exception:
print('ENV_TEST_FAILURE: Cannot find broker url in sdkconfig')
print("ENV_TEST_FAILURE: Cannot find broker url in sdkconfig")
raise
client = None
# 1. Test connects to a broker
@@ -100,11 +100,11 @@ def test_examples_protocol_mqtt_ssl(dut): # type: (Dut) -> None
client.user_data_set((binary_file, bin_size))
client.tls_set(None, None, None, cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
client.tls_insecure_set(True)
print('Connecting...')
print("Connecting...")
client.connect(broker_url, broker_port, 60)
except Exception:
print(
'ENV_TEST_FAILURE: Unexpected error while connecting to broker {}: {}:'.format(
"ENV_TEST_FAILURE: Unexpected error while connecting to broker {}: {}:".format(
broker_url, sys.exc_info()[0]
)
)
@@ -113,23 +113,23 @@ def test_examples_protocol_mqtt_ssl(dut): # type: (Dut) -> None
thread1 = Thread(target=mqtt_client_task, args=(client,))
thread1.start()
try:
print('Connecting py-client to broker {}:{}...'.format(broker_url, broker_port))
print("Connecting py-client to broker {}:{}...".format(broker_url, broker_port))
if not event_client_connected.wait(timeout=30):
raise ValueError('ENV_TEST_FAILURE: Test script cannot connect to broker: {}'.format(broker_url))
raise ValueError("ENV_TEST_FAILURE: Test script cannot connect to broker: {}".format(broker_url))
try:
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[0]
print('Connected to AP with IP: {}'.format(ip_address))
ip_address = dut.expect(r"IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]", timeout=30)[0]
print("Connected to AP with IP: {}".format(ip_address))
except pexpect.TIMEOUT:
print('ENV_TEST_FAILURE: Cannot connect to AP')
print("ENV_TEST_FAILURE: Cannot connect to AP")
raise
print('Checking py-client received msg published from esp...')
print("Checking py-client received msg published from esp...")
if not event_client_received_correct.wait(timeout=30):
raise ValueError('Wrong data received, msg log: {}'.format(message_log))
print('Checking esp-client received msg published from py-client...')
dut.expect(r'DATA=send binary please', timeout=30)
print('Receiving binary data from running partition...')
raise ValueError("Wrong data received, msg log: {}".format(message_log))
print("Checking esp-client received msg published from py-client...")
dut.expect(r"DATA=send binary please", timeout=30)
print("Receiving binary data from running partition...")
if not event_client_received_binary.wait(timeout=30):
raise ValueError('Binary not received within timeout')
raise ValueError("Binary not received within timeout")
finally:
event_stop_client.set()
thread1.join()
+2 -1
View File
@@ -1,4 +1,4 @@
CONFIG_BROKER_URI="mqtts://${EXAMPLE_MQTT_BROKER_SSL}"
CONFIG_BROKER_URI="mqtts://${TEST_BROKER_BRNO_SSL}"
CONFIG_BROKER_CERTIFICATE_OVERRIDE="${EXAMPLE_MQTT_BROKER_CERTIFICATE}"
CONFIG_MQTT_USE_CUSTOM_CONFIG=y
CONFIG_MQTT_TCP_DEFAULT_PORT=1883
@@ -20,3 +20,4 @@ CONFIG_EXAMPLE_ETH_PHY_RST_GPIO=5
CONFIG_EXAMPLE_ETH_PHY_ADDR=1
CONFIG_EXAMPLE_CONNECT_IPV6=y
CONFIG_LWIP_CHECK_THREAD_SAFETY=y
CONFIG_LOG_MAXIMUM_LEVEL_DEBUG=y
-4
View File
@@ -8,7 +8,3 @@ dependencies:
version: ">=0.10,<2.0"
rules:
- if: target in [esp32p4, esp32h2]
espressif/esp_hosted:
version: "2.5.1"
rules:
- if: target in [esp32p4, esp32h2]
+28 -27
View File
@@ -10,8 +10,9 @@ from threading import Thread
import pexpect
import pytest
from common_test_methods import get_host_ip4_by_dest_ip
from pytest_embedded import Dut
from conftest import get_host_ip4_by_dest_ip
from pytest_embedded_idf.utils import idf_parametrize
msgid = -1
@@ -19,7 +20,7 @@ msgid = -1
def mqqt_server_sketch(my_ip, port): # type: (str, str) -> None
global msgid
print('Starting the server on {}'.format(my_ip))
logging.info("Starting the server on {}".format(my_ip))
s = None
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -28,32 +29,32 @@ def mqqt_server_sketch(my_ip, port): # type: (str, str) -> None
s.listen(1)
q, addr = s.accept()
q.settimeout(30)
print('connection accepted')
logging.info("connection accepted")
except Exception:
print(
'Local server on {}:{} listening/accepting failure: {}'
'Possibly check permissions or firewall settings'
'to accept connections on this address'.format(my_ip, port, sys.exc_info()[0])
logging.error(
"Local server on {}:{} listening/accepting failure: {}"
"Possibly check permissions or firewall settings"
"to accept connections on this address".format(my_ip, port, sys.exc_info()[0])
)
raise
data = q.recv(1024)
# check if received initial empty message
print('received from client {!r}'.format(data))
logging.info("received from client {!r}".format(data))
data = bytearray([0x20, 0x02, 0x00, 0x00])
q.send(data)
# try to receive qos1
data = q.recv(1024)
msgid = struct.unpack('>H', data[15:17])[0]
print('received from client {!r}, msgid: {}'.format(data, msgid))
msgid = struct.unpack(">H", data[15:17])[0]
logging.info("received from client {!r}, msgid: {}".format(data, msgid))
data = bytearray([0x40, 0x02, data[15], data[16]])
q.send(data)
time.sleep(5)
s.close()
print('server closed')
logging.info("server closed")
@pytest.mark.ethernet
@idf_parametrize('target', ['esp32'], indirect=['target'])
@pytest.mark.eth_ip101
@idf_parametrize("target", ["esp32"], indirect=["target"])
def test_examples_protocol_mqtt_qos1(dut: Dut) -> None:
global msgid
"""
@@ -64,34 +65,34 @@ def test_examples_protocol_mqtt_qos1(dut: Dut) -> None:
4. Test the broker received the same message id evaluated in step 3
"""
# check and log bin size
binary_file = os.path.join(dut.app.binary_path, 'mqtt_tcp.bin')
binary_file = os.path.join(dut.app.binary_path, "mqtt_tcp.bin")
bin_size = os.path.getsize(binary_file)
logging.info('[Performance][mqtt_tcp_bin_size]: %s KB', bin_size // 1024)
logging.info("[Performance][mqtt_tcp_bin_size]: %s KB", bin_size // 1024)
# waiting for getting the IP address
try:
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)', timeout=30).group(1).decode()
print('Connected to AP/Ethernet with IP: {}'.format(ip_address))
ip_address = dut.expect(r"IPv4 address: (\d+\.\d+\.\d+\.\d+)", timeout=30).group(1).decode()
logging.info("Connected to AP/Ethernet with IP: {}".format(ip_address))
except pexpect.TIMEOUT:
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
raise ValueError("ENV_TEST_FAILURE: Cannot connect to AP/Ethernet")
# 2. start mqtt broker sketch
host_ip = get_host_ip4_by_dest_ip(ip_address)
thread1 = Thread(target=mqqt_server_sketch, args=(host_ip, 1883))
thread1.start()
data_write = 'mqtt://' + host_ip
print('writing to device: {}'.format(data_write))
data_write = "mqtt://" + host_ip
logging.info("writing to device: {}".format(data_write))
dut.write(data_write)
thread1.join()
print('Message id received from server: {}'.format(msgid))
logging.info("Message id received from server: {}".format(msgid))
# 3. check the message id was enqueued and then deleted
msgid_enqueued = dut.expect(b'outbox: ENQUEUE msgid=([0-9]+)', timeout=30).group(1).decode()
msgid_deleted = dut.expect(b'outbox: DELETED msgid=([0-9]+)', timeout=30).group(1).decode()
msgid_enqueued = dut.expect(b"outbox: ENQUEUE msgid=([0-9]+)", timeout=30).group(1).decode()
msgid_deleted = dut.expect(b"outbox: DELETED msgid=([0-9]+)", timeout=30).group(1).decode()
# 4. check the msgid of received data are the same as that of enqueued and deleted from outbox
if msgid_enqueued == str(msgid) and msgid_deleted == str(msgid):
print('PASS: Received correct msg id')
logging.info("PASS: Received correct msg id")
else:
print('Failure!')
logging.error("Failure!")
raise ValueError(
'Mismatch of msgid: received: {}, enqueued {}, deleted {}'.format(msgid, msgid_enqueued, msgid_deleted)
"Mismatch of msgid: received: {}, enqueued {}, deleted {}".format(msgid, msgid_enqueued, msgid_deleted)
)
thread1.join()
+1
View File
@@ -11,3 +11,4 @@ CONFIG_EXAMPLE_ETH_PHY_ADDR=1
CONFIG_EXAMPLE_CONNECT_IPV6=y
CONFIG_LWIP_TCPIP_CORE_LOCKING=y
CONFIG_LWIP_CHECK_THREAD_SAFETY=y
CONFIG_LOG_MAXIMUM_LEVEL_DEBUG=y
+1
View File
@@ -3,3 +3,4 @@ CONFIG_EXAMPLE_CONNECT_WIFI=y
CONFIG_ESP_WIFI_REMOTE_LIBRARY_EPPP=y
CONFIG_ESP_WIFI_REMOTE_EPPP_UART_TX_PIN=17
CONFIG_ESP_WIFI_REMOTE_EPPP_UART_RX_PIN=16
CONFIG_LOG_MAXIMUM_LEVEL_DEBUG=y
+1
View File
@@ -1,3 +1,4 @@
CONFIG_IDF_TARGET="esp32p4"
CONFIG_EXAMPLE_CONNECT_WIFI=y
CONFIG_ESP_WIFI_REMOTE_LIBRARY_HOSTED=y
CONFIG_LOG_MAXIMUM_LEVEL_DEBUG=y
+2
View File
@@ -1,3 +1,5 @@
CONFIG_EXAMPLE_CONNECT_WIFI=n
CONFIG_EXAMPLE_CONNECT_PPP=y
CONFIG_EXAMPLE_CONNECT_PPP_DEVICE_UART=y
CONFIG_LOG_MAXIMUM_LEVEL_DEBUG=y
CONFIG_PARTITION_TABLE_SINGLE_APP_LARGE=y
+26 -26
View File
@@ -17,15 +17,15 @@ from pytest_embedded_idf.utils import idf_parametrize
event_client_connected = Event()
event_stop_client = Event()
event_client_received_correct = Event()
message_log = ''
message_log = ""
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc): # type: (mqtt.Client, tuple, bool, str) -> None
_ = (userdata, flags)
print('Connected with result code ' + str(rc))
print("Connected with result code " + str(rc))
event_client_connected.set()
client.subscribe('/topic/qos0')
client.subscribe("/topic/qos0")
def mqtt_client_task(client): # type: (mqtt.Client) -> None
@@ -38,17 +38,17 @@ def on_message(client, userdata, msg): # type: (mqtt.Client, tuple, mqtt.client
_ = userdata
global message_log
payload = msg.payload.decode()
if not event_client_received_correct.is_set() and payload == 'data':
client.publish('/topic/qos0', 'data_to_esp32')
if msg.topic == '/topic/qos0' and payload == 'data':
if not event_client_received_correct.is_set() and payload == "data":
client.publish("/topic/qos0", "data_to_esp32")
if msg.topic == "/topic/qos0" and payload == "data":
event_client_received_correct.set()
message_log += 'Received data:' + msg.topic + ' ' + payload + '\n'
message_log += "Received data:" + msg.topic + " " + payload + "\n"
@pytest.mark.ethernet
@idf_parametrize('target', ['esp32'], indirect=['target'])
@pytest.mark.eth_ip101
@idf_parametrize("target", ["esp32"], indirect=["target"])
def test_examples_protocol_mqtt_ws(dut): # type: (Dut) -> None
broker_url = ''
broker_url = ""
broker_port = 0
"""
steps: |
@@ -58,29 +58,29 @@ def test_examples_protocol_mqtt_ws(dut): # type: (Dut) -> None
4. Test ESP32 client received correct qos0 message
"""
# check and log bin size
binary_file = os.path.join(dut.app.binary_path, 'mqtt_websocket.bin')
binary_file = os.path.join(dut.app.binary_path, "mqtt_websocket.bin")
bin_size = os.path.getsize(binary_file)
logging.info('[Performance][mqtt_websocket_bin_size]: %s KB', bin_size // 1024)
logging.info("[Performance][mqtt_websocket_bin_size]: %s KB", bin_size // 1024)
# Look for host:port in sdkconfig
try:
value = re.search(r'\:\/\/([^:]+)\:([0-9]+)', dut.app.sdkconfig.get('BROKER_URI'))
value = re.search(r"\:\/\/([^:]+)\:([0-9]+)", dut.app.sdkconfig.get("BROKER_URI"))
assert value is not None
broker_url = value.group(1)
broker_port = int(value.group(2))
except Exception:
print('ENV_TEST_FAILURE: Cannot find broker url in sdkconfig')
print("ENV_TEST_FAILURE: Cannot find broker url in sdkconfig")
raise
client = None
# 1. Test connects to a broker
try:
client = mqtt.Client(transport='websockets')
client = mqtt.Client(transport="websockets")
client.on_connect = on_connect
client.on_message = on_message
print('Connecting...')
print("Connecting...")
client.connect(broker_url, broker_port, 60)
except Exception:
print(
'ENV_TEST_FAILURE: Unexpected error while connecting to broker {}: {}:'.format(
"ENV_TEST_FAILURE: Unexpected error while connecting to broker {}: {}:".format(
broker_url, sys.exc_info()[0]
)
)
@@ -89,20 +89,20 @@ def test_examples_protocol_mqtt_ws(dut): # type: (Dut) -> None
thread1 = Thread(target=mqtt_client_task, args=(client,))
thread1.start()
try:
print('Connecting py-client to broker {}:{}...'.format(broker_url, broker_port))
print("Connecting py-client to broker {}:{}...".format(broker_url, broker_port))
if not event_client_connected.wait(timeout=30):
raise ValueError('ENV_TEST_FAILURE: Test script cannot connect to broker: {}'.format(broker_url))
raise ValueError("ENV_TEST_FAILURE: Test script cannot connect to broker: {}".format(broker_url))
try:
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[0]
print('Connected to AP with IP: {}'.format(ip_address))
ip_address = dut.expect(r"IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]", timeout=30)[0]
print("Connected to AP with IP: {}".format(ip_address))
except Dut.ExpectTimeout:
print('ENV_TEST_FAILURE: Cannot connect to AP')
print("ENV_TEST_FAILURE: Cannot connect to AP")
raise
print('Checking py-client received msg published from esp...')
print("Checking py-client received msg published from esp...")
if not event_client_received_correct.wait(timeout=30):
raise ValueError('Wrong data received, msg log: {}'.format(message_log))
print('Checking esp-client received msg published from py-client...')
dut.expect(r'DATA=data_to_esp32', timeout=30)
raise ValueError("Wrong data received, msg log: {}".format(message_log))
print("Checking esp-client received msg published from py-client...")
dut.expect(r"DATA=data_to_esp32", timeout=30)
finally:
event_stop_client.set()
thread1.join()
+2 -1
View File
@@ -1,4 +1,4 @@
CONFIG_BROKER_URI="ws://${EXAMPLE_MQTT_BROKER_WS}/ws"
CONFIG_BROKER_URI="ws://${TEST_BROKER_BRNO_WS}/ws"
CONFIG_EXAMPLE_CONNECT_ETHERNET=y
CONFIG_EXAMPLE_CONNECT_WIFI=n
CONFIG_EXAMPLE_USE_INTERNAL_ETHERNET=y
@@ -9,3 +9,4 @@ CONFIG_EXAMPLE_ETH_PHY_RST_GPIO=5
CONFIG_EXAMPLE_ETH_PHY_ADDR=1
CONFIG_EXAMPLE_CONNECT_IPV6=y
CONFIG_LWIP_CHECK_THREAD_SAFETY=y
CONFIG_LOG_MAXIMUM_LEVEL_DEBUG=y
+26 -26
View File
@@ -19,15 +19,15 @@ from pytest_embedded_idf.utils import idf_parametrize
event_client_connected = Event()
event_stop_client = Event()
event_client_received_correct = Event()
message_log = ''
message_log = ""
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc): # type: (mqtt.Client, tuple, bool, str) -> None
_ = (userdata, flags)
print('Connected with result code ' + str(rc))
print("Connected with result code " + str(rc))
event_client_connected.set()
client.subscribe('/topic/qos0')
client.subscribe("/topic/qos0")
def mqtt_client_task(client): # type: (mqtt.Client) -> None
@@ -40,17 +40,17 @@ def on_message(client, userdata, msg): # type: (mqtt.Client, tuple, mqtt.client
_ = userdata
global message_log
payload = msg.payload.decode()
if not event_client_received_correct.is_set() and payload == 'data':
client.publish('/topic/qos0', 'data_to_esp32')
if msg.topic == '/topic/qos0' and payload == 'data':
if not event_client_received_correct.is_set() and payload == "data":
client.publish("/topic/qos0", "data_to_esp32")
if msg.topic == "/topic/qos0" and payload == "data":
event_client_received_correct.set()
message_log += 'Received data:' + msg.topic + ' ' + payload + '\n'
message_log += "Received data:" + msg.topic + " " + payload + "\n"
@pytest.mark.ethernet
@idf_parametrize('target', ['esp32'], indirect=['target'])
@pytest.mark.eth_ip101
@idf_parametrize("target", ["esp32"], indirect=["target"])
def test_examples_protocol_mqtt_wss(dut): # type: (Dut) -> None # type: ignore
broker_url = ''
broker_url = ""
broker_port = 0
"""
steps: |
@@ -60,30 +60,30 @@ def test_examples_protocol_mqtt_wss(dut): # type: (Dut) -> None # type: ignore
4. Test ESP32 client received correct qos0 message
"""
# check and log bin size
binary_file = os.path.join(dut.app.binary_path, 'mqtt_websocket_secure.bin')
binary_file = os.path.join(dut.app.binary_path, "mqtt_websocket_secure.bin")
bin_size = os.path.getsize(binary_file)
logging.info('[Performance][mqtt_websocket_secure_bin_size]: %s KB', bin_size // 1024)
logging.info("[Performance][mqtt_websocket_secure_bin_size]: %s KB", bin_size // 1024)
# Look for host:port in sdkconfig
try:
value = re.search(r'\:\/\/([^:]+)\:([0-9]+)', dut.app.sdkconfig.get('BROKER_URI'))
value = re.search(r"\:\/\/([^:]+)\:([0-9]+)", dut.app.sdkconfig.get("BROKER_URI"))
assert value is not None
broker_url = value.group(1)
broker_port = int(value.group(2))
except Exception:
print('ENV_TEST_FAILURE: Cannot find broker url in sdkconfig')
print("ENV_TEST_FAILURE: Cannot find broker url in sdkconfig")
raise
client = None
# 1. Test connects to a broker
try:
client = mqtt.Client(transport='websockets')
client = mqtt.Client(transport="websockets")
client.on_connect = on_connect
client.on_message = on_message
client.tls_set(None, None, None, cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
print('Connecting...')
print("Connecting...")
client.connect(broker_url, broker_port, 60)
except Exception:
print(
'ENV_TEST_FAILURE: Unexpected error while connecting to broker {}: {}:'.format(
"ENV_TEST_FAILURE: Unexpected error while connecting to broker {}: {}:".format(
broker_url, sys.exc_info()[0]
)
)
@@ -92,20 +92,20 @@ def test_examples_protocol_mqtt_wss(dut): # type: (Dut) -> None # type: ignore
thread1 = Thread(target=mqtt_client_task, args=(client,))
thread1.start()
try:
print('Connecting py-client to broker {}:{}...'.format(broker_url, broker_port))
print("Connecting py-client to broker {}:{}...".format(broker_url, broker_port))
if not event_client_connected.wait(timeout=30):
raise ValueError('ENV_TEST_FAILURE: Test script cannot connect to broker: {}'.format(broker_url))
raise ValueError("ENV_TEST_FAILURE: Test script cannot connect to broker: {}".format(broker_url))
try:
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[0]
print('Connected to AP with IP: {}'.format(ip_address))
ip_address = dut.expect(r"IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]", timeout=30)[0]
print("Connected to AP with IP: {}".format(ip_address))
except pexpect.TIMEOUT:
print('ENV_TEST_FAILURE: Cannot connect to AP')
print("ENV_TEST_FAILURE: Cannot connect to AP")
raise
print('Checking py-client received msg published from esp...')
print("Checking py-client received msg published from esp...")
if not event_client_received_correct.wait(timeout=30):
raise ValueError('Wrong data received, msg log: {}'.format(message_log))
print('Checking esp-client received msg published from py-client...')
dut.expect(r'DATA=data_to_esp32', timeout=30)
raise ValueError("Wrong data received, msg log: {}".format(message_log))
print("Checking esp-client received msg published from py-client...")
dut.expect(r"DATA=data_to_esp32", timeout=30)
finally:
event_stop_client.set()
thread1.join()
+2 -1
View File
@@ -1,4 +1,4 @@
CONFIG_BROKER_URI="wss://${EXAMPLE_MQTT_BROKER_WSS}/ws"
CONFIG_BROKER_URI="wss://${TEST_BROKER_BRNO_WSS}/ws"
CONFIG_BROKER_CERTIFICATE_OVERRIDE="${EXAMPLE_MQTT_BROKER_CERTIFICATE}"
CONFIG_EXAMPLE_CONNECT_ETHERNET=y
CONFIG_EXAMPLE_CONNECT_WIFI=n
@@ -11,3 +11,4 @@ CONFIG_EXAMPLE_ETH_PHY_ADDR=1
CONFIG_EXAMPLE_CONNECT_IPV6=y
CONFIG_LWIP_TCPIP_CORE_LOCKING=y
CONFIG_LWIP_CHECK_THREAD_SAFETY=y
CONFIG_LOG_MAXIMUM_LEVEL_DEBUG=y
+12
View File
@@ -0,0 +1,12 @@
[tool.ruff]
# Keep default settings; configure lint rules below.
line-length = 120
[tool.ruff.lint]
select = ["E", "F", "Q"]
[tool.ruff.lint."flake8-quotes"]
inline-quotes = "double"
multiline-quotes = "double"
docstring-quotes = "double"
avoid-escape = true
+24 -11
View File
@@ -3,13 +3,15 @@
*
* SPDX-License-Identifier: Apache-2.0
*/
#include "esp_eth.h"
#include "esp_event.h"
#include "esp_log.h"
#include "esp_netif.h"
#include "freertos/FreeRTOS.h"
#include "freertos/event_groups.h"
#include "unity.h"
#include "esp_event.h"
#include "esp_netif.h"
#include "esp_eth.h"
#include "esp_log.h"
#include "esp_idf_version.h"
#if SOC_EMAC_SUPPORTED
#define ETH_START_BIT BIT(0)
@@ -74,7 +76,8 @@ static void got_ip_event_handler(void *arg, esp_event_base_t event_base,
xEventGroupSetBits(eth_event_group, ETH_GOT_IP_BIT);
}
static esp_err_t test_uninstall_driver(esp_eth_handle_t eth_hdl, uint32_t ms_to_wait)
static esp_err_t test_uninstall_driver(esp_eth_handle_t eth_hdl,
uint32_t ms_to_wait)
{
int i = 0;
ms_to_wait += 100;
@@ -107,7 +110,11 @@ void connect_test_fixture_setup(void)
eth_esp32_emac_config_t esp32_emac_config = ETH_ESP32_EMAC_DEFAULT_CONFIG();
s_mac = esp_eth_mac_new_esp32(&esp32_emac_config, &mac_config);
eth_phy_config_t phy_config = ETH_PHY_DEFAULT_CONFIG();
#if ESP_IDF_VERSION >= ESP_IDF_VERSION_VAL(6, 0, 0)
s_phy = esp_eth_phy_new_generic(&phy_config);
#else
s_phy = esp_eth_phy_new_ip101(&phy_config);
#endif
esp_eth_config_t eth_config = ETH_DEFAULT_CONFIG(s_mac, s_phy);
// install Ethernet driver
TEST_ESP_OK(esp_eth_driver_install(&eth_config, &s_eth_handle));
@@ -115,12 +122,15 @@ void connect_test_fixture_setup(void)
s_eth_glue = esp_eth_new_netif_glue(s_eth_handle);
TEST_ESP_OK(esp_netif_attach(s_eth_netif, s_eth_glue));
// register user defined event handlers
TEST_ESP_OK(esp_event_handler_register(ETH_EVENT, ESP_EVENT_ANY_ID, &eth_event_handler, s_eth_event_group));
TEST_ESP_OK(esp_event_handler_register(IP_EVENT, IP_EVENT_ETH_GOT_IP, &got_ip_event_handler, s_eth_event_group));
TEST_ESP_OK(esp_event_handler_register(
ETH_EVENT, ESP_EVENT_ANY_ID, &eth_event_handler, s_eth_event_group));
TEST_ESP_OK(esp_event_handler_register(
IP_EVENT, IP_EVENT_ETH_GOT_IP, &got_ip_event_handler, s_eth_event_group));
// start Ethernet driver
TEST_ESP_OK(esp_eth_start(s_eth_handle));
/* wait for IP lease */
bits = xEventGroupWaitBits(s_eth_event_group, ETH_GOT_IP_BIT, true, true, pdMS_TO_TICKS(ETH_GET_IP_TIMEOUT_MS));
bits = xEventGroupWaitBits(s_eth_event_group, ETH_GOT_IP_BIT, true, true,
pdMS_TO_TICKS(ETH_GET_IP_TIMEOUT_MS));
TEST_ASSERT((bits & ETH_GOT_IP_BIT) == ETH_GOT_IP_BIT);
}
@@ -130,15 +140,18 @@ void connect_test_fixture_teardown(void)
// stop Ethernet driver
TEST_ESP_OK(esp_eth_stop(s_eth_handle));
/* wait for connection stop */
bits = xEventGroupWaitBits(s_eth_event_group, ETH_STOP_BIT, true, true, pdMS_TO_TICKS(ETH_STOP_TIMEOUT_MS));
bits = xEventGroupWaitBits(s_eth_event_group, ETH_STOP_BIT, true, true,
pdMS_TO_TICKS(ETH_STOP_TIMEOUT_MS));
TEST_ASSERT((bits & ETH_STOP_BIT) == ETH_STOP_BIT);
TEST_ESP_OK(esp_eth_del_netif_glue(s_eth_glue));
/* driver should be uninstalled within 2 seconds */
TEST_ESP_OK(test_uninstall_driver(s_eth_handle, 2000));
TEST_ESP_OK(s_phy->del(s_phy));
TEST_ESP_OK(s_mac->del(s_mac));
TEST_ESP_OK(esp_event_handler_unregister(IP_EVENT, IP_EVENT_ETH_GOT_IP, got_ip_event_handler));
TEST_ESP_OK(esp_event_handler_unregister(ETH_EVENT, ESP_EVENT_ANY_ID, eth_event_handler));
TEST_ESP_OK(esp_event_handler_unregister(IP_EVENT, IP_EVENT_ETH_GOT_IP,
got_ip_event_handler));
TEST_ESP_OK(esp_event_handler_unregister(ETH_EVENT, ESP_EVENT_ANY_ID,
eth_event_handler));
esp_netif_destroy(s_eth_netif);
TEST_ESP_OK(esp_event_loop_delete_default());
vEventGroupDelete(s_eth_event_group);
+2 -2
View File
@@ -5,7 +5,7 @@ from pytest_embedded import Dut
from pytest_embedded_idf.utils import idf_parametrize
@pytest.mark.ethernet
@idf_parametrize('target', ['esp32'], indirect=['target'])
@pytest.mark.eth_ip101
@idf_parametrize("target", ["esp32"], indirect=["target"])
def test_mqtt_client(dut: Dut) -> None:
dut.expect_unity_test_output()
+3 -2
View File
@@ -1,4 +1,5 @@
CONFIG_MQTT_TEST_BROKER_URI="mqtt://${EXAMPLE_MQTT_BROKER_TCP}"
CONFIG_MQTT5_TEST_BROKER_URI="mqtt://${EXAMPLE_MQTT_BROKER_TCP}"
CONFIG_MQTT_TEST_BROKER_URI="mqtt://${TEST_BROKER_BRNO_TCP}"
CONFIG_MQTT5_TEST_BROKER_URI="mqtt://${TEST_BROKER_BRNO_TCP}"
CONFIG_ESP_TASK_WDT_EN=n
CONFIG_UNITY_ENABLE_FIXTURE=y
CONFIG_LOG_MAXIMUM_LEVEL_DEBUG=y
+2 -2
View File
@@ -5,7 +5,7 @@ from pytest_embedded import Dut
from pytest_embedded_idf.utils import idf_parametrize
@pytest.mark.ethernet
@idf_parametrize('target', ['esp32'], indirect=['target'])
@pytest.mark.eth_ip101
@idf_parametrize("target", ["esp32"], indirect=["target"])
def test_mqtt5_client(dut: Dut) -> None:
dut.expect_unity_test_output()
+2 -2
View File
@@ -1,4 +1,4 @@
CONFIG_MQTT_TEST_BROKER_URI="mqtt://${EXAMPLE_MQTT_BROKER_TCP}"
CONFIG_MQTT5_TEST_BROKER_URI="mqtt://${EXAMPLE_MQTT_BROKER_TCP}"
CONFIG_MQTT_TEST_BROKER_URI="mqtt://${TEST_BROKER_BRNO_TCP}"
CONFIG_MQTT5_TEST_BROKER_URI="mqtt://${TEST_BROKER_BRNO_TCP}"
CONFIG_ESP_TASK_WDT_EN=n
CONFIG_UNITY_ENABLE_FIXTURE=y
+101 -93
View File
@@ -14,8 +14,9 @@ from typing import Dict
from typing import Optional
import pytest
from common_test_methods import get_host_ip4_by_dest_ip
from pytest_embedded import Dut
from conftest import get_host_ip4_by_dest_ip
from pytest_embedded_idf.utils import idf_parametrize
SERVER_PORT = 2222
@@ -27,44 +28,44 @@ def _path(f): # type: (str) -> str
def set_server_cert_cn(ip): # type: (str) -> None
arg_list = [
['openssl', 'req', '-out', _path('srv.csr'), '-key', _path('server.key'), '-subj', '/CN={}'.format(ip), '-new'],
["openssl", "req", "-out", _path("srv.csr"), "-key", _path("server.key"), "-subj", "/CN={}".format(ip), "-new"],
[
'openssl',
'x509',
'-req',
'-in',
_path('srv.csr'),
'-CA',
_path('ca.crt'),
'-CAkey',
_path('ca.key'),
'-CAcreateserial',
'-out',
_path('srv.crt'),
'-days',
'360',
"openssl",
"x509",
"-req",
"-in",
_path("srv.csr"),
"-CA",
_path("ca.crt"),
"-CAkey",
_path("ca.key"),
"-CAcreateserial",
"-out",
_path("srv.crt"),
"-days",
"360",
],
]
for args in arg_list:
if subprocess.check_call(args) != 0:
raise RuntimeError('openssl command {} failed'.format(args))
raise RuntimeError("openssl command {} failed".format(args))
class MQTTHandler(socketserver.StreamRequestHandler):
def handle(self) -> None:
logging.info(' - connection from: {}'.format(self.client_address))
logging.info(" - connection from: {}".format(self.client_address))
data = bytearray(self.request.recv(1024))
message = ''.join(format(x, '02x') for x in data)
if message[0:16] == '101800044d515454':
message = "".join(format(x, "02x") for x in data)
if message[0:16] == "101800044d515454":
if self.server.refuse_connection is False: # type: ignore
logging.info(' - received mqtt connect, sending ACK')
self.request.send(bytearray.fromhex('20020000'))
logging.info(" - received mqtt connect, sending ACK")
self.request.send(bytearray.fromhex("20020000"))
else:
# injecting connection not authorized error
logging.info(' - received mqtt connect, sending NAK')
self.request.send(bytearray.fromhex('20020005'))
logging.info(" - received mqtt connect, sending NAK")
self.request.send(bytearray.fromhex("20020005"))
else:
raise Exception(' - error process_mqtt_connect unexpected connect received: {}'.format(message))
raise Exception(" - error process_mqtt_connect unexpected connect received: {}".format(message))
# Simple server for mqtt over TLS connection
@@ -83,16 +84,16 @@ class TlsServer(socketserver.TCPServer):
):
self.refuse_connection = refuse_connection
self.context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
self.ssl_error = ''
self.ssl_error = ""
self.alpn_protocol: Optional[str] = None
if client_cert:
self.context.verify_mode = ssl.CERT_REQUIRED
self.context.load_verify_locations(cafile=_path('ca.crt'))
self.context.load_cert_chain(certfile=_path('srv.crt'), keyfile=_path('server.key'))
self.context.load_verify_locations(cafile=_path("ca.crt"))
self.context.load_cert_chain(certfile=_path("srv.crt"), keyfile=_path("server.key"))
if use_alpn:
self.context.set_alpn_protocols(['mymqtt', 'http/1.1'])
self.context.set_alpn_protocols(["mymqtt", "http/1.1"])
self.server_thread = Thread(target=self.serve_forever)
super().__init__(('', port), ServerHandler)
super().__init__(("", port), ServerHandler)
def server_activate(self) -> None:
self.socket = self.context.wrap_socket(self.socket, server_side=True)
@@ -145,112 +146,119 @@ def get_test_cases(dut: Dut) -> Any:
try:
# Get connection test cases configuration: symbolic names for test cases
for case in [
'EXAMPLE_CONNECT_CASE_NO_CERT',
'EXAMPLE_CONNECT_CASE_SERVER_CERT',
'EXAMPLE_CONNECT_CASE_MUTUAL_AUTH',
'EXAMPLE_CONNECT_CASE_INVALID_SERVER_CERT',
'EXAMPLE_CONNECT_CASE_SERVER_DER_CERT',
'EXAMPLE_CONNECT_CASE_MUTUAL_AUTH_KEY_PWD',
'EXAMPLE_CONNECT_CASE_MUTUAL_AUTH_BAD_CRT',
'EXAMPLE_CONNECT_CASE_NO_CERT_ALPN',
"EXAMPLE_CONNECT_CASE_NO_CERT",
"EXAMPLE_CONNECT_CASE_SERVER_CERT",
"EXAMPLE_CONNECT_CASE_MUTUAL_AUTH",
"EXAMPLE_CONNECT_CASE_INVALID_SERVER_CERT",
"EXAMPLE_CONNECT_CASE_SERVER_DER_CERT",
"EXAMPLE_CONNECT_CASE_MUTUAL_AUTH_KEY_PWD",
"EXAMPLE_CONNECT_CASE_MUTUAL_AUTH_BAD_CRT",
"EXAMPLE_CONNECT_CASE_NO_CERT_ALPN",
]:
cases[case] = dut.app.sdkconfig.get(case)
except Exception:
logging.error('ENV_TEST_FAILURE: Some mandatory CONNECTION test case not found in sdkconfig')
logging.error("ENV_TEST_FAILURE: Some mandatory CONNECTION test case not found in sdkconfig")
raise
return cases
def get_dut_ip(dut: Dut) -> Any:
dut_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30).group(1).decode()
logging.info('Got IP={}'.format(dut_ip))
dut_ip = dut.expect(r"IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]", timeout=30).group(1).decode()
logging.info("Got IP={}".format(dut_ip))
return get_host_ip4_by_dest_ip(dut_ip)
@contextlib.contextmanager
def connect_dut(dut: Dut, uri: str, case_id: int) -> Any:
dut.write('connection_setup')
dut.write(f'connect {uri} {case_id}')
dut.expect(f'Test case:{case_id} started')
dut.write('reconnect')
dut.write("connection_setup")
dut.write(f"connect {uri} {case_id}")
dut.expect(f"Test case:{case_id} started")
dut.write("reconnect")
yield
dut.write('connection_teardown')
dut.write('disconnect')
dut.write("connection_teardown")
dut.write("disconnect")
def run_cases(dut: Dut, uri: str, cases: Dict[str, int]) -> None:
try:
dut.write('init')
dut.write('start')
dut.write('disconnect')
dut.write("init")
dut.write("start")
dut.write("disconnect")
for case in [
'EXAMPLE_CONNECT_CASE_NO_CERT',
'EXAMPLE_CONNECT_CASE_SERVER_CERT',
'EXAMPLE_CONNECT_CASE_SERVER_DER_CERT',
"EXAMPLE_CONNECT_CASE_NO_CERT",
"EXAMPLE_CONNECT_CASE_SERVER_CERT",
"EXAMPLE_CONNECT_CASE_SERVER_DER_CERT",
]:
# All these cases connect to the server with no server verification or with server only verification
with TlsServer(), connect_dut(dut, uri, cases[case]):
logging.info(f'Running {case}: default server - expect to connect normally')
dut.expect(f'MQTT_EVENT_CONNECTED: Test={cases[case]}', timeout=30)
logging.info(f"Running {case}: default server - expect to connect normally")
dut.expect(f"MQTT_EVENT_CONNECTED: Test={cases[case]}", timeout=30)
with TlsServer(refuse_connection=True), connect_dut(dut, uri, cases[case]):
logging.info(f'Running {case}: ssl shall connect, but mqtt sends connect refusal')
dut.expect(f'MQTT_EVENT_ERROR: Test={cases[case]}', timeout=30)
dut.expect('MQTT ERROR: 0x5') # expecting 0x5 ... connection not authorized error
logging.info(f"Running {case}: ssl shall connect, but mqtt sends connect refusal")
dut.expect(f"MQTT_EVENT_ERROR: Test={cases[case]}", timeout=30)
dut.expect("MQTT ERROR: 0x5") # expecting 0x5 ... connection not authorized error
with TlsServer(client_cert=True) as server, connect_dut(dut, uri, cases[case]):
logging.info(
f'Running {case}: server with client verification - handshake error since client presents no client certificate'
(
f"Running {case}: server with client verification - handshake error since client presents no "
"client certificate"
)
)
dut.expect(f'MQTT_EVENT_ERROR: Test={cases[case]}', timeout=30)
dut.expect(f"MQTT_EVENT_ERROR: Test={cases[case]}", timeout=30)
dut.expect(
'ESP-TLS ERROR: ESP_ERR_MBEDTLS_SSL_HANDSHAKE_FAILED'
"ESP-TLS ERROR: ESP_ERR_MBEDTLS_SSL_HANDSHAKE_FAILED"
) # expect ... handshake error (PEER_DID_NOT_RETURN_A_CERTIFICATE)
assert 'PEER_DID_NOT_RETURN_A_CERTIFICATE' in server.last_ssl_error()
assert "PEER_DID_NOT_RETURN_A_CERTIFICATE" in server.last_ssl_error()
for case in ['EXAMPLE_CONNECT_CASE_MUTUAL_AUTH', 'EXAMPLE_CONNECT_CASE_MUTUAL_AUTH_KEY_PWD']:
# These cases connect to server with both server and client verification (client key might be password protected)
for case in ["EXAMPLE_CONNECT_CASE_MUTUAL_AUTH", "EXAMPLE_CONNECT_CASE_MUTUAL_AUTH_KEY_PWD"]:
# These cases connect to server with both server and client verification
# (client key might be password protected)
with TlsServer(client_cert=True), connect_dut(dut, uri, cases[case]):
logging.info(f'Running {case}: server with client verification - expect to connect normally')
dut.expect(f'MQTT_EVENT_CONNECTED: Test={cases[case]}', timeout=30)
logging.info(f"Running {case}: server with client verification - expect to connect normally")
dut.expect(f"MQTT_EVENT_CONNECTED: Test={cases[case]}", timeout=30)
case = 'EXAMPLE_CONNECT_CASE_INVALID_SERVER_CERT'
case = "EXAMPLE_CONNECT_CASE_INVALID_SERVER_CERT"
with TlsServer() as s, connect_dut(dut, uri, cases[case]):
logging.info(f'Running {case}: invalid server certificate on default server - expect ssl handshake error')
dut.expect(f'MQTT_EVENT_ERROR: Test={cases[case]}', timeout=30)
logging.info(f"Running {case}: invalid server certificate on default server - expect ssl handshake error")
dut.expect(f"MQTT_EVENT_ERROR: Test={cases[case]}", timeout=30)
dut.expect(
'ESP-TLS ERROR: ESP_ERR_MBEDTLS_SSL_HANDSHAKE_FAILED'
"ESP-TLS ERROR: ESP_ERR_MBEDTLS_SSL_HANDSHAKE_FAILED"
) # expect ... handshake error (TLSV1_ALERT_UNKNOWN_CA)
if re.match('.*alert.*unknown.*ca', s.last_ssl_error(), flags=re.I) is None:
raise Exception(f'Unexpected ssl error from the server: {s.last_ssl_error()}')
if re.match(".*alert.*unknown.*ca", s.last_ssl_error(), flags=re.I) is None:
raise Exception(f"Unexpected ssl error from the server: {s.last_ssl_error()}")
case = 'EXAMPLE_CONNECT_CASE_MUTUAL_AUTH_BAD_CRT'
case = "EXAMPLE_CONNECT_CASE_MUTUAL_AUTH_BAD_CRT"
with TlsServer(client_cert=True) as s, connect_dut(dut, uri, cases[case]):
logging.info(
f'Running {case}: Invalid client certificate on server with client verification - expect ssl handshake error'
(
f"Running {case}: Invalid client certificate on server with client verification - expect ssl "
"handshake error"
)
)
dut.expect(f'MQTT_EVENT_ERROR: Test={cases[case]}', timeout=30)
dut.expect(f"MQTT_EVENT_ERROR: Test={cases[case]}", timeout=30)
dut.expect(
'ESP-TLS ERROR: ESP_ERR_MBEDTLS_SSL_HANDSHAKE_FAILED'
"ESP-TLS ERROR: ESP_ERR_MBEDTLS_SSL_HANDSHAKE_FAILED"
) # expect ... handshake error (CERTIFICATE_VERIFY_FAILED)
if 'CERTIFICATE_VERIFY_FAILED' not in s.last_ssl_error():
raise Exception('Unexpected ssl error from the server {}'.format(s.last_ssl_error()))
if "CERTIFICATE_VERIFY_FAILED" not in s.last_ssl_error():
raise Exception("Unexpected ssl error from the server {}".format(s.last_ssl_error()))
for case in ['EXAMPLE_CONNECT_CASE_NO_CERT', 'EXAMPLE_CONNECT_CASE_NO_CERT_ALPN']:
for case in ["EXAMPLE_CONNECT_CASE_NO_CERT", "EXAMPLE_CONNECT_CASE_NO_CERT_ALPN"]:
with TlsServer(use_alpn=True) as s, connect_dut(dut, uri, cases[case]):
logging.info(f'Running {case}: server with alpn - expect connect, check resolved protocol')
dut.expect(f'MQTT_EVENT_CONNECTED: Test={cases[case]}', timeout=30)
if case == 'EXAMPLE_CONNECT_CASE_NO_CERT':
logging.info(f"Running {case}: server with alpn - expect connect, check resolved protocol")
dut.expect(f"MQTT_EVENT_CONNECTED: Test={cases[case]}", timeout=30)
if case == "EXAMPLE_CONNECT_CASE_NO_CERT":
assert s.get_negotiated_protocol() is None
elif case == 'EXAMPLE_CONNECT_CASE_NO_CERT_ALPN':
assert s.get_negotiated_protocol() == 'mymqtt'
elif case == "EXAMPLE_CONNECT_CASE_NO_CERT_ALPN":
assert s.get_negotiated_protocol() == "mymqtt"
else:
assert False, f'Unexpected negotiated protocol {s.get_negotiated_protocol()}'
assert False, f"Unexpected negotiated protocol {s.get_negotiated_protocol()}"
finally:
dut.write('stop')
dut.write('destroy')
dut.write("stop")
dut.write("destroy")
@pytest.mark.ethernet
@idf_parametrize('target', ['esp32'], indirect=['target'])
@pytest.mark.eth_ip101
@idf_parametrize("target", ["esp32"], indirect=["target"])
def test_mqtt_connect(
dut: Dut,
log_performance: Callable[[str, object], None],
@@ -262,15 +270,15 @@ def test_mqtt_connect(
3. send and receive data
"""
# check and log bin size
binary_file = os.path.join(dut.app.binary_path, 'mqtt_publish_connect_test.bin')
binary_file = os.path.join(dut.app.binary_path, "mqtt_publish_connect_test.bin")
bin_size = os.path.getsize(binary_file)
log_performance('mqtt_publish_connect_test_bin_size', f'{bin_size // 1024} KB')
log_performance("mqtt_publish_connect_test_bin_size", f"{bin_size // 1024} KB")
ip = get_dut_ip(dut)
set_server_cert_cn(ip)
uri = f'mqtts://{ip}:{SERVER_PORT}'
uri = f"mqtts://{ip}:{SERVER_PORT}"
# Look for test case symbolic names and publish configs
cases = get_test_cases(dut)
dut.expect_exact('mqtt>', timeout=30)
dut.expect_exact("mqtt>", timeout=30)
run_cases(dut, uri, cases)
@@ -33,16 +33,17 @@ class MqttPublisher(mqtt.Client):
def __init__(self, config, log_details=False): # type: (MqttPublisher, dict, bool) -> None
self.log_details = log_details
self.config = config
self.expected_data = f'{config["pattern"] * config["scenario"]["msg_len"]}'
self.expected_data = f"{config['pattern'] * config['scenario']['msg_len']}"
self.received = 0
self.subscribe_mid = 0
self.lock = Lock()
self.event_client_connected = Event()
self.event_client_subscribed = Event()
self.event_client_got_all = Event()
transport = 'websockets' if self.config['transport'] in ['ws', 'wss'] else 'tcp'
client_id = 'MqttTestRunner' + ''.join(
random.choice(string.ascii_uppercase + string.ascii_lowercase) for _ in range(5)
transport = "websockets" if self.config["transport"] in ["ws", "wss"] else "tcp"
client_id = "MqttTestRunner" + "".join(
random.choice(string.ascii_uppercase + string.ascii_lowercase)
for _ in range(5)
)
super().__init__(client_id, userdata=0, transport=transport)
@@ -50,65 +51,87 @@ class MqttPublisher(mqtt.Client):
if self.log_details:
logging.info(text)
def on_subscribe(self, client: Any, userdata: Any, mid: Any, granted_qos: Any) -> None:
def on_subscribe(
self, client: Any, userdata: Any, mid: Any, granted_qos: Any
) -> None:
"""Verify successful subscription."""
if mid == self.subscribe_mid:
logging.info(f'Subscribed to {self.config["subscribe_topic"]} successfully with QoS: {granted_qos}')
logging.info(
f"Subscribed to {self.config['subscribe_topic']} successfully with QoS: {granted_qos}"
)
self.event_client_subscribed.set()
def on_connect(self, mqttc: Any, obj: Any, flags: Any, rc: int) -> None:
self.event_client_connected.set()
def on_connect_fail(self, mqttc: Any, obj: Any) -> None:
logging.error('Connect failed')
logging.error("Connect failed")
def on_message(self, mqttc: mqtt.Client, obj: Any, msg: mqtt.MQTTMessage) -> None:
payload = msg.payload.decode('utf-8')
payload = msg.payload.decode("utf-8")
if payload == self.expected_data:
self.received += 1
if self.received == self.config['scenario']['nr_of_msgs']:
if self.received == self.config["scenario"]["nr_of_msgs"]:
self.event_client_got_all.set()
else:
differences = len(list(filter(lambda data: data[0] != data[1], zip(payload, self.expected_data))))
differences = len(
list(
filter(
lambda data: data[0] != data[1],
zip(payload, self.expected_data),
)
)
)
logging.error(
f'Payload on topic "{msg.topic}" (QoS {msg.qos}) differs in {differences} positions '
'from expected data. '
f'Received size: {len(payload)}, expected size: {len(self.expected_data)}.'
"from expected data. "
f"Received size: {len(payload)}, expected size: {len(self.expected_data)}."
)
logging.info(f'Repetitions: {payload.count(self.config["pattern"])}')
logging.info(f'Pattern: {self.config["pattern"]}')
logging.info(f'First: {payload[:DEFAULT_MSG_SIZE]}')
logging.info(f'Last: {payload[-DEFAULT_MSG_SIZE:]}')
logging.info(f"Repetitions: {payload.count(self.config['pattern'])}")
logging.info(f"Pattern: {self.config['pattern']}")
logging.info(f"First: {payload[:DEFAULT_MSG_SIZE]}")
logging.info(f"Last: {payload[-DEFAULT_MSG_SIZE:]}")
matcher = difflib.SequenceMatcher(a=payload, b=self.expected_data)
for match in matcher.get_matching_blocks():
logging.info(f'Match: {match}')
logging.info(f"Match: {match}")
def __enter__(self) -> Any:
qos = self.config['qos']
broker_host = self.config['broker_host_' + self.config['transport']]
broker_port = self.config['broker_port_' + self.config['transport']]
connect_timeout_seconds = self.config.get('client_connect_timeout', 30)
qos = self.config["qos"]
broker_host = self.config["broker_host_" + self.config["transport"]]
broker_port = self.config["broker_port_" + self.config["transport"]]
connect_timeout_seconds = self.config.get("client_connect_timeout", 30)
try:
self.print_details('Connecting...')
if self.config['transport'] in ['ssl', 'wss']:
self.tls_set(None, None, None, cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
self.print_details("Connecting...")
if self.config["transport"] in ["ssl", "wss"]:
self.tls_set(
None,
None,
None,
cert_reqs=ssl.CERT_NONE,
tls_version=ssl.PROTOCOL_TLSv1_2,
ciphers=None,
)
self.tls_insecure_set(True)
self.event_client_connected.clear()
self.loop_start()
self.connect(broker_host, broker_port, 60) # paho's keepalive
except Exception:
self.print_details(f'ENV_TEST_FAILURE: Unexpected error while connecting to broker {broker_host}')
self.print_details(
f"ENV_TEST_FAILURE: Unexpected error while connecting to broker {broker_host}"
)
raise
self.print_details(f'Connecting py-client to broker {broker_host}:{broker_port}...')
self.print_details(
f"Connecting py-client to broker {broker_host}:{broker_port}..."
)
if not self.event_client_connected.wait(timeout=connect_timeout_seconds):
raise ValueError(
f'ENV_TEST_FAILURE: Test script cannot connect to broker: {broker_host} '
f'within {connect_timeout_seconds}s'
f"ENV_TEST_FAILURE: Test script cannot connect to broker: {broker_host} "
f"within {connect_timeout_seconds}s"
)
self.event_client_got_all.clear()
result, self.subscribe_mid = self.subscribe(self.config['subscribe_topic'], qos)
result, self.subscribe_mid = self.subscribe(self.config["subscribe_topic"], qos)
assert result == 0
return self
@@ -123,58 +146,66 @@ def get_configurations(dut: Dut, test_case: Any) -> Dict[str, Any]:
@no_type_check
def get_config_from_dut(dut, config_option):
# logging.info('Option:', config_option, dut.app.sdkconfig.get(config_option))
value = re.search(r'\:\/\/([^:]+)\:([0-9]+)', dut.app.sdkconfig.get(config_option))
# logging.info("Option:", config_option, dut.app.sdkconfig.get(config_option))
value = re.search(
r"\:\/\/([^:]+)\:([0-9]+)", dut.app.sdkconfig.get(config_option)
)
if value is None:
return None, None
return value.group(1), int(value.group(2))
# Get publish test configuration
publish_cfg['broker_host_ssl'], publish_cfg['broker_port_ssl'] = get_config_from_dut(
dut, 'EXAMPLE_BROKER_SSL_URI'
publish_cfg["broker_host_ssl"], publish_cfg["broker_port_ssl"] = (
get_config_from_dut(dut, "EXAMPLE_BROKER_SSL_URI")
)
publish_cfg['broker_host_tcp'], publish_cfg['broker_port_tcp'] = get_config_from_dut(
dut, 'EXAMPLE_BROKER_TCP_URI'
publish_cfg["broker_host_tcp"], publish_cfg["broker_port_tcp"] = (
get_config_from_dut(dut, "EXAMPLE_BROKER_TCP_URI")
)
publish_cfg['broker_host_ws'], publish_cfg['broker_port_ws'] = get_config_from_dut(dut, 'EXAMPLE_BROKER_WS_URI')
publish_cfg['broker_host_wss'], publish_cfg['broker_port_wss'] = get_config_from_dut(
dut, 'EXAMPLE_BROKER_WSS_URI'
publish_cfg["broker_host_ws"], publish_cfg["broker_port_ws"] = (
get_config_from_dut(dut, "EXAMPLE_BROKER_WS_URI")
)
publish_cfg["broker_host_wss"], publish_cfg["broker_port_wss"] = (
get_config_from_dut(dut, "EXAMPLE_BROKER_WSS_URI")
)
except Exception:
logging.info('ENV_TEST_FAILURE: Some mandatory PUBLISH test case not found in sdkconfig')
logging.info(
"ENV_TEST_FAILURE: Some mandatory PUBLISH test case not found in sdkconfig"
)
raise
transport, qos, enqueue, scenario = test_case
if publish_cfg['broker_host_' + transport] is None:
pytest.skip(f'Skipping transport: {transport}...')
publish_cfg['scenario'] = scenario
publish_cfg['qos'] = qos
publish_cfg['enqueue'] = enqueue
publish_cfg['transport'] = transport
publish_cfg['pattern'] = ''.join(
random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(DEFAULT_MSG_SIZE)
if publish_cfg["broker_host_" + transport] is None:
pytest.skip(f"Skipping transport: {transport}...")
publish_cfg["scenario"] = scenario
publish_cfg["qos"] = qos
publish_cfg["enqueue"] = enqueue
publish_cfg["transport"] = transport
publish_cfg["pattern"] = "".join(
random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits)
for _ in range(DEFAULT_MSG_SIZE)
)
publish_cfg['client_connect_timeout'] = 30
publish_cfg['dut_subscribe_timeout'] = 60
publish_cfg['publish_ack_timeout'] = 60
publish_cfg['test_timeout'] = get_timeout(test_case)
publish_cfg["client_connect_timeout"] = 30
publish_cfg["dut_subscribe_timeout"] = 60
publish_cfg["publish_ack_timeout"] = 60
publish_cfg["test_timeout"] = get_timeout(test_case)
unique_topic = ''.join(
random.choice(string.ascii_uppercase + string.ascii_lowercase) for _ in range(DEFAULT_MSG_SIZE)
unique_topic = "".join(
random.choice(string.ascii_uppercase + string.ascii_lowercase)
for _ in range(DEFAULT_MSG_SIZE)
)
publish_cfg['subscribe_topic'] = 'test/subscribe_to/' + unique_topic
publish_cfg['publish_topic'] = 'test/subscribe_to/' + unique_topic
logging.info(f'configuration: {publish_cfg}')
publish_cfg["subscribe_topic"] = "test/subscribe_to/" + unique_topic
publish_cfg["publish_topic"] = "test/subscribe_to/" + unique_topic
logging.info(f"configuration: {publish_cfg}")
return publish_cfg
@contextlib.contextmanager
def connected_and_subscribed(dut: Dut, config: Dict[str, Any]) -> Any:
dut.write('start')
dut_subscribe_timeout = config.get('dut_subscribe_timeout', 60)
dut.expect(re.compile(rb'MQTT_EVENT_SUBSCRIBED'), timeout=dut_subscribe_timeout)
dut.write("start")
dut_subscribe_timeout = config.get("dut_subscribe_timeout", 60)
dut.expect(re.compile(rb"MQTT_EVENT_SUBSCRIBED"), timeout=dut_subscribe_timeout)
yield
dut.write('stop')
dut.write("stop")
def get_scenarios() -> List[Dict[str, int]]:
@@ -182,154 +213,214 @@ def get_scenarios() -> List[Dict[str, int]]:
# Initialize message sizes and repeat counts (if defined in the environment)
for i in count(0):
# Check env variable: MQTT_PUBLISH_MSG_{len|repeat}_{x}
env_dict = {var: 'MQTT_PUBLISH_MSG_' + var + '_' + str(i) for var in ['len', 'repeat']}
if os.getenv(env_dict['len']) and os.getenv(env_dict['repeat']):
scenarios.append({var: int(os.getenv(env_dict[var])) for var in ['len', 'repeat']}) # type: ignore
env_dict = {
var: "MQTT_PUBLISH_MSG_" + var + "_" + str(i) for var in ["len", "repeat"]
}
len_val = os.getenv(env_dict["len"])
repeat_val = os.getenv(env_dict["repeat"])
if len_val is not None and repeat_val is not None:
scenarios.append({"msg_len": int(len_val), "nr_of_msgs": int(repeat_val)})
continue
break
if not scenarios: # No message sizes present in the env - set defaults
logging.info('Using predefined cases')
logging.info("Using predefined cases")
scenarios = [
{'msg_len': 0, 'nr_of_msgs': 5}, # zero-sized messages
{'msg_len': 2, 'nr_of_msgs': 5}, # short messages
{'msg_len': 200, 'nr_of_msgs': 3}, # long messages
{"msg_len": 0, "nr_of_msgs": 5}, # zero-sized messages
{"msg_len": 2, "nr_of_msgs": 5}, # short messages
{"msg_len": 200, "nr_of_msgs": 3}, # long messages
]
return scenarios
def get_timeout(test_case: Any) -> int:
transport, qos, enqueue, scenario = test_case
timeout = int(scenario['nr_of_msgs'] * 20)
timeout = int(scenario["nr_of_msgs"] * 20)
if qos == 1:
timeout += 30
if qos == 2:
timeout += 45
if transport in ['ws', 'wss']:
if transport in ["ws", "wss"]:
timeout += 30
return timeout
def run_publish_test_case(dut: Dut, config: Any) -> None:
logging.info(
f'Starting Publish test: transport:{config["transport"]}, qos:{config["qos"]},'
f'nr_of_msgs:{config["scenario"]["nr_of_msgs"]},'
f' msg_size:{config["scenario"]["msg_len"]}, enqueue:{config["enqueue"]}'
f"Starting Publish test: transport:{config['transport']}, qos:{config['qos']},"
f"nr_of_msgs:{config['scenario']['nr_of_msgs']},"
f" msg_size:{config['scenario']['msg_len']}, enqueue:{config['enqueue']}"
)
dut.write(
f'publish_setup {config["transport"]} {config["publish_topic"]}'
f' {config["subscribe_topic"]} {config["pattern"]} {config["scenario"]["msg_len"]}'
f"publish_setup {config['transport']} {config['publish_topic']}"
f" {config['subscribe_topic']} {config['pattern']} {config['scenario']['msg_len']}"
)
with MqttPublisher(config) as publisher, connected_and_subscribed(dut, config):
py_client_subscribe_timeout = config.get('py_client_subscribe_timeout', config['test_timeout'])
assert publisher.event_client_subscribed.wait(timeout=py_client_subscribe_timeout), 'Runner failed to subscribe'
msgs_published: List[mqtt.MQTTMessageInfo] = []
dut.write(f'publish {config["scenario"]["nr_of_msgs"]} {config["qos"]} {config["enqueue"]}')
assert publisher.event_client_got_all.wait(timeout=config['test_timeout']), (
f'Not all data received from ESP32: {config["transport"]} '
f'qos={config["qos"]} received: {publisher.received} '
f'expected: {config["scenario"]["nr_of_msgs"]}'
py_client_subscribe_timeout = config.get(
"py_client_subscribe_timeout", config["test_timeout"]
)
logging.info(' - all data received from ESP32')
payload = config['pattern'] * config['scenario']['msg_len']
for _ in range(config['scenario']['nr_of_msgs']):
assert publisher.event_client_subscribed.wait(
timeout=py_client_subscribe_timeout
), "Runner failed to subscribe"
msgs_published: List[mqtt.MQTTMessageInfo] = []
dut.write(
f"publish {config['scenario']['nr_of_msgs']} {config['qos']} {config['enqueue']}"
)
assert publisher.event_client_got_all.wait(timeout=config["test_timeout"]), (
f"Not all data received from ESP32: {config['transport']} "
f"qos={config['qos']} received: {publisher.received} "
f"expected: {config['scenario']['nr_of_msgs']}"
)
logging.info(" - all data received from ESP32")
payload = config["pattern"] * config["scenario"]["msg_len"]
for _ in range(config["scenario"]["nr_of_msgs"]):
with publisher.lock:
msg = publisher.publish(topic=config['publish_topic'], payload=payload, qos=config['qos'])
if config['qos'] > 0:
msg = publisher.publish(
topic=config["publish_topic"], payload=payload, qos=config["qos"]
)
if config["qos"] > 0:
msgs_published.append(msg)
logging.info(f'Published: {len(msgs_published)} messages from script with QoS > 0 needing ACK.')
logging.info(
f"Published: {len(msgs_published)} messages from script with QoS > 0 needing ACK."
)
if msgs_published:
publish_ack_timeout_seconds = config.get('publish_ack_timeout', 60) # Default 60s, make configurable
publish_ack_timeout_seconds = config.get(
"publish_ack_timeout", 60
) # Default 60s, make configurable
ack_wait_start_time = time.time()
initial_unacked_count = len(msgs_published)
logging.info(f'Waiting {initial_unacked_count} publish ack with timeout {publish_ack_timeout_seconds}s...')
logging.info(
f"Waiting {initial_unacked_count} publish ack with timeout {publish_ack_timeout_seconds}s..."
)
while msgs_published:
if time.time() - ack_wait_start_time > publish_ack_timeout_seconds:
unacked_mids = [msg.mid for msg in msgs_published if msg.mid is not None and not msg.is_published()]
unacked_mids = [
msg.mid
for msg in msgs_published
if msg.mid is not None and not msg.is_published()
]
logging.error(
f'Timeout waiting for publish acknowledgements. '
f'{len(unacked_mids)} of {initial_unacked_count} messages remain unacknowledged. '
f'Unacked MIDs: {unacked_mids}'
f"Timeout waiting for publish acknowledgements. "
f"{len(unacked_mids)} of {initial_unacked_count} messages remain unacknowledged. "
f"Unacked MIDs: {unacked_mids}"
)
# This will likely cause the test to fail at a later assertion,
# or you could raise an explicit error here.
# e.g. raise Exception('Timeout waiting for publish acknowledgements')
break
msgs_published = [msg for msg in msgs_published if not msg.is_published()]
msgs_published = [
msg for msg in msgs_published if not msg.is_published()
]
if msgs_published: # Avoid busy-looping if list is not empty
time.sleep(0.1) # Brief pause
if not msgs_published:
logging.info('All script-published QoS > 0 messages acknowledged by broker.')
logging.info(
"All script-published QoS > 0 messages acknowledged by broker."
)
logging.info('All messages from runner published (or timed out waiting for ACK).')
logging.info(
"All messages from runner published (or timed out waiting for ACK)."
)
try:
dut.expect(re.compile(rb'Correct pattern received exactly x times'), timeout=config['test_timeout'])
dut.expect(
re.compile(rb"Correct pattern received exactly x times"),
timeout=config["test_timeout"],
)
except pexpect.exceptions.ExceptionPexpect:
dut.write('publish_report')
dut.expect(re.compile(rb'Test Report'), timeout=30)
dut.write("publish_report")
dut.expect(re.compile(rb"Test Report"), timeout=30)
raise
logging.info('ESP32 received all data from runner')
logging.info("ESP32 received all data from runner")
stress_scenarios = [{'msg_len': 20, 'nr_of_msgs': 30}] # many medium sized
transport_cases = ['tcp', 'ws', 'wss', 'ssl']
stress_scenarios = [{"msg_len": 20, "nr_of_msgs": 30}] # many medium sized
transport_cases = ["tcp", "ws", "wss", "ssl"]
qos_cases = [0, 1, 2]
enqueue_cases = [0, 1]
local_broker_supported_transports = ['tcp']
local_broker_supported_transports = ["tcp"]
local_broker_scenarios = [
{'msg_len': 0, 'nr_of_msgs': 5}, # zero-sized messages
{'msg_len': 5, 'nr_of_msgs': 20}, # short messages
{'msg_len': 500, 'nr_of_msgs': 10}, # long messages
{'msg_len': 20, 'nr_of_msgs': 20},
{"msg_len": 0, "nr_of_msgs": 5}, # zero-sized messages
{"msg_len": 5, "nr_of_msgs": 20}, # short messages
{"msg_len": 500, "nr_of_msgs": 10}, # long messages
{"msg_len": 20, "nr_of_msgs": 20},
] # many medium sized
def make_cases(transport: Any, scenarios: List[Dict[str, int]]) -> List[Tuple[str, int, int, Dict[str, int]]]:
return [test_case for test_case in product(transport, qos_cases, enqueue_cases, scenarios)]
def make_cases(
transport: Any, scenarios: List[Dict[str, int]]
) -> List[Tuple[str, int, int, Dict[str, int]]]:
return [
test_case
for test_case in product(transport, qos_cases, enqueue_cases, scenarios)
]
def generate_test_id(test_case: Tuple[str, int, int, Dict[str, int]]) -> str:
"""Generate a descriptive test ID from test case parameters."""
transport, qos, enqueue, scenario = test_case
msg_len = scenario["msg_len"]
nr_of_msgs = scenario["nr_of_msgs"]
enqueue_str = "enq_" if enqueue else ""
test_id = f"{transport}_qos{qos}_{enqueue_str}{msg_len}sz_{nr_of_msgs}msg"
return test_id
test_cases = make_cases(transport_cases, get_scenarios())
stress_test_cases = make_cases(transport_cases, stress_scenarios)
@pytest.mark.ethernet
@pytest.mark.parametrize('test_case', test_cases)
@pytest.mark.parametrize('config', ['default'], indirect=True)
@idf_parametrize('target', ['esp32'], indirect=['target'])
@pytest.mark.eth_ip101
@pytest.mark.parametrize(
"test_case", test_cases, ids=[generate_test_id(case) for case in test_cases]
)
@pytest.mark.parametrize("config", ["default"], indirect=True)
@idf_parametrize("target", ["esp32"], indirect=["target"])
@pytest.mark.flaky(reruns=1, reruns_delay=1)
def test_mqtt_publish(dut: Dut, test_case: Any) -> None:
publish_cfg = get_configurations(dut, test_case)
dut.expect(re.compile(rb'mqtt>'), timeout=30)
dut.confirm_write('init', expect_pattern='init', timeout=30)
dut.expect(re.compile(rb"mqtt>"), timeout=30)
dut.confirm_write("init", expect_pattern="init", timeout=30)
run_publish_test_case(dut, publish_cfg)
@pytest.mark.ethernet_stress
@pytest.mark.eth_ip101_stress
@pytest.mark.nightly_run
@pytest.mark.parametrize('test_case', stress_test_cases)
@pytest.mark.parametrize('config', ['default'], indirect=True)
@pytest.mark.parametrize(
"test_case",
stress_test_cases,
ids=[generate_test_id(case) for case in stress_test_cases],
)
@pytest.mark.parametrize("config", ["default"], indirect=True)
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@idf_parametrize('target', ['esp32'], indirect=['target'])
@idf_parametrize("target", ["esp32"], indirect=["target"])
def test_mqtt_publish_stress(dut: Dut, test_case: Any) -> None:
publish_cfg = get_configurations(dut, test_case)
dut.expect(re.compile(rb'mqtt>'), timeout=30)
dut.write('init')
dut.expect(re.compile(rb"mqtt>"), timeout=30)
dut.write("init")
run_publish_test_case(dut, publish_cfg)
@pytest.mark.ethernet
@pytest.mark.parametrize('test_case', make_cases(local_broker_supported_transports, local_broker_scenarios))
@pytest.mark.parametrize('config', ['local_broker'], indirect=True)
@idf_parametrize('target', ['esp32'], indirect=['target'])
local_test_cases = make_cases(local_broker_supported_transports, local_broker_scenarios)
@pytest.mark.eth_ip101
@pytest.mark.parametrize(
"test_case",
local_test_cases,
ids=[generate_test_id(case) for case in local_test_cases],
)
@pytest.mark.parametrize("config", ["local_broker"], indirect=True)
@idf_parametrize("target", ["esp32"], indirect=["target"])
@pytest.mark.flaky(reruns=1, reruns_delay=1)
def test_mqtt_publish_local(dut: Dut, test_case: Any) -> None:
if test_case[0] not in local_broker_supported_transports:
pytest.skip(f'Skipping transport: {test_case[0]}...')
dut_ip = dut.expect(r'esp_netif_handlers: .+ ip: (\d+\.\d+\.\d+\.\d+),').group(1)
pytest.skip(f"Skipping transport: {test_case[0]}...")
dut_ip = dut.expect(r"esp_netif_handlers: .+ ip: (\d+\.\d+\.\d+\.\d+),").group(1)
publish_cfg = get_configurations(dut, test_case)
publish_cfg['broker_host_tcp'] = dut_ip
publish_cfg['broker_port_tcp'] = 1234
dut.expect(re.compile(rb'mqtt>'), timeout=30)
dut.confirm_write('init', expect_pattern='init', timeout=30)
publish_cfg["broker_host_tcp"] = dut_ip
publish_cfg["broker_port_tcp"] = 1234
dut.expect(re.compile(rb"mqtt>"), timeout=30)
dut.confirm_write("init", expect_pattern="init", timeout=30)
run_publish_test_case(dut, publish_cfg)
@@ -1,7 +1,7 @@
CONFIG_EXAMPLE_BROKER_SSL_URI="mqtts://${EXAMPLE_MQTT_BROKER_SSL}"
CONFIG_EXAMPLE_BROKER_TCP_URI="mqtt://${EXAMPLE_MQTT_BROKER_TCP}"
CONFIG_EXAMPLE_BROKER_WS_URI="ws://${EXAMPLE_MQTT_BROKER_WS}/ws"
CONFIG_EXAMPLE_BROKER_WSS_URI="wss://${EXAMPLE_MQTT_BROKER_WSS}/ws"
CONFIG_EXAMPLE_BROKER_SSL_URI="mqtts://${TEST_BROKER_BRNO_SSL}"
CONFIG_EXAMPLE_BROKER_TCP_URI="mqtt://${TEST_BROKER_BRNO_TCP}"
CONFIG_EXAMPLE_BROKER_WS_URI="ws://${TEST_BROKER_BRNO_WS}/ws"
CONFIG_EXAMPLE_BROKER_WSS_URI="wss://${TEST_BROKER_BRNO_WSS}/ws"
CONFIG_EXAMPLE_BROKER_CERTIFICATE_OVERRIDE="${EXAMPLE_MQTT_BROKER_CERTIFICATE}"
CONFIG_MBEDTLS_ASYMMETRIC_CONTENT_LEN=y
CONFIG_MBEDTLS_SSL_IN_CONTENT_LEN=16384
@@ -18,3 +18,4 @@ CONFIG_EXAMPLE_ETH_MDIO_GPIO=18
CONFIG_EXAMPLE_ETH_PHY_RST_GPIO=5
CONFIG_EXAMPLE_ETH_PHY_ADDR=1
CONFIG_EXAMPLE_CONNECT_IPV6=y
CONFIG_LOG_MAXIMUM_LEVEL_DEBUG=y
@@ -11,3 +11,4 @@ CONFIG_MQTT_USE_CUSTOM_CONFIG=y
CONFIG_MQTT_POLL_READ_TIMEOUT_MS=50
CONFIG_PARTITION_TABLE_SINGLE_APP_LARGE=y
CONFIG_EXAMPLE_RUN_LOCAL_BROKER=y
CONFIG_LOG_MAXIMUM_LEVEL_DEBUG=y