快轉到主要內容
  1. 後端首頁/

PGMQ(PostgreSQL Message Queue) 設定

·2 分鐘· ·
Blog Zh-Tw Backend Postgresql
Liu Zhe You
作者
Liu Zhe You
涉略全端、DevOps,目前專注在 Backend
目錄

PGMQ(PostgreSQL Message Queue)
#

tembo-io/pgmq

A lightweight message queue. Like AWS SQS and RSMQ but on Postgres.

Rust
2600
59

https://github.com/tembo-io/pgmq
PGMQ 是一個基於 PostgreSQL 的輕量消息隊列
使用 PostgrSQL Extension 實現

因為只有基於 PostgreSQL 所以對於需要 MQ 的輕量業務需求非常適合

目前使用的情境是以原有的 API Server + PGMQ + consumer
來做一些非同步任務和第三方服務的 retry 機制

安裝
#

Tembo Docker Image
#

  • 先確認系統是否有安裝:
    • Docker
    • Docker Compose

最簡單的方式是直接使用 Tembo 官方的 Docker Image

docker run -d --name postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 quay.io/tembo/pg16-pgmq:latest

從 Extension 安裝
#

如果想要用原本的 postgres Image 以 Extension 的方式安裝:

  1. 下載最新 PGMQ 的 SQL 和 Control 檔案
curl -o pgmq.sql https://raw.githubusercontent.com/tembo-io/pgmq/main/pgmq-extension/sql/pgmq.sql
curl -o pgmq.control https://raw.githubusercontent.com/tembo-io/pgmq/main/pgmq-extension/pgmq.control
  1. 更新 docker-compose.yml 加上 Volume 的設定
  • db.sql 是一些初始化的 SQL 檔案(看專案有沒有需要)
  • pgmq.sqlpgmq.control 是 PGMQ 的 SQL 和 Control 檔案
    • pgmq--1.3.3.sql 的版本號需要看 pgmq.control 內的版本號
  • stateful_volumes/postgresqlpostgres container 持久化 mount 的目錄
version: '3.8'

services:
  postgres:
    image: postgres:15.1
    container_name: postgres
    ports:
      - 5432:5432
    env_file:
      - postgres.env
    restart: always
    volumes:
      - ./db.sql:/docker-entrypoint-initdb.d/db.sql
      - ./pgmq.control:/usr/share/postgresql/15/extension/pgmq.control
      - ./pgmq.sql:/usr/share/postgresql/15/extension/pgmq--1.3.3.sql
      - ./stateful_volumes/postgresql:/var/lib/postgresql/data/
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "your_postgres_user", "-d", "your_postgres_db"]
      interval: 2s
      timeout: 5s
      retries: 3
  1. 啟動 postgres container

如果原本有跑過 postgres container 記得先把 volume 刪掉 !

docker compose up postgres -d

PGMQ Python Client
#

  • 安裝 pgmq Python Client
pip install tembo-pgmq-python

因為 tembo-pgmq-python 是直接使用 psycopg ( 是 psycopg3 不是 psycopg2 )
在 Mac 上如果用 poetry 安裝會有問題
所以建議直接用 pip 安裝

  • 使用 pgmq Python Client

tembo-pgmq-python 的 README 其實寫的蠻清楚的,主要就是 PGMQueueMessage 兩個 data class
vtvisibility timeout 的縮寫,是指消息在被讀取後有多久的時間內不會再被讀取

以下是一個簡單的範例:

from tembo_pgmq_python import PGMQueue, Message

queue = PGMQueue(
    host="0.0.0.0",
    port="5432",
    username="postgres",
    password="postgres",
    database="postgres"
)

msg_id: int = queue.send("my_queue", {"hello": "world"})

read_message: Message = queue.read("my_queue", vt=30)
print(read_message)

deleted: bool = queue.delete("my_queue", read_message.msg_id)

dropped: bool = queue.drop_queue("my_queue")

如果要看更多的範例可以參考 tembo-pgmq-python的 README

Reference
#

相關文章

PgBouncer: 輕量 Postgres 連接池
·2 分鐘
Blog Database Zh-Tw Postgresql
以 PgBouncer 解決 Django 後端 DB connection 過多的問題
在 SqlAlchemy 使用 Transaction
·2 分鐘
Blog Zh-Tw SqlAlchemy Backend Python
如何在 SqlAlchemy 中使用 Transaction
FastAPI: 使用 Moto 模擬 S3
·2 分鐘
Blog Zh-Tw AWS Backend Testing FastAPI
FastAPI 測試: 使用 Moto 模擬 AWS S3 Boto3
Redis 持久化設定:RDB 與 AOF
·4 分鐘
Blog Zh-Tw Database Redis
Redis 持久化設定:RDB 與 AOF
Python: 重複讀取檔案(BinaryIO)
·1 分鐘
Blog Zh-Tw Python
在 Python 中重複讀取檔案 (BinaryIO),如何解決在第二次讀取時出現空內容的問題。
Cloudflare Tunnel
·2 分鐘
Blog Zh-Tw
設定 Cloudflare Tunnel 來穿透內網 IP,Ngrok 的替代方案