# 算子实现

#### 算子代码实现

1. 接口定义。

```def scatter_nd_add(var,
indices,
var_out,
use_locking=False,
scatter_nd.scatter_operator()```

主要包括以下关键点：

• 定义Scatter类，并在初始化函数中进行tiling参数的计算，请参考2

2. tiling参数计算。

```class Scatter():
def __init__(self, var, indices, updates, var_out, nd_flag, kernel_name,
compute_type):
# 初始化tik容器
if cce.CceProductParams().cce_product in ("1.1", "1.3"):
self.product_name = "mini"
elif cce.CceProductParams().cce_product == "1.60":
self.product_name = "cloud"
else:
raise RuntimeError(
"scatter compute only support target:cloud_v100/mini_v100")
self.tik_instance = tik.Tik(tik.Dprofile("v100", self.product_name))
self.nd_flag = nd_flag
#初始化三个输入的shape和数据类型
self.var_shape = var.get("shape")
self.var_dtype = var.get("dtype").lower()
self.indices_shape = indices.get("shape")
self.indices_dtype = indices.get("dtype").lower()
# 计算三个输入的tensor的大小
self.var_ele_num = functools_reduce(lambda x, y: x * y, self.var_shape)
self.indices_num = functools_reduce(lambda x, y: x * y,
self.indices_shape)
self.updates_num = functools_reduce(lambda x, y: x * y,
self.kernel_name = kernel_name
self.check_param(var_out)
# 计算index的个数和最大值，用于遍历和分核。ND和非ND场景按不同的分支计算
if nd_flag:
if self.indices_shape[-1] == len(self.var_shape):
self.update_data_num = 1
else:
self.update_data_num = functools_reduce(
lambda x, y: x * y, self.var_shape[self.indices_shape[-1]:])
self.max_indice = functools_reduce(
lambda x, y: x * y, self.var_shape[0:self.indices_shape[-1]])
self.index_dims = self.indices_shape[-1]
else:
if len(self.var_shape) > 1:
self.update_data_num = functools_reduce(lambda x, y: x * y,
self.var_shape[1:])
else:
self.update_data_num = 1
self.max_indice = self.var_shape[0]
self.index_dims = 1
self.compute_type = compute_type
# 获取UB buffer空间大小，并计算一个block可以存储多少相应数据类型的数据
self.ub_size_bytes = (
cce.CceProductParams().getParams("Unified_Buffer") - 8192)
self.var_dtype_bytes_size = cce.cce_intrin.get_bit_len(
self.var_dtype) // 8
self.indices_dtype_bytes_size = cce.cce_intrin.get_bit_len(
self.indices_dtype) // 8
self.var_data_each_block = 32 // self.var_dtype_bytes_size
self.indices_data_each_block = 32 // self.indices_dtype_bytes_size
self.indices_ub_number = 0

self.index_loop_num = 0

self.max_num_one_repeat = 128
if self.var_dtype in ("float32", "int32"):
self.max_num_one_repeat = 64
if self.update_data_num < self.var_data_each_block:
self.block_num = 1
else:
ai_core_num = tik.Dprofile("v100",
self.product_name).get_aicore_num()
self.indice_step = math.ceil(self.max_indice / ai_core_num)
self.block_num = math.ceil(self.max_indice / self.indice_step)
# 定义输入和输出在GM中的tensor
self.var_gm = self.tik_instance.Tensor(
self.var_dtype, self.var_shape, name="var_gm", scope=tik.scope_gm)
self.indices_gm = self.tik_instance.Tensor(
self.indices_dtype,
self.indices_shape,
name="indices_gm",
scope=tik.scope_gm)
scope=tik.scope_gm)
self.out_gm = self.tik_instance.Tensor(
self.var_dtype, self.var_shape, name="out_gm", scope=tik.scope_gm)

self.vconv_dst_dtype = "float16"

self.init_ub_tensor_para()
self.var_vconv_ub = None
self.var_tile_vconv_ub = None

self.var_ub = None
self.indices_ub = None
self.var_tile_ub = None

self.indices_loop_index = None
self.indices_tmp = None

# 计算UB大小的划分，根据输入的shape大小和数据类型计算
def init_ub_tensor_para(self):
indices_size_bytes = self.indices_dtype_bytes_size * self.indices_num
need_vconv_dtype = ("int8", "uint8")
# update数据类型为int8或者uint8时的计算方法
if self.var_dtype in need_vconv_dtype:
vconv_dtype_bytes_size = cce.cce_intrin.get_bit_len(
self.vconv_dst_dtype)
vconv_data_each_block = 32 // vconv_dtype_bytes_size
vconv_size_bytes = (
vconv_dtype_bytes_size)
# 当update和var分片能在UB上放下时优先存储这两个数据
if (updates_size_bytes + vconv_size_bytes) * 2 < (
self.ub_size_bytes * 0.9):
self.update_data_num /
self.var_data_each_block) * self.var_data_each_block

self.vconv_ub_number = math.ceil(
self.update_data_num /
vconv_data_each_block) * vconv_data_each_block

self.indices_ub_number = (
self.ub_size_bytes - updates_size_bytes * 2 -
vconv_size_bytes * 2) // self.indices_dtype_bytes_size

self.indices_ub_number = math.ceil(
self.indices_ub_number /
self.indices_data_each_block) * self.indices_data_each_block
# 当update和var分片在UB上放不下时，如果indices能放下，优先存储indices数据
elif indices_size_bytes < (self.ub_size_bytes * 0.9):
self.indices_ub_number = math.ceil(
self.indices_num /
self.indices_data_each_block) * self.indices_data_each_block
self.ub_size_bytes -
indices_size_bytes) // self.var_dtype_bytes_size // 6

self.var_data_each_block) * self.var_data_each_block

self.vconv_ub_number = math.ceil(
vconv_data_each_block) * vconv_data_each_block
# 都放不下时，UB内存对半分
else:
self.updates_ub_number = (self.ub_size_bytes // 2 //
(vconv_dtype_bytes_size +
self.var_dtype_bytes_size) // 2 //
self.var_data_each_block *
self.var_data_each_block)
self.indices_ub_number = (self.ub_size_bytes //
self.indices_dtype_bytes_size // 2 //
self.var_data_each_block *
self.var_data_each_block)
# update数据类型非int8或者uint8时的处理方法
else:
# 当update和var分片能在UB上放下时优先存储这两个数据
if updates_size_bytes * 2 < self.ub_size_bytes * 0.9:
self.update_data_num /
self.var_data_each_block) * self.var_data_each_block
self.indices_ub_number = (
self.ub_size_bytes -
self.indices_ub_number = math.ceil(
self.indices_ub_number /
self.indices_data_each_block) * self.indices_data_each_block
if self.indices_num < self.indices_ub_number:
self.indices_ub_number = math.ceil(
self.indices_num / self.indices_data_each_block
) * self.indices_data_each_block
# 当update和var分片在UB上放不下时，如果indices能放下，优先存储indices数据
elif indices_size_bytes < self.ub_size_bytes * 0.9:
self.indices_ub_number = math.ceil(
self.indices_num /
self.indices_data_each_block) * self.indices_data_each_block

self.ub_size_bytes -
indices_size_bytes) // 2 // self.var_dtype_bytes_size

self.var_data_each_block) * self.var_data_each_block
# 都放不下时，UB内存对半分
else:
self.indices_ub_number = (self.ub_size_bytes //
self.indices_dtype_bytes_size // 2 //
self.indices_data_each_block *
self.indices_data_each_block)
self.updates_ub_number = (self.indices_ub_number // 2 //
self.var_data_each_block *
self.var_data_each_block)

if (last_num < self.var_data_each_block and

3. 计算过程实现。

根据tiling的计算结果，我们判断要不要使用多核。如果要使用多核，就需要设置多核循环。并且定义UB tensor的操作必须定义在多核循环内，防止编译时出现冲突。对于多核场景，每次循环都会遍历输入张量indices，在计算出index后判断该index是否在当前核的处理范围内再进行计算。
```    def scatter_operator(self):
# 根据tiling计算结果判断能否开多核，如果需要开多核，需要指定多核循环
if self.block_num > 1:
with self.tik_instance.for_range(
0, self.block_num,
block_num=self.block_num) as indices_loop_index:
# 初始化UB中的tensor
self.init_ub_tensor()
self.indices_loop_index.set_as(indices_loop_index)
# 遍历indices索引计算
self.traversing_indices()
else:
self.init_ub_tensor()
self.traversing_indices()

# 通过BuildCCE接口进行算子编译，最终生成算子目标文件.o与算子描述文件.json
self.tik_instance.BuildCCE(
kernel_name=self.kernel_name,
outputs=(self.out_gm),
enable_l2=False)

return self.tik_instance```
1. traversing_indices函数定义。
```    def traversing_indices(self):
# 计算indices需要分多少次搬入UB进行遍历，根据给indices分配的UB大小来计算
max_ub_idx_num = (self.indices_ub_number // self.index_dims *
self.index_dims)
indices_loop_num = self.indices_num // max_ub_idx_num

if indices_loop_num > 0:
with self.tik_instance.for_range(
0, indices_loop_num) as indices_loop_index:
max_ub_idx_num)
# 遍历的尾巴，或者只需要一次搬入遍历的场景
indices_last_num = self.indices_num % max_ub_idx_num
if indices_last_num > 0:
indices_last_num)```
```    def updates_the_var(self, indices_in_index, indice_num):
# 计算数据搬运的burst_len
indices_burst_len = math.ceil(indice_num / self.indices_data_each_block)
# 将indices搬运到UB
if self.indices_num == 1:
self.tik_instance.data_move(self.indices_ub, self.indices_gm, 0, 1,
indices_burst_len, 0, 0)
else:
self.tik_instance.data_move(self.indices_ub,
self.indices_gm[indices_in_index], 0, 1,
indices_burst_len, 0, 0)
if self.nd_flag:
indice_loop_num = indice_num // self.indices_shape[-1]
else:
indice_loop_num = indice_num
# 遍历搬运到UB的indices
with self.tik_instance.for_range(0,
indice_loop_num) as indices_ub_index:
if self.block_num > 1:
# 判断index是否在当前核的计算范围内，如果在，进行对应的计算
with self.tik_instance.if_scope(
self.indices_loop_index *
with self.tik_instance.if_scope(
(self.indices_loop_index + 1) *
if self.nd_flag:
indices_in_index = indices_in_index // \
self.indices_shape[
-1]
indices_in_index)
self.update_data_num)
# 计算update和var的函数
else:
if self.nd_flag:
indices_in_index = indices_in_index // self.indices_shape[
-1]
self.update_data_num)

if last_num > 0:
last_num)```
```     def calc_updates_small(self, read_index_offset, element_num):
# 计算一次搬运到UB的burst_len参数
# 将需要更新的var分片搬运到UB buffer
self.tik_instance.data_move(
self.tik_instance.data_move(
# 计算非32B对齐场景尾巴的数据有多少，需要两次计算和搬运防止写覆盖
tile_ele_num = element_num % self.var_data_each_block
align_offset = 0
# 非32B对齐，且大于32B的场景进行计算。并将计算结果搬出
if (tile_ele_num != 0 and
self.update_data_num > self.var_data_each_block):
align_ele_num = (
element_num // self.var_data_each_block *
self.var_data_each_block)
align_offset = (
(self.var_data_each_block - tile_ele_num))
self.tik_instance.data_move(
self.var_tile_ub,
self.var_gm[self.var_read_index + align_offset], 0, 1, 1, 0, 0)

self.tik_instance.data_move(

compute_loop = element_num // self.max_num_one_repeat // 255
if compute_loop > 0:
with self.tik_instance.for_range(0, compute_loop) as index:
index_offset = index * self.max_num_one_repeat * 255
self.calc_process(self.max_num_one_repeat, index_offset,
index_offset, index_offset, 255, False)
last_loop = element_num % (self.max_num_one_repeat *
255) // self.max_num_one_repeat

if last_loop > 0:
index_offset = compute_loop * self.max_num_one_repeat * 255
self.calc_process(self.max_num_one_repeat, index_offset,
index_offset, index_offset, last_loop, False)

index_offset = (
element_num // self.max_num_one_repeat *
self.max_num_one_repeat)
# 32B对齐场景，只需要将数据一次搬出去
if (tile_ele_num == 0 or
self.update_data_num < self.var_data_each_block):
index_offset, 1, False)

self.tik_instance.data_move(
self.var_ub, 0, 1, updates_burst_len, 0, 0)
# 非32B对齐场景，需要把对齐部分和非对齐部分分两次计算，然后搬出
else:
self.calc_process(self.var_data_each_block, 0, 0, 0, 1, True)
self.tik_instance.data_move(
self.var_tile_ub, 0, 1, 1, 0, 0)
index_offset, 1, False)
self.tik_instance.data_move(
self.var_ub, 0, 1, updates_burst_len - 1, 0, 0)
else:
self.tik_instance.data_move(
self.var_ub, 0, 1, updates_burst_len, 0, 0)```
4. calc_process函数定义。
```    def calc_process(self, mask, dest_addr, src_addr1, src_addr2, repeat_times,
is_tile):
need_vconv_dtype = ("int8", "uint8")
# 对于int8和uint8数据类型，需要进行转换后再进行计算
if self.var_dtype in need_vconv_dtype:
if is_tile:
repeat_times, 8, 4)
repeat_times, 8, 4)
compute_repeat_strid = 8
src1_ub = self.var_tile_vconv_ub
dst_ub = self.var_tile_vconv_ub
else:
repeat_times, 8, 4)
compute_repeat_strid = 8

else:
if is_tile:
compute_repeat_strid = (
self.max_num_one_repeat // self.var_data_each_block)
src1_ub = self.var_tile_ub
dst_ub = self.var_tile_ub
else:
compute_repeat_strid = (
self.max_num_one_repeat // self.var_data_each_block)

compute_repeat_strid,
compute_repeat_strid, compute_repeat_strid)
elif self.compute_type == "vsub":
compute_repeat_strid,
compute_repeat_strid, compute_repeat_strid)
else:
raise RuntimeError("the operater [%s] is not supported" %
self.compute_type)
if self.var_dtype in need_vconv_dtype:
if is_tile:
self.var_tile_vconv_ub, repeat_times, 4, 8)
else:
repeat_times, 4, 8)```

#### 算子原型定义

```#ifndef GE_OP_ARG_MAX_H
#define GE_OP_ARG_MAX_H
#include "graph/operator_reg.h"

namespace ge {
.INPUT(var, TensorType({DT_FLOAT16, DT_FLOAT,DT_INT32,DT_INT8,DT_UINT8}))
.INPUT(indices, TensorType::IndexNumberType())
.OUTPUT(var, TensorType({DT_FLOAT16, DT_FLOAT,DT_INT32,DT_INT8,DT_UINT8}))
.ATTR(use_locking, Bool, false)

#endif  // GE_OP_ARG_MAX_H```

IndexNumberType()的数据类型定义请参见“inc/graph/types.h”文件，此文件中定义了所有GE使用的数据类型。

```IMPLEMT_VERIFIER(ScatterNdAdd, ScatterNdAddVerify) {
return GRAPH_FAILED;
}
return GRAPH_SUCCESS;
}```

```IMPLEMT_COMMON_INFERFUNC(ScatterNdAddInferShape) {
Shape var_shape = op.GetInputDesc("var").GetShape();
DataType input_dtype = op.GetInputDesc("var").GetDataType();
TensorDesc td = op.GetOutputDesc("var");
td.SetShape(ge::Shape(var_shape));
td.SetDataType(input_dtype);
(void)op.UpdateOutputDesc("var", td);
return GRAPH_SUCCESS;
}```