Thiết kế Bộ đệm MQTT chống mất kết nối với SQLite

06 tháng 4, 2026·7 phút đọc

Bài viết chia sẻ cách xây dựng một dịch vụ MQTT nhẹ, chịu lỗi tốt bằng cách dùng SQLite làm bộ đệm bền vững. Giải pháp cho phép thu nhận dữ liệu liên tục dù dịch vụ hạ tầng phía sau gặp sự cố, đảm bảo dữ liệu không bị mất và tái truyền khi cần.

Thiết kế Bộ đệm MQTT chống mất kết nối với SQLite

Thiết kế Bộ đệm MQTT chống mất kết nối với SQLite

Trong hệ thống thu thập dữ liệu sử dụng giao thức MQTT (Message Queuing Telemetry Transport), câu hỏi quan trọng nhất là:

"Điều gì xảy ra khi dịch vụ phía sau biến mất trong 20 phút?"

Đó cũng chính là thách thức khi xây dựng một client nhẹ để nhận dữ liệu từ các thiết bị MQTT. Thay vì gửi thẳng từng tin nhắn MQTT đến API HTTP, giải pháp cần tính đến tính không ổn định của mạng, phổ biến trong các hệ thống phân tán.

Nếu API phía sau bị chậm, timeout hay lỗi xác thực, toàn bộ luồng xử lý sẽ sụp đổ theo. Do đó, mục tiêu là thiết kế một hệ thống có thể tiếp tục nhận dữ liệu kể cả khi các thành phần còn lại gặp sự cố.

Kiến trúc dịch vụ

Dịch vụ này nằm giữa MQTT broker và endpoint HTTP nhận dữ liệu sau cùng. Dòng dữ liệu chạy như sau:

MQTT Broker
   ↓
MQTT Client (on_message)
   ↓
latest_by_topic (bộ lọc trùng bộ nhớ)
   ↓
Flush Worker (ghi định kỳ)
   ↓
SQLite — mqtt_buffer
   ↓
Sender Worker (gửi theo lô, xác thực)
   ↓
Downstream HTTP

Dịch vụ đăng ký các topic như sites, inverters, strings, weather, grid với cấu trúc:

  • Một process Python
  • Vòng lặp client MQTT duy nhất
  • Hai thread làm việc nền (Flush Worker và Sender Worker)
  • Một file SQLite làm hàng đợi bền vững

Phân tách ba bước nhận, lưu, chuyển tiếp là chủ ý để từng bước có thể thất bại độc lập mà không ảnh hưởng toàn hệ thống.

Tại sao không gửi trực tiếp

Phương pháp dễ nhất là nhận tin trên callback on_message rồi gửi HTTP luôn. Ý tưởng này thoạt nhìn ổn trong demo nhưng không hiệu quả trong môi trường thực tế, bởi vì hàm callback giờ đây phải xử lý:

  • Độ trễ API phía sau
  • Tính sẵn sàng của API
  • Lỗi xác thực hoặc token
  • Cơ chế retry
  • Độ ổn định mạng
  • Xử lý gửi một phần thành công

Tập trung tất cả vào một hàm callback khiến hệ thống bị phụ thuộc chặt chẽ, dễ bị vỡ khi một phần có vấn đề. Nhận dữ liệu và gửi dữ liệu là hai giai đoạn riêng biệt.

Quyết định thiết kế chính

1. SQLite làm biên giới bền vững

Buộc phải phân biệt rạch ròi giữa "đã nhận tin" và "đã lưu bền vững". Tin chỉ thực sự an toàn khi đã ghi vào SQLite.

Bảng mqtt_buffer lưu các trường:

CộtÝ nghĩa
idĐánh số, xử lý trùng lặp
topicNguồn dữ liệu
tsThời gian
payloadNội dung thô
qosMức độ chất lượng giao hàng
retainCờ giữ lại MQTT
attemptsSố lần thử gửi
last_errorLý do thất bại gần nhất

Bản ghi chỉ bị xóa khi dịch vụ phía dưới xác nhận thành công, nghĩa là nếu gửi thất bại thì bản ghi vẫn nằm đó để thử lại. SQLite được lựa chọn vì gọn nhẹ, nhúng được và dễ quản lý qua SSH, rất phù hợp cho dịch vụ chạy tại thiết bị biên (edge).

2. Giữ callback gọn nhẹ

Hàm on_message chỉ làm bốn việc:

  • Phân tích tin nhắn
  • Lưu metadata và timestamp
  • Cập nhật trạng thái bộ nhớ
  • Thoát ngay, không ghi đĩa, không gửi HTTP, không xử lý xác thực hay retry

Điều này giúp desvices MQTT vẫn nhận tin nhanh, không bị phụ thuộc đến tình trạng dịch vụ gửi phía sau.

3. Hai worker tách biệt nhiệm vụ

  • Flush worker: định kỳ lấy dữ liệu trong bộ nhớ đệm rồi ghi vào SQLite theo lô.
  • Sender worker: kiểm tra tình trạng dịch vụ dưới, lấy token JWT, gửi dữ liệu theo batch, xóa bản ghi được xác nhận, cập nhật lỗi và số lần thử.

Cách tách này giúp:

  • Đường truyền phía dưới offline thì dịch vụ vẫn nhận được dữ liệu.
  • Lỗi xác thực không ảnh hưởng đến bộ đệm nhận dữ liệu.
  • Tốc độ gửi chậm hơn tốc độ nhận thì SQLite bù đắp kịp thời.

4. SQLite là hàng đợi bền vững

Mặc dù SQLite không phải trung gian tin nhắn (message broker) chuyên dụng, nhưng nó đáp ứng tốt các yêu cầu:

  • Thêm bản ghi mới
  • Đọc bản ghi theo thứ tự cũ nhất
  • Thử lại gửi khi gặp lỗi
  • Xóa dữ liệu chỉ khi được xác nhận

Các thiết lập quan trọng:

  • Chế độ WAL cho phép đọc ghi đồng thời
  • synchronous=FULL đảm bảo không mất dữ liệu khi crash hệ điều hành
  • busy_timeout xử lý tranh chấp khoá

Điều này đảm bảo hệ thống ổn định khi hoạt động lâu dài dưới tải.

5. Gửi theo lô

Dữ liệu không được gửi từng bản ghi một ngay lập tức mà gửi theo lô cấu hình bởi:

  • SEND_BATCH_SIZE
  • SEND_INTERVAL_SECONDS

Giúp kiểm soát tải dịch vụ dưới, giảm số lần retry, dễ tinh chỉnh tăng/giảm throughput.

6. Xác thực (Auth) tách biệt với nhận dữ liệu

Xác thực JWT chỉ do sender worker xử lý:

  • Lấy token từ AUTH_URL khi khởi động
  • Lưu cache trong bộ nhớ
  • Đính token vào các request
  • Nếu nhận lỗi 401 thì refresh token

Nếu dịch vụ xác thực không sẵn sàng, nhận dữ liệu vẫn hoạt động, chỉ có gửi bị tạm dừng.

7. Kiểm tra truy cập trước khi gửi

Trước mỗi lần gửi, sender worker kiểm tra nhanh trạng thái endpoint HTTP hoặc dò cổng TCP. Điều này tránh việc gửi dữ liệu khi không cần thiết, giúp retry ổn định hơn.

8. Delivery tối thiểu một lần (At-least-once)

Dịch vụ đảm bảo dữ liệu được gửi xuống dưới ít nhất một lần, không đảm bảo chính xác một lần. Do đó, phía nhận dữ liệu cần xử lý idempotent để tránh trùng lặp ảnh hưởng.

9. Đặc tính vận hành

  • Log được ghi ra stdout, dễ theo dõi
  • Các lỗi như gửi thất bại, lỗi xác thực, mất kết nối MQTT được báo rõ ràng
  • Xử lý tín hiệu SIGINT/SIGTERM để tắt dần an toàn, không mất dữ liệu chưa gửi
  • Khi khởi động lại, retry các bản ghi chưa xác nhận

Thực tế triển khai và bài học

  • Cần giám sát dung lượng file SQLite vì nó tăng theo thời gian khi dịch vụ phía dưới mất kết nối.
  • Cần lưu file SQLite trên ổ nhớ bền vững.
  • Cần định nghĩa rõ ràng giới hạn an toàn là khi ghi vào SQLite.
  • Xây dựng đồng bộ, khóa truy cập shared state giữa các thread.
  • Nhận ra rằng độ tin cậy chủ yếu là vấn đề đặt trạng thái (state) ở đâu.
  • Nên ưu tiên hệ thống "hoạt động ổn định khi bị lỗi" hơn là chỉ "hoạt động khi khỏe".

Kết luận

Dự án không đơn thuần là xây MQTT subscriber mà là tạo ra một biên giới truyền tải chịu lỗi, giúp hệ thống "nhận hết, ghi lại hết, gửi theo lô rồi retry", giữ cho dịch vụ đơn giản, an toàn và dễ debug.

Đối với những ai từng triển khai giải pháp tương tự, câu hỏi sẽ là:

"An toàn thực sự bắt đầu từ đâu? Bộ đệm local, gửi trực tiếp hay dùng hệ thống trung gian khác như Redis, Kafka?"

Đây luôn là điểm thiết kế quan trọng quyết định sự thành công của giải pháp.


Nếu bạn quan tâm đến các giải pháp kết nối IoT bền vững, đây là bài học hữu ích về cách kết hợp MQTT với SQLite để đạt độ tin cậy cao trong môi trường mạng không ổn định.

Bài viết được tổng hợp và biên soạn bằng AI từ các nguồn tin tức công nghệ. Nội dung mang tính tham khảo. Xem bài gốc ↗