




版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)
文檔簡介
第python多進程及通信實現(xiàn)異步任務(wù)的方法目錄一、python多進程及通信基本用法1、多進程的基本實現(xiàn)a、Process重寫run方法b、使用Process和target方法c、直接使用Process類2、多進程的通信a、Queueb、Pipe二、python多進程實戰(zhàn)1、使用進程池快速抽取數(shù)據(jù)2、多進程及通信完成數(shù)據(jù)清洗和保存3、多進程及通信實現(xiàn)異步任務(wù)需求寫在最前面,說實話python多進程這塊兒知識對于很少使用python多進程或者沒有實際使用過多python進程解決問題的人來說,還是有一定難度的。本人也是很少接觸多進程的場景,對于python多進程的使用也是比較陌生的。在接觸了一些多進程的業(yè)務(wù)場景下,對python多進程的使用進行了學習,覺得很有必要進行一個梳理總結(jié)。
一、python多進程及通信基本用法
python中多進程及其通信,是比較重要的一塊兒內(nèi)容,作為python程序員,這塊兒內(nèi)容要基本掌握。
1、多進程的基本實現(xiàn)
python多進程的使用一般是調(diào)用multiprocessing包中的Process和Pool(進程池),其中Process的用法又有多種,基本函數(shù)
p.start()啟動一個已經(jīng)初始化的進程
p.join()讓進程運行完了以后,主進程再執(zhí)行
a、Process重寫run方法
MultiOneProcess類繼承了multiprocessing的Process類,然后重寫它的run方法,實現(xiàn)具體業(yè)務(wù)邏輯功能;主程序啟動10個進程。
frommultiprocessingimportProcess
count=0
classMultiOneProcess(Process):
def__init__(self,name):
super().__init__()
=name
defrun(self)-None:
globalcount
count+=1
print('processname%sisrunning----count:%d'%(,count))
if__name__=='__main__':
p_list=[]
foriinrange(10):
name='process_%d'%i
p=MultiOneProcess(name=name)
p.start()
p_list.append(p)
forpinp_list:
p.join()
print('thismainprocess')
b、使用Process和target方法
定義一個進程類繼承Process類,同時在super()初始化中傳入target函數(shù)
frommultiprocessingimportProcess
count=0
classMultiTwoProcess(Process):
def__init__(self,name):
super().__init__(target=self.do_fun)
=name
defdo_fun(self):
globalcount
count+=1
print('processname%sisrunning----count:%d'%(name,count))
if__name__=='__main__':
p_list=[]
foriinrange(10):
name='process_%d'%i
p=MultiTwoProcess(name)
p.start()
p_list.append(p)
forpinp_list:
p.join()
print('thismainprocess')
代碼中定義了一個類MultiTwoProcess類,類中定義了do_fun函數(shù),把它作為參數(shù)傳入到target中。
c、直接使用Process類
傳入target函數(shù),同時傳入args參數(shù),注意args參數(shù)是一個元組,切不能省略最后一個逗號
frommultiprocessingimportProcess
count=0
defdo_fun(name):
globalcount
count+=1
print('processname%sisrunning----count:%d'%(name,count))
if__name__=='__main__':
p_list=[]
foriinrange(10):
name='process_%d'%i
p=Process(target=do_fun,args=(name,))
p.start()
p_list.append(p)
forpinp_list:
p.join()
print('thismainprocess')
以上三者運行的結(jié)果,是一樣的,如下:
2、多進程的通信
進程之間的通信一般都采用Queue和pipe,區(qū)別是:pipe只能在兩個進程之間調(diào)用,而Queue是可以多個進程間調(diào)用的;效率上pipe效率更高,Queue是基于pipe實現(xiàn)的,效率比pipe要低一點。
a、Queue
常用API,
存放數(shù)據(jù)
queue.put(obj,block=True,timeout=None)
當block=False的時候,如果Queue已經(jīng)滿了,那么就會跑出Queue.Full異常;
當block=True且timeout有正值的時候,Queue已經(jīng)滿了,Queue會阻塞timeout時間,超出時間就會拋出同樣的異常
獲取數(shù)據(jù)
queue.get(block=True,timeout=None)
當block=False的時候,如果Queue為空,那么就會跑出Queue.Empty異常;
當block=True且timeout有正值的時候,Queue已經(jīng)為空,Queue會阻塞timeout時間,超出時間就會拋出同樣的異常
以上2個API是阻塞;還有兩個非堵塞的API
queue.put(obj,block=False)和queue.put_nowait(obj)等效
queue.get(block=False)和queue.get_nowait()等效
簡單的實現(xiàn),一個進程發(fā)送數(shù)據(jù),另外2個進程接收數(shù)據(jù),就可以使用queue通信
frommultiprocessingimportProcess,Queue
defsend(q):
whileTrue:
q.put('發(fā)送一個數(shù)據(jù)')
defreceive1(q):
whileTrue:
s=q.get()
print('receive1:',s)
defreceive2(q):
whileTrue:
s=q.get()
print('receive2:',s)
if__name__=='__main__':
q=Queue()
p1=Process(target=send,args=(q,))
p2=Process(target=receive1,args=(q,))
p3=Process(target=receive2,args=(q,))
p1.start()
p2.start()
p3.start()
p1進程不斷的往q中存放數(shù)據(jù);p2和p3不停的從q中取數(shù)據(jù)(有競爭的再取),所以打印結(jié)果是無序的
b、Pipe
Pipe(duplex=True)返回2個連通端(p1,p2);當duplex=True時,雙向通信,p1發(fā)送,p2接收;p2發(fā)送,p1接收。
當duplex=True時,單向通信,p1只能發(fā)送,p2只能接收。
常用API,pipe.send()pipe.recv()
frommultiprocessingimportProcess,Pipe
deffun2(p):
whileTrue:
s=p.recv()
print('接收一個數(shù)據(jù):',s)
deffun1(p):
whileTrue:
print('發(fā)送一個數(shù)據(jù):pipe')
p.send('pipe')
if__name__=='__main__':
pi1,pi2=Pipe(duplex=True)
p1=Process(target=fun1,args=(pi1,))
p2=Process(target=fun2,args=(pi2,))
p1.start()
p2.start()
結(jié)果如下:
二、python多進程實戰(zhàn)
不同的業(yè)務(wù)場景使用多進程的方式和復雜度也不相同,就我遇見過的一些場景進行演示和說明。
1、使用進程池快速抽取數(shù)據(jù)
場景描述:有1000個Excel文件的數(shù)據(jù)需要進行抽取和清洗,要把不符合我們需求的數(shù)據(jù)過濾掉,保留質(zhì)量很高的數(shù)據(jù);每個Excel都有幾十萬或者上百萬的數(shù)據(jù),那么怎么快速的完成這個任務(wù)呢?
首先整體上而言,可以把單個Excel的處理并行起來;那么可以使用多進程,其次這個需要返回結(jié)果,要保留合格的數(shù)據(jù),比較簡單的就是采用進程池了,它能夠很方便的把進程處理的結(jié)果進行返回,并且返回的還是一個生成器;如果還需要更快,那么可以把單個Excel中的每條數(shù)據(jù)的處理并行起來。代碼層面上,采用pool進程池來完成這個任務(wù)(本文沒有對進程池的使用和API做說明),具體的實現(xiàn)方式采取pool.imap()
if__name__=='__main__':
#所有Excel的路徑
all_paths=glob('../data/original_data/*')
sysInfo_list=['我通過了好友請求,現(xiàn)在你倆可以開始聊天了','我通過了你的朋友驗證請求,現(xiàn)在我們可以開始聊天了','已通過你的朋友驗證請求,現(xiàn)在可以開始聊天了','不支持此消息,請在手機上查看',
'微信紅包']
interval=25
iflen(all_paths)//interval*intervallen(all_paths):
k=len(all_paths)//interval+1
else:
k=len(all_paths)//interval
#分段處理,每段25個Excel
foriinrange(k):
paths=all_paths[i*interval:(i+1)*interval]
ifi*interval=100andi*interval200:
params=[]
forpathintqdm(paths):
params.append((path,sysInfo_list))
#多進程處理——進程池、以及進度顯示
withPool(20)asp:
res=list(tqdm(p.imap(extract_data,params),total=len(params),desc='extract_data'))
all_df=[]
fordfsinres:
iflen(dfs)0:
all_df.extend(dfs)
df=pd.concat(all_df,axis=0)
save_path='../data/weikong_clean_data_'+str(i*interval)+'_'+str(i*interval+len(paths)-1)+'.xlsx'
writer=pd.ExcelWriter(save_path)
df.to_excel(writer,index=False)
writer.save()
writer.close()
2、多進程及通信完成數(shù)據(jù)清洗和保存
場景描述:從Excel中讀取數(shù)據(jù),數(shù)據(jù)格式是整通整通的對話,每通對話有一定的輪數(shù);保存數(shù)據(jù)到2個txt中,一個是順序保留,一個是倒序保留;整體對話順序不變,每通對話內(nèi)部順序倒序。
正序:
倒序:
要想實現(xiàn)這樣的任務(wù),粗暴的做法是,用兩個list,一個保留正序的,一個保留倒序的,然后分別對這兩個list進行文件寫入操作。但是如果數(shù)據(jù)量很多在內(nèi)存有限的時候,只能滿足不了兩個list的情況下怎么實現(xiàn)呢?
我的實現(xiàn)方式就是開啟兩個進程,一個進程保留一個正序list,寫入文件的同時對每個元素(每通)對話進行倒序,然后把倒序后的數(shù)據(jù)通過Queue或者Pipe傳入到另外一個進程,讓另外的進程進行寫文件操作。
defsave_mmi_train_data(queue):
withopen('../data/finetune_mmi_data/train.txt','w',encoding='utf-8')asf:
whileTrue:
save_list=queue.get()
iflen(save_list)==0:
break
forlineinsave_list:
f.write(line)
defsave_mmi_val_data(queue):
withopen('../data/finetune_mmi_data/val.txt','w',encoding='utf-8')asf:
whileTrue:
save_list=queue.get()
iflen(save_list)==0:
break
forlineinsave_list:
f.write(line)
defget_funtine_data(paths):
all_groups=[]
forpathintqdm(paths,desc='loaddatafromexcle'):
df=pd.read_excel(path)
df.dropna(inplace=True)
df.drop_duplicates(inplace=True,keep='first')
groups=list(df.groupby(by=['坐席id','客戶微信id']))
all_groups.extend(groups)
print('len(all_groups)',len(all_groups))
train,val=train_test_split(all_groups,test_size=10000/len(all_groups),random_state=1)
print('len(train)',len(train))
print('len(val)',len(val))
train_std_path='../data/finetune_std_data/train.txt'
val_std_path='../data/finetune_std_data/val.txt'
train_mmi_queue=Queue()
save_funtine_data(train,train_std_path,train_mmi_queue,save_mmi_train_data)
val_mmi_queue=Queue()
save_funtine_data(val,val_std_path,val_mmi_queue,save_mmi_val_data)
defsave_funtine_data(groups,save_std_path,queue,fun):
p=Process(target=fun,args=(queue,))
p.start()
withopen(save_std_path,'w',encoding='utf-8')asf:
forgroupintqdm(groups,desc='findandsavefuntinedialoguedatas'):
new_df=group[1]
df_roles=new_df['是否客服'].values.tolist()
df_contents=new_df['消息內(nèi)容'].values.tolist()
roles=[]
contents=[]
forrole,contentinzip(df_roles,df_contents):
content=content.replace('\n','')
content=emoji.replace_emoji(content,'')
iflen(content)0andcontent!="":
roles.append(role)
contents.append(content)
save_list=[]
save_str=""
forindex,roleinenumerate(roles):
content=contents[index].replace('\n','')
content=emoji.replace_emoji(content,'')
ifcontent[-1]notinpunctuations:
content+=';'
ifindex==0:
ifrole=="是":
save_str+="坐席:"+content
else:
save_str+="客戶:"+content
else:
ifrole!=roles[index-1]:
f.write(save_str[0:-1]+'\n')
save_list.append(save_str[0:-1]+'\n')
ifrole=="是":
save_str="坐席:"+content
else:
save_str="客戶:"+content
else:
save_str+=content
iflen(save_str)1:
save_list.append(save_str[0:-1]+'\n')
f.write(save_str[0:-1]+'\n')
f.write('\n')
#切片反轉(zhuǎn)
save_list=save_list[::-1]
save_list.append('\n')
iflen(save_list)0:
queue.put(save_list)
#注意傳入一個空值,讓倒序進程結(jié)束
queue.put([])
p.join()
要注意的是,倒序進程中使用whileTrue無限循環(huán),需要傳入一個空值,能夠讓它在正序進程結(jié)束的同時知道數(shù)據(jù)寫完了,跳出循環(huán)。以上代碼比較簡單就不一一說明了。
3、多進程及通信實現(xiàn)異步任務(wù)需求
場景描述:假定一個模型推理系統(tǒng),網(wǎng)絡(luò)模塊負責接受請求傳輸?shù)臄?shù)據(jù),把數(shù)據(jù)傳輸給數(shù)據(jù)處理模塊;數(shù)據(jù)處理模塊負責處理數(shù)據(jù)(比如說語音流或者視頻流等,這些數(shù)據(jù)處理對CPU的消耗很大),處理完后把數(shù)據(jù)傳輸給模型推理模塊;模型推理模塊負責對數(shù)據(jù)進行推理并把結(jié)果返回給網(wǎng)絡(luò)模塊。要求就是網(wǎng)絡(luò)模塊、數(shù)據(jù)處理模塊和模型推理模塊是獨立的,可以并行的完成自己的任務(wù),3個模塊是異步的,其實可以把這個系統(tǒng)簡化的使用多進程來實現(xiàn)。
每個模塊可以用一個進程來表示,內(nèi)部的邏輯可以開啟子進程來實現(xiàn),然后模塊直接的數(shù)據(jù)傳輸就可以使用多進程的通信來實現(xiàn),同時也創(chuàng)建一個全局的Queue變量,讓每個模塊的進程按需使用。
畫了一個簡單的結(jié)構(gòu)和流程圖,如下:
注意的是模塊之間的數(shù)據(jù)傳輸,使用queue傳輸?shù)臅r候,數(shù)據(jù)量越小,效率越高,所以可以在網(wǎng)絡(luò)模塊這端提前把數(shù)據(jù)進行處理。
函數(shù)入口文件
importa
importb
importc
fromwhole_queueimportWholeQueue
importos
if__name__=='__main__':
print("mainprocess:",os.getpid())
whole_queue=WholeQueue()
b_pool_size=2
c_pool_size=6
Module_list=[
a.A(whole_queue,b_pool_size),
b.B(whole_queue,b_pool_size,c_pool_size),
c.C(whole_queue,c_pool_size)
forpinModule_list:
p.start()
公共隊列類
classWholeQueue():
def__init__(self):
self.queues=dict()
defregister(self,queuename,queue):
self.queues[queuename]=queue
各個子模塊類
a
frommultiprocessingimportProcess,Queue
importtime
importrandom
importos
classA(Process):
def__init__(self,whole_queue,b_pool_size):
super().__init__(target=self.do_run)
self.whole_queue=whole_queue
self.b_pool_size=b_pool_size
self.queue_list=[]
queue=Queue()
self.whole_queue.register('A',queue)
self.queue_list.append(queue)
self.count=0
defdo_run(self):
print("A.do_runprocess:",os.getpid())
a_send_pro=Process(target=self.send)
a_send_pro.start()
a_receive_pro=Process(target=self.receive)
a_receive_pro.start()
defsend(self):
print("A.sendprocess:",os.getpid())
whileTrue:
time.sleep(0.001)
self.whole_queue.queues['B_%d'%(self.count%self.b_pool_size)].put_nowait(self.count)
self.count+=1
defreceive(self):
print("A.receiveprocess:",os.getpid())
whileTrue:
rece=self.whole_queue.queues['A'].get()
print(rece)
b
frommultiprocessingimportProcess,Queue
importtime
importrandom
importos
classB(Process):
def__init__(self,whole_queue,b_pool_size,c_pool_size):
super().__init__(target=self.do_run)
self.whole_queue=whole_queue
self.b_pool_size=b_pool_size
self.c_pool_size=c_pool_size
self.queue_list=[]
foriinrange(self.b_pool_size):
queue=Queue()
self.whole_queue.register('B_%d'%i,queue)
self.queue_list.append(queue)
self.count=0
defdo_run(self):
print("B.do_runprocess:",os.getpid())
foriinrange(self.b_pool_size):
p=Process(target=ponent,args=(self.queue_list[i],))
p.start()
defcomponent(self,queue):
print("B.componentprocess:",os.getpid())
whileTrue:
time.sleep(0.01)
info=queue.get()
componext_info='component_'+str(info)
self.whole_queue.queues['C_%d'%(info%self.c_pool_size)].put(componext_info)
c
frommultiprocessingimportProcess,Queue
frommodelimportModel
importtime
importrandom
importos
classC(Process):
def__init__(self,whole_queue,c_pool_size):
super().__init__(target=self.do_run)
self.whole_queue=whole_queue
self.c_pool_size=c_pool_size
self.queue_list=[]
foriinrange(self.c_pool_size):
queue=Queue()
self.whole_queue.register('C_%d'%i,queue)
self.queue_list.append(queue)
#self.cache_queue=None
#self.result_queue=None
#self.infer_queue=None
defdo_run(self):
cache_queue=Queue()
result_queue=Queue()
infer_queue=Queue()
print("C.do_runprocess:",os.getpid())
foriinra
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
- 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 從信息安全到透明化看區(qū)塊鏈在金融領(lǐng)域的應(yīng)用
- 醫(yī)療行業(yè)電子病歷系統(tǒng)升級的商業(yè)模式探討
- 2025年中學消防應(yīng)急疏散總結(jié)模版
- 新生兒低血鈣的臨床護理
- 利用大數(shù)據(jù)分析提升公共衛(wèi)生中的疾病預防效率
- 公司車輛轉(zhuǎn)讓協(xié)議合同范例
- 醫(yī)療設(shè)備的成本控制與經(jīng)濟效益分析
- 會員入股協(xié)議合同范例
- 財務(wù)部半度總結(jié)模版
- 債權(quán)傭金合同范例
- 附屬房拆除方案模板
- JB-T 4088.1-2022 日用管狀電熱元件 第1部分:通用要求
- RLC串聯(lián)電路暫態(tài)研究
- 《實數(shù)》單元作業(yè)設(shè)計
- 圍手術(shù)期血糖的管理專家講座
- 干濕法脫硫運行經(jīng)濟成本對比(自動計算)
- 運輸與配送管理選擇題復習題庫
- 線性代數(shù)矩陣
- S22天天高速安慶至潛山段(涼亭至月山)環(huán)境影響報告書
- 某廠蒸汽管道安裝吹掃及試運行方案
- 清華大學出版社機械制圖習題集參考答案(課堂PPT)
評論
0/150
提交評論