Japan
サイト内の現在位置
NumPy互換数値演算ライブラリNLCPy NLCPy ndarray属性の取得、callback、そしてk-meansクラスタリング処理サンプル
no.0232022.12.2NLCPyのJITコンパイルではC/C++やFortranで記述した関数オブジェクトからNLCPy形式の配列に対する参照や、NumPy形式配列の操作を行うための手段が用意されています。
今回はこうした配列データに対する参照や処理を行う方法を説明します。また、JITコンパイルの説明の締めくくりとしてk-meansクラスタリングの一部をCコードで記載することで高速化したサンプルコードについてscikit-learnやNLCPyコードとの処理時間を比較しながら紹介します。
1. NLCPy ndarray属性の取得¶
前回(22回)使用したサンプルコードでは、関数を呼び出す際にデータの先頭を指すポインタとそのデータサイズを引数に指定しました。データサイズを関数に引き渡すことで、ループ処理の上限を設定して処理を繰り返すためです。
必要な値を引数で渡す代わりの方法に、関数からNLCPy ndarrayの属性にアクセスして配列のデータサイズや形状を読み取り、繰り返しの上限を関数の中で判断して設定する方法があります。
関数側からarray形状やサイズを取得することで、例えばreshapeやtransposeなどの配列操作によりarray形状が変化しても関数側でarrayの属性を判別して配列に対する処理が実行できるようになります。
参照できる属性として、NLCPy ndarrayの配列先頭アドレスへのポインタやストライドのサイズ、各データサイズの他に、配列次元数、配列のメモリレイアウト情報(C型/Fortran型)等がve_array C-structure構造体の形で用意されています。
2つのNLCPy形式の配列(単精度浮動小数点の2次元と倍精度浮動小数の3次元配列)を用意し、関数からそれらの属性を読み取ってPythonに結果を返す例をSample-1に示します。関数内で配列の次元数、配列サイズ、行間のストライドとデータサイズを読出した上で、Pythonスクリプト(ベクトルホスト側の処理)で表示しています。
なお、こうしたNLCPy ndarray属性に対するアクセスはC/C++から利用できます(Fortranは未対応)。
Sample-1¶
C言語ソース定義
# 配列の属性取得処理をおこなう関数のソースコード部
c_src=r'''
#include <nlcpy.h>
void ve_get_a(ve_array *x, ve_array *y,
ve_array *x_attributes,
ve_array *y_attributes) {
/* 配列へのポインタ取得 */
double *px = (double *)x->ve_adr;
double *py = (double *)y->ve_adr;
/* 配列xに対する属性取得:次元数、配列サイズ、ストライド、形状、 各データのバイト数 */
uint64_t *px_attr = (uint64_t *)x_attributes->ve_adr;
px_attr[0] = x->ndim;
px_attr[1] = x->size;
px_attr[2] = x->strides[x->ndim - 2];
px_attr[3] = x->strides[x->ndim - 1];
px_attr[4] = x->shape[x->ndim - 2];
px_attr[5] = x->shape[x->ndim - 1];
px_attr[6] = x->itemsize;
/* 配列yに対する属性取得:次元数、配列サイズ、ストライド、形状、各データのバイト数 */
uint64_t *py_attr = (uint64_t *)y_attributes->ve_adr;
py_attr[0] = y->ndim;
py_attr[1] = y->size;
py_attr[2] = y->strides[y->ndim - 3];
py_attr[3] = y->strides[y->ndim - 2];
py_attr[4] = y->strides[y->ndim - 1];
py_attr[5] = y->shape[y->ndim - 3];
py_attr[6] = y->shape[y->ndim - 2];
py_attr[7] = y->shape[y->ndim - 1];
py_attr[8] = y->itemsize;
}
'''
VEカーネル生成
import nlcpy
from nlcpy import veo
from nlcpy import ve_types
import numpy
ve_lib = nlcpy.jit.CustomVELibrary(code=c_src)
# 関数定義
ve_get_a = ve_lib.get_function(
've_get_a',
args_type=(ve_types.void_p, ve_types.void_p, ve_types.void_p, ve_types.void_p),
ret_type=ve_types.void
)
VEカーネル実行
x = nlcpy.arange(20, dtype='f4').reshape((4, 5))
y = nlcpy.arange(60, dtype='f8').reshape((3, 4, 5))
x_attr = nlcpy.empty(7, dtype='u8')
y_attr = nlcpy.empty(9, dtype='u8')
# 関数ve_get_a実行
ve_get_a(x, y, x_attr, y_attr)
配列x, yの各属性表示
"x->ndim: {}, x->size: {}, x->strides: {}, x->shape: {}, x->itemsize: {}".format(x_attr[0], x_attr[1], x_attr[2:4], x_attr[4:6], x_attr[6])
'x->ndim: 2, x->size: 20, x->strides: [20 4], x->shape: [4 5], x->itemsize: 4'
"y->ndim: {}, y->size: {}, y->strides: {}, y->shape: {}, y->itemsize: {}".format(y_attr[0], y_attr[1], y_attr[2:5], y_attr[5:8], y_attr[8])
'y->ndim: 3, y->size: 60, y->strides: [160 40 8], y->shape: [3 4 5], y->itemsize: 8'
次に、NLCPy ndarray属性を読み取り、配列間で処理する例をSample 2に示します。各配列サイズを関数への引数に指定する代わりに配列の各次元のストライドと形状を読出すことでループ処理を行います。以下の例ではカーネルの引数に渡す配列xは"T"アトリビュートを使用して転置した配列を入力しています。転置された配列は物理的なメモリ配置はそのままに、各次元の形状とストライドのみが入れ替わります。そのため、単純にデータが連続した配列として扱うことはできませんが、C言語の関数内で各次元のストライドを参照することでこのようなケースにも柔軟に対応することができます。
Sample-2¶
C言語ソース定義
c_src=r'''
#include <nlcpy.h>
void ve_add(ve_array *x, ve_array *y, ve_array *z) {
/* 配列のポインターを取得 */
double *px = (double *)x->ve_adr;
double *py = (double *)y->ve_adr;
double *pz = (double *)z->ve_adr;
/* 各配列におけるストライドを取得 */
uint64_t ix0 = x->strides[x->ndim-1] / x->itemsize;
uint64_t ix1 = x->strides[x->ndim-2] / x->itemsize;
uint64_t iy0 = y->strides[y->ndim-1] / y->itemsize;
uint64_t iy1 = y->strides[y->ndim-2] / y->itemsize;
uint64_t iz0 = z->strides[z->ndim-1] / z->itemsize;
uint64_t iz1 = z->strides[z->ndim-2] / z->itemsize;
/* 配列要素の足し合わせ */
#pragma omp parallel for
for (int i = 0; i < z->shape[z->ndim-2]; i++) {
for (int j = 0; j < z->shape[z->ndim-1]; j++) {
pz[i*iz1 + j*iz0] = px[i*ix1 + j*ix0] + py[i*iy1 + j*iy0];
}
}
}
'''
VEカーネル生成
ve_lib = nlcpy.jit.CustomVELibrary(code=c_src)
ve_add = ve_lib.get_function(
've_add',
args_type=(ve_types.void_p, ve_types.void_p, ve_types.void_p),
ret_type=ve_types.void
)
VEカーネル実行
x = nlcpy.arange(20, dtype='f8').reshape((5, 4))
y = nlcpy.arange(20, dtype='f8').reshape((4, 5))
z = nlcpy.arange(20, dtype='f8').reshape((4, 5))
ve_add(x.T, y, z)
配列x, y, zの各要素表示
x.T
array([[ 0., 4., 8., 12., 16.], [ 1., 5., 9., 13., 17.], [ 2., 6., 10., 14., 18.], [ 3., 7., 11., 15., 19.]])
x.T.strides
(8, 32)
y
array([[ 0., 1., 2., 3., 4.], [ 5., 6., 7., 8., 9.], [10., 11., 12., 13., 14.], [15., 16., 17., 18., 19.]])
y.strides
(40, 8)
z
array([[ 0., 5., 10., 15., 20.], [ 6., 11., 16., 21., 26.], [12., 17., 22., 27., 32.], [18., 23., 28., 33., 38.]])
2. on_stack命令によるNumPy ndarrayデータのベクトルエンジンへの転送¶
PythonのオブジェクトにはBuffer Protocolという仕組みが用意されています。Buffer Protocolを使用することによって、例えば異なるオブジェクト間でデータをコピーすることなく相互のオブジェクトから同じアドレスのデータにアクセスできるようになっています。
Buffer ProtocolについてAn Introduction to the Python Buffer Protocol (https://jakevdp.github.io/blog/2014/05/05/introduction-to-the-python-buffer-protocol/) に詳しい説明がありますが、巨大なデータのコピーが不要になることでメモリ消費量を抑制することができます。
オブジェクト間におけるデータ共有という概念に基づいて、NECはNumPy ndarrayとベクトルエンジンで動作するオブジェクト間でのデータ共有に拡張する枠組みを用意しています。
その一つがveo.Onstackです。PythonのBuffer Protocolとベクトルエンジン向けにNECが開発したpy-veoの組み合わにより、NumPyオブジェクトのデータをNLCPyオブジェクトに変換することなく、ベクトルエンジンで動作するプロセスから直接アクセスすることができます。
PythonのBuffer Protocolにおけるオブジェクト間のデータ共有と異なり、ベクトルエンジンで処理を行うデータは、NumPyオブジェクトが存在するベクトルホストのメモリからベクトルエンジンのメモリに転送されます。
しかし表面上はベクトルエンジンで実行された処理は自動的にNumPyオブジェクトのデータに反映されることから、あたかもベクトルホストの同じメモリアドレスを参照しているように振る舞います。
py-veoはベクトルホストで動作するPythonオブジェクトとベクトルエンジンのVeoの間でAPIを提供するものです。py-veo (https://github.com/SX-Aurora/py-veo) はgithubに公開されており、いくつかのPython Classが提供されます。
APIの一つであるveo.OnstackはPython Bufferインターフェースを使い、オブジェクトのbufferをベクトルエンジンへ転送したり、反対にベクトルエンジンからオブジェクトのbufferへの転送を容易にするものです。
Sample-3はveo.Onstackを使用したサンプルコードです。この例ではVEO_INTENT_INでbufferオブジェクトである'a'をベクトルエンジンのkernelが参照する前にベクトルエンジンのメモリにコピーします。
VEO_INTENT_OUTはベクトルエンジンのkernel処理後、ベクトルエンジンのメモリからbufferオブジェクト'b'にコピーしています。ベクトルエンジンの処理終了後、直ちにPythonオブジェクト'b'にはベクトルエンジンで処理された結果が出力されます。
Sample-3¶
src = r'''
#include <stdint.h>
void onstack_test(float *a, float *b) {
b[0] = a[0] + a[1];
}
'''
ve_lib = nlcpy.jit.CustomVELibrary(code=src)
on_stack = ve_lib.get_function(
'onstack_test',
args_type=(ve_types.void_p, ve_types.void_p),
ret_type=ve_types.void
)
a = numpy.array([1, 2], dtype='f4')
b = numpy.empty(1, dtype='f4')
print("a: {}, b: {}".format(a, b))
# OnStack命令を使用したBuffer dataのVH-VE間転送
on_stack(
veo.OnStack(a, inout=veo.INTENT_IN),
veo.OnStack(b, inout=veo.INTENT_OUT),
sync=True
)
print("a: {}, b: {}".format(a, b))
a: [1. 2.], b: [0.] a: [1. 2.], b: [3.]
3. C/C++/Fortran記述関数からcallback¶
Pythonでは、ある関数を利用する際にその関数から呼び出し元に用意した別の関数を呼び出すcallback機能が存在します。callbackを利用すると、例えば関数内の状況に応じてあらかじめ用意した別関数を呼び出して、異なる処理を実施することができるようになります。処理する内容毎に複数の関数を作らず、callbackする関数を関数の引数に指定することで異なる処理をcallback側で行います。
callbackはC/C++/Fortranで記述した関数においてもJITコンパイルすることで利用可能です。通常のPythonと同様に、callbackする関数を引数に含めることで、C/C++/Fortranの関数はその関数をcallbackします。callback関数の引数はVEカーネルの戻り値を受け取る必要があります。
Sample-4はその利用例です。callback_test()に対して処理するデータxの後でcallback=err_printとcallbackする関数を指定しています。callback_test()の中でデータxに対して属性確認した結果に応じて返値をerror codeとしてerr_print()関数をcallbackします。
この例ではベクトルエンジンにオフロードされる関数callback_test()の定義中に、ret_type=ve_types.uint64と返値に対するデータ型を指定しています。
ベクトルエンジンで実行される関数内の処理は連続した数値データに対する何らかの演算です。そのため、演算前の受け取ったデータ属性チェックや演算後の何らかの確認をcallback関数に送り処理することが、一般的な使用例になります。
Sample-4¶
callback関数とVEカーネルの定義
この例では、C言語とPython間でエラーコードを共有するために、string.Template()を使用して文字列から定数への変換を行っています。
import string
err = {
'ERR_OK': 0,
'ERR_MEMORY': 1,
'ERR_NDIM': 2,
'ERR_DTYPE': 3,
'ERR_CONTIGUOUS': 4,
}
temp = string.Template(r'''
#include <nlcpy.h>
#include <stdlib.h>
uint64_t callback_test(ve_array *x) {
double *px = (double *)x->ve_adr;
if (px == NULL) return ${ERR_MEMORY};
if (x->ndim != 1) return ${ERR_NDIM};
if (x->dtype != ve_f64) return ${ERR_DTYPE};
if (! (x->is_c_contiguous & x->is_f_contiguous)) return ${ERR_CONTIGUOUS};
/* do something here */
return ${ERR_OK};
}
''')
src = temp.substitute(err)
ve_lib = nlcpy.jit.CustomVELibrary(code=src)
callback_test = ve_lib.get_function(
'callback_test',
args_type=(ve_types.void_p,),
ret_type=ve_types.uint64
)
# callback関数を定義
def err_print(retval):
# reverse lookup
for k, v in err.items():
if retval == v:
print(k)
return
raise Exception
正常実行(ERR_OK)
x = nlcpy.arange(9, dtype='f8')
_ = callback_test(x, callback=err_print, sync=True)
ERR_OK
異なるデータ型を伴った実行(ERR_DTYPE)
x = nlcpy.arange(9, dtype='f4')
_ = callback_test(x, callback=err_print, sync=True)
ERR_DTYPE
非連続な配列を伴った実行(ERR_CONTIGUOUS)
x = nlcpy.arange(9, dtype='f8')[::2]
_ = callback_test(x, callback=err_print, sync=True)
ERR_CONTIGUOUS
4. k-meansクラスタリング処理におけるscikit-learn等との実行時間比較¶
JITコンパイル説明の最後に、k-meansクラスタリングのサンプルコードを紹介します。各クラスタの重心を求める処理をJITコンパイルで実施した他に、比較のためscikit-learnのライブラリを使用したものやNLCPyの関数で処理したコードとそれら処理時間を掲載しています。
これまでにNLCPyの新しい機能の説明を数回に分けて行ってきました。NLCPyをインポートすることで、ベクトルホストのCPUでPythonを実行しながら、数値処理をベクトルエンジンにオフロードすることが簡単に行えるようになります。
NLCPyに用意されるNumPy互換関数を使うだけでなく、独自にコンパイル言語を使用した関数を作成して利用することで、多岐にわたる処理をオフロードできるようになります。
SX-Aurora TUBASAは、データ前処理や可視化といった処理をPythonの様々なモジュールを通して処理し、同時に高速化したい数値演算部分ではNLCPyを使うといったヘテロジニアスな開発環境をご用意しています。ぜひこれをご利用いただき、開発期間の短縮につなげてください。
import numpy as np
import nlcpy as vp
from nlcpy import ve_types
from matplotlib import pyplot as plt
from sklearn.datasets import make_blobs
from sklearn.cluster import KMeans
import time
import contextlib
N_SAMPLES = 2000000 # The number of samples
N_CLUSTERS = 7 # The number of clusters
MAX_ITER = 300 # The number of maximum iterations
N_DRAW = 10000 # The number of samples for drawing
N_KMEANS_PP = 10 # The number of iterations for k-means++
src = r'''
#include <stdio.h>
#include <stdlib.h>
#include <nlcpy.h>
int64_t update_centroid(ve_array *label,
ve_array *s,
ve_array *c,
int64_t n_clusters) {
int64_t *plabel = (int64_t *)label->ve_adr;
double *ps = (double *)s->ve_adr;
double *pc = (double *)c->ve_adr;
if (plabel == NULL || ps == NULL || pc == NULL) return -1;
int32_t *pmask = (int32_t *)malloc(sizeof(int32_t) * n_clusters * label->shape[0]);
if (pmask == NULL) return -1;
double *psums = (double *)malloc(sizeof(double) * n_clusters * s->shape[0]);
if (psums == NULL) return -1;
int32_t *pcounts = (int32_t *)malloc(sizeof(int32_t) * n_clusters);
if (pcounts == NULL) return -1;
const int64_t il0 = label->strides[0] / label->itemsize;
const int64_t is0 = s->strides[0] / s->itemsize;
const int64_t is1 = s->strides[1] / s->itemsize;
const int64_t ic0 = c->strides[0] / c->itemsize;
const int64_t ic1 = c->strides[1] / c->itemsize;
// mask = (label == vp.arange(N_CLUSTERS)[:, None])
#pragma omp parallel for
for (int64_t i = 0; i < n_clusters; i++) {
for (int64_t j = 0; j < label->shape[0]; j++) {
pmask[i*label->shape[0] + j] = (plabel[j * il0] == i) ? 1 : 0;
}
}
// sums = vp.where(mask[:, None, :], s.T, 0).sum(axis=2)
#pragma omp parallel for
for (int64_t i = 0; i < n_clusters; i++) {
for (int64_t j = 0; j < s->shape[0]; j++) {
psums[i*s->shape[0] + j] = 0;
for (int64_t k = 0; k < s->shape[1]; k++) {
if (pmask[i*s->shape[1] + k] == 1) {
psums[i*s->shape[0] + j] += ps[j*is0 + k*is1];
}
}
}
}
// counts = mask.sum(axis=1).reshape((N_CLUSTERS, 1))
#pragma omp parallel for
for (int64_t i = 0; i < n_clusters; i++) {
pcounts[i] = 0;
for (int64_t j = 0; j < label->shape[0]; j++) {
pcounts[i] += pmask[i*label->shape[0] + j];
}
}
// c = sums / counts
for (int64_t i = 0; i < c->shape[0]; i++) {
#pragma _NEC novector
for (int64_t j = 0; j < c->shape[1]; j++) {
pc[i*ic0 + j*ic1] = psums[i*s->shape[0] + j] / pcounts[i];
}
}
free(pmask);
free(psums);
free(pcounts);
return 0;
}
'''
def draw(s, c, l, it):
# Plot the samples and centroids of the fitted clusters into an image file.
fig = plt.figure()
np.random.seed(777)
colors = np.random.rand(N_CLUSTERS, 3)
ind = np.random.randint(0, N_SAMPLES, N_DRAW)
s = s[ind]
l = l[ind]
plt.text(0, 0, 'number of iterations: {}'.format(it),
fontsize='large')
for i in range(N_CLUSTERS):
labels = s[l == i, :]
plt.scatter(labels[:, 0], labels[:, 1], color=colors[i, :])
plt.scatter(
c[:, 0], c[:, 1], s=120, marker='s', facecolors=colors,
edgecolors='k')
plt.show()
def kmeans_nlcpy(data, center, jit=True):
vp.request.flush()
t0 = time.time()
if jit:
ve_lib = vp.jit.CustomVELibrary(
code=src,
ftrace=True,
)
update_kernel = ve_lib.get_function(
'update_centroid',
args_type=(
ve_types.void_p,
ve_types.void_p,
ve_types.void_p,
ve_types.int64),
ret_type=ve_types.int64
)
data = vp.asarray(data)
center = vp.asarray(center)
label = vp.zeros(N_SAMPLES, dtype='i8')
for i in range(MAX_ITER):
# Estimate the distance and label for each samples
d = vp.linalg.norm(data[None, :, :] - center[:, None, :], axis=2)
label_new = vp.argmin(d, axis=0)
if vp.all(label == label_new):
break
label = label_new
if jit:
update_kernel(label, data.T, center, N_CLUSTERS)
else:
mask = (label == vp.arange(N_CLUSTERS)[:, None])
sums = vp.where(mask[:, None, :], data.T, 0).sum(axis=2)
counts = mask.sum(axis=1).reshape((N_CLUSTERS, 1))
center = sums / counts
vp.request.flush()
t1 = time.time()
name = 'NLCPy_JIT' if jit else 'NLCPy\t'
print('{}\t: {} [sec]'.format(name, t1 - t0))
draw(
data.get(),
center.get(),
label.get(),
i + 1,
)
def kmeans_sklearn(data, init_center):
k_means = KMeans(
random_state=111,
init=init_center,
n_clusters=N_CLUSTERS,
max_iter=MAX_ITER,
n_init=1,
tol=1e-9
)
t0 = time.time()
k_means.fit(data)
t1 = time.time()
print('Scikit-Learn\t: {} [sec]'.format(t1 - t0))
draw(
data,
k_means.cluster_centers_,
k_means.labels_,
k_means.n_iter_,
)
def kmeans_pp(data, n_clusters, n_samples):
center = np.empty((n_clusters, 2))
dist = np.zeros((n_clusters, n_samples))
center[0] = data[np.random.randint(0, n_samples)]
dist[0] = np.linalg.norm(data - center[0], axis=1)
ind_pool = np.arange(n_samples)
for i in range(1, n_clusters):
_dist = dist[:i].min(axis=0)
p = _dist / _dist.sum()
ind = np.random.choice(ind_pool, 1, p=p)
center[i] = data[ind]
dist[i] = np.linalg.norm(data - center[i], axis=1)
return center, dist.min(axis=0).sum()
if __name__ == '__main__':
assert N_SAMPLES >= N_CLUSTERS
assert N_SAMPLES >= N_DRAW
data, _ = make_blobs(
random_state=111,
n_samples=N_SAMPLES,
n_features=2,
cluster_std=.7,
centers=N_CLUSTERS,
)
print('k-means++', end='', flush=True)
np.random.seed(111)
init_center, min_score = kmeans_pp(data, N_CLUSTERS, N_SAMPLES)
for _ in range(N_KMEANS_PP-1):
center, score = kmeans_pp(data, N_CLUSTERS, N_SAMPLES)
if score < min_score:
init_center = center
min_score = score
print('.', end='', flush=True)
print('done')
print('initial-centroids')
draw(data, init_center, np.zeros(N_SAMPLES, dtype='i8'), 0)
kmeans_sklearn(data, init_center)
kmeans_nlcpy(data, init_center, jit=False)
kmeans_nlcpy(data, init_center, jit=True)
k-means++.........done initial-centroids
Scikit-Learn : 0.5512814521789551 [sec]
NLCPy : 0.47018861770629883 [sec]
NLCPy_JIT : 0.2472062110900879 [sec]
関連リンク
最新資料/カタログはこちらからダウンロード
ご紹介資料、各種カタログをダウンロードいただけます。
My NEC登録が必要です
My NEC登録が必要です
