From 7f16081e29eb0e8629f36dff079791c79bd0537c Mon Sep 17 00:00:00 2001
From: 54liuyao <54liuyao@163.com>
Date: Wed, 19 Oct 2022 09:24:18 +0800
Subject: [PATCH] feat(stream): stream task and meta

---
 source/libs/stream/src/streamExec.c | 2 ++
 source/libs/stream/src/streamMeta.c | 2 ++
 source/libs/stream/src/streamTask.c | 5 ++++-
 3 files changed, 8 insertions(+), 1 deletion(-)

diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c
index 5ad5aa549d..149b1a8447 100644
--- a/source/libs/stream/src/streamExec.c
+++ b/source/libs/stream/src/streamExec.c
@@ -137,6 +137,8 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch)
       if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
         streamDispatch(pTask);
       }
+    } else {
+      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
     }
   }
 
diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c
index 53e49a6ba5..cf72533b31 100644
--- a/source/libs/stream/src/streamMeta.c
+++ b/source/libs/stream/src/streamMeta.c
@@ -262,12 +262,14 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) {
     if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) {
       tdbFree(pKey);
       tdbFree(pVal);
+      tdbTbcClose(pCur);
       return -1;
     }
 
     if (taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
       tdbFree(pKey);
       tdbFree(pVal);
+      tdbTbcClose(pCur);
       return -1;
     }
   }
diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c
index ce5917de29..5304938195 100644
--- a/source/libs/stream/src/streamTask.c
+++ b/source/libs/stream/src/streamTask.c
@@ -119,7 +119,10 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
   for (int32_t i = 0; i < epSz; i++) {
     SStreamChildEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamChildEpInfo));
     if (pInfo == NULL) return -1;
-    if (tDecodeStreamEpInfo(pDecoder, pInfo) < 0) return -1;
+    if (tDecodeStreamEpInfo(pDecoder, pInfo) < 0) {
+      taosMemoryFreeClear(pInfo);
+      return -1;
+    }
     taosArrayPush(pTask->childEpInfo, &pInfo);
   }
 
-- 
GitLab