在 New Relic,我們每分鐘都會收集到 13.7 億個數(shù)據(jù)點(diǎn)。我們?yōu)槲覀兊目蛻羰占⒎治龊驼故镜暮艽笠徊糠謹(jǐn)?shù)據(jù)都是時間序列數(shù)據(jù)。為了創(chuàng)建應(yīng)用與其它實(shí)體(比如服務(wù)器和容器)之間的關(guān)系,以便打造 New Relic Radar 這樣的新型智能產(chǎn)品,我們正在不斷探索更快更有效的對時間序列數(shù)據(jù)分組的方法。鑒于我們所收集的數(shù)據(jù)的量是如此巨大,更快的聚類時間至關(guān)重要。
加速 k-均值聚類
k-均值聚類是一種流行的分組數(shù)據(jù)的方法。k-均值方法的基本原理涉及到確定每個數(shù)據(jù)點(diǎn)之間的距離并將它們分組成有意義的聚類。我們通常使用平面上的二維數(shù)據(jù)來演示這個過程。以超過二維的方式聚類當(dāng)然是可行的,但可視化這種數(shù)據(jù)的過程會變得更為復(fù)雜。比如,下圖給出了 k-均值聚類在兩個任意維度上經(jīng)過幾次迭代的收斂情況:
不幸的是,這種方法并不能很好地用于時間序列數(shù)據(jù),因?yàn)樗鼈兺ǔJ请S時間變化的一維數(shù)據(jù)。但是,我們?nèi)匀豢梢允褂靡恍┎煌暮瘮?shù)來計算兩個時間序列數(shù)據(jù)之間的距離因子(distance factor)。在這些案例中,我們可以使用均方誤差(MSE)來探索不同的 k-均值實(shí)現(xiàn)。在測試這些實(shí)現(xiàn)的過程中,我們注意到很多實(shí)現(xiàn)的表現(xiàn)水平都有嚴(yán)重的問題,但我們?nèi)匀豢梢匝菔炯铀?k-均值聚類的可能方法,在某些案例中甚至能實(shí)現(xiàn)一個數(shù)量級的速度提升。
這里我們將使用 Python 的 NumPy 軟件包。如果你決定上手跟著練習(xí),你可以直接將這些代碼復(fù)制和粘貼到 Jupyter Notebook 中。讓我們從導(dǎo)入軟件包開始吧,這是我們一直要用到的東西:
import timeimport numpy as npimport matplotlib.pyplot as plt?%matplotlib inline
在接下來的測試中,我們首先生成 10000 個隨機(jī)時間序列數(shù)據(jù),每個數(shù)據(jù)的樣本長度為 500。然后我們向隨機(jī)長度的正弦波添加噪聲。盡管這一類數(shù)據(jù)對 k-均值聚類方法而言并不理想,但它足以完成未優(yōu)化的實(shí)現(xiàn)。
n = 10000ts_len = 500?phases = np.array(np.random.randint(0, 50, [n, 2]))pure = np.sin([np.linspace(-np.pi * x[0], -np.pi * x[1], ts_len) for x in phases])noise = np.array([np.random.normal(0, 1, ts_len) for x in range(n)])?signals = pure * noise?# Normalize everything between 0 and 1signals += np.abs(np.min(signals))signals /= np.max(signals)?plt.plot(signals[0])
第一個實(shí)現(xiàn)
讓我們從最基本和最直接的實(shí)現(xiàn)開始吧。euclid_dist 可以為距離函數(shù)實(shí)現(xiàn)一個簡單的 MSE 估計器,k_means 可以實(shí)現(xiàn)基本的 k-均值算法。我們從我們的初始數(shù)據(jù)集中選擇了 num_clust 隨機(jī)時間序列數(shù)據(jù)作為質(zhì)心(代表每個聚類的中心)。在 num_iter 次迭代的過程中,我們會持續(xù)不斷地移動質(zhì)心,同時最小化這些質(zhì)心與其它時間序列數(shù)據(jù)之間的距離。
def euclid_dist(t1, t2):? ? ? ? return np.sqrt(((t1-t2)**2).sum())def k_means(data, num_clust, num_iter):? ?centroids = signals[np.random.randint(0, signals.shape[0], num_clust)]?? ?for n in range(num_iter):? ? ? ?assignments={}? ? ? ?for ind, i in enumerate(data):? ? ? ? ? ?min_dist = float('inf')? ? ? ? ? ?closest_clust = None? ? ? ? ? ?for c_ind, j in enumerate(centroids):? ? ? ? ? ? ? ?dist = euclid_dist(i, j)? ? ? ? ? ? ? ?if dist < min_dist:? ? ? ? ? ? ? ? ? min_dist = dist? ? ? ? ? ? ? ? ? closest_clust = c_ind?? ? ? ? ? ?if closest_clust in assignments:? ? ? ? ? ? ? ?assignments[closest_clust].append(ind)? ? ? ? ? ?else:? ? ? ? ? ? ? ?assignments[closest_clust]=[]? ? ? ? ? ? ? ?assignments[closest_clust].append(ind)?? ? ? ?for key in assignments:? ? ? ? ? ?clust_sum = 0? ? ? ? ? ?for k in assignments[key]:? ? ? ? ? ? ? ?clust_sum = clust_sum + data[k]? ? ? ? ? ?centroids[key] = [m / len(assignments[key]) for m in clust_sum]?? ?return centroidst1 = time.time()centroids = k_means(signals, 100, 100)t2 = time.time()print("Took {} seconds".format(t2 - t1))Took 1138.8745470046997 seconds
聚類這些數(shù)據(jù)用去了接近 20 分鐘。這不是很糟糕,但肯定算不上好。為了在下一個實(shí)現(xiàn)中達(dá)到更快的速度,我們決定去掉盡可能多的 for 循環(huán)。
向量化的實(shí)現(xiàn)
使用 NumPy 的一大優(yōu)勢是向量化運(yùn)算。(如果你不太了解向量化運(yùn)算,請參考這個鏈接:http://www.scipy-lectures.org/intro/numpy/operations.html)
k-均值算法要求每個質(zhì)心和數(shù)據(jù)點(diǎn)都成對地進(jìn)行比較。這意味著在我們之前的迭代中,我們要將 100 個質(zhì)心和 10000 個時間序列數(shù)據(jù)分別進(jìn)行比較,也就是每次迭代都要進(jìn)行 100 萬次比較。請記住每次比較都涉及到兩個包含 500 個樣本的集合。因?yàn)槲覀兊?100 次,那就是說我們總共比較了 1 億次——對于單個 CPU 而言算是相當(dāng)大的工作量了。盡管 Python 是一種還算高效的語言,但效率還趕不上用 C 語言寫的指令。正是由于這個原因,NumPy 的大部分核心運(yùn)算都是用 C 語言寫的,并且還進(jìn)行了向量化以最小化由循環(huán)帶來的計算開銷。
我們來探索一下我們可以如何向量化我們的代碼,從而去掉盡可能多的循環(huán)。
首先,我們將代碼分成不同的功能模塊。這能讓我們更好地理解每個部分所負(fù)責(zé)的工作。接下來,我們修改 calc_centroids 步驟以便僅在質(zhì)心上迭代(而不是在每個時間序列數(shù)據(jù)上)。這樣,我們將所有時間序列數(shù)據(jù)和一個質(zhì)心傳遞給 euclid_dist。我們還可以預(yù)先分配 dist 矩陣,而不是將其當(dāng)成一個詞典進(jìn)行處理并隨時間擴(kuò)展它。NumPy 的 argmin 可以一次性比較每個向量對。
在 move_centroids 中,我們使用向量運(yùn)算去掉了另一個 for 循環(huán),而且我們只在獨(dú)特的質(zhì)心集上迭代。如果我們丟失了一個質(zhì)心,我們就通過從我們的時間序列數(shù)據(jù)集中進(jìn)行隨機(jī)選擇來加入合適的數(shù)字(這在實(shí)際應(yīng)用的實(shí)踐中很罕見)。
最后,我們添加一個提前停止(early stopping)來檢查 k_means——如果質(zhì)心不再更新,就停止迭代。
來看看代碼:
def euclid_dist(t1, t2):? ?return np.sqrt(((t1-t2)**2).sum(axis = 1))?def calc_centroids(data, centroids):? ?dist = np.zeros([data.shape[0], centroids.shape[0]])?? ?for idx, centroid in enumerate(centroids):? ? ? ?dist[:, idx] = euclid_dist(centroid, data)?? ?return np.array(dist)?def closest_centroids(data, centroids):? ?dist = calc_centroids(data, centroids)? ?return np.argmin(dist, axis = 1)?def move_centroids(data, closest, centroids):? ?k = centroids.shape[0]? ?new_centroids = np.array([data[closest == c].mean(axis = 0) for c in np.unique(closest)])?? ?if k - new_centroids.shape[0] > 0:? ? ? print("adding {} centroid(s)".format(k - new_centroids.shape[0]))? ? ? additional_centroids = data[np.random.randint(0, data.shape[0], k - new_centroids.shape[0])]? ? ? new_centroids = np.append(new_centroids, additional_centroids, axis = 0)?? ?return new_centroids?def k_means(data, num_clust, num_iter):? ?centroids = signals[np.random.randint(0, signals.shape[0], num_clust)]? ?last_centroids = centroids?? ?for n in range(num_iter):? ? ? ?closest = closest_centroids(data, centroids)? ? ? ?centroids = move_centroids(data, closest, centroids)? ? ? ?if not np.any(last_centroids != centroids):? ? ? ? ? print("early finish!")? ? ? ? ? break? ? ? ?last_centroids = centroids?? ?return centroids
t1 = time.time()centroids = k_means(signals, 100, 100)t2 = time.time()print("Took {} seconds".format(t2 - t1))adding 1 centroid(s)early finish!took 206.72993397712708 seconds
耗時 3.5 分鐘多一點(diǎn)。很不錯!但我們還想完成得更快。
k-means++ 實(shí)現(xiàn)
我們的下一個實(shí)現(xiàn)使用了 k-means++ 算法。這個算法的目的是選擇更優(yōu)的初始質(zhì)心。讓我們看看這種優(yōu)化方法有沒有用……
def init_centroids(data, num_clust):? ?centroids = np.zeros([num_clust, data.shape[1]])? ?centroids[0,:] = data[np.random.randint(0, data.shape[0], 1)]?? ?for i in range(1, num_clust):? ? ? ?D2 = np.min([np.linalg.norm(data - c, axis = 1)**2 for c in centroids[0:i, :]], axis = 0)? ? ? ?probs = D2/D2.sum()? ? ? ?cumprobs = probs.cumsum()? ? ? ?ind = np.where(cumprobs >= np.random.random())[0][0]? ? ? ?centroids[i, :] = np.expand_dims(data[ind], axis = 0)?? ?return centroids?def k_means(data, num_clust, num_iter):? ?centroids = init_centroids(data, num_clust)? ?last_centroids = centroids?? ?for n in range(num_iter):? ? ? ?closest = closest_centroids(data, centroids)? ? ? ?centroids = move_centroids(data, closest, centroids)? ? ? ?if not np.any(last_centroids != centroids):? ? ? ? ? ?print("Early finish!")? ? ? ? ? ?break? ? ? ?last_centroids = centroids?? ?return centroids
t1 = time.time()centroids = k_means(signals, 100, 100)t2 = time.time()print("Took {} seconds".format(t2 - t1))early finish!took 180.91435194015503 seconds
相比于我們之前的迭代,加入 k-means++ 算法能得到稍微好一點(diǎn)的性能。但是,當(dāng)我們將其并行化之后,這種優(yōu)化方法才真正開始帶來顯著回報。
?并行實(shí)現(xiàn)
到目前為止,我們所有的實(shí)現(xiàn)都是單線程的,所以我們決定探索 k-means++ 算法的并行化部分。因?yàn)槲覀冊谑褂?Jupyter Notebook,所以我們選擇使用用于并行計算的 ipyparallel 來管理并行性(ipyparallel 地址:https://github.com/ipython/ipyparallel)。使用 ipyparallel,我們不必?fù)?dān)心整個服務(wù)器分叉,但我們需要解決一些特殊問題。比如說,我們必須指示我們的工作器節(jié)點(diǎn)加載 NumPy。
import ipyparallel as ippc = ipp.Client()v = c[:]v.use_cloudpickle()?with v.sync_imports():? ?import numpy as np
關(guān)于加載工作器更多詳情,請參閱 ipyparallel 上手指南:https://ipyparallel.readthedocs.io/en/latest/
在這一個實(shí)現(xiàn)中,我們的重點(diǎn)放在并行化的兩個方面。首先,calc_centroids 有一個在每個質(zhì)心上迭代并將其與我們的時間序列數(shù)據(jù)進(jìn)行比較的循環(huán)。我們使用了 map_sync 來將這些迭代中的每一個發(fā)送到我們的工作器。
接下來,我們并行化 k-means++ 質(zhì)心搜索中一個相似的循環(huán)。注意其中對 v.push 的調(diào)用:因?yàn)槲覀兊?lambda 引用的數(shù)據(jù),我們需要確保它在工作器節(jié)點(diǎn)上是可用的。我們通過調(diào)用 ipyparallel 的 push 方法來將該變量復(fù)制到工作器的全局范圍中,從而實(shí)現(xiàn)了這一目標(biāo)。
看看代碼:
def calc_centroids(data, centroids):? ? ? ? return np.array(v.map_sync(lambda x: np.sqrt(((x - data)**2).sum(axis = 1)), centroids))def closest_centroids(points, centroids):? ?dist = calc_centroids(points, centroids)? ?return np.argmin(dist, axis=0)?def init_centroids(data, num_clust):? ?v.push(dict(data=data))? ?centroids = np.zeros([num_clust, data.shape[1]])? ?centroids[0,:] = data[np.random.randint(0, data.shape[0], 1)]? ?for i in range(1, num_clust):? ? ? ?D2 = np.min(v.map_sync(lambda c: np.linalg.norm(data - c, axis = 1)**2, centroids[0:i,:]), axis = 0)? ? ? ?probs = D2/D2.sum()? ? ? ?cumprobs = probs.cumsum()? ? ? ?ind = np.where(cumprobs >= np.random.random())[0][0]? ? ? ?centroids[i, :] = np.expand_dims(data[ind], axis = 0)?? ?return centroids
t1 = time.time()centroids = k_means(signals, 100, 100)t2 = time.time()print("Took {} seconds".format(t2 - t1))adding 2 centroid(s)early finish!took 143.49819207191467 seconds
結(jié)果只有兩分鐘多一點(diǎn),這是我們目前實(shí)現(xiàn)的最快速度!
接下來:更快!
在這些測試中,我們都只使用了中央處理器(CPU)。CPU 能提供方便的并行化,但我們認(rèn)為再多花點(diǎn)功夫,我們就可以使用圖形處理器(GPU)來實(shí)現(xiàn)聚類,且速度將得到一個數(shù)量級的提升。我們也許可以使用 TensorFlow 來實(shí)現(xiàn),這是一個用于數(shù)值計算和機(jī)器學(xué)習(xí)的開源軟件。實(shí)際上,TensorFlow 已經(jīng)包含了 k-均值實(shí)現(xiàn),但我們基本上肯定還是需要對其進(jìn)行調(diào)整才能將其用于時間序列聚類。不管怎樣,我們都不會停下尋找更快更高效的聚類算法的步伐,以幫助管理我們的用戶的數(shù)據(jù)。
- 特朗普宣布200億美元投資計劃,在美國多地建設(shè)數(shù)據(jù)中心
- 工信部:“點(diǎn)、鏈、網(wǎng)、面”體系化推進(jìn)算力網(wǎng)絡(luò)工作 持續(xù)提升算網(wǎng)綜合供給能力
- 2025年超融合基礎(chǔ)設(shè)施的4大趨勢
- 2025年將影響數(shù)據(jù)中心的5個云計算趨勢
- 80萬輛大眾汽車因AWS云配置錯誤導(dǎo)致數(shù)據(jù)泄露,包含“高精度”位置記錄
- 名創(chuàng)優(yōu)品超4000家門店接入“碰一下”支付,引爆年輕消費(fèi)熱潮
- 免稅店也能用“碰一下”支付了!中免海南免稅店:碰一下就優(yōu)惠
- 報告:人工智能推動數(shù)據(jù)中心系統(tǒng)支出激增25%
- 密態(tài)計算技術(shù)助力農(nóng)村普惠金融 螞蟻密算、網(wǎng)商銀行項(xiàng)目入選大數(shù)據(jù)“星河”案例
- 專利糾紛升級!Netflix就虛擬機(jī)專利侵權(quán)起訴博通及VMware
免責(zé)聲明:本網(wǎng)站內(nèi)容主要來自原創(chuàng)、合作伙伴供稿和第三方自媒體作者投稿,凡在本網(wǎng)站出現(xiàn)的信息,均僅供參考。本網(wǎng)站將盡力確保所提供信息的準(zhǔn)確性及可靠性,但不保證有關(guān)資料的準(zhǔn)確性及可靠性,讀者在使用前請進(jìn)一步核實(shí),并對任何自主決定的行為負(fù)責(zé)。本網(wǎng)站對有關(guān)資料所引致的錯誤、不確或遺漏,概不負(fù)任何法律責(zé)任。任何單位或個人認(rèn)為本網(wǎng)站中的網(wǎng)頁或鏈接內(nèi)容可能涉嫌侵犯其知識產(chǎn)權(quán)或存在不實(shí)內(nèi)容時,應(yīng)及時向本網(wǎng)站提出書面權(quán)利通知或不實(shí)情況說明,并提供身份證明、權(quán)屬證明及詳細(xì)侵權(quán)或不實(shí)情況證明。本網(wǎng)站在收到上述法律文件后,將會依法盡快聯(lián)系相關(guān)文章源頭核實(shí),溝通刪除相關(guān)內(nèi)容或斷開相關(guān)鏈接。