Skip to content

Python:threading

Value

Return a ctypes object allocated from shared memory. By default the return value is actually a synchronized wrapper for the object.

typecode_or_type determines the type of the returned object: it is either a ctypes type or a one character typecode of the kind used by the array module. *args is passed on to the constructor for the type.

If lock is True (the default) then a new recursive lock object is created to synchronize access to the value. If lock is a Lock or RLock object then that will be used to synchronize access to the value. If lock is False then access to the returned object will not be automatically protected by a lock, so it will not necessarily be “process-safe”.

Operations like += which involve a read and write are not atomic. So if, for instance, you want to atomically increment a shared value it is insufficient to just do

counter.value += 1

Assuming the associated lock is recursive (which it is by default) you can instead do

with counter.get_lock():
    counter.value += 1

Note that lock is a keyword-only argument.

Example

import multiprocessing
def worker1(v):
    with v.get_lock():
        v.value += 1

def worker2(v):
    with v.get_lock():
        v.value += 2

ctypes_int = multiprocessing.Value("i", 0)
print ctypes_int.value

# Output: 0

process1 = multiprocessing.Process(
    target=worker1, args=[ctypes_int])
process2 = multiprocessing.Process(
    target=worker2, args=[ctypes_int])

process1.start()
process2.start()
process1.join()
process2.join()

print ctypes_int.value

RLock

"Reentrant Lock"

간혹 lock을 거는 함수가 재귀호출을 하는 경우 쓰레드가 Block되어 Lock을 해제할 수 없게 되어버림

RLock()은 쓰레드가 lock을 취득한 상태에서 lock을 다시 취득하면 lock count를 1 올리면서 즉시 return한다.

"lock 재 획득 문제를 해결"

Condition

import threading
import time

CONSUMER_COUNT = 10
PRODUCER_COUNT = CONSUMER_COUNT // 2

queue = []
cv = threading.Condition()

item_id = 0

class Consumer(threading.Thread):
    def __init__(self, id):
        threading.Thread.__init__(self)
        self.id = id

    def run(self):
        for i in range(5):
            cv.acquire()
            while len(queue) < 1:
                print('consumer({}) waiting...'.format(self.id))
                cv.wait()
            print('consumer({}) -> item({})'.format(self.id, queue.pop(0)))
            cv.release()
            time.sleep(0.5)


class Producer(threading.Thread):
    def run(self):
        global item_id
        for i in range(10):
            cv.acquire()
            item_id += 1
            queue.append(item_id)
            cv.notify()
            cv.release()
            time.sleep(0.7)


threads = []

for i in range(CONSUMER_COUNT):
    threads.append(Consumer(i))

for i in range(PRODUCER_COUNT):
    threads.append(Producer())

for th in threads:
    th.start()

for th in threads:
    th.join()

print('<End>')

간단한 Condition 과 Lock 사용 예제

from threading import Thread, Condition, Lock


class Demo(Thread):
    def __init__(self):
        self._done = False
        self._lock = Lock()
        self._condition = Condition(self._lock)

    def quit(self) -> None:
        with self._condition:
            self._done = True
            self._condition.notify_all()

    def is_done(self) -> bool:
        with self._condition:
            result = self._done
            self._condition.notify_all()
            return result

    def run(self) -> None:
        with self._condition:
            while not self._done:
                self._condition.wait()
                # TODO: run something ...

Interrupt the Main Thread

import _thread

_thread.interrupt_main()

Daemon Thread

데몬 스레드를 생성할 경우:

thread = Thread(... daemon=True)

메인 프로그램이 종료되면 데몬 스레드도 강제로 종료됩니다. 일반 스레드와 달리 메인 프로그램이 데몬 스레드가 완료되기를 기다리지 않습니다.

구체적인 차이점

  • 일반 스레드 (daemon=False) <- 기본값
    • 메인 프로그램이 모든 일반 스레드가 완료될 때까지 대기
    • 스레드가 실행 중이면 프로그램이 종료되지 않음
  • 데몬 스레드 (daemon=True)
    • 메인 프로그램 종료 시 즉시 강제 종료
    • 작업이 완료되지 않아도 프로그램과 함께 종료

사용 예시

데몬 스레드는 다음과 같은 상황에 유용합니다:

  • 백그라운드 모니터링
  • 로그 처리
  • 주기적인 정리 작업
  • 메인 작업에 영향을 주지 않는 보조 작업

See also

Favorite site