如何使用python生成大量數(shù)據(jù)寫入es數(shù)據(jù)庫并查詢操作_第1頁
如何使用python生成大量數(shù)據(jù)寫入es數(shù)據(jù)庫并查詢操作_第2頁
如何使用python生成大量數(shù)據(jù)寫入es數(shù)據(jù)庫并查詢操作_第3頁
如何使用python生成大量數(shù)據(jù)寫入es數(shù)據(jù)庫并查詢操作_第4頁
如何使用python生成大量數(shù)據(jù)寫入es數(shù)據(jù)庫并查詢操作_第5頁
已閱讀5頁,還剩12頁未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報或認(rèn)領(lǐng)

文檔簡介

第如何使用python生成大量數(shù)據(jù)寫入es數(shù)據(jù)庫并查詢操作模擬學(xué)生成績信息寫入es數(shù)據(jù)庫,包括姓名、性別、科目、成績。

示例代碼1:【一次性寫入10000*1000條數(shù)據(jù)】【本人親測耗時5100秒】

fromelasticsearchimportElasticsearch

fromelasticsearchimporthelpers

importrandom

importtime

es=Elasticsearch(hosts=':9200')

#print(es)

names=['劉一','陳二','張三','李四','王五','趙六','孫七','周八','吳九','鄭十']

sexs=['男','女']

subjects=['語文','數(shù)學(xué)','英語','生物','地理']

grades=[85,77,96,74,85,69,84,59,67,69,86,96,77,78,79,80,81,82,83,84,85,86]

datas=[]

start=time.time()

#開始批量寫入es數(shù)據(jù)庫

#批量寫入數(shù)據(jù)

forjinrange(1000):

print(j)

action=[

"_index":"grade",

"_type":"doc",

"_id":i,

"_source":{

"id":i,

"name":random.choice(names),

"sex":random.choice(sexs),

"subject":random.choice(subjects),

"grade":random.choice(grades)

}foriinrange(10000*j,10000*j+10000)

helpers.bulk(es,action)

end=time.time()

print('花費(fèi)時間:',end-start)

elasticsearch-head中顯示:

示例代碼2:【一次性寫入10000*5000條數(shù)據(jù)】【本人親測耗時23000秒】

fromelasticsearchimportElasticsearch

fromelasticsearchimporthelpers

importrandom

importtime

es=Elasticsearch(hosts=':9200')

#print(es)

names=['劉一','陳二','張三','李四','王五','趙六','孫七','周八','吳九','鄭十']

sexs=['男','女']

subjects=['語文','數(shù)學(xué)','英語','生物','地理']

grades=[85,77,96,74,85,69,84,59,67,69,86,96,77,78,79,80,81,82,83,84,85,86]

datas=[]

start=time.time()

#開始批量寫入es數(shù)據(jù)庫

#批量寫入數(shù)據(jù)

forjinrange(5000):

print(j)

action=[

"_index":"grade3",

"_type":"doc",

"_id":i,

"_source":{

"id":i,

"name":random.choice(names),

"sex":random.choice(sexs),

"subject":random.choice(subjects),

"grade":random.choice(grades)

}foriinrange(10000*j,10000*j+10000)

helpers.bulk(es,action)

end=time.time()

print('花費(fèi)時間:',end-start)

示例代碼3:【一次性寫入10000*9205條數(shù)據(jù)】【耗時過長】

fromelasticsearchimportElasticsearch

fromelasticsearchimporthelpers

importrandom

importtime

es=Elasticsearch(hosts=':9200')

names=['劉一','陳二','張三','李四','王五','趙六','孫七','周八','吳九','鄭十']

sexs=['男','女']

subjects=['語文','數(shù)學(xué)','英語','生物','地理']

grades=[85,77,96,74,85,69,84,59,67,69,86,96,77,78,79,80,81,82,83,84,85,86]

datas=[]

start=time.time()

#開始批量寫入es數(shù)據(jù)庫

#批量寫入數(shù)據(jù)

forjinrange(9205):

print(j)

action=[

"_index":"grade2",

"_type":"doc",

"_id":i,

"_source":{

"id":i,

"name":random.choice(names),

"sex":random.choice(sexs),

"subject":random.choice(subjects),

"grade":random.choice(grades)

}foriinrange(10000*j,10000*j+10000)

helpers.bulk(es,action)

end=time.time()

print('花費(fèi)時間:',end-start)

查詢數(shù)據(jù)并計算各種方式的成績總分。

示例代碼4:【一次性獲取所有的數(shù)據(jù),在程序中分別計算所耗的時間】

fromelasticsearchimportElasticsearch

importtime

defsearch_data(es,size=10):

query={

"query":{

"match_all":{}

res=es.search(index='grade',body=query,size=size)

#print(res)

returnres

if__name__=='__main__':

start=time.time()

es=Elasticsearch(hosts=':9200')

#print(es)

size=10000

res=search_data(es,size)

#print(type(res))

#total=res['hits']['total']['value']

#print(total)

all_source=[]

foriinrange(size):

source=res['hits']['hits'][i]['_source']

all_source.append(source)

#print(source)

#統(tǒng)計查詢出來的所有學(xué)生的所有課程的所有成績的總成績

start1=time.time()

all_grade=0

fordatainall_source:

all_grade+=int(data['grade'])

print('所有學(xué)生總成績之和:',all_grade)

end1=time.time()

print("耗時:",end1-start1)

#統(tǒng)計查詢出來的每個學(xué)生的所有課程的所有成績的總成績

start2=time.time()

names1=[]

all_name_grade={}

fordatainall_source:

ifdata['name']innames1:

all_name_grade[data['name']]+=data['grade']

else:

names1.append(data['name'])

all_name_grade[data['name']]=data['grade']

print(all_name_grade)

end2=time.time()

print("耗時:",end2-start2)

#統(tǒng)計查詢出來的每個學(xué)生的每門課程的所有成績的總成績

start3=time.time()

names2=[]

subjects=[]

all_name_all_subject_grade={}

fordatainall_source:

ifdata['name']innames2:

ifall_name_all_subject_grade[data['name']].get(data['subject']):

all_name_all_subject_grade[data['name']][data['subject']]+=data['grade']

else:

all_name_all_subject_grade[data['name']][data['subject']]=data['grade']

else:

names2.append(data['name'])

all_name_all_subject_grade[data['name']]={}

all_name_all_subject_grade[data['name']][data['subject']]=data['grade']

print(all_name_all_subject_grade)

end3=time.time()

print("耗時:",end3-start3)

end=time.time()

print('總耗時:',end-start)

運(yùn)行結(jié)果:

在示例代碼4中當(dāng)把size由10000改為2000000時,運(yùn)行效果如下所示:

在項(xiàng)目中一般不用上述代碼4中所統(tǒng)計成績的方法,面對大量的數(shù)據(jù)是比較耗時的,要使用es中的聚合查詢。計算數(shù)據(jù)中所有成績之和。

示例代碼5:【使用普通計算方法和聚類方法做對比驗(yàn)證】

fromelasticsearchimportElasticsearch

importtime

defsearch_data(es,size=10):

query={

"query":{

"match_all":{}

res=es.search(index='grade',body=query,size=size)

#print(res)

returnres

defsearch_data2(es,size=10):

query={

"aggs":{

"all_grade":{

"terms":{

"field":"grade",

"size":1000

res=es.search(index='grade',body=query,size=size)

#print(res)

returnres

if__name__=='__main__':

start=time.time()

es=Elasticsearch(hosts=':9200')

size=2000000

res=search_data(es,size)

all_source=[]

foriinrange(size):

source=res['hits']['hits'][i]['_source']

all_source.append(source)

#print(source)

#統(tǒng)計查詢出來的所有學(xué)生的所有課程的所有成績的總成績

start1=time.time()

all_grade=0

fordatainall_source:

all_grade+=int(data['grade'])

print('200萬數(shù)據(jù)所有學(xué)生總成績之和:',all_grade)

end1=time.time()

print("耗時:",end1-start1)

end=time.time()

print('200萬數(shù)據(jù)總耗時:',end-start)

#聚合操作

start_aggs=time.time()

es=Elasticsearch(hosts=':9200')

#size=2000000

size=0

res=search_data2(es,size)

#print(res)

aggs=res['aggregations']['all_grade']['buckets']

print(aggs)

sum=0

foragginaggs:

sum+=(agg['key']*agg['doc_count'])

print('1000萬數(shù)據(jù)總成績之和:',sum)

end_aggs=time.time()

print('1000萬數(shù)據(jù)總耗時:',end_aggs-start_aggs)

運(yùn)行結(jié)果:

計算數(shù)據(jù)中每個同學(xué)的各科總成績之和。

示例代碼6:【子聚合】【先分組,再計算】

fromelasticsearchimportElasticsearch

importtime

defsearch_data(es,size=10):

query={

"query":{

"match_all":{}

res=es.search(index='grade',body=query,size=size)

#print(res)

returnres

defsearch_data2(es):

query={

"size":0,

"aggs":{

"all_names":{

"terms":{

"field":"name.keyword",

"size":10

"aggs":{

"total_grade":{

"sum":{

"field":"grade"

res=es.search(index='grade',body=query)

#print(res)

returnres

if__name__=='__main__':

start=time.time()

es=Elasticsearch(hosts=':9200')

size=2000000

res=search_data(es,size)

all_source=[]

foriinrange(size):

source=res['hits']['hits'][i]['_source']

all_source.append(source)

#print(source)

#統(tǒng)計查詢出來的每個學(xué)生的所有課程的所有成績的總成績

start2=time.time()

names1=[]

all_name_grade={}

fordatainall_source:

ifdata['name']innames1:

all_name_grade[data['name']]+=data['grade']

else:

names1.append(data['name'])

all_name_grade[data['name']]=data['grade']

print(all_name_grade)

end2=time.time()

print("200萬數(shù)據(jù)耗時:",end2-start2)

end=time.time()

print('200萬數(shù)據(jù)總耗時:',end-start)

#聚合操作

start_aggs=time.time()

es=Elasticsearch(hosts=':9200')

res=search_data2(es)

#print(res)

aggs=res['aggregations']['all_names']['buckets']

#print(aggs)

dic={}

foragginaggs:

dic[agg['key']]=agg['total_grade']['value']

print('1000萬數(shù)據(jù):',dic)

end_aggs=time.time()

print('1000萬數(shù)據(jù)總耗時:',end_aggs-start_aggs)

運(yùn)行結(jié)果:

計算數(shù)據(jù)中每個同學(xué)的每科成績之和。

示例代碼7:

fromelasticsearchimportElasticsearch

importtime

defsearch_data(es,size=10):

query={

"query":{

"match_all":{}

res=es.search(index='grade',body=query,size=size)

#print(res)

returnres

defsearch_data2(es):

query={

"size":0,

"aggs":{

"all_names":{

"terms":{

"field":"name.keyword",

"size":10

"aggs":{

"all_subjects":{

"terms":{

"field":"subject.keyword",

"size":5

"aggs":{

"total_grade":{

"sum":{

"field":"grade"

res=es.search(index='grade',body=query)

#print(res)

returnres

if__name__=='__main__':

start=time.time()

es=Elasticsearch(hosts=':9200')

size=2000000

res=search_data(es,size)

all_source=[]

foriinrange(size):

source=res['hits']['hits'][i]['_source']

all_source.append(source)

#print(source)

#統(tǒng)計查詢出來的每個學(xué)生的每門課程的所有成績的總成績

start3=time.time()

names2=[]

subjects=[]

all_name_all_subject_grade={}

fordatainall_source:

ifdata['name']innames2:

ifall_name_all_subject_grade[data['name']].get(data['subject']):

all_name_all_subject_grade[data['name']][data['subject']]+=data['grade']

else:

all_name_all_subject_grade[data['name']][data['subject']]=data['grade']

else:

names2.append(data['name'])

all_name_all_subject_grade[data['name']]={}

all_name_all_subject_grade[data['name']][data['subject']]=data['grade']

print('200萬數(shù)據(jù):',all_name_all_subject_grade)

end3=time.time()

print("耗時:",end3-start3)

end=time.time()

print('200萬數(shù)據(jù)總耗時:',end-start)

#聚合操作

start_aggs=time.time()

es=Elasticsearch(hosts=':9200')

res=search_data2(es)

#print(res)

aggs=res['aggregations']['all_names']['buckets']

#print(aggs)

dic={}

foragginaggs:

dic[agg['key']]={}

forsubinagg['all_subjects']['buckets']:

dic[agg['key']][sub['key']]=sub['total_grade']['value']

print('1000萬數(shù)據(jù):',dic)

end_aggs=time.time()

print('1000萬數(shù)據(jù)總耗時:',end_aggs-start_aggs)

運(yùn)行結(jié)果:

在上面查詢計算示例代碼中,

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論