a.thread & process

Python中的進程與線程

學習知識,我們不但要知其然,還是知其所以然。你做到了你就比別人NB。我們先了解一下什麼是進程和線程。

進程與線程的歷史

我們都知道計算機是由硬件和軟件組成的。硬件中的CPU是計算機的核心,它承擔計算機的所有任務。操作系統是運行在硬件之上的軟件,是計算機的管理者,它負責資源的管理和分配、任務的調度。程序是運行在系統上的具有某種功能的軟件,比如說瀏覽器,音樂播放器等。每次執行程序的時候,都會完成一定的功能,比如說瀏覽器幫我們打開網頁,為了保證其獨立性,就需要一個專門的管理和控制執行程序的數據結構——進程控制塊。 進程就是一個程序在一個數據集上的一次動態執行過程。 進程一般由程序、數據集、進程控制塊三部分組成。我們編寫的程序用來描述進程要完成哪些功能以及如何完成;數據集則是程序在執行過程中所需要使用的資源;進程控制塊用來記錄進程的外部特徵,描述進程的執行變化過程,系統可以利用它來控制和管理進程,它是系統感知進程存在的唯一標誌。

在早期的操作系統裡,計算機只有一個核心,進程執行程序的最小單位,任務調度採用時間片輪轉的搶占式方式進行進程調度。每個進程都有各自的一塊獨立的內存,保證進程彼此間的內存地址空間的隔離。隨著計算機技術的發展,進程出現了很多弊端,一是進程的創建、撤銷和切換的開銷比較大,二是由於對稱多處理機(對稱多處理機(SymmetricalMulti-Processing)又叫SMP,是指在一個計算機上匯集了一組處理器(多CPU),各CPU之間共享內存子系統以及總線結構)的出現,可以滿足多個運行單位,而多進程並行開銷過大。這個時候就引入了線程的概念。線程也叫輕量級進程,它是一個基本的CPU執行單元,也是程序執行過程中的最小單元,由線程ID、程序計數器、寄存器集合和堆棧共同組成。線程的引入減小了程序並發執行時的開銷,提高了操作系統的並發性能。線程沒有自己的系統資源,只擁有在運行時必不可少的資源。但線程可以與同屬與同一進程的其他線程共享進程所擁有的其他資源。

進程與線程之間的關係

線程是屬於進程的,線程運行在進程空間內,同一進程所產生的線程共享同一內存空間,當進程退出時該進程所產生的線程都會被強制退出並清除。線程可與屬於同一進程的其它線程共享進程所擁有的全部資源,但是其本身基本上不擁有系統資源,只擁有一點在運行中必不可少的信息(如程序計數器、一組寄存器和棧)。

threading模塊

threading 模塊建立在_thread 模塊之上。thread 模塊以低級、原始的方式來處理和控制線程,而threading 模塊通過對thread 進行二次封裝,提供了更方便的api 來處理線程。

import threading import time def worker(num): """ thread worker function :return: """ time.sleep( 1 ) print ( " Thread %d " % num) return for i in range(20 ): t = threading.Thread(target=worker,args=(i,),name=“t.%d” % i) t.start()

thread方法說明

t.start() : 激活線程,

t.getName() : 獲取線程的名稱

t.setName() : 設置線程的名稱

t.name : 獲取或設置線程的名稱

t.is_alive() : 判斷線程是否為激活狀態

t.isAlive() :判斷線程是否為激活狀態

t.setDaemon() 設置為後台線程或前台線程(默認:False);通過一個布爾值設置線程是否為守護線程,必須在執行start()方法之後才可以使用。如果是後台線程,主線程執行過程中,後台線程也在進行,主線程執行完畢後,後台線程不論成功與否,均停止;如果是前台線程,主線程執行過程中,前台線程也在進行,主線程執行完畢後,等待前台線程也執行完成後,程序停止

t.isDaemon() : 判斷是否為守護線程

t.ident :獲取線程的標識符。線程標識符是一個非零整數,只有在調用了start()方法之後該屬性才有效,否則它只返回None。

t.join() :逐個執行每個線程,執行完畢後繼續往下執行,該方法使得多線程變得無意義

t.run() :線程被cpu調度後自動執行線程對象的run方法

線程鎖threading.RLock和threading.Lock

我們使用線程對數據進行操作的時候,如果多個線程同時修改某個數據,可能會出現不可預料的結果,為了保證數據的準確性,引入了鎖的概念。

例:假設列表A的所有元素就為0,當一個線程從前向後打印列表的所有元素,另外一個線程則從後向前修改列表的元素為1,那麼輸出的時候,列表的元素就會一部分為0,一部分為1,這就導致了數據的不一致。鎖的出現解決了這個問題。

import threading import time globals_num = 0 lock = threading.RLock() def Func(): lock.acquire() # 獲得鎖 global globals_num globals_num += 1 time.sleep( 1 ) print (globals_num) lock.release() # 釋放鎖 for i in range(10 ): t = threading.Thread(target= Func) t.start()

threading.RLock和threading.Lock 的區別

RLock允許在同一線程中被多次acquire。而Lock卻不允許這種情況。如果使用RLock,那麼acquire和release必須成對出現,即調用了n次acquire,必須調用n次的release才能真正釋放所佔用的瑣。

import threading lock = threading.Lock() # Lock對象lock.acquire() lock.acquire() # 產生了死瑣。lock.release() lock.release()

import threading rLock = threading.RLock() # RLock對象rLock.acquire() rLock.acquire() # 在同一線程內,程序不會堵塞。rLock.release() rLock.release()

threading.Event

Event是線程間通信最間的機制之一:一個線程發送一個event信號,其他的線程則等待這個信號。用於主線程控制其他線程的執行。Events 管理一個flag,這個flag可以使用set()設置成True或者使用clear()重置為False,wait()則用於阻塞,在flag為True之前。flag默認為False。

  • Event.wait([timeout]) : 堵塞線程,直到Event對象內部標識位被設為True或超時(如果提供了參數timeout)。

  • Event.set() :將標識位設為Ture

  • Event.clear() : 將標識伴設為False。

  • Event.isSet() :判斷標識位是否為Ture。

import threading def do(event): print ( ' start ' ) event.wait() print ( ' execute ' ) event_obj = threading.Event() for i in range(10 ): t = threading.Thread(target=do, args= (event_obj,)) t.start() event_obj.clear() inp = input( ' input: ' ) if inp == ' true ' : event_obj.set()

當線程執行的時候,如果flag為False,則線程會阻塞,當flag為True的時候,線程不會阻塞。它提供了本地和遠程的並發性。

threading.Condition:

一個condition變量總是與某些類型的鎖相聯繫,這個可以使用默認的情況或創建一個,當幾個condition變量必須共享和同一個鎖的時候,是很有用的。鎖是conditon對象的一部分:沒有必要分別跟踪。

condition變量服從上下文管理協議:with語句塊封閉之前可以獲取與鎖的聯繫。acquire() 和release() 會調用與鎖相關聯的相應的方法。

其他和鎖關聯的方法必須被調用,wait()方法會釋放鎖,當另外一個線程使用notify() or notify_all()喚醒它之前會一直阻塞。一旦被喚醒,wait()會重新獲得鎖並返回,

Condition(lock=None)

Condition類實現了一個conditon變量。這個conditiaon變量允許一個或多個線程等待,直到他們被另一個線程通知。如果lock參數,被給定一個非空的值,,那麼他必須是一個lock或者Rlock對象,它用來做底層鎖。否則,會創建一個新的Rlock對象,用來做底層鎖。

  • wait(timeout=None) : 等待通知,或者等到設定的超時時間。當調用這wait()方法時,如果調用它的線程沒有得到鎖,那麼會拋出一個RuntimeError 異常。wati()釋放鎖以後,在被調用相同條件的另一個進程用notify() or notify_all() 叫醒之前會一直阻塞。wait() 還可以指定一個超時時間。

如果有等待的線程,notify()方法會喚醒一個在等待conditon變量的線程。notify_all() 則會喚醒所有在等待conditon變量的線程。

注意: notify()和notify_all()不會釋放鎖,也就是說,線程被喚醒後不會立刻返回他們的wait() 調用。除非線程調用notify()和notify_all()之後放棄了鎖的所有權。

在典型的設計風格里,利用condition變量用鎖去通許訪問一些共享狀態,線程在獲取到它想得到的狀態前,會反複調用wait()。修改狀態的線程在他們狀態改變時調用notify() or notify_all(),用這種方式,線程會盡可能的獲取到想要的一個等待者狀態。例子: 生產者-消費者模型,

import threading import time def consumer(cond): with cond: print ( " consumer before wait " ) cond.wait() print ( " consumer after wait " ) def producer(cond): with cond: print ( " producer before notifyAll " ) cond.notifyAll() print ( " producer after notifyAll " ) condition = threading.Condition() c1 = threading.Thread(name= " c1 " , target=consumer, args= (condition,)) c2 = threading.Thread(name= " c2 " , target=consumer, args= (condition,)) p = threading.Thread(name= " p " , target=producer, args= (condition,)) c1.start() time.sleep( 2 ) c2.start() time.sleep( 2 ) p.start()

consumer()線程要等待producer()設置了Condition之後才能繼續。

queue模塊

Queue 就是對隊列,它是線程安全的

舉例來說,我們去肯德基吃飯。廚房是給我們做飯的地方,前台負責把廚房做好的飯賣給顧客,顧客則去前台領取做好的飯。這裡的前台就相當於我們的隊列。

這個模型也叫生產者-消費者模型。

import queue q = queue.Queue( maxsize=0 ) #構造一個先進顯出隊列,maxsize指定隊列長度,為0時,表示隊列長度無限制。 q.join() # 等到隊列為kong的時候,在執行別的操作 q.qsize() # 返回隊列的大小(不可靠) q.empty() # 當隊列為空的時候,返回True 否則返回False (不可靠) q.full() # 當隊列滿的時候,返回True,否則返回False (不可靠) q.put( item, block=True, timeout=None ) #將item放入Queue尾部,item必須存在,可以參數block默認為True,表示當隊列滿時,會等待隊列給出可用位置,

                         為False時為非阻塞,此時如果隊列已滿,會引發q.get(queue.Full 异常。 可选参数timeout,表示 会阻塞设置的时间,过后,

                         如果队列无法给出放入item的位置,则引发 queue.Full 异常

block=True, timeout=None ) #移除並返回隊列頭部的一個值,可選參數block默認為True,表示獲取值的時候,如果隊列為空,則阻塞,為False時,不阻塞,

                      若此時隊列為空,則引發queue.Empty異常。可選參數timeout,表示會阻塞設置的時候,過後,如果隊列為空,則引發Empty異常。

q.put_nowait(item) #等效於put(item,block=False)

q.get_nowait() #等效於get(item,block=False)

生產者--消費者:

#!/usr/bin/env python import Queue import threading message = Queue.Queue(10) def producer(i): while True: message.put(i) def consumer(i): while True: msg = message.get() for i in range(12): t = threading.Thread(target=producer, args=(i,)) t.start() for i in range(10): t = threading.Thread(target=consumer, args=(i,)) t.start()

multiprocessing模塊

multiprocessing是python的多進程管理包,和threading.Thread類似。直接從側面用subprocesses替換線程使用GIL的方式,由於這一點,multiprocessing模塊可以讓程序員在給定的機器上充分的利用CPU。

在multiprocessing中,通過創建Process對像生成進程,然後調用它的start()方法,

from multiprocessing import Process def f(name): print ( ' hello ' , name) if __name__ == ' __main__ ' : p = Process(target=f, args=( ' bob ' ,)) p.start() p.join()

進程間的數據共享

在使用並發設計的時候最好盡可能的避免共享數據,尤其是在使用多進程的時候。如果你真有需要要共享數據, multiprocessing提供了兩種方式。

Shared memory

數據可以用Value或Array存儲在一個共享內存地圖裡,如下:

from multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = - a[i] if __name__ == ' __main__ ' : num = Value( ' d ' , 0.0 ) arr = Array( ' i ' , range(10 )) p = Process(target=f, args= (num, arr)) p.start() p.join() print (num.value) print (arr[:])

輸出:

3.1415927 [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

創建num和arr時,“d”和“i”參數由Array模塊使用的typecodes創建:“d”表示一個雙精度的浮點數,“i”表示一個有符號的整數,這些共享對象將被線程安全的處理。

Array('i', range(10))中的'i'參數:

'c': ctypes.c_char 'u': ctypes.c_wchar 'b': ctypes.c_byte 'B': ctypes.c_ubyte 'h': ctypes.c_short 'H': ctypes.c_ushort 'i': ctypes.c_int 'I': ctypes.c_uint 'l': ctypes.c_long, 'L': ctypes.c_ulong 'f': ctypes.c_float 'd': ctypes.c_double

Server process

由Manager()返回的manager提供list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array類型的支持。

from multiprocessing import Process, Manager def f(d, l): d[ 1] = ' 1 ' d[ ' 2 ' ] = 2 d[ 0.25] = None l.reverse() if __name__ == ' __main__ ' : with Manager() as manager: d = manager.dict() l = manager.list(range(10 )) p = Process(target=f, args= (d, l)) p.start() p.join() print (d) print (l)

輸出:

{0.25: None, 1: ' 1 ' , ' 2 ' : 2 } [ 9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

Server process manager比shared memory 更靈活,因為它可以支持任意的對像類型。另外,一個單獨的manager可以通過進程在網絡上不同的計算機之間共享,不過他比shared memory要慢。

使用工作池(Using a pool of workers)

Pool類描述了一個工作進程池,他有幾種不同的方法讓任務卸載工作進程。

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進進程,那麼程序就會等待,直到進程池中有可用進程為止。

我們可以用Pool類創建一個進程池, 展開提交的任務給進程池。例:

from multiprocessing import Pool import time def myFun(i): time.sleep( 2 ) return i+100def end_call(arg): print ( " end_call " ,arg) p = Pool(5 )# print(p.map(myFun,range(10)))for i in range(10 ): p.apply_async(func =myFun,args=(i,),callback= end_call)print ( " end " ) p.close() p.join()

官方示例

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

一個進程池對象可以控制工作進程池的哪些工作可以被提交,它支持超時和回調的異步結果,有一個類似map的實現。

  • processes :使用的工作進程的數量,如果processes是None那麼使用os.cpu_count()返回的數量。

  • initializer: 如果initializer是None,那麼每一個工作進程在開始的時候會調用initializer(*initargs)。

  • maxtasksperchild:工作進程退出之前可以完成的任務數,完成後用一個心的工作進程來替代原進程,來讓閒置的資源被釋放。maxtasksperchild默認是None,意味著只要Pool存在工作進程就會一直存活。

  • context: 用在製定工作進程啟動時的上下文,一般使用multiprocessing.Pool() 或者一個context對象的Pool()方法來創建一個池,兩種方法都適當的設置了context

注意:Pool對象的方法只可以被創建pool的進程所調用。

New in version 3.2: maxtasksperchild

New in version 3.4: context

進程池的方法

    • apply(func[, args[, kwds]]) :使用arg和kwds參數調用func函數,結果返回前會一直阻塞,由於這個原因,apply_async()更適合併發執行,另外,func函數僅被pool中的一個進程運行。

    • apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : apply()方法的一個變體,會返回一個結果對象。如果callback被指定,那麼callback可以接收一個參數然後被調用,當結果準備好回調時會調用callback,調用失敗時,則用error_callback替換callback。Callbacks應被立即完成,否則處理結果的線程會被阻塞。

    • close() : 阻止更多的任務提交到pool,待任務完成後,工作進程會退出。

    • terminate() : 不管任務是否完成,立即停止工作進程。在對pool對象進程垃圾回收的時候,會立即調用terminate()。

    • join() : wait工作線程的退出,在調用join()前,必須調用close() or terminate()。這樣是因為被終止的進程需要被父進程調用wait(join等價與wait),否則進程會成為殭屍進程。

    • map(func, iterable[, chunksize])¶

    • map_async(func, iterable[, chunksize[, callback[, error_callback]]])¶

    • imap(func, iterable[, chunksize])¶

    • imap_unordered(func, iterable[, chunksize])

    • starmap(func, iterable[, chunksize])¶

    • starmap_async(func, iterable[, chunksize[, callback[, error_back]]])

協程

協程又叫微線程,從技術的角度來說,“協程就是你可以暫停執行的函數”。如果你把它理解成“就像生成器一樣”,那麼你就想對了。線程和進程的操作是由程序觸發系統接口,最後的執行者是系統;協程的操作則是程序員。

協程存在的意義:對於多線程應用,CPU通過切片的方式來切換線程間的執行,線程切換時需要耗時(保存狀態,下次繼續)。協程,則只使用一個線程,在一個線程中規定某個代碼塊執行順序。

協程的適用場景:當程序中存在大量不需要CPU的操作時(IO),適用於協程;

Python3.5的async/await 新特性

async/await 是對3.3版本引入yeild from 的擴展.

定義協程:

@types.coroutine # 和asyncio.coroutine一樣def ping_server(ip): # ping code here...

async def ping_server(ip): # ping code here...

注意:使用async def 定義的協程函數,不能包含任何形式的yield 語句,只有return 和await 可以從協程中返回值。await只能用於async def 中,await 接受的對象必須是awaitable 對象, awaitable 對像要麼是一個協程;要么是一個定義了__await__()方法的對象,且__await__()必須返回一個不是協程的迭代器。

Event Loop

Event Loop是一種等待程序分配時間或消息的編程架構。簡單的說就是當事件A發生的時候,我們就去執行事件B。最簡單的例子就是:當我們瀏覽網頁的時候,我們點擊頁面的某個元素,這個點擊事件會被JavaScript 捕捉到,然後JavaScript 就會檢查這個事件是否綁定了onclick()回調函數來處理這個事件,只要綁定了,onclick()回調函數就會被執行。

event loop是協程執行的控制點, 如果你希望執行協程, 就需要用到它們。

event loop提供瞭如下的特性:

  • 註冊、執行、取消延時調用(異步函數)

  • 創建用於通信的client和server協議(工具)

  • 創建和別的程序通信的子進程和協議(工具)

  • 把函數調用送入線程池中

協程示例:

import asyncio async def cor1(): print ( " COR1 start " ) await cor2() print ( " COR1 end " ) async def cor2(): print ( " COR2 " ) loop = asyncio.get_event_loop() loop.run_until_complete(cor1()) loop.close()

最後三行是重點。

  • asyncio.get_event_loop() : asyncio啟動默認的event loop

  • run_until_complete() : 這個函數是阻塞執行的,知道所有的異步函數執行完成,

  • close() : 關閉event loop。

python 2.7 中的實現:

greenlet

gevent

gevent 遇到IO操作自動切換

subprocess模塊

通過使用subprocess模塊可以創建新的進程,連接到他們的輸入/輸出/錯誤管道,並獲取他們的返回值。該模塊計劃替代及一個舊的模塊的方法:

1

2

os.system

os.spawn*

使用subprocess模塊

在所有用例調用subprocess時推薦使用run()方法,更高級的用例,可以直接使用subprocess.Popen接口。

run()方法

在Python3.5增加的。

1

subprocess.run(args, *, stdin=None, input=None, stdout=None, stderr=None, shell=False, timeout=None, check=False)

run()默認不會捕捉到標準輸出和標準錯誤輸出,要捕捉的話,可以為標準輸出和標準錯誤輸出指定subprocess.PIPE(一個特殊值,可被用於Popen的stdin, stdout或stderr參數,表示一個標準流的管道應該被打開, Popen.communicate()用的最多)。

  • args :args應該是一個字符串或者一個序列。

  • timeout:設置超時時間,會傳遞給subprocess.Popen.communicate()。如果超時,子進程會被殺死並等待。子進程被終止後會報告一個TimeoutExpired異常。

  • input參數會傳遞給subprocess.Popen.communicate(),從而作為subprocess的標準輸入。當我們使用的時候,內部的Popen對象會自動創建stdin=PIPE,stdin參數可能不會被使用。

  • check:如果check參數為True,且進程退出的時候得到退出碼一個非0的值,會報告一個CalledProcessError異常

  • shell:shell參數默認為False,此時arg參數應該是一個列表。subprocess.run(["ls","-l"]) ; 當shell=True時,args可以是一個字符串。subprocess.run("ls -l",shell=True)

>>> ret = subprocess.run(["ls", "-l"]) # doesn't capture output CompletedProcess(args=['ls', '-l'], returncode= 0) >>> print (ret.stdout) None >>> subprocess.run("exit 1", shell=True, check= True) Traceback (most recent call last): ... subprocess.CalledProcessError: Command 'exit 1' returned non-zero exit status 1 >>> ret1 = subprocess.run(["ls", "-l", "/dev/null"], stdout= subprocess.PIPE) CompletedProcess(args=['ls', '-l', '/dev/null'], returncode= 0, stdout=b'crw-rw-rw- 1 root root 1, 3 Jan 23 16:23 /dev/null\n' ) >>> print (ret.stdout) b'crw-rw-rw- 1 root root 1, 3 6\xe6\x9c\x88 8 06:50 /dev/null\n'

call方法

subprocess.call ( args, *, stdin = None , stdout = None , stderr = None , shell = False , timeout = None )¶

call()方法等價於:run(..., check=True)和run()方法類所以,只是不支持input參數和check參數;

注意: 不要在這個方法裡使用stdout=PIPE 或stderr=PIPE,當到一個管道的輸出填滿系統的管道緩存時,子進程會被阻塞。

check_call方法

subprocess.check_output(args, *, stdin=None, stderr=None, shell=False, universal_newlines=False, timeout=None)

check_call()方法等價於: run(..., check=True, stdout=PIPE).stdout

3.1新增,3.3時增加了timeout參數,3.4時增加了對關鍵字參數的支持

check_output()方法

內部調用的是run()方法,但是會捕捉到stdout

>>> ret = subprocess.check_output(["ls", "-l", "/dev/null" ]) >>> print (ret) b'crw-rw-rw- 1 root root 1, 3 6\xe6\x9c\x88 8 06:50 /dev/null\n'

Popen類

上面的四個方法本質上調用的都是subprocess中的Popen類。

Popen對像都有以下方法:

poll() : 檢查子進程是否已經終止,返回一個returncode,相當於exit code。

wait() : 等待子進程終止,返回一個returncode

communicate(input=None) :和進程進行交互:發送數據到stdin;從stdout和stderr讀取數據,直到讀取完。等待進程終止。可選參數input會傳遞數據給子進程,當input=None事,沒有數據傳遞給子進程。communicate() 返回一個元組(stdout, stderr). 注意: 讀取的數據在內存的buffer中,所以當數據大小大於buffer或者沒有限制時,不要使用這個方法。