Supabase:Queues
Durable Message Queues with Guaranteed Delivery in Postgres
Supabase Queues로 Redis MQ 대체하기
핵심: pop()이 이미 존재함
pgmq 확장에는 ReadAndRemove에 해당하는 pop()이 내장되어 있다.
Supabase Queues의 public API에서도 사용 가능:
| WARNING |
| |
패턴별 비교
| Redis 패턴 | pgmq 대응 | 설명 |
| | | 큐가 비어있으면 최대 N초 대기 |
| | | 읽고 즉시 삭제 |
| | | 메시지 전송 |
| visibility timeout | | 읽은 후 VT 동안 다른 consumer에게 안 보임 |
Timeout이 있는 RPC 만들기
Redis BRPOP처럼 "메시지가 올 때까지 대기"하려면 read_with_poll을 사용한다.
SELECT * FROM pgmq.read_with_poll(
'my_queue', -- queue_name
30, -- vt: visibility timeout (초)
1, -- qty: 가져올 메시지 수
10, -- max_poll_seconds: 최대 대기 시간 (기본 5초)
250 -- poll_interval_ms: 폴링 간격 (기본 100ms)
);
RPC 함수 예시
CREATE OR REPLACE FUNCTION public.dequeue_task(
p_queue text,
p_timeout_seconds int DEFAULT 10
)
RETURNS jsonb
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = ''
AS $$
DECLARE
v_msg RECORD;
BEGIN
-- read_with_poll: 메시지가 올 때까지 최대 p_timeout_seconds 대기
SELECT *
INTO v_msg
FROM pgmq.read_with_poll(
p_queue,
30, -- visibility timeout
1, -- 1개만
p_timeout_seconds, -- long-poll 대기 시간
200 -- poll interval ms
);
IF v_msg IS NULL THEN
RETURN NULL;
END IF;
-- pop 시맨틱스: 읽은 즉시 삭제
PERFORM pgmq.delete(p_queue, v_msg.msg_id);
RETURN v_msg.message;
END;
$$;
클라이언트 호출
const { data, error } = await supabase.rpc('dequeue_task', {
p_queue: 'tasks',
p_timeout_seconds: 10
});
안전한 패턴 (at-least-once)
메시지 유실이 걱정된다면 pop() 대신 read → process → delete/archive 패턴을 권장한다.
-- 1. 읽기 (VT=60초 동안 다른 consumer에게 안 보임)
SELECT * FROM pgmq.read('my_queue', 60, 1);
-- 2. 처리 성공 후 삭제 또는 아카이브
SELECT pgmq.archive('my_queue', 123);
-- 처리 실패 시? → 60초 후 자동으로 다시 visible → 재처리
요약
-
pgmq.pop()— ReadAndRemove가 이미 있음 (at-most-once) -
pgmq.read_with_poll()— RedisBRPOP같은 blocking 대기 지원 - RPC로 감싸기 —
read_with_poll+delete를 조합하면 timeout 있는 pop RPC 구현 가능 -
pgmq_public.*스키마를 통해 Supabase client에서도 직접 호출 가능