🏠

DMA Sink Kernel (farrow_dma_snk) 子模块文档

概述

farrow_dma_snk 是 Farrow 滤波器系统的数据出口引擎,负责将 AI Engine 处理后的结果从 AXI4-Stream 接口接收,并写回 LPDDR4 内存。如果说 farrow_dma_src 是上料机器人,那么 farrow_dma_snk 就是成品收集和入库系统——它必须可靠地捕获每一帧输出,并按需存储到指定位置。

与 Source 端相比,Sink 端增加了一个独特功能:选择性捕获机制 (loop_sel),这让它在调试和验证场景中特别有用。


核心职责

  1. AXI4-Stream → 片上 BRAM 的流式捕获:以固定节拍接收 AIE 输出
  2. BRAM → DDR 的批量写入:利用 burst 传输最大化内存带宽
  3. 多轮迭代的选择性保存:通过 loop_sel 参数只保存指定迭代的结果
  4. 流量控制:作为数据流的终点,自然形成反压机制

架构设计

双阶段流水线 (DATAFLOW)

flowchart LR subgraph "Stage 1: Capture" AIE[AIE Kernel] -->|PLIO| AXIS[sig_i
AXI4-Stream] AXIS -->|hls::stream| CAPTURE[capture_streams] CAPTURE -->|条件写入| BRAM[buff_DEPTHP
片上BRAM] end subgraph "Stage 2: Write Back" BRAM -->|burst write| DDR[(LPDDR4)] end style AIE fill:#fff3e0 style AXIS fill:#e8f5e9 style BRAM fill:#fff8e1 style DDR fill:#f3e5f5

选择性捕获逻辑

这是 Sink 端最独特的设计——它不会保存所有接收到的数据,而是根据 loop_sel 参数只保留特定迭代:

CAPTURE: for (int ll=0; ll < loop_cnt; ll++) {
    SAMPLE_IN: for (int dd=0; dd < DEPTH; dd++) {
        (val[3], val[2], val[1], val[0]) = sig_i.read();
        if (ll == loop_sel) {           // 关键:条件写入
            buff[dd] = (val[3], val[2], val[1], val[0]);
        }
    }
}

为什么需要这个设计?

想象你在测试一个运行 1000 次迭代的系统,但你只想验证第 500 次的输出是否正确。如果没有选择性捕获,你需要:

  • 要么在 Host 端接收全部 1000 帧数据(内存和时间开销巨大)
  • 要么修改 AIE kernel 代码(侵入式设计变更)

loop_sel 提供了一个优雅的解决方案:硬件自动过滤,只把感兴趣的那一轮数据写回 DDR。


代码详解

顶层函数接口

void farrow_dma_snk_wrapper(
    farrow_dma_snk::TT_DATA mem[farrow_dma_snk::DEPTH],  // DDR 内存指针
    int loop_sel,                                         // 选择保存哪一轮
    int loop_cnt,                                         // 总循环次数
    farrow_dma_snk::TT_STREAM& sig_i                      // AXI4-Stream 输入
);

HLS 接口配置

#pragma HLS interface m_axi      port=mem         bundle=gmem    offset=slave   depth=DEPTH
#pragma HLS interface axis       port=sig_i
#pragma HLS interface s_axilite  port=loop_sel    bundle=control
#pragma HLS interface s_axilite  port=loop_cnt    bundle=control
#pragma HLS interface s_axilite  port=mem         bundle=control
#pragma HLS interface s_axilite  port=return      bundle=control
#pragma HLS DATAFLOW

注意到比 Source 多了一个 s_axilite port=loop_sel,这是运行时选择捕获目标的关键。

capture_streams() 函数

void capture_streams(TT_DATA (&buff)[DEPTH], TT_STREAM& sig_i,
                     const int& loop_sel, const int& loop_cnt) {
CAPTURE: for (int ll=0; ll < loop_cnt; ll++) {
#pragma HLS LOOP_TRIPCOUNT min=1 max=4
    SAMPLE_IN: for (int dd=0; dd < DEPTH; dd++) {
#pragma HLS pipeline II=1
        TT_SAMPLE val[4];
#pragma HLS array_partition variable=val dim=1
        (val[3], val[2], val[1], val[0]) = sig_i.read();
        if (ll == loop_sel) {
            buff[dd] = (val[3], val[2], val[1], val[0]);
        }
    }
}
}

关键设计点

  1. 无条件读取,条件写入:无论 loop_sel 是什么,每个样本都必须从 stream 中读出(否则会造成背压),但只有匹配的轮次才会写入 BRAM。

  2. II=1 约束:每周期处理一个 128-bit 字,匹配 AIE 的输出速率。

  3. Array Partitionval[4] 完全分割,支持并行位操作。

read_buffer() 函数

void read_buffer(TT_DATA mem[DEPTH], TT_DATA (&buff)[DEPTH]) {
    ap_uint<10> dd = 0;
READ_BUFF: for (int mm=0; mm < DEPTH; mm++) {
#pragma HLS PIPELINE II=1
    mem[mm] = buff[dd];
    dd = (dd == ap_uint<10>(DEPTH-1)) ? ap_uint<10>(0) : ap_uint<10>(dd + 1);
}
}

与 Source 端的 load_buffer 对称,但方向相反:从 BRAM 读取,写入 DDR。


Loop Select 机制的工程价值

使用场景分析

场景 loop_cnt loop_sel 效果
全量捕获 4 - 不支持(只能选一轮)
首轮验证 4 0 只保存第 0 轮
稳态测量 4 3 跳过预热,保存最后一轮
中间检查 4 2 保存第 2 轮用于调试

在 Host 中的使用

// host.cpp
static constexpr int32_t LOOP_SEL = 0;  // 捕获第 0 轮

dma_snk_run.set_arg(1, LOOP_SEL);
dma_snk_run.set_arg(2, LOOP_CNT_O);  // 期望接收 4 轮

注意 LOOP_CNT_O 必须与 AIE graph 的实际运行次数匹配,否则 dma_snk 会提前终止或无限等待。


数据类型与常量

与 Source 端保持一致:

namespace farrow_dma_snk {
    static constexpr unsigned DEPTH = 1024;
    static constexpr unsigned NBITS = 128;
    typedef ap_uint<NBITS>                 TT_DATA;
    typedef ap_uint<NBITS/4>               TT_SAMPLE;
    typedef hls::stream<TT_DATA>           TT_STREAM;
    typedef ap_uint<10>                    TT_ADDR;
};

这种一致性确保了 Source 和 Sink 之间的数据格式完全兼容。


测试平台 (tb_wrapper.cpp)

Sink 的测试比 Source 更复杂,因为它需要验证选择性捕获逻辑:

// 生成随机测试数据,模拟 AIE 输出
std::minstd_rand gen;
for (int ll=0; ll < loop_cnt; ll++) {
    for (int mm=0; mm < DEPTH; mm++) {
        TT_DATA data = TT_DATA(gen());
        if (ll == loop_sel) {
            ddr4_g[mm] = data;  // 记录期望保存的数据
        }
        sig_i.write(data);       // 写入输入流
    }
}

// 运行 DUT
farrow_dma_snk_wrapper(ddr4_o, loop_sel, loop_cnt, sig_i);

// 验证:只有 loop_sel 对应的数据被保存
for (int mm=0; mm < DEPTH; mm++) {
    assert(ddr4_o[mm] == ddr4_g[mm]);
}

测试覆盖

  • 正常捕获流程
  • loop_sel 边界值(0 和 loop_cnt-1)
  • 随机数据正确性

常见陷阱与调试技巧

陷阱 1: Stream 阻塞导致死锁

如果 loop_cnt 设置过大,而 AIE graph 实际产生的数据不足,capture_streams 会在 sig_i.read() 处永远阻塞。

症状:硬件仿真挂起,无输出。

解决:确保 Host、DMA 和 AIE graph 三者的迭代计数一致。

陷阱 2: Loop Sel 越界

如果 loop_sel >= loop_cnt,没有任何数据会被保存,但函数仍会正常返回。

症状:DDR 中的输出全是未初始化的垃圾值。

解决:Host 端添加范围检查:

if (LOOP_SEL >= LOOP_CNT_O) {
    std::cerr << "ERROR: LOOP_SEL out of range" << std::endl;
    return 1;
}

陷阱 3: 数据竞争

虽然 DATAFLOW 启用了两阶段并行,但 buff 是单缓冲。如果 Stage 2 还没写完,Stage 1 不能开始下一轮捕获。

当前实现:实际上是顺序执行(先 capture 后 read),所以安全。

优化方向:要实现真正的乒乓缓冲,需要:

TT_DATA buff_ping[DEPTH];
TT_DATA buff_pong[DEPTH];
// 用标志位管理读写状态

与其他组件的关系

flowchart TB subgraph "上游生产者" K2[farrow_kernel2] PLIO[ai_engine_0.PLIO_o_0] end subgraph "本模块内部" SNK[farrow_dma_snk_wrapper] end subgraph "上层调用者" HOST[host.cpp
XRT API] CFG[system.cfg
内核实例化] end subgraph "最终存储" DDR[(LPDDR4)] end K2 --> PLIO PLIO -->|sig_i| SNK HOST -->|配置 loop_sel/loop_cnt| SNK CFG -->|实例化为 dma_snk| SNK SNK -->|m_axi| DDR
  • 上游:接收来自 farrow_kernel2 的最终输出
  • 控制:Host 通过 XRT API 配置捕获参数
  • 下游:将选定数据写回 DDR,供 Host 验证

设计权衡总结

决策 选择 理由
单轮捕获 vs 全量捕获 单轮 减少 DDR 带宽占用,便于调试
条件写入 vs 后处理过滤 条件写入 节省 BRAM 和 DDR 空间
顺序执行 vs 乒乓并行 顺序 简化设计,满足当前需求

这种务实的工程取舍让这个简单的 DMA Sink 核在实际项目中非常实用。


本文档详细说明了 DMA Sink Kernel 的设计原理和实现细节。它与 Source 核共同构成了完整的 AIE-PL-DDR 数据通路。

On this page