Skip to content

Supabase:Queues

Durable Message Queues with Guaranteed Delivery in Postgres

Supabase Queues로 Redis MQ 대체하기

핵심: pop()이 이미 존재함

pgmq 확장에는 ReadAndRemove에 해당하는 pop()이 내장되어 있다.

-- 메시지 1개를 읽고 즉시 삭제 (at-most-once)
SELECT * FROM pgmq.pop('my_queue');

Supabase Queues의 public API에서도 사용 가능:

SELECT * FROM pgmq_public.pop('my_queue');

WARNING

pop()at-most-once 시맨틱스이다. 메시지를 읽는 즉시 삭제하므로, consumer가 처리 중 실패하면 메시지가 유실된다.

패턴별 비교

Redis 패턴

pgmq 대응

설명

BRPOP (blocking pop)

pgmq.read_with_poll()

큐가 비어있으면 최대 N초 대기

RPOPLPUSH (pop)

pgmq.pop()

읽고 즉시 삭제

LPUSH

pgmq.send()

메시지 전송

visibility timeout

pgmq.read(vt)

읽은 후 VT 동안 다른 consumer에게 안 보임

Timeout이 있는 RPC 만들기

Redis BRPOP처럼 "메시지가 올 때까지 대기"하려면 read_with_poll을 사용한다.

SELECT * FROM pgmq.read_with_poll(
  'my_queue',        -- queue_name
  30,                -- vt: visibility timeout (초)
  1,                 -- qty: 가져올 메시지 수
  10,                -- max_poll_seconds: 최대 대기 시간 (기본 5초)
  250                -- poll_interval_ms: 폴링 간격 (기본 100ms)
);

RPC 함수 예시

CREATE OR REPLACE FUNCTION public.dequeue_task(
  p_queue text,
  p_timeout_seconds int DEFAULT 10
)
RETURNS jsonb
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = ''
AS $$
DECLARE
  v_msg RECORD;
BEGIN
  -- read_with_poll: 메시지가 올 때까지 최대 p_timeout_seconds 대기
  SELECT *
    INTO v_msg
    FROM pgmq.read_with_poll(
      p_queue,
      30,                  -- visibility timeout
      1,                   -- 1개만
      p_timeout_seconds,   -- long-poll 대기 시간
      200                  -- poll interval ms
    );

  IF v_msg IS NULL THEN
    RETURN NULL;
  END IF;

  -- pop 시맨틱스: 읽은 즉시 삭제
  PERFORM pgmq.delete(p_queue, v_msg.msg_id);

  RETURN v_msg.message;
END;
$$;

클라이언트 호출

const { data, error } = await supabase.rpc('dequeue_task', {
  p_queue: 'tasks',
  p_timeout_seconds: 10
});

안전한 패턴 (at-least-once)

메시지 유실이 걱정된다면 pop() 대신 read → process → delete/archive 패턴을 권장한다.

-- 1. 읽기 (VT=60초 동안 다른 consumer에게 안 보임)
SELECT * FROM pgmq.read('my_queue', 60, 1);

-- 2. 처리 성공 후 삭제 또는 아카이브
SELECT pgmq.archive('my_queue', 123);

-- 처리 실패 시? → 60초 후 자동으로 다시 visible → 재처리

요약

  • pgmq.pop() — ReadAndRemove가 이미 있음 (at-most-once)
  • pgmq.read_with_poll() — Redis BRPOP 같은 blocking 대기 지원
  • RPC로 감싸기read_with_poll + delete를 조합하면 timeout 있는 pop RPC 구현 가능
  • pgmq_public.* 스키마를 통해 Supabase client에서도 직접 호출 가능

See also

Favorite site