python多進程及通信實現(xiàn)異步任務(wù)的方法_第1頁
python多進程及通信實現(xiàn)異步任務(wù)的方法_第2頁
python多進程及通信實現(xiàn)異步任務(wù)的方法_第3頁
python多進程及通信實現(xiàn)異步任務(wù)的方法_第4頁
python多進程及通信實現(xiàn)異步任務(wù)的方法_第5頁
已閱讀5頁,還剩15頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)

文檔簡介

第python多進程及通信實現(xiàn)異步任務(wù)的方法目錄一、python多進程及通信基本用法1、多進程的基本實現(xiàn)a、Process重寫run方法b、使用Process和target方法c、直接使用Process類2、多進程的通信a、Queueb、Pipe二、python多進程實戰(zhàn)1、使用進程池快速抽取數(shù)據(jù)2、多進程及通信完成數(shù)據(jù)清洗和保存3、多進程及通信實現(xiàn)異步任務(wù)需求寫在最前面,說實話python多進程這塊兒知識對于很少使用python多進程或者沒有實際使用過多python進程解決問題的人來說,還是有一定難度的。本人也是很少接觸多進程的場景,對于python多進程的使用也是比較陌生的。在接觸了一些多進程的業(yè)務(wù)場景下,對python多進程的使用進行了學習,覺得很有必要進行一個梳理總結(jié)。

一、python多進程及通信基本用法

python中多進程及其通信,是比較重要的一塊兒內(nèi)容,作為python程序員,這塊兒內(nèi)容要基本掌握。

1、多進程的基本實現(xiàn)

python多進程的使用一般是調(diào)用multiprocessing包中的Process和Pool(進程池),其中Process的用法又有多種,基本函數(shù)

p.start()啟動一個已經(jīng)初始化的進程

p.join()讓進程運行完了以后,主進程再執(zhí)行

a、Process重寫run方法

MultiOneProcess類繼承了multiprocessing的Process類,然后重寫它的run方法,實現(xiàn)具體業(yè)務(wù)邏輯功能;主程序啟動10個進程。

frommultiprocessingimportProcess

count=0

classMultiOneProcess(Process):

def__init__(self,name):

super().__init__()

=name

defrun(self)-None:

globalcount

count+=1

print('processname%sisrunning----count:%d'%(,count))

if__name__=='__main__':

p_list=[]

foriinrange(10):

name='process_%d'%i

p=MultiOneProcess(name=name)

p.start()

p_list.append(p)

forpinp_list:

p.join()

print('thismainprocess')

b、使用Process和target方法

定義一個進程類繼承Process類,同時在super()初始化中傳入target函數(shù)

frommultiprocessingimportProcess

count=0

classMultiTwoProcess(Process):

def__init__(self,name):

super().__init__(target=self.do_fun)

=name

defdo_fun(self):

globalcount

count+=1

print('processname%sisrunning----count:%d'%(name,count))

if__name__=='__main__':

p_list=[]

foriinrange(10):

name='process_%d'%i

p=MultiTwoProcess(name)

p.start()

p_list.append(p)

forpinp_list:

p.join()

print('thismainprocess')

代碼中定義了一個類MultiTwoProcess類,類中定義了do_fun函數(shù),把它作為參數(shù)傳入到target中。

c、直接使用Process類

傳入target函數(shù),同時傳入args參數(shù),注意args參數(shù)是一個元組,切不能省略最后一個逗號

frommultiprocessingimportProcess

count=0

defdo_fun(name):

globalcount

count+=1

print('processname%sisrunning----count:%d'%(name,count))

if__name__=='__main__':

p_list=[]

foriinrange(10):

name='process_%d'%i

p=Process(target=do_fun,args=(name,))

p.start()

p_list.append(p)

forpinp_list:

p.join()

print('thismainprocess')

以上三者運行的結(jié)果,是一樣的,如下:

2、多進程的通信

進程之間的通信一般都采用Queue和pipe,區(qū)別是:pipe只能在兩個進程之間調(diào)用,而Queue是可以多個進程間調(diào)用的;效率上pipe效率更高,Queue是基于pipe實現(xiàn)的,效率比pipe要低一點。

a、Queue

常用API,

存放數(shù)據(jù)

queue.put(obj,block=True,timeout=None)

當block=False的時候,如果Queue已經(jīng)滿了,那么就會跑出Queue.Full異常;

當block=True且timeout有正值的時候,Queue已經(jīng)滿了,Queue會阻塞timeout時間,超出時間就會拋出同樣的異常

獲取數(shù)據(jù)

queue.get(block=True,timeout=None)

當block=False的時候,如果Queue為空,那么就會跑出Queue.Empty異常;

當block=True且timeout有正值的時候,Queue已經(jīng)為空,Queue會阻塞timeout時間,超出時間就會拋出同樣的異常

以上2個API是阻塞;還有兩個非堵塞的API

queue.put(obj,block=False)和queue.put_nowait(obj)等效

queue.get(block=False)和queue.get_nowait()等效

簡單的實現(xiàn),一個進程發(fā)送數(shù)據(jù),另外2個進程接收數(shù)據(jù),就可以使用queue通信

frommultiprocessingimportProcess,Queue

defsend(q):

whileTrue:

q.put('發(fā)送一個數(shù)據(jù)')

defreceive1(q):

whileTrue:

s=q.get()

print('receive1:',s)

defreceive2(q):

whileTrue:

s=q.get()

print('receive2:',s)

if__name__=='__main__':

q=Queue()

p1=Process(target=send,args=(q,))

p2=Process(target=receive1,args=(q,))

p3=Process(target=receive2,args=(q,))

p1.start()

p2.start()

p3.start()

p1進程不斷的往q中存放數(shù)據(jù);p2和p3不停的從q中取數(shù)據(jù)(有競爭的再取),所以打印結(jié)果是無序的

b、Pipe

Pipe(duplex=True)返回2個連通端(p1,p2);當duplex=True時,雙向通信,p1發(fā)送,p2接收;p2發(fā)送,p1接收。

當duplex=True時,單向通信,p1只能發(fā)送,p2只能接收。

常用API,pipe.send()pipe.recv()

frommultiprocessingimportProcess,Pipe

deffun2(p):

whileTrue:

s=p.recv()

print('接收一個數(shù)據(jù):',s)

deffun1(p):

whileTrue:

print('發(fā)送一個數(shù)據(jù):pipe')

p.send('pipe')

if__name__=='__main__':

pi1,pi2=Pipe(duplex=True)

p1=Process(target=fun1,args=(pi1,))

p2=Process(target=fun2,args=(pi2,))

p1.start()

p2.start()

結(jié)果如下:

二、python多進程實戰(zhàn)

不同的業(yè)務(wù)場景使用多進程的方式和復雜度也不相同,就我遇見過的一些場景進行演示和說明。

1、使用進程池快速抽取數(shù)據(jù)

場景描述:有1000個Excel文件的數(shù)據(jù)需要進行抽取和清洗,要把不符合我們需求的數(shù)據(jù)過濾掉,保留質(zhì)量很高的數(shù)據(jù);每個Excel都有幾十萬或者上百萬的數(shù)據(jù),那么怎么快速的完成這個任務(wù)呢?

首先整體上而言,可以把單個Excel的處理并行起來;那么可以使用多進程,其次這個需要返回結(jié)果,要保留合格的數(shù)據(jù),比較簡單的就是采用進程池了,它能夠很方便的把進程處理的結(jié)果進行返回,并且返回的還是一個生成器;如果還需要更快,那么可以把單個Excel中的每條數(shù)據(jù)的處理并行起來。代碼層面上,采用pool進程池來完成這個任務(wù)(本文沒有對進程池的使用和API做說明),具體的實現(xiàn)方式采取pool.imap()

if__name__=='__main__':

#所有Excel的路徑

all_paths=glob('../data/original_data/*')

sysInfo_list=['我通過了好友請求,現(xiàn)在你倆可以開始聊天了','我通過了你的朋友驗證請求,現(xiàn)在我們可以開始聊天了','已通過你的朋友驗證請求,現(xiàn)在可以開始聊天了','不支持此消息,請在手機上查看',

'微信紅包']

interval=25

iflen(all_paths)//interval*intervallen(all_paths):

k=len(all_paths)//interval+1

else:

k=len(all_paths)//interval

#分段處理,每段25個Excel

foriinrange(k):

paths=all_paths[i*interval:(i+1)*interval]

ifi*interval=100andi*interval200:

params=[]

forpathintqdm(paths):

params.append((path,sysInfo_list))

#多進程處理——進程池、以及進度顯示

withPool(20)asp:

res=list(tqdm(p.imap(extract_data,params),total=len(params),desc='extract_data'))

all_df=[]

fordfsinres:

iflen(dfs)0:

all_df.extend(dfs)

df=pd.concat(all_df,axis=0)

save_path='../data/weikong_clean_data_'+str(i*interval)+'_'+str(i*interval+len(paths)-1)+'.xlsx'

writer=pd.ExcelWriter(save_path)

df.to_excel(writer,index=False)

writer.save()

writer.close()

2、多進程及通信完成數(shù)據(jù)清洗和保存

場景描述:從Excel中讀取數(shù)據(jù),數(shù)據(jù)格式是整通整通的對話,每通對話有一定的輪數(shù);保存數(shù)據(jù)到2個txt中,一個是順序保留,一個是倒序保留;整體對話順序不變,每通對話內(nèi)部順序倒序。

正序:

倒序:

要想實現(xiàn)這樣的任務(wù),粗暴的做法是,用兩個list,一個保留正序的,一個保留倒序的,然后分別對這兩個list進行文件寫入操作。但是如果數(shù)據(jù)量很多在內(nèi)存有限的時候,只能滿足不了兩個list的情況下怎么實現(xiàn)呢?

我的實現(xiàn)方式就是開啟兩個進程,一個進程保留一個正序list,寫入文件的同時對每個元素(每通)對話進行倒序,然后把倒序后的數(shù)據(jù)通過Queue或者Pipe傳入到另外一個進程,讓另外的進程進行寫文件操作。

defsave_mmi_train_data(queue):

withopen('../data/finetune_mmi_data/train.txt','w',encoding='utf-8')asf:

whileTrue:

save_list=queue.get()

iflen(save_list)==0:

break

forlineinsave_list:

f.write(line)

defsave_mmi_val_data(queue):

withopen('../data/finetune_mmi_data/val.txt','w',encoding='utf-8')asf:

whileTrue:

save_list=queue.get()

iflen(save_list)==0:

break

forlineinsave_list:

f.write(line)

defget_funtine_data(paths):

all_groups=[]

forpathintqdm(paths,desc='loaddatafromexcle'):

df=pd.read_excel(path)

df.dropna(inplace=True)

df.drop_duplicates(inplace=True,keep='first')

groups=list(df.groupby(by=['坐席id','客戶微信id']))

all_groups.extend(groups)

print('len(all_groups)',len(all_groups))

train,val=train_test_split(all_groups,test_size=10000/len(all_groups),random_state=1)

print('len(train)',len(train))

print('len(val)',len(val))

train_std_path='../data/finetune_std_data/train.txt'

val_std_path='../data/finetune_std_data/val.txt'

train_mmi_queue=Queue()

save_funtine_data(train,train_std_path,train_mmi_queue,save_mmi_train_data)

val_mmi_queue=Queue()

save_funtine_data(val,val_std_path,val_mmi_queue,save_mmi_val_data)

defsave_funtine_data(groups,save_std_path,queue,fun):

p=Process(target=fun,args=(queue,))

p.start()

withopen(save_std_path,'w',encoding='utf-8')asf:

forgroupintqdm(groups,desc='findandsavefuntinedialoguedatas'):

new_df=group[1]

df_roles=new_df['是否客服'].values.tolist()

df_contents=new_df['消息內(nèi)容'].values.tolist()

roles=[]

contents=[]

forrole,contentinzip(df_roles,df_contents):

content=content.replace('\n','')

content=emoji.replace_emoji(content,'')

iflen(content)0andcontent!="":

roles.append(role)

contents.append(content)

save_list=[]

save_str=""

forindex,roleinenumerate(roles):

content=contents[index].replace('\n','')

content=emoji.replace_emoji(content,'')

ifcontent[-1]notinpunctuations:

content+=';'

ifindex==0:

ifrole=="是":

save_str+="坐席:"+content

else:

save_str+="客戶:"+content

else:

ifrole!=roles[index-1]:

f.write(save_str[0:-1]+'\n')

save_list.append(save_str[0:-1]+'\n')

ifrole=="是":

save_str="坐席:"+content

else:

save_str="客戶:"+content

else:

save_str+=content

iflen(save_str)1:

save_list.append(save_str[0:-1]+'\n')

f.write(save_str[0:-1]+'\n')

f.write('\n')

#切片反轉(zhuǎn)

save_list=save_list[::-1]

save_list.append('\n')

iflen(save_list)0:

queue.put(save_list)

#注意傳入一個空值,讓倒序進程結(jié)束

queue.put([])

p.join()

要注意的是,倒序進程中使用whileTrue無限循環(huán),需要傳入一個空值,能夠讓它在正序進程結(jié)束的同時知道數(shù)據(jù)寫完了,跳出循環(huán)。以上代碼比較簡單就不一一說明了。

3、多進程及通信實現(xiàn)異步任務(wù)需求

場景描述:假定一個模型推理系統(tǒng),網(wǎng)絡(luò)模塊負責接受請求傳輸?shù)臄?shù)據(jù),把數(shù)據(jù)傳輸給數(shù)據(jù)處理模塊;數(shù)據(jù)處理模塊負責處理數(shù)據(jù)(比如說語音流或者視頻流等,這些數(shù)據(jù)處理對CPU的消耗很大),處理完后把數(shù)據(jù)傳輸給模型推理模塊;模型推理模塊負責對數(shù)據(jù)進行推理并把結(jié)果返回給網(wǎng)絡(luò)模塊。要求就是網(wǎng)絡(luò)模塊、數(shù)據(jù)處理模塊和模型推理模塊是獨立的,可以并行的完成自己的任務(wù),3個模塊是異步的,其實可以把這個系統(tǒng)簡化的使用多進程來實現(xiàn)。

每個模塊可以用一個進程來表示,內(nèi)部的邏輯可以開啟子進程來實現(xiàn),然后模塊直接的數(shù)據(jù)傳輸就可以使用多進程的通信來實現(xiàn),同時也創(chuàng)建一個全局的Queue變量,讓每個模塊的進程按需使用。

畫了一個簡單的結(jié)構(gòu)和流程圖,如下:

注意的是模塊之間的數(shù)據(jù)傳輸,使用queue傳輸?shù)臅r候,數(shù)據(jù)量越小,效率越高,所以可以在網(wǎng)絡(luò)模塊這端提前把數(shù)據(jù)進行處理。

函數(shù)入口文件

importa

importb

importc

fromwhole_queueimportWholeQueue

importos

if__name__=='__main__':

print("mainprocess:",os.getpid())

whole_queue=WholeQueue()

b_pool_size=2

c_pool_size=6

Module_list=[

a.A(whole_queue,b_pool_size),

b.B(whole_queue,b_pool_size,c_pool_size),

c.C(whole_queue,c_pool_size)

forpinModule_list:

p.start()

公共隊列類

classWholeQueue():

def__init__(self):

self.queues=dict()

defregister(self,queuename,queue):

self.queues[queuename]=queue

各個子模塊類

a

frommultiprocessingimportProcess,Queue

importtime

importrandom

importos

classA(Process):

def__init__(self,whole_queue,b_pool_size):

super().__init__(target=self.do_run)

self.whole_queue=whole_queue

self.b_pool_size=b_pool_size

self.queue_list=[]

queue=Queue()

self.whole_queue.register('A',queue)

self.queue_list.append(queue)

self.count=0

defdo_run(self):

print("A.do_runprocess:",os.getpid())

a_send_pro=Process(target=self.send)

a_send_pro.start()

a_receive_pro=Process(target=self.receive)

a_receive_pro.start()

defsend(self):

print("A.sendprocess:",os.getpid())

whileTrue:

time.sleep(0.001)

self.whole_queue.queues['B_%d'%(self.count%self.b_pool_size)].put_nowait(self.count)

self.count+=1

defreceive(self):

print("A.receiveprocess:",os.getpid())

whileTrue:

rece=self.whole_queue.queues['A'].get()

print(rece)

b

frommultiprocessingimportProcess,Queue

importtime

importrandom

importos

classB(Process):

def__init__(self,whole_queue,b_pool_size,c_pool_size):

super().__init__(target=self.do_run)

self.whole_queue=whole_queue

self.b_pool_size=b_pool_size

self.c_pool_size=c_pool_size

self.queue_list=[]

foriinrange(self.b_pool_size):

queue=Queue()

self.whole_queue.register('B_%d'%i,queue)

self.queue_list.append(queue)

self.count=0

defdo_run(self):

print("B.do_runprocess:",os.getpid())

foriinrange(self.b_pool_size):

p=Process(target=ponent,args=(self.queue_list[i],))

p.start()

defcomponent(self,queue):

print("B.componentprocess:",os.getpid())

whileTrue:

time.sleep(0.01)

info=queue.get()

componext_info='component_'+str(info)

self.whole_queue.queues['C_%d'%(info%self.c_pool_size)].put(componext_info)

c

frommultiprocessingimportProcess,Queue

frommodelimportModel

importtime

importrandom

importos

classC(Process):

def__init__(self,whole_queue,c_pool_size):

super().__init__(target=self.do_run)

self.whole_queue=whole_queue

self.c_pool_size=c_pool_size

self.queue_list=[]

foriinrange(self.c_pool_size):

queue=Queue()

self.whole_queue.register('C_%d'%i,queue)

self.queue_list.append(queue)

#self.cache_queue=None

#self.result_queue=None

#self.infer_queue=None

defdo_run(self):

cache_queue=Queue()

result_queue=Queue()

infer_queue=Queue()

print("C.do_runprocess:",os.getpid())

foriinra

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
  • 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論