Python:threading
Value
Return a ctypes object allocated from shared memory. By default the return value is actually a synchronized wrapper for the object.
typecode_or_type determines the type of the returned object: it is either a ctypes type or a one character typecode of the kind used by the array module. *args is passed on to the constructor for the type.
If lock is True (the default) then a new recursive lock object is created to synchronize access to the value. If lock is a Lock or RLock object then that will be used to synchronize access to the value. If lock is False then access to the returned object will not be automatically protected by a lock, so it will not necessarily be “process-safe”.
Operations like += which involve a read and write are not atomic. So if, for instance, you want to atomically increment a shared value it is insufficient to just do
Assuming the associated lock is recursive (which it is by default) you can instead do
Note that lock is a keyword-only argument.
Example
import multiprocessing
def worker1(v):
with v.get_lock():
v.value += 1
def worker2(v):
with v.get_lock():
v.value += 2
ctypes_int = multiprocessing.Value("i", 0)
print ctypes_int.value
# Output: 0
process1 = multiprocessing.Process(
target=worker1, args=[ctypes_int])
process2 = multiprocessing.Process(
target=worker2, args=[ctypes_int])
process1.start()
process2.start()
process1.join()
process2.join()
print ctypes_int.value
RLock
"Reentrant Lock"
간혹 lock을 거는 함수가 재귀호출을 하는 경우 쓰레드가 Block되어 Lock을 해제할 수 없게 되어버림
RLock()은 쓰레드가 lock을 취득한 상태에서 lock을 다시 취득하면 lock count를 1 올리면서 즉시 return한다.
"lock 재 획득 문제를 해결"
Condition
import threading
import time
CONSUMER_COUNT = 10
PRODUCER_COUNT = CONSUMER_COUNT // 2
queue = []
cv = threading.Condition()
item_id = 0
class Consumer(threading.Thread):
def __init__(self, id):
threading.Thread.__init__(self)
self.id = id
def run(self):
for i in range(5):
cv.acquire()
while len(queue) < 1:
print('consumer({}) waiting...'.format(self.id))
cv.wait()
print('consumer({}) -> item({})'.format(self.id, queue.pop(0)))
cv.release()
time.sleep(0.5)
class Producer(threading.Thread):
def run(self):
global item_id
for i in range(10):
cv.acquire()
item_id += 1
queue.append(item_id)
cv.notify()
cv.release()
time.sleep(0.7)
threads = []
for i in range(CONSUMER_COUNT):
threads.append(Consumer(i))
for i in range(PRODUCER_COUNT):
threads.append(Producer())
for th in threads:
th.start()
for th in threads:
th.join()
print('<End>')
간단한 Condition 과 Lock 사용 예제
from threading import Thread, Condition, Lock
class Demo(Thread):
def __init__(self):
self._done = False
self._lock = Lock()
self._condition = Condition(self._lock)
def quit(self) -> None:
with self._condition:
self._done = True
self._condition.notify_all()
def is_done(self) -> bool:
with self._condition:
result = self._done
self._condition.notify_all()
return result
def run(self) -> None:
with self._condition:
while not self._done:
self._condition.wait()
# TODO: run something ...
Interrupt the Main Thread
Daemon Thread
데몬 스레드를 생성할 경우:
메인 프로그램이 종료되면 데몬 스레드도 강제로 종료됩니다. 일반 스레드와 달리 메인 프로그램이 데몬 스레드가 완료되기를 기다리지 않습니다.
구체적인 차이점
- 일반 스레드 (daemon=False) <- 기본값
- 메인 프로그램이 모든 일반 스레드가 완료될 때까지 대기
- 스레드가 실행 중이면 프로그램이 종료되지 않음
- 메인 프로그램 종료 시 즉시 강제 종료
- 작업이 완료되지 않아도 프로그램과 함께 종료
사용 예시
데몬 스레드는 다음과 같은 상황에 유용합니다:
- 백그라운드 모니터링
- 로그 처리
- 주기적인 정리 작업
- 메인 작업에 영향을 주지 않는 보조 작업