R環境下的大數據運算

文章推薦指數: 80 %
投票人數:10人

如同我們剛剛講的,我們透過Spark在hadoop上進行資料的計算,你所命名計算 ... 多少個數,另外我們仍能透過colnames()函數來瞭解資料的每個欄位名稱。

R環境下的大數據運算 卓雍然、曾靖驊、余曜廷 2019-09-1210:18:17 前言:商業大數據分析方法 在真實的數據中,數據往往動輒千萬或億,當為了進一步分析而將數據轉為內容相關矩陣或是將資料合併時,更使資料筆數大幅提升,此時並不能依照以往的方式處理,因為資料的龐大將會需要非常久的時間處理,每一步轉換、處理分析都要再等一次,還有可能等了好幾個小時,最後發現資料無法讀入。

那這個時候要如何分析呢?我們將介紹一套系統,不僅能夠將龐大的資料在極短時間內讀取並觀看,還能夠將數億筆資料直接性的用眾所皆知的dyplr四大要素:select、arrange、filter、group_by來進行資料過篩,猶如在面對只有幾萬的小數據。

本章主要內容 1.大數據存儲與運算系統介紹2.基本操作與函數介紹 Hadoop與Spark系統運作流程圖 現在就讓我們來介紹一下如此威猛的大數據分析系統與其元素吧!這套系統主要分成兩個部分,分別為: Hadoop(網路檔案系統) 為擁有常見分散式檔案存儲功能的其中一個系統,hadoop有許多用途,我們在此僅介紹「HDFS」(Hadoop分布式檔案系統)。

簡單而言,hadoop分布式檔案系統就是好比一個由很多電腦組成的伺服器,而我們將手上可觀數目的數據切碎分別投入到這些電腦當中,讓每台電腦都有分布部分的數據(有可能重疊),這樣能夠加高效能、使用數據的速度,方便於存儲與之後的運算。

Spark(叢集運算平台)電腦叢集的意思是由一組連接再一起,共同工作的電腦,使用叢集的原因通常是為了提高單台電腦的效能與可用性,能夠處理許多計算趨勢匯聚的結果。

[維基百科]而Spark由許多組成的要素,我們在此也只說明「Spark核心與彈性分散式資料集」Spark最主要的功能(核心)就是提供分散式任務調度而其基礎的程式抽象則稱為彈性分散式資料集(RDDs),是一個可以並列操作、有容錯機制的資料集合。

RDDs可以透過參照外部儲存系統的資料集建立(例如:共用檔案系統、HDFS、HBase或其他Hadoop資料格式的資料來源)。

或者是透過在現有RDDs的轉換而建立(比如:map、filter、reduce、join等等)。

RDD抽象化是經由一個以Scala,Java,Python的語言整合API所呈現,簡化了編程複雜性,應用程式操縱RDDs的方法類似於操縱本地端的資料集合。

,用途很廣泛,而我們這裡則使用於「分散式計算」,,因為資料非常的有規模,我們就必須 精簡來說,Hadoop是個很有效率的存儲方式,而Spark則是能夠迅速的將這些資料在未下載的情況下(仍在hadoop中),進行一系列資料轉換、篩選、排序、計算,如dyplr中select、group_by、arrange、filter,讓3億多筆資料的大數據如同僅有一萬筆資料的小數據一般,在一分鐘內完成運算。

如同我們剛剛講的,我們透過Spark在hadoop上進行資料的計算,你所命名計算之資料雖然也會出現在environment中,但是出現的名稱,僅僅只是hadoop上資料的connection,又稱「Handler connector」,在enviroment以list格式出現,負責連結在hadoop的大型資料,因此他們仍在hadoop上,必須要透過collect()此函數將數據抓取,你所命名之資料才會抓取到你的Rstudio的伺服器上,就可以直接的使用。

(搭配上圖會更清楚) 若仍不清楚的話,請直接依序操作下列CODE,手把手教學,讓你以實際動作直接學習、體會!!! ✿:導入基本設定大數據資料導入起手式,連接上sparkcontext(Spark的環境) library(dplyr) ## ##Attachingpackage:'dplyr' ##Thefollowingobjectsaremaskedfrom'package:stats': ## ##filter,lag ##Thefollowingobjectsaremaskedfrom'package:base': ## ##intersect,setdiff,setequal,union library(sparklyr) Sys.setenv(SPARK_HOME="/usr/local/spark/spark-2.1.0-bin-hadoop2.7/") config ##1349655789 colnames(TX) ##[1]"id""chain""dept" ##[4]"category""company""brand" ##[7]"date""productsize""productmeasure" ##[10]"purchasequantity""purchaseamount" ✿:進行篩選由此我們可以看到經過Spark的切割存儲之後,再大的數據都能夠使用dyplr套件的方式進行資料探勘,就猶如小資料的數據分析一般簡易操作。

我們一開始先使用Select將我們需要的欄位進行擷取,並且找出所求之各欄位不同之觀察值數量。

此時注意!!!我們將TX前兩百項觀察值從hadoop上抓取下來並且命名df,可以藉此查看有否執行collect()函數的差異。

在進行任何運算命名之後,只要沒有collect他都是屬於一種connection不會直接在R環境裡面。

select(TX,id:brand)%>%summarise_all(n_distinct) ###Source:lazyquery[??x6] ###Database:spark_connection ##idchaindeptcategorycompanybrand ## ##1311541134838363277335689 TX%>%head(200)%>%collect->df ✿:資料格式轉換我們將日期格式轉換,並且將資料依照id,date,chain進行分組,建立出id,date,chain、購買總數、特定產品購買數量與產品購買總金額之新資料框。

其中sdf_with_unique_id(id=‘order’)是將這些分組好的資料額外在設定一個「訂單編號」。

ORD=TX%>%mutate(date=as.Date(date))%>% group_by(id,date,chain)%>%summarise( items=n(), qty=sum(purchasequantity), amount=sum(purchaseamount) )%>%sdf_with_unique_id(id='order') ##Warning:MissingvaluesarealwaysremovedinSQL. ##Use`SUM(x,na.rm=TRUE)`tosilencethiswarning ##Warning:MissingvaluesarealwaysremovedinSQL. ##Use`SUM(x,na.rm=TRUE)`tosilencethiswarning #觀察數據筆數 count(ORD)#26496798order ###Source:lazyquery[??x1] ###Database:spark_connection ##n ## ##126496798 #我們想知道有多少不同的訂單,並且想找出離資料最早的訂單時間與最晚的訂單時間 ORD%>%summarise(n_distinct(order),min(date),max(date)) ##Warning:MissingvaluesarealwaysremovedinSQL. ##Use`MIN(x,na.rm=TRUE)`tosilencethiswarning ##Warning:MissingvaluesarealwaysremovedinSQL. ##Use`MAX(x,na.rm=TRUE)`tosilencethiswarning ##Warning:MissingvaluesarealwaysremovedinSQL. ##Use`MIN(x,na.rm=TRUE)`tosilencethiswarning ##Warning:MissingvaluesarealwaysremovedinSQL. ##Use`MAX(x,na.rm=TRUE)`tosilencethiswarning ###Source:lazyquery[??x3] ###Database:spark_connection ##`n_distinct(order)``min(date)``max(date)` ## ##1264967982012-03-012013-07-27 ✿:建立RFM模型分析RFM模型是衡量客戶價值和客戶創利能力的重要工具和手段。

該模型通過一個客戶的近期購買行為、購買的總體頻率以及花了多少錢三項指標來描述該客戶的價值狀況。

我們再電子商務平台上,能夠分析的資料包羅萬象,其中有三種重要的指標是能夠抓住主要獲利來源、顧客分布以及進行更深入行銷策略,此三個資料所建立的分析,又稱「RFM分析」。

R(Recency最近一次購買時間)、F(Frequence購買頻率)、M(Monetary交易金額) rfm=ORD%>%mutate(k=datediff(as.Date("2013-07-28"),date))%>% group_by(id)%>%summarise( recent=min(k), freq=n(), money=mean(amount) )%>%collect#資料筆數僅剩311541 ##Warning:MissingvaluesarealwaysremovedinSQL. ##Use`MIN(x,na.rm=TRUE)`tosilencethiswarning ##Warning:MissingvaluesarealwaysremovedinSQL. ##Use`AVG(x,na.rm=TRUE)`tosilencethiswarning #計算每位客人的訂單量26496798/311541=85.051orderspercustomer #另外建立一個除去掉離群值之後的rfm分析 rfm2=subset( rfm,money>=5&money<=250& recent>=1&recent<=200&freq<=400) 由上方rfm分析我們可以得知,在進行篩選、整理之後,原資料(raw data)所剩的資料筆數只剩下約30萬筆,相較原先3億筆資料已經大幅度的下降,而我們使用Spark所建立的資料皆放在我們「hadoop」伺服器上,因此先前使用都是以「hadoop」的方式以伺服器進行資料的篩選、運算,而不是直接地放在Rstudio的伺服器上或我們使用的電腦本機上,格式通常都會呈現List的狀態,並且無法觀看其細節資訊,再加上在rstudio上面可以進行更多視覺化、美化以及更有彈性的函數應用,因此當我們整理到一定大小之後,就要使用collect()函數,將資料從我們存放的伺服器上抓下來。

#將抓下來的資料進行視覺化的分析,我們可以看到資料的分布,可以著手進行資料的美化、觀察,篩減離群值 hist(rfm2$money) hist(rfm2$recent) hist(rfm2$freq) ✿:建立熱圖分析 #進行熱圖的外觀設定 library(d3heatmap);library(RColorBrewer) spectral=brewer.pal(11,"Spectral") #pivot下方會講解概念 mxCD=sdf_pivot(TX,chain~dept)%>%collect mxCD=as.matrix(mxCD);mxCD[is.na(mxCD)]=0;dim(mxCD) ##[1]13484 rownames(mxCD)=paste0('chain',mxCD[,1]);mxCD=mxCD[,-1] colnames(mxCD)=paste0('dept',colnames(mxCD)) dim(mxCD) ##[1]13483 #建立完美熱圖,要先看分布,若差異太大,取LOG可以改善。

range(mxCD);hist(mxCD) ##[1]03971119 range(log(1+mxCD));hist(log(1+mxCD)) ##[1]0.0000015.19456 d3heatmap(log(1+mxCD),col=rev(spectral)) 第二堂 本章主要內容 1.Pivot函數介紹2.抓取仍具規模的數據資料 ✿:樞紐分析有想過在一個超大資料分析時沒有table、tapply這兩個超爆好用的函數嗎?你猜對了!在Spark平台運算時,沒有辦法使用這兩個霹靂無敵好用加三級的函數,這時候就要使用sdf_pivot(資料,列呈現之變數觀察值~欄呈現之變數觀察值,function),這個功能與excel樞紐分析相似,會直接幫我們創建列連表,也就與table、tapply異曲同工之妙囉!但要注意的地方是,他會將"列變數觀察值"放入第一欄。

(P.S.sdf意思為SparkDataFrame) #tapply(TX$purchaseamount,list(TX$id,TX$dept),sum) mxUD=sdf_pivot(TX,id~dept,list(purchaseamount='sum')) count(mxUD) #311541 ###Source:lazyquery[??x1] ###Database:spark_connection ##n ## ##1311541 #為了之後重整欄位名稱,我們先將欄位獨立儲存起來。

nx=colnames(mxUD);nx ##[1]"id""0""1""2""3""4""5""6""7""8""9""10""11""12" ##[15]"13""14""15""16""17""18""19""20""21""22""23""24""25""26" ##[29]"27""28""29""30""31""32""33""34""35""36""37""38""39""40" ##[43]"41""42""43""44""45""46""47""49""50""51""53""54""55""56" ##[57]"57""58""59""60""62""63""64""65""67""69""70""71""72""73" ##[71]"74""75""78""79""81""82""91""92""94""95""96""97""98""99" ✿:篩選過後檔案仍太大,該如何解決呢? 這個時候就必須先將自己的資料上傳到hadoop之後,再進行抓取下來(我們以剛剛資料作為例子) step1:你先將所要存的資料寫上你的hadoop位置mxUD為要存儲數據,“hdfs:/home/m074111025/temp.csv”則為你的存儲位置,F則是將headr(表頭)設定不要儲存,因為我們資料室切碎的,如果表頭設定T的情況下,將會時不時出現表頭,導致數據變很混亂。

spark_write_csv(mxUD,"hdfs:/home/m074111032/temp.csv",F,mode='overwrite') ✿:存檔將hadoop上面的資料抓取後,設定所要存放的路徑。

(此為範例,要改為自己的路徑名稱) #建立terminal,並且找到自己工作區域的路徑,並在terminal區域打入下列Coding #hdfsdfs-getmerge/home/m074111025/temp.csvtemp.csv #hdfsdfs這個是說明你要做的動作是在hodoop上來操作 #而getmerge則是因為hadoop資料都是經過切割成碎片放上伺服器,因此你要抓取資料轉為csv檔案存儲時,必須要將這些碎片合併。

#/home/m074111025/temp.csv則是你存儲路徑 #temp.csv則為你所命名的檔案名稱 ✿:斷開連結當資料確定抓取下來後,為了避免伺服器資源持續地佔用,我們就會斷開連接。

spark_disconnect(sc) ✿:導入先前抓取並存到hadoop上的資料 pacman::p_load(readr) mx=read_csv('temp.csv',col_names=F) ##Parsedwithcolumnspecification: ##cols( ##.default=col_double(), ##X79=col_character() ##) ##Seespec(...)forfullcolumnspecifications. mx[,1:ncol(mx)]=lapply(mx[,1:ncol(mx)],as.numeric) mx[is.na(mx)]=0 colnames(mx)=c('id',paste0('dept',nx[2:84])) colSums(is.na(mx)) ##iddept0dept1dept2dept3dept4dept5dept6dept7dept8 ##0000000000 ##dept9dept10dept11dept12dept13dept14dept15dept16dept17dept18 ##0000000000 ##dept19dept20dept21dept22dept23dept24dept25dept26dept27dept28 ##0000000000 ##dept29dept30dept31dept32dept33dept34dept35dept36dept37dept38 ##0000000000 ##dept39dept40dept41dept42dept43dept44dept45dept46dept47dept49 ##0000000000 ##dept50dept51dept53dept54dept55dept56dept57dept58dept59dept60 ##0000000000 ##dept62dept63dept64dept65dept67dept69dept70dept71dept72dept73 ##0000000000 ##dept74dept75dept78dept79dept81dept82dept91dept92dept94dept95 ##0000000000 ##dept96dept97dept98dept99 ##0000 colSums(mx[,2:84])%>%sort%>%barplot q20=colSums(mx[,2:84])%>%quantile(0.2) table(colSums(mx[,2:84])>q20) ## ##FALSETRUE ##1766 x=left_join(rfm2[,"id",F],data.frame(mx)) ##Joining,by="id" identical(x$id,rfm2$id)#TRUE ##[1]TRUE x=x[,colSums(mx)>q20] x=x[,-1]%>%scale dim(x) ##[1]31038166 save(rfm,rfm2,x,file='rfm.rdata',compress=T) 💡:首先將所需要的packages套件下載下來並載入 pacman::p_load(d3heatmap,googleVis,readr,dplyr,RColorBrewer) 選擇所需要顏色11種 spectral=brewer.pal(11,"Spectral") 下載先前已儲存的rfm資料,裡頭有ID(顧客ID)、recent(最近一次購買離當前天數)、freq(購買頻率)、money(平均消費金額)四個欄位 load("rfm.rdata") dim(x) ##[1]31038166 ✿:Kmeans分群分析 使用setseed(10),將隨機數固定,接著使用kmeans分群方法,將顧客分為80群,並選擇“MacQueen”演算法 t0=Sys.time() gc();set.seed(10) ##used(Mb)gctrigger(Mb)maxused(Mb) ##Ncells93098549.8144229177.1144229177.1 ##Vcells50373784384.41371249101046.21370496131045.7 km=kmeans(x,80,algorithm="MacQueen") ##Warning:didnotconvergein10iterations Sys.time()-t0 ##Timedifferenceof2.834129mins 使用kmeans分群結果,近一步了解這80群的每一群內部的recent、freq、money的平均值,並用熱圖來呈現 km80=km$cluster table(km80)%>%sort ##km80 ##54126114556540435649 ##6975180208271295392491547614614659 ##4264960263276621708015 ##702831854891924986101210241029108811671293 ##591639486356183713233032 ##131413431403150515291533155915591575163716621690 ##5356745367794133284612 ##170818111841187819191977207322572346236523712387 ##76772944315768516987211 ##239624962670273928102827296230783109328036723787 ##242550782073177138104743 ##401540814102489249796058625869457000716771898620 ##1952587562347422 ##943110373144991569716649173992747832265 df=sapply(split(as.data.frame.matrix(x),km80),colMeans) range(df) ##[1]-0.966897731.2451342 log(1+df)%>%d3heatmap(colors=rev(spectral)) 進一步用df這個資料集統整每一群的平均購買頻率、平均消費金額、每群總收益、族群大小、平均收益、平均最近一次購買離當前天數 rfm2=cbind(rfm2,km80) df=rfm2%>%group_by(km80)%>%summarise( #avg_freq=mean(freq), '平均購買頻率'=mean(freq), '平均消費金額'=mean(money), '每群總收益'=sum(freq*money), '族群大小'=n(), '平均收益'=mean(freq*money), '平均最近一次購買離當前天數'=mean(recent), dummy=2013) GoogleMotionChartI🗿:請將flash開啟為“允許”,使用泡泡做資料視覺化並做調整與觀察族群間差異我們先將時間因素排除在外,而要建成泡泡圖時必須要設定X軸與Y軸變數,因此我們再給予一個dummy(虛擬)變數進行X軸度的缺口。

op=options(gvis.plot.tag='chart') plot(gvisMotionChart( df,"km80","dummy", options=list(width=720,height=480))) GoogleMotionChartII將時間因素考慮進去的動態泡泡圖 op=options(gvis.plot.tag='chart') df=rfm2%>% mutate(km80=sprintf("%02d",km80))%>% group_by(km80)%>%summarise( '平均購買頻率'=mean(freq), '平均購買金額'=mean(money), '集群總營收貢獻'=sum(freq*money), '集群大小'=n(), '平均營收貢獻'=mean(freq*money), '平均距今購買天數'=mean(recent), year=2013) plot(gvisMotionChart( df,"km80","year",options=list(width=800,height=600))) #plot(gvisMotionChart( #subset(df,group_size>=20&group_size<=1200), #"kg","dummy",options=list(width=800,height=600))) 🗿:問題:我們原先是依照dept(部門)作為我們的分群依據,那假若我們想改成以Brand(品牌)作為分類依據時,該如何做呢?



請為這篇文章評分?