响应分包和丢包处理说明
TcaplusDB 接入层单个响应包上限为 256 KB。响应数据超过 256 KB 时 proxy 会拆分成多个响应包陆续下发,但 保证不会切分单条记录:单条记录大小不论多大,都会出现在某个分包里完整返回。例如 3 条记录分别为
10 KB / 251 KB / 1 MB,可能拆为(10 KB + 251 KB)和(1 MB)两个响应包。
1. 不分包是历史包袱,务必开启分包
⚠️ §2.2 的"默认不分包"是 TcaplusDB 早期协议的妥协,不是合理设计:响应数据超过 256 KB 时,proxy 只把第一包发回业务,剩余部分被静默丢弃,业务没有任何错误码可感知。
红线(请把这条作为统一规则):
对任何分包类命令字都务必显式开启分包,不要去记"哪个命令字默认是否分包"。 各 SDK 的设置入口:
| SDK | 设置入口 | 是否需要业务手动设置 |
|---|---|---|
| C++ TDR SDK | pstRequest->SetMultiResponseFlag(1) |
必须 |
| C++ PB SDK(async / coroutine) | SDK 内部对批量类命令字已自动设置 | 业务无需设置 |
| Go SDK 底层用法 | req.SetMultiResponseFlag(1) |
必须 |
| Go SDK 高级用法 | option.TDROpt{ MultiFlag: 1 } / option.PBOpt{ MultiFlag: 1 } |
必须(高级用法仍只把 MultiFlag 透传到底层 request) |
2. 触发分包的命令字(全集)
按"业务是否必须手动开启分包"分两类。
2.1 必然分包的命令字(无须手动设置)
proxy 路径上会主动开启分包,业务即使没设 SetMultiResponseFlag(1) 也会自动分包,但 SDK 侧仍需正确循环收包。
| 命令字 | 含义 |
|---|---|
BATCH_GET_REQ |
Generic 表批量查询 |
GET_BY_PARTKEY_REQ |
Generic 表部分 Key 查询(本地索引) |
UPDATE_BY_PARTKEY_REQ / DELETE_BY_PARTKEY_REQ |
部分 Key 更新 / 删除,响应携带变更 key |
INDEX_QUERY_REQ |
全局索引查询,SQL 返回多条匹配记录 |
2.2 默认不分包,必须手动开启的命令字(历史包袱)
这类命令字 multi_flag 默认 0,proxy 只发第一个响应包,剩余部分直接丢弃。业务必须显式调用 SetMultiResponseFlag(1) / option.MultiFlag = 1。
| 命令字 | 何时会超 256 KB |
|---|---|
BATCH_INSERT_REQ / BATCH_UPDATE_REQ / BATCH_REPLACE_REQ / BATCH_DELETE_REQ |
设置了 ResultFlag 回带新值 / 旧值,需要返回成批记录数据 |
LIST_GET_ALL_REQ |
List 表一次读所有元素,元素较多即可能超 256 KB |
LIST_GET_BATCH_REQ |
List 表按 index 批量读多条元素 |
LIST_ADDAFTER_BATCH_REQ / LIST_REPLACE_BATCH_REQ / LIST_DELETE_BATCH_REQ |
设置了 ResultFlag 回带新值 / 旧值的批量元素操作 |
3. 分包丢包
3.1 什么是"分包丢包"
正确开启分包后,proxy 会把响应拆成多个包陆续发回 SDK;但单个分包仍是普通 TCP 报文,在以下场景可能丢失:
| 链路 | 触发原因 | 业务侧表现 |
|---|---|---|
| Proxy ↔ SDK 丢包 | 业务收包不及时(回调线程被同步逻辑阻塞、SDK 处理慢),或网络问题(链路抖动 / 重置),导致 proxy 端 TCP 发送缓冲区写满 → 后续分包发不出去 | 业务收到部分分包却等不到后续分包,只能靠 SDK 请求超时识别 |
| Proxy ↔ Tcapsvr 丢包 | proxy 到 svr 的内部链路网络问题、Tcapsvr 处理慢 / 主从切换 / 重启等 | 按命令字范围分两类: • Generic Batch 类( BatchGet / BatchInsert / BatchUpdate / BatchReplace / BatchDelete):proxy 按 key 拆子请求下发Tcapsvr。• 整批透传类(List Batch 命令字 ListAddAfterBatch / ListReplaceBatch / ListDeleteBatch / ListGetBatch;流式中转命令字 GetByPartKey / UpdateByPartKey / DeleteByPartKey 等):proxy 整体转发后由 Tcapsvr 流式回包,proxy 没有"子请求维度",无法按 key 兜底,业务只能靠 SDK 请求超时 / 总错误码识别丢包场景 |
3.2 如何处理分包和识别丢包
无论哪类命令字,两条通用原则先收住:
- 及时收包:C++ TDR
OnResponse/ C++ PBUpdateNetWork调用线程不要做重活,避免收包不及时;收包优先级高于发包,避免一直发包而不收包导致缓冲区满,进一步恶化。 - 请求超时兜底:不论哪种业务都要编写请求超时兜底代码——Proxy ↔ SDK 链路丢包(缓冲区满 / 网络抖动)TCP 协议层不会主动告知,只能靠 SDK 请求超时识别。
按命令字类分四组讨论具体的"开启分包 + 识别丢包"路径。
3.2.1 Generic Batch 类
适用命令字:BatchGet / BatchInsert / BatchUpdate / BatchReplace / BatchDelete
关键处理:
- proxy 按 key 拆子请求下发到 svr,Proxy ↔ Tcapsvr 链路丢包会被 proxy 兜底成"该 key 的超时分错误码,但前提是sdk业务的超时时间比Proxy的长,否则sdk可能先超时"
- 用户收到响应后,需要确认哪些key成功,哪些key失败;收尾时,需要确认哪些key丢包了没收到
C++ TDR SDK(以 BatchGet 为例):
分包丢包识别:
- SDK 是单线程异步模型,业务需要自己维护超时队列,通过异步ID关联请求和响应
- 每次收到包,业务通过分错误码,确认哪些key成功,哪些key失败
- 每次收到包,业务通过HaveMoreResPkgs是否为1,判断还有下一个分包
- 收尾时,业务确认哪些key丢包了没收到
- 业务定时遍历超时队列,做好超时处理
// TDR SDK 是单线程异步模型:业务侧在同一个线程里循环
static const int kUnconfirmed = -1; // 哨兵:该 key 的分错误码尚未确认(响应未到达)
static const uint64_t kBatchTimeoutMs = 5 * 1000; // 按预计分包数 × 单包预算放大
struct BatchWait {
uint64_t expire_ms; // 截止时间(绝对时间),到点判超时
// 「本批所有 key → 分错误码」;初始全部 kUnconfirmed;
// 收到响应里某 key 就把 result[key] 覆写为 sub_ret;
// Finalize 时仍为 kUnconfirmed 的 key 即「中途丢包/超时未确认」
std::map<KeyT, int> result;
};
std::map<uint64_t /*asyncId*/, BatchWait> g_wait_queue;
// ---- 1) 发送 BatchGet:成功即把所有 key 以 kUnconfirmed 初始化塞入等待队列 ----
int SendBatchGet(const std::vector<KeyT>& keys, uint64_t asyncId) {
TcaplusServiceRequest* req = tcaplus_server.GetRequest(TABLE_NAME);
if (req == NULL) return -1;
req->Init(TCAPLUS_API_BATCH_GET_REQ, NULL, 0, 0, 0, 0);
req->SetAsyncID(asyncId);
req->SetMultiResponseFlag(1); // 必须手动开启分包
BatchWait w;
w.expire_ms = NowMs() + kBatchTimeoutMs;
for (auto& k : keys) {
TcaplusServiceRecord* rec = req->AddRecord(-1);
// rec->SetKeyXxx(...) 按表定义把 k 填进 record
w.result[k] = kUnconfirmed;
}
int ret = tcaplus_server.SendRequest(req);
if (ret != 0) return ret;
g_wait_queue[asyncId] = w;
return 0;
}
// ---- 2) 主循环:单线程顺序跑「收响应 + 超时扫描」----
void MainLoop() {
while (running) {
// 2.1 收响应:RecvResponse 第二个参数 sync_timeout(ms),
// 若无包等待该时长,避免空转 CPU;返回 0 且 rsp != NULL 才有响应
TcaplusServiceResponse* rsp = NULL;
int ret = tcaplus_server.RecvResponse(rsp, 10 /*ms*/);
if (ret == 0 && rsp != NULL) {
HandleResponse(rsp);
}
// 2.2 超时扫描:业务可按 tick 间隔(如每 50ms)做一次
ScanTimeout();
}
}
void HandleResponse(TcaplusServiceResponse* rsp) {
uint64_t asyncId = rsp->GetAsynID();
auto it = g_wait_queue.find(asyncId);
if (it == g_wait_queue.end()) {
// 找不到 = 已被 ScanTimeout 剔除,本包是超时后才到达的残包,丢弃
return;
}
BatchWait& wait = it->second;
if (rsp->GetResult() != 0) {
// 表示本分包应全是错误记录;仍要读分错误码逐条判断
}
for (int i = 0; i < rsp->GetRecordCount(); ++i) {
const TcaplusServiceRecord* rec = NULL;
int sub_ret = rsp->FetchRecord(rec); // 分错误码:0 命中 / 261 不存在 / -7953(Proxy↔Tcapsvr 兜底) 等
if (rec == NULL) continue;
KeyT key = ExtractKey(rec); // 业务从 rec->GetKeyXxx(...) 拼成 KeyT
wait.result[key] = sub_ret; // 精确匹配 key 覆写分错误码(哨兵被改写)
}
if (rsp->HaveMoreResPkgs() == 1) {
// 还有后续分包:续期超时时间,继续等下一个响应
wait.expire_ms = NowMs() + kBatchTimeoutMs;
} else {
// 收到尾包:本 asyncId 任务结束,统一收口
Finalize(wait.result);
g_wait_queue.erase(it);
}
}
// ---- 3) 超时扫描:把仍未确认的 key 改写为超时错误码,统一收口 ----
void ScanTimeout() {
uint64_t now = NowMs();
for (auto it = g_wait_queue.begin(); it != g_wait_queue.end(); ) {
if (it->second.expire_ms > now) { ++it; continue; }
// 超时未收齐 ⇒ Proxy↔SDK 段丢包;把 kUnconfirmed 的 key 改写为 API_ERR_WAIT_RSP_TIMEOUT
for (auto& kv : it->second.result) {
if (kv.second == kUnconfirmed) kv.second = TcapErrCode::API_ERR_WAIT_RSP_TIMEOUT;
}
Finalize(it->second.result);
it = g_wait_queue.erase(it);
}
}
// ---- 4) 唯一收口点:遍历 result,按每个 key 的最终分错误码统一处理 ----
void Finalize(const std::map<KeyT, int>& result) {
for (auto& kv : result) {
int sub_ret = kv.second;
if (sub_ret == 0) {
// 命中:业务读 value
} else if (sub_ret == TcapErrCode::TXHDB_ERR_RECORD_NOT_EXIST) {
// 261:记录不存在
} else if (sub_ret == kUnconfirmed) {
// 尾包到达仍是 kUnconfirmed ⇒ 该 key 在所有分包里都没出现,即「中途丢包」
} else {
// 其它非 0:含
// • 服务端语义失败 / Proxy↔Tcapsvr 被 proxy 兜底回来的超时(如 -7953)
// • ScanTimeout 改写的 API_ERR_WAIT_RSP_TIMEOUT
// 业务按 sub_ret 决定是否重试
}
}
}
C++ PB SDK(async):
分包丢包识别:
- PB SDK 是单线程回调模型,SDK自己维护了超时队列,每次收到分包都会触发OnRecv回调
- 每次收到包,业务通过分错误码,确认哪些key成功,哪些key失败
- sdk自己会遍历超时队列,超时触发OnTimeout超时回调,业务确认哪些key超时,做超时处理
- 收尾时,确认哪些key丢包了没收到
// PB async SDK 对批量类命令字内部已自动 SetMultiResponseFlag(1),业务无需手动开启。
// 1) 每收到一个分包 → SDK 用响应里 record 的 key 字节串 —— 严格 —— 反查业务原始 msg 指针,
// 把 value 反序列化回去后 mapMsg[业务原始指针] = sub_ret,再调 OnRecv(mapMsg)
// 2) HaveMoreResPkgs() == 1:SDK 自动续期超时 m_tmExpire = now + session_timeout;继续等下一个分包
// HaveMoreResPkgs() != 1(尾包):SDK 用 OnError(vecMsgs, API_ERR_NO_MORE_RECORD) 作为「结束哨兵」
// 3) 中间分包久未到达触发超时:SDK 调 OnTimeout(vecMsgs)
// 4) 上述任一终止路径都会在 MsgStatus 析构里调 OnFinish(param) 作为本次操作的统一收尾
//
// 关键事实:OnRecv 的 mapMsg 里的 Message* 就是业务发请求时传入 BatchGet 的那一份原始指针
// (SDK 按 key 字节串严格匹配回来,不会新建也不会随便挑),因此业务可以直接用「指针集合」做「待回包→已回包」的差集
class BatchGetCallback : public TcaplusPbCallback {
public:
// 哨兵值:表示该 key 的分错误码尚未确认(响应未到达);正常 sub_ret 取值不会用 -1
enum : int { kUnconfirmed = -1 };
// 「所有 msg 指针 → 分错误码」;发请求前业务把全部 key 以 kUnconfirmed 初始化塞入;
// OnRecv 每收到一个分包就把对应指针的 value 覆写为 SDK 回填的 sub_ret;
std::map< ::google::protobuf::Message*, int> m_result;
// 每收到一个分包就回调一次;mapMsg 里是这一包里 SDK 反查回的原始 msg 指针 + 该 key 的分错误码
// 每收到一个分包就回调一次;只做「把哨兵覆写为真实分错误码」,业务统一在 OnFinish 收口处理
int OnRecv(const std::map< ::google::protobuf::Message*, int>& mapMsg) override {
for (auto& kv : mapMsg) {
m_result[kv.first] = kv.second;
}
return 0;
}
// OnError 的 API_ERR_NO_MORE_RECORD 只是「尾包已收齐」的结束哨兵,业务忽略——
// 真正收口走 OnFinish;其它 errorcode 才是协议层错误(API_ERR_UNPACK_MESSAGE / SVR_ERR_FAIL_SYSTEM_ERROR 等),
// 把仍是 kUnconfirmed 的 key 改写成该 errorcode,让 OnFinish 能用一套分支处理
int OnError(const std::vector< ::google::protobuf::Message*>& /*msgs*/, int errorcode) override {
if (errorcode == TcapErrCode::API_ERR_NO_MORE_RECORD) return 0;
for (auto& kv : m_result) {
if (kv.second == kUnconfirmed) kv.second = errorcode;
}
return 0;
}
// SDK 等待超时(典型 Proxy↔SDK 段丢包):把未确认的 key 改写为 API_ERR_WAIT_RSP_TIMEOUT,同样让 OnFinish 收口
int OnTimeout(const std::vector< ::google::protobuf::Message*>& /*msgs*/) override {
for (auto& kv : m_result) {
if (kv.second == kUnconfirmed) kv.second = TcapErrCode::API_ERR_WAIT_RSP_TIMEOUT;
}
return 0;
}
// 唯一收口点:任一终止路径(正常收齐 / 协议错误 / 超时)最终都会调到 OnFinish;
// 业务在这里遍历 m_result,按每个 key 的最终分错误码统一处理
int OnFinish(const NS_TCAPLUS_PROTOBUF_API::MsgParam& /*param*/) override {
for (auto& kv : m_result) {
int sub_ret = kv.second;
if (sub_ret == TcapErrCode::GEN_ERR_SUC) {
// 命中:业务读 value 字段
// auto* t = dynamic_cast<tb_online*>(kv.first); /* 读 t->xxx() */
} else if (sub_ret == TcapErrCode::TXHDB_ERR_RECORD_NOT_EXIST) {
// 261:记录不存在
} else if (sub_ret == kUnconfirmed) {
// 走到 OnFinish 仍是 kUnconfirmed ⇒ 正常收齐路径下该 key 在所有分包里都没出现
// 即「中途丢包」
} else {
// 其它非 0:含
// • 服务端语义失败 / Proxy↔Tcapsvr 被 proxy 兜底回来的超时(如 -7953)
// • OnTimeout 改写的 API_ERR_WAIT_RSP_TIMEOUT
// • OnError 改写的协议层错误(API_ERR_UNPACK_MESSAGE / SVR_ERR_FAIL_SYSTEM_ERROR 等)
// 业务按 sub_ret 决定是否重试
}
}
m_result.clear();
return 0;
}
};
// ---- 发起 BatchGet:业务把 key 字段填进 msg 集合后交给 SDK,并同步把指针以 kUnconfirmed 初始化塞入 cb.m_result ----
void SendBatchGetRequest(struct schedule* /*S*/, void* /*arg*/) {
static std::vector<tb_online> keys;
static std::vector< ::google::protobuf::Message*> msgs;
static BatchGetCallback cb;
keys.clear(); msgs.clear(); cb.m_result.clear();
keys.resize(/*batch_size=*/2);
keys[0].set_openid(1); keys[0].set_tconndid(1); keys[0].set_timekey("test_1");
keys[1].set_openid(2); keys[1].set_tconndid(2); keys[1].set_timekey("test_2");
for (auto& k : keys) {
msgs.push_back(&k);
cb.m_result[&k] = BatchGetCallback::kUnconfirmed; // 初始全部 -1 = 未确认
}
int ret = g_stAsyncApi.BatchGet(&msgs, &cb);
if (ret != TcapErrCode::GEN_ERR_SUC) {
// 发起失败:路由 / 序列化 / 配置错误,未进入网络阶段;本批不会触发 OnRecv/OnError/OnTimeout/OnFinish
cb.m_result.clear();
}
}
Go SDK 高级用法(推荐):
分包丢包识别:
- Go SDK是同步接口,通过多协程并发
- 用户直接调用DoBatchXXX接口,SDK会聚合分包,通过option.BatchResult知道哪些key成功,哪些key失败或者丢包
opt := &option.TDROpt{
MultiFlag: 1, // 必须手动开启分包
Timeout: 10 * time.Second,
}
// DoBatchGet 是 inout 风格:业务构造 dataSlice(每个元素是具体表 struct,只填 key 字段)传入,
// 成功后 SDK 把每条记录的 value 字段反写回 dataSlice[i],分错误码填到 opt.BatchResult[i]
var dataSlice []record.TdrTableSt
for _, k := range keys {
rec := service_info.NewService_Info()
rec.Openid = k // 只填 key 字段
dataSlice = append(dataSlice, rec)
}
err := client.DoBatchGet(TableName, dataSlice, opt)
if err != nil {
// Proxy↔SDK 整体超时:doBatch 直接 return err,opt.BatchResult 不会被填,
// 遍历 BatchResult 无意义 ⇒ 业务直接整体兜底重试,提前返回
if tcErr, ok := err.(*terror.ErrorCode); ok && tcErr.Code == terror.API_ERR_WAIT_RSP_TIMEOUT {
return err
}
// 其它情况:err 表示「任意一个 key 失败」,继续按下面 BatchResult 逐 key 判断
}
// 业务遍历 opt.BatchResult,按每个 key 的最终分错误码处理;BatchResult 顺序大小和 dataSlice 一致
for i, subErr := range opt.BatchResult {
if subErr == nil {
// 正确返回:dataSlice[i] 的 value 字段已被 SDK 反写
_ = dataSlice[i]
} else if tcErr, ok := subErr.(*terror.ErrorCode); ok && tcErr.Code == terror.TXHDB_ERR_RECORD_NOT_EXIST {
// 261:记录不存在,业务语义而非异常
} else {
// dataSlice[i]失败,含:服务端失败 / Proxy↔Tcapsvr超时(-7953 等) / 没回包被 doBatch 用 globalErr 兜底
// 业务按 subErr 决定是否幂等重试
}
}
3.2.2 List Batch 类
适用命令字:ListAddAfterBatch / ListReplaceBatch / ListDeleteBatch / ListGetBatch
关键特性:
- List Batch 类的操作目标是同一主 key 下的多个 list 元素
- proxy 不按子元素拆请求,整批 (主 key + indexes) 透传给 Tcapsvr
- 用户收到响应后,需要确认哪些index成功,哪些index失败;收尾时,需要确认哪些丢掉了没收到
C++ TDR SDK(以 ListGetBatch 为例):
分包丢包识别:
- SDK 是单线程异步模型,业务需要自己维护超时队列,通过异步ID关联请求和响应
- 每次收到包,业务通过分错误码,确认哪些index成功,哪些index失败
- 每次收到包,业务通过HaveMoreResPkgs是否为1,判断还有下一个分包
- 收尾时,业务确认哪些index丢包了没收到
- 业务定时遍历超时队列,做好超时处理
// TDR SDK 单线程异步模型;List Batch 响应按 element_index 维度组织 sub_ret,
// 业务侧的 BatchWait 沿用 §3.2.1 Generic Batch 的结构,只是 key 类型从「业务 KeyT」换成 element_index
static const int kUnconfirmed = -1; // 哨兵:该 index 的分错误码尚未确认
static const uint64_t kListBatchTimeoutMs = 5 * 1000;
struct ListBatchWait {
uint64_t expire_ms; // 截止时间(绝对时间),到点判超时
// 「请求里所有 element_index → 分错误码」;初始全部 kUnconfirmed;
// 收到响应里某 index 就把 result[idx] 覆写为 sub_ret;
// 收齐尾包 / 超时之后,仍为 kUnconfirmed 的 index 即「该批未确认 → 整批丢包」
std::map<int32_t /*element_index*/, int /*sub_ret*/> result;
};
std::map<uint64_t /*asyncId*/, ListBatchWait> g_wait_queue;
// ---- 1) 发送 ListGetBatch:成功即把所有 index 以 kUnconfirmed 初始化塞入等待队列 ----
int SendListGetBatch(const MainKey& mkey, const std::vector<int32_t>& element_indexes, uint64_t asyncId) {
TcaplusServiceRequest* req = tcaplus_server.GetRequest(TABLE_NAME);
if (req == NULL) return -1;
req->Init(TCAPLUS_API_LIST_GET_BATCH_REQ, NULL, 0, 0, 0, 0);
req->SetAsyncID(asyncId);
req->SetMultiResponseFlag(1); // 同一主 key 下 element 多 / value 大时仍可能分包,统一开启
TcaplusServiceRecord* rec = req->AddRecord(0);
// rec->SetKeyXxx(...) 按表定义把主 key mkey 填进 record
ListBatchWait w;
w.expire_ms = NowMs() + kListBatchTimeoutMs;
for (auto idx : element_indexes) {
req->AddElementIndex(idx); // 批量添加 element index(单批 ≤ 1024 个)
w.result[idx] = kUnconfirmed; // 同步占位
}
int ret = tcaplus_server.SendRequest(req);
if (ret != 0) return ret;
g_wait_queue[asyncId] = w;
return 0;
}
// ---- 2) 主循环:单线程顺序跑「收响应 + 超时扫描」----
void MainLoop() {
while (running) {
TcaplusServiceResponse* rsp = NULL;
int ret = tcaplus_server.RecvResponse(rsp, 10 /*ms*/);
if (ret == 0 && rsp != NULL) HandleResponse(rsp);
ScanTimeout();
}
}
void HandleResponse(TcaplusServiceResponse* rsp) {
uint64_t asyncId = rsp->GetAsynID();
auto it = g_wait_queue.find(asyncId);
if (it == g_wait_queue.end()) return; // 残包,丢弃
ListBatchWait& wait = it->second;
if (rsp->GetResult() != 0) {
// 总错误码非 0:表示本分包全是错误记录
// 仍要继续读分错误码逐条确认
}
for (int i = 0; i < rsp->GetRecordCount(); ++i) {
const TcaplusServiceRecord* rec = NULL;
int sub_ret = rsp->FetchRecord(rec); // 分错误码:0 命中 / 261 该 index 不存在 / 其它服务端错误
if (rec == NULL) continue;
int32_t element_idx = rec->GetIndex(); // 响应里每条 record 自带 element_index
wait.result[element_idx] = sub_ret; // 精确匹配 index 覆写哨兵
// sub_ret == 0 时业务可立即读 rec->GetData(...) 保存到业务容器
}
if (rsp->HaveMoreResPkgs() == 1) {
wait.expire_ms = NowMs() + kListBatchTimeoutMs; // 续期超时,等下一个分包
} else {
Finalize(wait.result); // 尾包到达:本 asyncId 任务结束,统一收口
g_wait_queue.erase(it);
}
}
// ---- 3) 超时扫描:仍未确认的 index 改写为超时错误码,统一收口 ----
void ScanTimeout() {
uint64_t now = NowMs();
for (auto it = g_wait_queue.begin(); it != g_wait_queue.end(); ) {
if (it->second.expire_ms > now) { ++it; continue; }
// 超时未收齐,把 kUnconfirmed 改写为 API_ERR_WAIT_RSP_TIMEOUT
for (auto& kv : it->second.result) {
if (kv.second == kUnconfirmed) kv.second = TcapErrCode::API_ERR_WAIT_RSP_TIMEOUT;
}
Finalize(it->second.result);
it = g_wait_queue.erase(it);
}
}
// ---- 4) 唯一收口点:遍历 result,按每个 element_index 的最终分错误码统一处理 ----
void Finalize(const std::map<int32_t, int>& result) {
for (auto& kv : result) {
int sub_ret = kv.second;
if (sub_ret == 0) {
// 命中:value 已在 HandleResponse 中读到(业务自行保存)
} else if (sub_ret == TcapErrCode::TXHDB_ERR_RECORD_NOT_EXIST) {
// 261:该 element index 在 svr 端不存在(含被淘汰 / 过期)
} else if (sub_ret == kUnconfirmed) {
// 走到这里仍是 kUnconfirmed —— 常规收齐路径不会出现(svr 必为每个请求 index 写一条 res);
// 仅作为 defensive,超时路径已被 ScanTimeout 改写为 API_ERR_WAIT_RSP_TIMEOUT
} else {
// 其它非 0:含 API_ERR_WAIT_RSP_TIMEOUT(ScanTimeout 改写)/ 服务端语义失败等
// 业务按 sub_ret 决定是否幂等重试
}
}
}
C++ PB SDK(async):
分包丢包识别:
- PB SDK 是单线程回调模型,SDK自己维护了超时队列,每次收到分包都会触发OnRecv回调
- 每次收到包,业务通过分错误码,确认哪些index成功,哪些index失败
- sdk自己会遍历超时队列,超时触发OnTimeout超时回调,业务确认哪些index超时,做超时处理
- 收尾时,确认哪些Index丢包了没收到
// ListBatchGet 用专用回调签名 OnRecv(const ListBatchGetRequest& req, ListBatchGetResponse* res)
// res 里:
// • m_mapMsg<int element_index, Message*>:SDK new 出来并 ParseFromArray 的 msg,所有权在 SDK,业务必须在 OnRecv 内读完字段(OnFinish 后 res 会被销毁)
// • m_mapErr<int element_index, int sub_ret>:对应 sub_ret(0 命中 / 261 不存在 等)
// 分包到达时 SDK 真实处理流程:
// 1) 每收到一个分包 → SDK 按响应里的 elementIndexArray 把 (index, msg, sub_ret) 累积写进 res,
// 再调一次 OnRecv(*req, res);res->m_mapMsg / m_mapErr 跨分包累积
// 2) HaveMoreResPkgs() == 1:SDK 自动续期 m_tmExpire,继续等下一个分包
// HaveMoreResPkgs() != 1(尾包):SDK 用 OnError(vecMsgs, API_ERR_NO_MORE_RECORD) 作为「结束哨兵」
// 3) 中间分包久未到达触发超时:SDK 调 OnTimeout(vecMsgs)
// 4) 上述任一终止路径都会在 MsgStatus 析构里调 OnFinish 作为本次操作的统一收尾
class ListGetBatchCallback : public TcaplusPbCallback {
public:
enum : int { kUnconfirmed = -1 };
// 「请求里所有 element_index → 分错误码」;发请求前业务把全部 index 以 kUnconfirmed 初始化塞入;
// OnRecv 每收到一个分包就把对应 index 的 sub_ret 覆写
std::map<int /*element_index*/, int /*sub_ret*/> m_result;
int OnRecv(const NS_TCAPLUS_PROTOBUF_API::ListBatchGetRequest& /*req*/,
NS_TCAPLUS_PROTOBUF_API::ListBatchGetResponse* res) override {
// res->m_mapErr 是累积型;遍历全量覆写哨兵是幂等的,无需区分"本次新增的 index"
for (auto& kv : res->m_mapErr) {
m_result[kv.first] = kv.second;
// sub_ret == 0 命中:业务此时可读 res->m_mapMsg[kv.first] 的字段并保存到业务容器
// (m_mapMsg 的 Message* 所有权在 SDK,OnFinish 后会被销毁)
}
return 0;
}
// OnError 的 API_ERR_NO_MORE_RECORD 是「尾包已收齐」的结束哨兵,业务忽略——
// 真正收口走 OnFinish;其它 errorcode 是协议层错误(API_ERR_UNPACK_MESSAGE / SVR_ERR_FAIL_SYSTEM_ERROR 等),
// 把仍是 kUnconfirmed 的 index 改写成该 errorcode,让 OnFinish 用一套分支处理
int OnError(const std::vector< ::google::protobuf::Message*>& /*msgs*/, int errorcode) override {
if (errorcode == TcapErrCode::API_ERR_NO_MORE_RECORD) return 0;
for (auto& kv : m_result) {
if (kv.second == kUnconfirmed) kv.second = errorcode;
}
return 0;
}
// SDK 等待超时(典型 Proxy↔SDK 段丢包 / Proxy↔Tcapsvr 段整批丢失):
// 把未确认的 index 改写为 API_ERR_WAIT_RSP_TIMEOUT,让 OnFinish 收口
int OnTimeout(const std::vector< ::google::protobuf::Message*>& /*msgs*/) override {
for (auto& kv : m_result) {
if (kv.second == kUnconfirmed) kv.second = TcapErrCode::API_ERR_WAIT_RSP_TIMEOUT;
}
return 0;
}
// 唯一收口点:任一终止路径(正常收齐 / 协议错误 / 超时)最终都会调到 OnFinish;
// 业务在这里遍历 m_result,按每个 element_index 的最终分错误码统一处理
int OnFinish(const NS_TCAPLUS_PROTOBUF_API::MsgParam& /*param*/) override {
for (auto& kv : m_result) {
int element_index = kv.first;
int sub_ret = kv.second;
(void)element_index;
if (sub_ret == TcapErrCode::GEN_ERR_SUC) {
// 命中:value 已在 OnRecv 中读到
} else if (sub_ret == TcapErrCode::TXHDB_ERR_RECORD_NOT_EXIST) {
// 261:该 element_index 在 svr 端不存在
} else if (sub_ret == kUnconfirmed) {
// OnFinish 仍是 kUnconfirmed ⇒ defensive;正常路径已被 OnError/OnTimeout 改写
} else {
// 其它非 0:含 API_ERR_WAIT_RSP_TIMEOUT / API_ERR_UNPACK_MESSAGE 等
// 业务按 sub_ret 决定是否幂等重试
}
}
m_result.clear();
return 0;
}
};
// ---- 发起 ListBatchGet:业务填好主 key + indexes 后,同步把 m_result 以 kUnconfirmed 初始化 ----
void SendListBatchGetRequest(struct schedule* /*S*/, void* /*arg*/) {
static tb_online_list t;
t.set_openid(1); t.set_tconndid(2); t.set_timekey("key");
static NS_TCAPLUS_PROTOBUF_API::ListBatchGetRequest req;
req.m_pMsg = &t;
req.m_setElemIndexes.clear();
static ListGetBatchCallback cb;
cb.m_result.clear();
for (int idx = 0; idx < 10; ++idx) {
req.m_setElemIndexes.insert(idx);
cb.m_result[idx] = ListGetBatchCallback::kUnconfirmed; // 初始全部 -1 = 未确认
}
int ret = g_stAsyncApi.ListBatchGet(req, &cb);
if (ret != TcapErrCode::GEN_ERR_SUC) {
// 发起失败:路由 / 序列化 / 配置错误,未进入网络阶段;本批不会触发 OnRecv/OnError/OnTimeout/OnFinish
cb.m_result.clear();
}
}
Go SDK 高级用法(推荐):
分包丢包识别:
- Go SDK是同步接口,通过多协程并发
- 用户直接调用DoListXXXBatch接口,SDK会聚合分包,通过option.BatchResult知道哪些Index成功,哪些Index失败或者丢包
opt := &option.TDROpt{
MultiFlag: 1, // 同一主 key 下 element 多 / value 大时仍可能分包,统一开启
Timeout: 10 * time.Second,
}
// DoListGetBatch 的语义:业务构造一个主 key 的 record + 要操作的 element index 列表
data := tcaplus_tb.NewTable_Traverser_List()
data.Key = mainKey // 只填主 key
data.Name = mainName
var indexs []int32
for i := 0; i < 10; i++ {
indexs = append(indexs, int32(i))
}
recs, err := client.DoListGetBatch(TABLE_NAME, data, indexs, opt)
if err != nil {
// Proxy↔SDK 整体超时:DoMore 失败、opt.BatchResult 不会被填,
// 遍历 BatchResult 无意义 ⇒ 业务直接整批兜底重试,提前返回
if tcErr, ok := err.(*terror.ErrorCode); ok && tcErr.Code == terror.API_ERR_WAIT_RSP_TIMEOUT {
return err
}
// 其它情况:err 表示「某 res 包总错误码非 0 / 任一 idx sub_ret 非 0」,继续按下面 BatchResult 逐 idx 判断
}
// 1) recs 里都是命中(FetchRecord 返回 nil 的 record),业务直接遍历读 value
for _, rec := range recs {
elementIdx := rec.GetIndex()
_ = elementIdx
_ = rec.GetData(data) // 业务读 data 字段
}
// 2) 失败的 idx 看 opt.BatchResult:sub_ret 非 0 的 record SDK 不会 append 到 recs
for i, subErr := range opt.BatchResult {
if subErr == nil {
continue // 命中,已在 recs 处理
}
idx := indexs[i]
_ = idx
if tcErr, ok := subErr.(*terror.ErrorCode); ok && tcErr.Code == terror.TXHDB_ERR_RECORD_NOT_EXIST {
// 261:该 element index 在 svr 端不存在(含被淘汰 / 过期),业务语义而非异常
} else {
// 其它失败丢包等
}
}
3.2.3 索引查询类
适用命令字:
GetByPartKey/ListGetAll
关键特性:
- proxy 整体转发请求给 svr,由 svr 流式返回多条 record,在 svr 端对应的是索引查询
- 如果索引下记录数太多,一旦堵包,非常容易出现丢包,记录数太多推荐用户使用 offset 和 Limit 方式进行查询
- 流式响应的丢包识别,关键在于分包的编号:
- 旧版本:SDK 与 tcapsvr 都没有分包编号,业务只能通过响应里
offset是否连续来判断是否丢包,但在条件过滤的场景下会失效 - 新版本复用了subcmd字段为分包编号,编号从1开始,SDK会自动识别丢包,返回丢包错误码给业务层,不用业务关注分包编号。
- 新版本要求:tcapsvr > 3.73.0 2024-11-22;Go SDK > 3.55.0 2026-01-16; C++ TDR/PB SDK > 3.55.0 2024-12-10
- 旧版本:SDK 与 tcapsvr 都没有分包编号,业务只能通过响应里
C++ TDR SDK:
分包丢包识别:
- 新版本自动识别丢包,会返回丢包错误码
- GetByPartKey和ListGetAll的响应处理丢包错误码
void HandleResponse(TcaplusServiceResponse* rsp) {
if (rsp->GetResult() != 0) {
// 总错误码非 0
if (rsp->GetResult() == TcapErrCode::API_ERR_DROP_SUB_PKG){
// 出现丢包, 业务可重试,或者offset limit降低包量
}
return;
}
for (int i = 0; i < rsp->GetRecordCount(); ++i) {
const TcaplusServiceRecord* rec = NULL;
rsp->FetchRecord(rec);
// 业务累计本批 record
}
if (rsp->HaveMoreResPkgs() == 1) {
// 继续等下一个分包
} else {
// 任务结束
}
}
C++ PB SDK(async)
分包丢包识别:
- 新版本自动识别丢包,会返回丢包错误码
- GetByPartKey和ListGetAll的OnError回调处理丢包
int OnError(const std::vector<google::protobuf::Message*>& msgs, int errorcode) override {
if (errorcode == TcapErrCode::API_ERR_DROP_SUB_PKG) {
// 出现丢包, 业务可重试,或者失败处理,或者offset limit降低包量
}
}
Go SDK 高级用法:
分包丢包识别:
- 新版本自动识别丢包,会返回丢包错误码
- 如果索引下记录数太多,GoSDK同步聚合响应记录会占据较多内存,推荐使用offset和limit控制,或者使用异步,一边收包,一边处理,及时释放内存
opt := &option.TDROpt{
MultiFlag: 1,
Timeout: 10 * time.Second,
}
records, err := client.DoGetByPartKey(TableName, partKeyData, indexName, opt)
if err != nil {
// 流式查询:超时(API_ERR_WAIT_RSP_TIMEOUT)或丢包错误码都会让 err != nil,
// 此时 records 可能只有部分字段,也可能没有,用户按需要读取
return err
}
for _, rec := range records {
// 收到的记录
_ = rec
}
3.2.4 遍历类
适用命令字:
Traverse(Generic 表全表扫)ListTraverse(List 表全表扫)
分包丢包识别: SDK会自己维护游标,自动做丢包重试,业务不用关注
4. 与其它文档的关系
- 协议层"必须开分包"的强红线、各 SDK 设置入口完整对照表,详见 批量操作接口的相关说明 §4。
ResultFlag与分包的交互(回带数据时更易超过 256 KB),详见 ResultFlag响应数据返回控制。