响应分包和丢包处理说明

TcaplusDB 接入层单个响应包上限为 256 KB。响应数据超过 256 KB 时 proxy 会拆分成多个响应包陆续下发,但 保证不会切分单条记录:单条记录大小不论多大,都会出现在某个分包里完整返回。例如 3 条记录分别为 10 KB / 251 KB / 1 MB,可能拆为 (10 KB + 251 KB)(1 MB) 两个响应包。

1. 不分包是历史包袱,务必开启分包

⚠️ §2.2 的"默认不分包"是 TcaplusDB 早期协议的妥协,不是合理设计:响应数据超过 256 KB 时,proxy 只把第一包发回业务,剩余部分被静默丢弃,业务没有任何错误码可感知。

红线(请把这条作为统一规则):

对任何分包类命令字都务必显式开启分包,不要去记"哪个命令字默认是否分包"。 各 SDK 的设置入口:

SDK 设置入口 是否需要业务手动设置
C++ TDR SDK pstRequest->SetMultiResponseFlag(1) 必须
C++ PB SDK(async / coroutine) SDK 内部对批量类命令字已自动设置 业务无需设置
Go SDK 底层用法 req.SetMultiResponseFlag(1) 必须
Go SDK 高级用法 option.TDROpt{ MultiFlag: 1 } / option.PBOpt{ MultiFlag: 1 } 必须(高级用法仍只把 MultiFlag 透传到底层 request)

2. 触发分包的命令字(全集)

按"业务是否必须手动开启分包"分两类。

2.1 必然分包的命令字(无须手动设置)

proxy 路径上会主动开启分包,业务即使没设 SetMultiResponseFlag(1) 也会自动分包,但 SDK 侧仍需正确循环收包。

命令字 含义
BATCH_GET_REQ Generic 表批量查询
GET_BY_PARTKEY_REQ Generic 表部分 Key 查询(本地索引)
UPDATE_BY_PARTKEY_REQ / DELETE_BY_PARTKEY_REQ 部分 Key 更新 / 删除,响应携带变更 key
INDEX_QUERY_REQ 全局索引查询,SQL 返回多条匹配记录

2.2 默认不分包,必须手动开启的命令字(历史包袱)

这类命令字 multi_flag 默认 0proxy 只发第一个响应包,剩余部分直接丢弃。业务必须显式调用 SetMultiResponseFlag(1) / option.MultiFlag = 1

命令字 何时会超 256 KB
BATCH_INSERT_REQ / BATCH_UPDATE_REQ / BATCH_REPLACE_REQ / BATCH_DELETE_REQ 设置了 ResultFlag 回带新值 / 旧值,需要返回成批记录数据
LIST_GET_ALL_REQ List 表一次读所有元素,元素较多即可能超 256 KB
LIST_GET_BATCH_REQ List 表按 index 批量读多条元素
LIST_ADDAFTER_BATCH_REQ / LIST_REPLACE_BATCH_REQ / LIST_DELETE_BATCH_REQ 设置了 ResultFlag 回带新值 / 旧值的批量元素操作

3. 分包丢包

3.1 什么是"分包丢包"

正确开启分包后,proxy 会把响应拆成多个包陆续发回 SDK;但单个分包仍是普通 TCP 报文,在以下场景可能丢失:

链路 触发原因 业务侧表现
Proxy ↔ SDK 丢包 业务收包不及时(回调线程被同步逻辑阻塞、SDK 处理慢),或网络问题(链路抖动 / 重置),导致 proxy 端 TCP 发送缓冲区写满 → 后续分包发不出去 业务收到部分分包却等不到后续分包,只能靠 SDK 请求超时识别
Proxy ↔ Tcapsvr 丢包 proxy 到 svr 的内部链路网络问题、Tcapsvr 处理慢 / 主从切换 / 重启等 按命令字范围分两类
Generic Batch 类BatchGet / BatchInsert / BatchUpdate / BatchReplace / BatchDelete):proxy 按 key 拆子请求下发Tcapsvr。
整批透传类(List Batch 命令字 ListAddAfterBatch / ListReplaceBatch / ListDeleteBatch / ListGetBatch;流式中转命令字 GetByPartKey / UpdateByPartKey / DeleteByPartKey 等):proxy 整体转发后由 Tcapsvr 流式回包,proxy 没有"子请求维度",无法按 key 兜底,业务只能靠 SDK 请求超时 / 总错误码识别丢包场景

3.2 如何处理分包和识别丢包

无论哪类命令字,两条通用原则先收住:

  • 及时收包:C++ TDR OnResponse / C++ PB UpdateNetWork 调用线程不要做重活,避免收包不及时;收包优先级高于发包,避免一直发包而不收包导致缓冲区满,进一步恶化。
  • 请求超时兜底不论哪种业务都要编写请求超时兜底代码——Proxy ↔ SDK 链路丢包(缓冲区满 / 网络抖动)TCP 协议层不会主动告知,只能靠 SDK 请求超时识别

按命令字类分四组讨论具体的"开启分包 + 识别丢包"路径。

3.2.1 Generic Batch 类

适用命令字BatchGet / BatchInsert / BatchUpdate / BatchReplace / BatchDelete

关键处理

  • proxy 按 key 拆子请求下发到 svr,Proxy ↔ Tcapsvr 链路丢包会被 proxy 兜底成"该 key 的超时分错误码,但前提是sdk业务的超时时间比Proxy的长,否则sdk可能先超时"
  • 用户收到响应后,需要确认哪些key成功,哪些key失败;收尾时,需要确认哪些key丢包了没收到

C++ TDR SDK(以 BatchGet 为例):

分包丢包识别

  • SDK 是单线程异步模型,业务需要自己维护超时队列,通过异步ID关联请求和响应
  • 每次收到包,业务通过分错误码,确认哪些key成功,哪些key失败
  • 每次收到包,业务通过HaveMoreResPkgs是否为1,判断还有下一个分包
  • 收尾时,业务确认哪些key丢包了没收到
  • 业务定时遍历超时队列,做好超时处理
// TDR SDK 是单线程异步模型:业务侧在同一个线程里循环

static const int      kUnconfirmed    = -1;       // 哨兵:该 key 的分错误码尚未确认(响应未到达)
static const uint64_t kBatchTimeoutMs = 5 * 1000;  // 按预计分包数 × 单包预算放大

struct BatchWait {
    uint64_t expire_ms;             // 截止时间(绝对时间),到点判超时
    // 「本批所有 key → 分错误码」;初始全部 kUnconfirmed;
    // 收到响应里某 key 就把 result[key] 覆写为 sub_ret;
    // Finalize 时仍为 kUnconfirmed 的 key 即「中途丢包/超时未确认」
    std::map<KeyT, int> result;
};
std::map<uint64_t /*asyncId*/, BatchWait> g_wait_queue;

// ---- 1) 发送 BatchGet:成功即把所有 key 以 kUnconfirmed 初始化塞入等待队列 ----
int SendBatchGet(const std::vector<KeyT>& keys, uint64_t asyncId) {
    TcaplusServiceRequest* req = tcaplus_server.GetRequest(TABLE_NAME);
    if (req == NULL) return -1;
    req->Init(TCAPLUS_API_BATCH_GET_REQ, NULL, 0, 0, 0, 0);
    req->SetAsyncID(asyncId);
    req->SetMultiResponseFlag(1); // 必须手动开启分包

    BatchWait w;
    w.expire_ms = NowMs() + kBatchTimeoutMs;
    for (auto& k : keys) {
        TcaplusServiceRecord* rec = req->AddRecord(-1);
        // rec->SetKeyXxx(...) 按表定义把 k 填进 record
        w.result[k] = kUnconfirmed;
    }

    int ret = tcaplus_server.SendRequest(req);
    if (ret != 0) return ret;

    g_wait_queue[asyncId] = w;
    return 0;
}

// ---- 2) 主循环:单线程顺序跑「收响应 + 超时扫描」----
void MainLoop() {
    while (running) {
        // 2.1 收响应:RecvResponse 第二个参数 sync_timeout(ms),
        //     若无包等待该时长,避免空转 CPU;返回 0 且 rsp != NULL 才有响应
        TcaplusServiceResponse* rsp = NULL;
        int ret = tcaplus_server.RecvResponse(rsp, 10 /*ms*/);
        if (ret == 0 && rsp != NULL) {
            HandleResponse(rsp);
        }

        // 2.2 超时扫描:业务可按 tick 间隔(如每 50ms)做一次
        ScanTimeout();
    }
}

void HandleResponse(TcaplusServiceResponse* rsp) {
    uint64_t asyncId = rsp->GetAsynID();
    auto it = g_wait_queue.find(asyncId);
    if (it == g_wait_queue.end()) {
        // 找不到 = 已被 ScanTimeout 剔除,本包是超时后才到达的残包,丢弃
        return;
    }
    BatchWait& wait = it->second;

    if (rsp->GetResult() != 0) {
        // 表示本分包应全是错误记录;仍要读分错误码逐条判断
    }

    for (int i = 0; i < rsp->GetRecordCount(); ++i) {
        const TcaplusServiceRecord* rec = NULL;
        int sub_ret = rsp->FetchRecord(rec); // 分错误码:0 命中 / 261 不存在 / -7953(Proxy↔Tcapsvr 兜底) 等
        if (rec == NULL) continue;
        KeyT key = ExtractKey(rec); // 业务从 rec->GetKeyXxx(...) 拼成 KeyT
        wait.result[key] = sub_ret; // 精确匹配 key 覆写分错误码(哨兵被改写)
    }

    if (rsp->HaveMoreResPkgs() == 1) {
        // 还有后续分包:续期超时时间,继续等下一个响应
        wait.expire_ms = NowMs() + kBatchTimeoutMs;
    } else {
        // 收到尾包:本 asyncId 任务结束,统一收口
        Finalize(wait.result);
        g_wait_queue.erase(it);
    }
}

// ---- 3) 超时扫描:把仍未确认的 key 改写为超时错误码,统一收口 ----
void ScanTimeout() {
    uint64_t now = NowMs();
    for (auto it = g_wait_queue.begin(); it != g_wait_queue.end(); ) {
        if (it->second.expire_ms > now) { ++it; continue; }
        // 超时未收齐 ⇒ Proxy↔SDK 段丢包;把 kUnconfirmed 的 key 改写为 API_ERR_WAIT_RSP_TIMEOUT
        for (auto& kv : it->second.result) {
            if (kv.second == kUnconfirmed) kv.second = TcapErrCode::API_ERR_WAIT_RSP_TIMEOUT;
        }
        Finalize(it->second.result);
        it = g_wait_queue.erase(it);
    }
}

// ---- 4) 唯一收口点:遍历 result,按每个 key 的最终分错误码统一处理 ----
void Finalize(const std::map<KeyT, int>& result) {
    for (auto& kv : result) {
        int sub_ret = kv.second;
        if (sub_ret == 0) {
            // 命中:业务读 value
        } else if (sub_ret == TcapErrCode::TXHDB_ERR_RECORD_NOT_EXIST) {
            // 261:记录不存在
        } else if (sub_ret == kUnconfirmed) {
            // 尾包到达仍是 kUnconfirmed ⇒ 该 key 在所有分包里都没出现,即「中途丢包」
        } else {
            // 其它非 0:含
            //   • 服务端语义失败 / Proxy↔Tcapsvr 被 proxy 兜底回来的超时(如 -7953)
            //   • ScanTimeout 改写的 API_ERR_WAIT_RSP_TIMEOUT
            // 业务按 sub_ret 决定是否重试
        }
    }
}

C++ PB SDK(async)

分包丢包识别

  • PB SDK 是单线程回调模型,SDK自己维护了超时队列,每次收到分包都会触发OnRecv回调
  • 每次收到包,业务通过分错误码,确认哪些key成功,哪些key失败
  • sdk自己会遍历超时队列,超时触发OnTimeout超时回调,业务确认哪些key超时,做超时处理
  • 收尾时,确认哪些key丢包了没收到
// PB async SDK 对批量类命令字内部已自动 SetMultiResponseFlag(1),业务无需手动开启。
//   1) 每收到一个分包 → SDK 用响应里 record 的 key 字节串 —— 严格 —— 反查业务原始 msg 指针,
//      把 value 反序列化回去后 mapMsg[业务原始指针] = sub_ret,再调 OnRecv(mapMsg)
//   2) HaveMoreResPkgs() == 1:SDK 自动续期超时 m_tmExpire = now + session_timeout;继续等下一个分包
//      HaveMoreResPkgs() != 1(尾包):SDK 用 OnError(vecMsgs, API_ERR_NO_MORE_RECORD) 作为「结束哨兵」
//   3) 中间分包久未到达触发超时:SDK 调 OnTimeout(vecMsgs)
//   4) 上述任一终止路径都会在 MsgStatus 析构里调 OnFinish(param) 作为本次操作的统一收尾
//
// 关键事实:OnRecv 的 mapMsg 里的 Message* 就是业务发请求时传入 BatchGet 的那一份原始指针
// (SDK 按 key 字节串严格匹配回来,不会新建也不会随便挑),因此业务可以直接用「指针集合」做「待回包→已回包」的差集

class BatchGetCallback : public TcaplusPbCallback {
public:
    // 哨兵值:表示该 key 的分错误码尚未确认(响应未到达);正常 sub_ret 取值不会用 -1
    enum : int { kUnconfirmed = -1 };

    // 「所有 msg 指针 → 分错误码」;发请求前业务把全部 key 以 kUnconfirmed 初始化塞入;
    // OnRecv 每收到一个分包就把对应指针的 value 覆写为 SDK 回填的 sub_ret;
    std::map< ::google::protobuf::Message*, int> m_result;

    // 每收到一个分包就回调一次;mapMsg 里是这一包里 SDK 反查回的原始 msg 指针 + 该 key 的分错误码
    // 每收到一个分包就回调一次;只做「把哨兵覆写为真实分错误码」,业务统一在 OnFinish 收口处理
    int OnRecv(const std::map< ::google::protobuf::Message*, int>& mapMsg) override {
        for (auto& kv : mapMsg) {
            m_result[kv.first] = kv.second;
        }
        return 0;
    }

    // OnError 的 API_ERR_NO_MORE_RECORD 只是「尾包已收齐」的结束哨兵,业务忽略——
    // 真正收口走 OnFinish;其它 errorcode 才是协议层错误(API_ERR_UNPACK_MESSAGE / SVR_ERR_FAIL_SYSTEM_ERROR 等),
    // 把仍是 kUnconfirmed 的 key 改写成该 errorcode,让 OnFinish 能用一套分支处理
    int OnError(const std::vector< ::google::protobuf::Message*>& /*msgs*/, int errorcode) override {
        if (errorcode == TcapErrCode::API_ERR_NO_MORE_RECORD) return 0;
        for (auto& kv : m_result) {
            if (kv.second == kUnconfirmed) kv.second = errorcode;
        }
        return 0;
    }

    // SDK 等待超时(典型 Proxy↔SDK 段丢包):把未确认的 key 改写为 API_ERR_WAIT_RSP_TIMEOUT,同样让 OnFinish 收口
    int OnTimeout(const std::vector< ::google::protobuf::Message*>& /*msgs*/) override {
        for (auto& kv : m_result) {
            if (kv.second == kUnconfirmed) kv.second = TcapErrCode::API_ERR_WAIT_RSP_TIMEOUT;
        }
        return 0;
    }

    // 唯一收口点:任一终止路径(正常收齐 / 协议错误 / 超时)最终都会调到 OnFinish;
    // 业务在这里遍历 m_result,按每个 key 的最终分错误码统一处理
    int OnFinish(const NS_TCAPLUS_PROTOBUF_API::MsgParam& /*param*/) override {
        for (auto& kv : m_result) {
            int sub_ret = kv.second;
            if (sub_ret == TcapErrCode::GEN_ERR_SUC) {
                // 命中:业务读 value 字段
                // auto* t = dynamic_cast<tb_online*>(kv.first); /* 读 t->xxx() */
            } else if (sub_ret == TcapErrCode::TXHDB_ERR_RECORD_NOT_EXIST) {
                // 261:记录不存在
            } else if (sub_ret == kUnconfirmed) {
                // 走到 OnFinish 仍是 kUnconfirmed ⇒ 正常收齐路径下该 key 在所有分包里都没出现
                // 即「中途丢包」
            } else {
                // 其它非 0:含
                //   • 服务端语义失败 / Proxy↔Tcapsvr 被 proxy 兜底回来的超时(如 -7953)
                //   • OnTimeout 改写的 API_ERR_WAIT_RSP_TIMEOUT
                //   • OnError 改写的协议层错误(API_ERR_UNPACK_MESSAGE / SVR_ERR_FAIL_SYSTEM_ERROR 等)
                // 业务按 sub_ret 决定是否重试
            }
        }
        m_result.clear();
        return 0;
    }
};

// ---- 发起 BatchGet:业务把 key 字段填进 msg 集合后交给 SDK,并同步把指针以 kUnconfirmed 初始化塞入 cb.m_result ----
void SendBatchGetRequest(struct schedule* /*S*/, void* /*arg*/) {
    static std::vector<tb_online>                      keys;
    static std::vector< ::google::protobuf::Message*> msgs;
    static BatchGetCallback                            cb;
    keys.clear(); msgs.clear(); cb.m_result.clear();

    keys.resize(/*batch_size=*/2);
    keys[0].set_openid(1); keys[0].set_tconndid(1); keys[0].set_timekey("test_1");
    keys[1].set_openid(2); keys[1].set_tconndid(2); keys[1].set_timekey("test_2");
    for (auto& k : keys) {
        msgs.push_back(&k);
        cb.m_result[&k] = BatchGetCallback::kUnconfirmed; // 初始全部 -1 = 未确认
    }

    int ret = g_stAsyncApi.BatchGet(&msgs, &cb);
    if (ret != TcapErrCode::GEN_ERR_SUC) {
        // 发起失败:路由 / 序列化 / 配置错误,未进入网络阶段;本批不会触发 OnRecv/OnError/OnTimeout/OnFinish
        cb.m_result.clear();
    }
}

Go SDK 高级用法(推荐):

分包丢包识别

  • Go SDK是同步接口,通过多协程并发
  • 用户直接调用DoBatchXXX接口,SDK会聚合分包,通过option.BatchResult知道哪些key成功,哪些key失败或者丢包
opt := &option.TDROpt{
    MultiFlag: 1,                  // 必须手动开启分包
    Timeout:   10 * time.Second,
}

// DoBatchGet 是 inout 风格:业务构造 dataSlice(每个元素是具体表 struct,只填 key 字段)传入,
// 成功后 SDK 把每条记录的 value 字段反写回 dataSlice[i],分错误码填到 opt.BatchResult[i]
var dataSlice []record.TdrTableSt
for _, k := range keys {
    rec := service_info.NewService_Info()
    rec.Openid = k // 只填 key 字段
    dataSlice = append(dataSlice, rec)
}

err := client.DoBatchGet(TableName, dataSlice, opt)
if err != nil {
    // Proxy↔SDK 整体超时:doBatch 直接 return err,opt.BatchResult 不会被填,
    // 遍历 BatchResult 无意义 ⇒ 业务直接整体兜底重试,提前返回
    if tcErr, ok := err.(*terror.ErrorCode); ok && tcErr.Code == terror.API_ERR_WAIT_RSP_TIMEOUT {
        return err
    }
    // 其它情况:err 表示「任意一个 key 失败」,继续按下面 BatchResult 逐 key 判断
}

// 业务遍历 opt.BatchResult,按每个 key 的最终分错误码处理;BatchResult 顺序大小和 dataSlice 一致
for i, subErr := range opt.BatchResult {
    if subErr == nil {
        // 正确返回:dataSlice[i] 的 value 字段已被 SDK 反写
        _ = dataSlice[i]
    } else if tcErr, ok := subErr.(*terror.ErrorCode); ok && tcErr.Code == terror.TXHDB_ERR_RECORD_NOT_EXIST {
        // 261:记录不存在,业务语义而非异常
    } else {
        // dataSlice[i]失败,含:服务端失败 / Proxy↔Tcapsvr超时(-7953 等) / 没回包被 doBatch 用 globalErr 兜底
        // 业务按 subErr 决定是否幂等重试
    }
}

3.2.2 List Batch 类

适用命令字ListAddAfterBatch / ListReplaceBatch / ListDeleteBatch / ListGetBatch

关键特性

  • List Batch 类的操作目标是同一主 key 下的多个 list 元素
  • proxy 不按子元素拆请求,整批 (主 key + indexes) 透传给 Tcapsvr
  • 用户收到响应后,需要确认哪些index成功,哪些index失败;收尾时,需要确认哪些丢掉了没收到

C++ TDR SDK(以 ListGetBatch 为例):

分包丢包识别

  • SDK 是单线程异步模型,业务需要自己维护超时队列,通过异步ID关联请求和响应
  • 每次收到包,业务通过分错误码,确认哪些index成功,哪些index失败
  • 每次收到包,业务通过HaveMoreResPkgs是否为1,判断还有下一个分包
  • 收尾时,业务确认哪些index丢包了没收到
  • 业务定时遍历超时队列,做好超时处理
// TDR SDK 单线程异步模型;List Batch 响应按 element_index 维度组织 sub_ret,
// 业务侧的 BatchWait 沿用 §3.2.1 Generic Batch 的结构,只是 key 类型从「业务 KeyT」换成 element_index

static const int      kUnconfirmed        = -1;       // 哨兵:该 index 的分错误码尚未确认
static const uint64_t kListBatchTimeoutMs = 5 * 1000;

struct ListBatchWait {
    uint64_t expire_ms;                                // 截止时间(绝对时间),到点判超时
    // 「请求里所有 element_index → 分错误码」;初始全部 kUnconfirmed;
    // 收到响应里某 index 就把 result[idx] 覆写为 sub_ret;
    // 收齐尾包 / 超时之后,仍为 kUnconfirmed 的 index 即「该批未确认 → 整批丢包」
    std::map<int32_t /*element_index*/, int /*sub_ret*/> result;
};
std::map<uint64_t /*asyncId*/, ListBatchWait> g_wait_queue;

// ---- 1) 发送 ListGetBatch:成功即把所有 index 以 kUnconfirmed 初始化塞入等待队列 ----
int SendListGetBatch(const MainKey& mkey, const std::vector<int32_t>& element_indexes, uint64_t asyncId) {
    TcaplusServiceRequest* req = tcaplus_server.GetRequest(TABLE_NAME);
    if (req == NULL) return -1;
    req->Init(TCAPLUS_API_LIST_GET_BATCH_REQ, NULL, 0, 0, 0, 0);
    req->SetAsyncID(asyncId);
    req->SetMultiResponseFlag(1);                      // 同一主 key 下 element 多 / value 大时仍可能分包,统一开启

    TcaplusServiceRecord* rec = req->AddRecord(0);
    // rec->SetKeyXxx(...) 按表定义把主 key mkey 填进 record

    ListBatchWait w;
    w.expire_ms = NowMs() + kListBatchTimeoutMs;
    for (auto idx : element_indexes) {
        req->AddElementIndex(idx);                     // 批量添加 element index(单批 ≤ 1024 个)
        w.result[idx] = kUnconfirmed;                  // 同步占位
    }

    int ret = tcaplus_server.SendRequest(req);
    if (ret != 0) return ret;

    g_wait_queue[asyncId] = w;
    return 0;
}

// ---- 2) 主循环:单线程顺序跑「收响应 + 超时扫描」----
void MainLoop() {
    while (running) {
        TcaplusServiceResponse* rsp = NULL;
        int ret = tcaplus_server.RecvResponse(rsp, 10 /*ms*/);
        if (ret == 0 && rsp != NULL) HandleResponse(rsp);
        ScanTimeout();
    }
}

void HandleResponse(TcaplusServiceResponse* rsp) {
    uint64_t asyncId = rsp->GetAsynID();
    auto it = g_wait_queue.find(asyncId);
    if (it == g_wait_queue.end()) return;              // 残包,丢弃
    ListBatchWait& wait = it->second;

    if (rsp->GetResult() != 0) {
        // 总错误码非 0:表示本分包全是错误记录
        // 仍要继续读分错误码逐条确认
    }

    for (int i = 0; i < rsp->GetRecordCount(); ++i) {
        const TcaplusServiceRecord* rec = NULL;
        int sub_ret = rsp->FetchRecord(rec);           // 分错误码:0 命中 / 261 该 index 不存在 / 其它服务端错误
        if (rec == NULL) continue;
        int32_t element_idx = rec->GetIndex();         // 响应里每条 record 自带 element_index
        wait.result[element_idx] = sub_ret;            // 精确匹配 index 覆写哨兵
        // sub_ret == 0 时业务可立即读 rec->GetData(...) 保存到业务容器
    }

    if (rsp->HaveMoreResPkgs() == 1) {
        wait.expire_ms = NowMs() + kListBatchTimeoutMs; // 续期超时,等下一个分包
    } else {
        Finalize(wait.result);                          // 尾包到达:本 asyncId 任务结束,统一收口
        g_wait_queue.erase(it);
    }
}

// ---- 3) 超时扫描:仍未确认的 index 改写为超时错误码,统一收口 ----
void ScanTimeout() {
    uint64_t now = NowMs();
    for (auto it = g_wait_queue.begin(); it != g_wait_queue.end(); ) {
        if (it->second.expire_ms > now) { ++it; continue; }
        // 超时未收齐,把 kUnconfirmed 改写为 API_ERR_WAIT_RSP_TIMEOUT
        for (auto& kv : it->second.result) {
            if (kv.second == kUnconfirmed) kv.second = TcapErrCode::API_ERR_WAIT_RSP_TIMEOUT;
        }
        Finalize(it->second.result);
        it = g_wait_queue.erase(it);
    }
}

// ---- 4) 唯一收口点:遍历 result,按每个 element_index 的最终分错误码统一处理 ----
void Finalize(const std::map<int32_t, int>& result) {
    for (auto& kv : result) {
        int sub_ret = kv.second;
        if (sub_ret == 0) {
            // 命中:value 已在 HandleResponse 中读到(业务自行保存)
        } else if (sub_ret == TcapErrCode::TXHDB_ERR_RECORD_NOT_EXIST) {
            // 261:该 element index 在 svr 端不存在(含被淘汰 / 过期)
        } else if (sub_ret == kUnconfirmed) {
            // 走到这里仍是 kUnconfirmed —— 常规收齐路径不会出现(svr 必为每个请求 index 写一条 res);
            // 仅作为 defensive,超时路径已被 ScanTimeout 改写为 API_ERR_WAIT_RSP_TIMEOUT
        } else {
            // 其它非 0:含 API_ERR_WAIT_RSP_TIMEOUT(ScanTimeout 改写)/ 服务端语义失败等
            // 业务按 sub_ret 决定是否幂等重试
        }
    }
}

C++ PB SDK(async)

分包丢包识别

  • PB SDK 是单线程回调模型,SDK自己维护了超时队列,每次收到分包都会触发OnRecv回调
  • 每次收到包,业务通过分错误码,确认哪些index成功,哪些index失败
  • sdk自己会遍历超时队列,超时触发OnTimeout超时回调,业务确认哪些index超时,做超时处理
  • 收尾时,确认哪些Index丢包了没收到
// ListBatchGet 用专用回调签名 OnRecv(const ListBatchGetRequest& req, ListBatchGetResponse* res)
// res 里:
//   • m_mapMsg<int element_index, Message*>:SDK new 出来并 ParseFromArray 的 msg,所有权在 SDK,业务必须在 OnRecv 内读完字段(OnFinish 后 res 会被销毁)
//   • m_mapErr<int element_index, int sub_ret>:对应 sub_ret(0 命中 / 261 不存在 等)
// 分包到达时 SDK 真实处理流程:
//   1) 每收到一个分包 → SDK 按响应里的 elementIndexArray 把 (index, msg, sub_ret) 累积写进 res,
//      再调一次 OnRecv(*req, res);res->m_mapMsg / m_mapErr 跨分包累积
//   2) HaveMoreResPkgs() == 1:SDK 自动续期 m_tmExpire,继续等下一个分包
//      HaveMoreResPkgs() != 1(尾包):SDK 用 OnError(vecMsgs, API_ERR_NO_MORE_RECORD) 作为「结束哨兵」
//   3) 中间分包久未到达触发超时:SDK 调 OnTimeout(vecMsgs)
//   4) 上述任一终止路径都会在 MsgStatus 析构里调 OnFinish 作为本次操作的统一收尾

class ListGetBatchCallback : public TcaplusPbCallback {
public:
    enum : int { kUnconfirmed = -1 };

    // 「请求里所有 element_index → 分错误码」;发请求前业务把全部 index 以 kUnconfirmed 初始化塞入;
    // OnRecv 每收到一个分包就把对应 index 的 sub_ret 覆写
    std::map<int /*element_index*/, int /*sub_ret*/> m_result;

    int OnRecv(const NS_TCAPLUS_PROTOBUF_API::ListBatchGetRequest& /*req*/,
               NS_TCAPLUS_PROTOBUF_API::ListBatchGetResponse* res) override {
        // res->m_mapErr 是累积型;遍历全量覆写哨兵是幂等的,无需区分"本次新增的 index"
        for (auto& kv : res->m_mapErr) {
            m_result[kv.first] = kv.second;
            // sub_ret == 0 命中:业务此时可读 res->m_mapMsg[kv.first] 的字段并保存到业务容器
            // (m_mapMsg 的 Message* 所有权在 SDK,OnFinish 后会被销毁)
        }
        return 0;
    }

    // OnError 的 API_ERR_NO_MORE_RECORD 是「尾包已收齐」的结束哨兵,业务忽略——
    // 真正收口走 OnFinish;其它 errorcode 是协议层错误(API_ERR_UNPACK_MESSAGE / SVR_ERR_FAIL_SYSTEM_ERROR 等),
    // 把仍是 kUnconfirmed 的 index 改写成该 errorcode,让 OnFinish 用一套分支处理
    int OnError(const std::vector< ::google::protobuf::Message*>& /*msgs*/, int errorcode) override {
        if (errorcode == TcapErrCode::API_ERR_NO_MORE_RECORD) return 0;
        for (auto& kv : m_result) {
            if (kv.second == kUnconfirmed) kv.second = errorcode;
        }
        return 0;
    }

    // SDK 等待超时(典型 Proxy↔SDK 段丢包 / Proxy↔Tcapsvr 段整批丢失):
    // 把未确认的 index 改写为 API_ERR_WAIT_RSP_TIMEOUT,让 OnFinish 收口
    int OnTimeout(const std::vector< ::google::protobuf::Message*>& /*msgs*/) override {
        for (auto& kv : m_result) {
            if (kv.second == kUnconfirmed) kv.second = TcapErrCode::API_ERR_WAIT_RSP_TIMEOUT;
        }
        return 0;
    }

    // 唯一收口点:任一终止路径(正常收齐 / 协议错误 / 超时)最终都会调到 OnFinish;
    // 业务在这里遍历 m_result,按每个 element_index 的最终分错误码统一处理
    int OnFinish(const NS_TCAPLUS_PROTOBUF_API::MsgParam& /*param*/) override {
        for (auto& kv : m_result) {
            int element_index = kv.first;
            int sub_ret       = kv.second;
            (void)element_index;
            if (sub_ret == TcapErrCode::GEN_ERR_SUC) {
                // 命中:value 已在 OnRecv 中读到
            } else if (sub_ret == TcapErrCode::TXHDB_ERR_RECORD_NOT_EXIST) {
                // 261:该 element_index 在 svr 端不存在
            } else if (sub_ret == kUnconfirmed) {
                // OnFinish 仍是 kUnconfirmed ⇒ defensive;正常路径已被 OnError/OnTimeout 改写
            } else {
                // 其它非 0:含 API_ERR_WAIT_RSP_TIMEOUT / API_ERR_UNPACK_MESSAGE 等
                // 业务按 sub_ret 决定是否幂等重试
            }
        }
        m_result.clear();
        return 0;
    }
};

// ---- 发起 ListBatchGet:业务填好主 key + indexes 后,同步把 m_result 以 kUnconfirmed 初始化 ----
void SendListBatchGetRequest(struct schedule* /*S*/, void* /*arg*/) {
    static tb_online_list t;
    t.set_openid(1); t.set_tconndid(2); t.set_timekey("key");

    static NS_TCAPLUS_PROTOBUF_API::ListBatchGetRequest req;
    req.m_pMsg = &t;
    req.m_setElemIndexes.clear();

    static ListGetBatchCallback cb;
    cb.m_result.clear();
    for (int idx = 0; idx < 10; ++idx) {
        req.m_setElemIndexes.insert(idx);
        cb.m_result[idx] = ListGetBatchCallback::kUnconfirmed; // 初始全部 -1 = 未确认
    }

    int ret = g_stAsyncApi.ListBatchGet(req, &cb);
    if (ret != TcapErrCode::GEN_ERR_SUC) {
        // 发起失败:路由 / 序列化 / 配置错误,未进入网络阶段;本批不会触发 OnRecv/OnError/OnTimeout/OnFinish
        cb.m_result.clear();
    }
}

Go SDK 高级用法(推荐):

分包丢包识别

  • Go SDK是同步接口,通过多协程并发
  • 用户直接调用DoListXXXBatch接口,SDK会聚合分包,通过option.BatchResult知道哪些Index成功,哪些Index失败或者丢包
opt := &option.TDROpt{
    MultiFlag: 1,                    // 同一主 key 下 element 多 / value 大时仍可能分包,统一开启
    Timeout:   10 * time.Second,
}

// DoListGetBatch 的语义:业务构造一个主 key 的 record + 要操作的 element index 列表
data := tcaplus_tb.NewTable_Traverser_List()
data.Key  = mainKey                  // 只填主 key
data.Name = mainName
var indexs []int32
for i := 0; i < 10; i++ {
    indexs = append(indexs, int32(i))
}

recs, err := client.DoListGetBatch(TABLE_NAME, data, indexs, opt)
if err != nil {
    // Proxy↔SDK 整体超时:DoMore 失败、opt.BatchResult 不会被填,
    // 遍历 BatchResult 无意义 ⇒ 业务直接整批兜底重试,提前返回
    if tcErr, ok := err.(*terror.ErrorCode); ok && tcErr.Code == terror.API_ERR_WAIT_RSP_TIMEOUT {
        return err
    }
    // 其它情况:err 表示「某 res 包总错误码非 0 / 任一 idx sub_ret 非 0」,继续按下面 BatchResult 逐 idx 判断
}

// 1) recs 里都是命中(FetchRecord 返回 nil 的 record),业务直接遍历读 value
for _, rec := range recs {
    elementIdx := rec.GetIndex()
    _ = elementIdx
    _ = rec.GetData(data)            // 业务读 data 字段
}

// 2) 失败的 idx 看 opt.BatchResult:sub_ret 非 0 的 record SDK 不会 append 到 recs
for i, subErr := range opt.BatchResult {
    if subErr == nil {
        continue                     // 命中,已在 recs 处理
    }
    idx := indexs[i]
    _ = idx
    if tcErr, ok := subErr.(*terror.ErrorCode); ok && tcErr.Code == terror.TXHDB_ERR_RECORD_NOT_EXIST {
        // 261:该 element index 在 svr 端不存在(含被淘汰 / 过期),业务语义而非异常
    } else {
        // 其它失败丢包等
    }
}

3.2.3 索引查询类

适用命令字

  • GetByPartKey / ListGetAll

关键特性

  • proxy 整体转发请求给 svr,由 svr 流式返回多条 record,在 svr 端对应的是索引查询
  • 如果索引下记录数太多,一旦堵包,非常容易出现丢包,记录数太多推荐用户使用 offset 和 Limit 方式进行查询
  • 流式响应的丢包识别,关键在于分包的编号:
    • 旧版本:SDK 与 tcapsvr 都没有分包编号,业务只能通过响应里offset 是否连续来判断是否丢包,但在条件过滤的场景下会失效
    • 新版本复用了subcmd字段为分包编号,编号从1开始,SDK会自动识别丢包,返回丢包错误码给业务层,不用业务关注分包编号。
    • 新版本要求:tcapsvr > 3.73.0 2024-11-22;Go SDK > 3.55.0 2026-01-16; C++ TDR/PB SDK > 3.55.0 2024-12-10

C++ TDR SDK

分包丢包识别

  • 新版本自动识别丢包,会返回丢包错误码
  • GetByPartKey和ListGetAll的响应处理丢包错误码
void HandleResponse(TcaplusServiceResponse* rsp) {
    if (rsp->GetResult() != 0) {
        // 总错误码非 0
        if (rsp->GetResult() == TcapErrCode::API_ERR_DROP_SUB_PKG){
            // 出现丢包, 业务可重试,或者offset limit降低包量
        }
        return;
    }
    for (int i = 0; i < rsp->GetRecordCount(); ++i) {
        const TcaplusServiceRecord* rec = NULL;
        rsp->FetchRecord(rec);
        // 业务累计本批 record
    }
    if (rsp->HaveMoreResPkgs() == 1) {
        // 继续等下一个分包
    } else {
        // 任务结束
    }
}

C++ PB SDK(async)

分包丢包识别

  • 新版本自动识别丢包,会返回丢包错误码
  • GetByPartKey和ListGetAll的OnError回调处理丢包
int OnError(const std::vector<google::protobuf::Message*>& msgs, int errorcode) override {
    if (errorcode == TcapErrCode::API_ERR_DROP_SUB_PKG) {
        // 出现丢包, 业务可重试,或者失败处理,或者offset limit降低包量
    }
}

Go SDK 高级用法

分包丢包识别

  • 新版本自动识别丢包,会返回丢包错误码
  • 如果索引下记录数太多,GoSDK同步聚合响应记录会占据较多内存,推荐使用offset和limit控制,或者使用异步,一边收包,一边处理,及时释放内存
opt := &option.TDROpt{
    MultiFlag: 1,
    Timeout:   10 * time.Second,
}

records, err := client.DoGetByPartKey(TableName, partKeyData, indexName, opt)
if err != nil {
    // 流式查询:超时(API_ERR_WAIT_RSP_TIMEOUT)或丢包错误码都会让 err != nil,
    // 此时 records 可能只有部分字段,也可能没有,用户按需要读取
    return err
}
for _, rec := range records {
    // 收到的记录
    _ = rec
}

3.2.4 遍历类

适用命令字

  • Traverse(Generic 表全表扫)
  • ListTraverse(List 表全表扫)

分包丢包识别: SDK会自己维护游标,自动做丢包重试,业务不用关注

4. 与其它文档的关系

results matching ""

    No results matching ""