跳转至

向量化

向量化简介

向量化是实现SIMD的一种方法,它通过使用特殊的CPU指令集来处理数据向量。这些指令集能够一次性对多个数据元素执行相同的操作,从而提高数据处理的效率。

SIMD

在现代计算中,SIMD(单指令多数据)是一种并行计算技术,它允许单个指令同时对多个数据执行操作。这种技术通过在单个指令周期内处理多个数据来提高性能,特别适合于图像处理、音频处理、科学计算等领域,其中有许多可以并行处理的数据。

常见的SIMD指令集

  • AVX-512:支持512位宽的SIMD操作,常见于Intel与AMD服务器级处理器中。
  • ARM NEON:ARM架构的SIMD指令集,支持128位宽的操作。

向量化使用方法

向量化的主要使用方法有两种

  • 通过内联汇编写入代码中
  • 让编译器自动优化

内联汇编

下面是一个使用AVX-512指令集的例子,我们将使用C++和内联汇编来演示如何向量化一个简单的数组加法操作。

#include <immintrin.h>
#include <iostream>

void avx512_add(float* a, float* b, float* result, int n) {
    // 处理数组的前n个元素
    for (int i = 0; i < n; i += 16) {
        // 加载数据到ZMM寄存器
        __m512 va = _mm512_loadu_ps(a + i);
        __m512 vb = _mm512_loadu_ps(b + i);
        // 执行AVX-512加法
        __m512 vresult = _mm512_add_ps(va, vb);
        // 将结果存回内存
        _mm512_storeu_ps(result + i, vresult);
    }
}

int main() {
    const int n = 1024; 
    float a[n], b[n], result[n];

    for (int i = 0; i < n; i++) {
        a[i] = i;
        b[i] = i;
    }

    avx512_add(a, b, result, n);

    for (int i = 0; i < n; i++) {
        std::cout << "Result[" << i << "] = " << result[i] << std::endl;
    }

    return 0;
}

编译器优化

如果你尝试运行了上述的代码并和普通版本的比较,你可能会发现两者速度并没有较大区别。这是因为现代编译器往往集成了自动向量化的能力,能够自动检测可以向量化的部分并在编译时完成向量化。

但是编译器只能完成在简单的场景下的自动向量化,如果代码略微复杂,则不能自动向量化。此时我们可以通过修改代码引导编译器完成优化。

下面的例子改编自第一届PKU HPCGame,该例子很好的向我们展示了如何引导编译器自动向量化优化代码:

base.cpp中实现了对输入的double类型的数组中的每个数进行 logistic 映射迭代itn轮的功能,opt.cpp是优化过后的版本,conf.data为对应的可执行程序所需的输入文件。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include <iostream>
#include <chrono>
#include <random>
#include <fstream>

double it(double r, double x, int64_t itn) {
    for (int64_t i = 0; i < itn; i++) {
        x = r * x * (1.0 - x);
    }
    return x;
}

void itv(double r, double* x, int64_t n, int64_t itn) {
    for (int64_t i = 0; i < n; i++) {
        x[i] = it(r, x[i], itn);
    }
}

int main()
{
    int64_t itn;
    double r;
    int64_t n;
    double *x;

    std::ifstream fi("conf.data");
    fi >> itn >> r >> n;
    fi.close();

    x = (double *)malloc(n * 8);
    std::random_device rd;
    std::mt19937 gen(rd());
    std::uniform_real_distribution<> dis(0.0, 4.0);
    for(int i=0; i<n ;i++){
        x[i] = dis(gen);
    }

    auto t1 = std::chrono::steady_clock::now();
    itv(r, x, n, itn);
    auto t2 = std::chrono::steady_clock::now();
    int d1 = std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1).count();
    printf("%d\n", d1);


    return 0;
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#include <iostream>
#include <chrono>
#include <random>
#include <fstream>

template <int n>
void it(double r, double *x, int64_t itn)
{
    for (int64_t i = 0; i < itn; i++)
    {
        for (int64_t j = 0; j < n; j++)
        {
            x[j] = r * x[j] * (1.0 - x[j]);
        }
    }
}

template <int gn>
void itvg(double r, double *x, int64_t n, int64_t itn)
{
    for (int64_t i = 0; i < n; i += gn)
    {
        it<gn>(r, x + i, itn);
    }
}

int main()
{
    int64_t itn;
    double r;
    int64_t n;
    double *x;

    std::ifstream fi("conf.data");
    fi >> itn >> r >> n;
    fi.close();

    x = (double *)malloc(n * 8);
    std::random_device rd;
    std::mt19937 gen(rd());
    std::uniform_real_distribution<> dis(0.0, 4.0);
    for(int i=0; i<n ;i++){
        x[i] = dis(gen);
    }

    auto t1 = std::chrono::steady_clock::now();
    itvg<128>(r, x, n, itn);
    auto t2 = std::chrono::steady_clock::now();
    int d1 = std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1).count();
    printf("%d\n", d1);


    return 0;
}
CXX = clang++
CXXFLAGS = -O3

all: base opt
clean : 
    rm  base opt
base: base.cpp
    $(CXX) $(CXXFLAGS) base.cpp -o base
opt: opt.cpp
    $(CXX) $(CXXFLAGS) opt.cpp -o opt

.PHONY: all clean
32768
0.1896
32768

base.cpp通过两步优化即可得到opt.cpp

  1. 数据并行化

    • base.cpp中,函数对数组中的每个数依次进行迭代。对于单个数来说,迭代的过程显然是必须严格串行的,这样的代码让编译器无法完成自动的向量化。
    • 注意到同一个数组中各个数的迭代是可以并行处理的,且其所在内存连续,每轮迭代对应的操作相同,这非常契合向量化指令集。
    • 从而我们可以交换循环次序,让编译器进行自动向量化。
  2. 访存优化

    • 在交换循环次序后,我们每轮迭代先将整个数组完成更新,再迭代下一轮。在数组较大的情况下,显然寄存器与高速缓存无法存放所有的内存数据,这将导致频繁的从内存中读写数据。
    • 为了降低访存开销,我们可以将整个数组分块。每块的大小即为opt.cpp中的gn,对于每个gn大小的小块,我们采用数据并行化的方法进行计算,每算完一块再计算下一块,最大程度降低了从内存中读写文件的开销。其中gn的最佳大小与具体的硬件有关,我们采用模版参数方法定义,测试多组找到最佳的参数即可。