diff --git a/internal/dataservice/binlog_helper.go b/internal/dataservice/binlog_helper.go
index a2f7d6ec4614102191b113492055718c77e23a73..f53d6c3a98fb44566f608c012cce697e1ede610f 100644
--- a/internal/dataservice/binlog_helper.go
+++ b/internal/dataservice/binlog_helper.go
@@ -194,11 +194,11 @@ func (s *Server) prepareSegmentPos(segInfo *datapb.SegmentInfo, dmlPos, ddlPos *
return nil, err
}
msPosPair := proto.MarshalTextString(ddlPos)
- result[path.Join(Params.SegmentDmlPosSubPath, key)] = msPosPair //segment pos
+ result[path.Join(Params.SegmentDdlPosSubPath, key)] = msPosPair //segment pos
result[path.Join(Params.DdlChannelPosSubPath, segInfo.InsertChannel)] = msPosPair // DdlChannel pos(use dm channel as Key, since dd channel may share same channel name)
}
- return map[string]string{}, nil
+ return result, nil
}
// GetVChanPositions get vchannel latest postitions with provided dml channel names
@@ -245,7 +245,7 @@ func (s *Server) GetVChanPositions(vchans []vchannel) ([]*datapb.VchannelPair, e
pairs = append(pairs, &datapb.VchannelPair{
CollectionID: vchan.CollectionID,
DmlVchannelName: vchan.DmlChannel,
- DdlVchannelName: vchan.DmlChannel,
+ DdlVchannelName: vchan.DdlChannel,
DdlPosition: ddlPos,
DmlPosition: dmlPos,
})
diff --git a/internal/dataservice/cluster.go b/internal/dataservice/cluster.go
index 4300122aed72027372a656205825c9dd03b5e9cc..2a38ffaddbf9b2aae308477edd690fb8b29f6826 100644
--- a/internal/dataservice/cluster.go
+++ b/internal/dataservice/cluster.go
@@ -62,7 +62,7 @@ func withAssignPolicy(p channelAssignPolicy) clusterOption {
}
func defaultStartupPolicy() clusterStartupPolicy {
- return newReWatchOnRestartsStartupPolicy()
+ return newWatchRestartsStartupPolicy()
}
func defaultRegisterPolicy() dataNodeRegisterPolicy {
@@ -74,7 +74,7 @@ func defaultUnregisterPolicy() dataNodeUnregisterPolicy {
}
func defaultAssignPolicy() channelAssignPolicy {
- return newAllAssignPolicy()
+ return newAssignAllPolicy()
}
func newCluster(ctx context.Context, dataManager *clusterNodeManager, sessionManager sessionManager, posProvider positionProvider, opts ...clusterOption) *cluster {
@@ -165,6 +165,7 @@ func (c *cluster) register(n *datapb.DataNodeInfo) {
func (c *cluster) unregister(n *datapb.DataNodeInfo) {
c.mu.Lock()
defer c.mu.Unlock()
+ c.sessionManager.releaseSession(n.Address)
c.dataManager.unregister(n)
cNodes := c.dataManager.getDataNodes(true)
rets := c.unregisterPolicy.apply(cNodes, n)
diff --git a/internal/dataservice/datanode_helper.go b/internal/dataservice/datanode_helper.go
index 687c2d60fe4cd7abe50853dfadde654cf6f8b7d1..171364980e56072c26c47b38c6dff7ec2f7752e8 100644
--- a/internal/dataservice/datanode_helper.go
+++ b/internal/dataservice/datanode_helper.go
@@ -12,9 +12,7 @@
package dataservice
import (
- "github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/datapb"
- "go.uber.org/zap"
)
type vchannel struct {
@@ -57,30 +55,3 @@ func (dp dummyPosProvider) GetDdlChannel() string {
func (s *Server) GetDdlChannel() string {
return s.ddChannelName
}
-
-// getAllActiveVChannels get all vchannels with unflushed segments
-func (s *Server) getAllActiveVChannels() []vchannel {
- segments := s.meta.GetUnFlushedSegments()
-
- mChanCol := make(map[string]UniqueID)
- for _, segment := range segments {
- ocid, has := mChanCol[segment.InsertChannel]
- if has && ocid != segment.CollectionID {
- log.Error("col:vchan not 1:N",
- zap.Int64("colid 1", ocid),
- zap.Int64("colid 2", segment.CollectionID),
- zap.String("channel", segment.InsertChannel))
- }
- mChanCol[segment.InsertChannel] = segment.CollectionID
- }
-
- vchans := make([]vchannel, 0, len(mChanCol))
- for dmChan, colID := range mChanCol {
- vchans = append(vchans, vchannel{
- CollectionID: colID,
- DmlChannel: dmChan,
- DdlChannel: s.ddChannelName,
- })
- }
- return vchans
-}
diff --git a/internal/dataservice/policy.go b/internal/dataservice/policy.go
index 1d66d0a093b8b130927e61478624b2b66487160c..6bcb35ff093e5fa8f0d9d9f14bf29393a9438109 100644
--- a/internal/dataservice/policy.go
+++ b/internal/dataservice/policy.go
@@ -25,17 +25,18 @@ type clusterDeltaChange struct {
restarts []string
}
type clusterStartupPolicy interface {
+ // apply accept all nodes and new/offline/restarts nodes and returns datanodes whose status need to be changed
apply(oldCluster map[string]*datapb.DataNodeInfo, delta *clusterDeltaChange) []*datapb.DataNodeInfo
}
-type reWatchOnRestartsStartupPolicy struct {
+type watchRestartsStartupPolicy struct {
}
-func newReWatchOnRestartsStartupPolicy() clusterStartupPolicy {
- return &reWatchOnRestartsStartupPolicy{}
+func newWatchRestartsStartupPolicy() clusterStartupPolicy {
+ return &watchRestartsStartupPolicy{}
}
-func (p *reWatchOnRestartsStartupPolicy) apply(cluster map[string]*datapb.DataNodeInfo, delta *clusterDeltaChange) []*datapb.DataNodeInfo {
+func (p *watchRestartsStartupPolicy) apply(cluster map[string]*datapb.DataNodeInfo, delta *clusterDeltaChange) []*datapb.DataNodeInfo {
ret := make([]*datapb.DataNodeInfo, 0)
for _, addr := range delta.restarts {
node := cluster[addr]
@@ -48,6 +49,7 @@ func (p *reWatchOnRestartsStartupPolicy) apply(cluster map[string]*datapb.DataNo
}
type dataNodeRegisterPolicy interface {
+ // apply accept all online nodes and new created node, returns nodes needed to be changed
apply(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo) []*datapb.DataNodeInfo
}
@@ -63,6 +65,7 @@ func (p *doNothingRegisterPolicy) apply(cluster map[string]*datapb.DataNodeInfo,
}
type dataNodeUnregisterPolicy interface {
+ // apply accept all online nodes and unregistered node, returns nodes needed to be changed
apply(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo) []*datapb.DataNodeInfo
}
@@ -77,9 +80,9 @@ func (p *doNothingUnregisterPolicy) apply(cluster map[string]*datapb.DataNodeInf
return nil
}
-type reassignRandomUnregisterPolicy struct{}
+type randomAssignUnregisterPolicy struct{}
-func (p *reassignRandomUnregisterPolicy) apply(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo) []*datapb.DataNodeInfo {
+func (p *randomAssignUnregisterPolicy) apply(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo) []*datapb.DataNodeInfo {
if len(cluster) == 0 || // no available node
len(session.Channels) == 0 { // lost node not watching any channels
return []*datapb.DataNodeInfo{}
@@ -113,17 +116,18 @@ func (p *reassignRandomUnregisterPolicy) apply(cluster map[string]*datapb.DataNo
}
type channelAssignPolicy interface {
+ // apply accept all online nodes and new created channel with collectionID, returns node needed to be changed
apply(cluster map[string]*datapb.DataNodeInfo, channel string, collectionID UniqueID) []*datapb.DataNodeInfo
}
-type allAssignPolicy struct {
+type assignAllPolicy struct {
}
-func newAllAssignPolicy() channelAssignPolicy {
- return &allAssignPolicy{}
+func newAssignAllPolicy() channelAssignPolicy {
+ return &assignAllPolicy{}
}
-func (p *allAssignPolicy) apply(cluster map[string]*datapb.DataNodeInfo, channel string, collectionID UniqueID) []*datapb.DataNodeInfo {
+func (p *assignAllPolicy) apply(cluster map[string]*datapb.DataNodeInfo, channel string, collectionID UniqueID) []*datapb.DataNodeInfo {
ret := make([]*datapb.DataNodeInfo, 0)
for _, node := range cluster {
has := false
diff --git a/internal/dataservice/policy_test.go b/internal/dataservice/policy_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..fa161c4cec829431d974d921bafcaaaa0e0806fc
--- /dev/null
+++ b/internal/dataservice/policy_test.go
@@ -0,0 +1,46 @@
+package dataservice
+
+import (
+ "testing"
+
+ "github.com/milvus-io/milvus/internal/proto/datapb"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestWatchRestartsPolicy(t *testing.T) {
+ p := newWatchRestartsStartupPolicy()
+ c := make(map[string]*datapb.DataNodeInfo)
+ c["localhost:1111"] = &datapb.DataNodeInfo{
+ Address: "localhost:1111",
+ Version: 0,
+ Channels: []*datapb.ChannelStatus{
+ {
+ Name: "vch1",
+ State: datapb.ChannelWatchState_Complete,
+ CollectionID: 0,
+ },
+ },
+ }
+
+ c["localhost:2222"] = &datapb.DataNodeInfo{
+ Address: "localhost:2222",
+ Version: 0,
+ Channels: []*datapb.ChannelStatus{
+ {
+ Name: "vch2",
+ State: datapb.ChannelWatchState_Complete,
+ CollectionID: 0,
+ },
+ },
+ }
+
+ dchange := &clusterDeltaChange{
+ newNodes: []string{},
+ offlines: []string{},
+ restarts: []string{"localhost:2222"},
+ }
+
+ nodes := p.apply(c, dchange)
+ assert.EqualValues(t, 1, len(nodes))
+ assert.EqualValues(t, datapb.ChannelWatchState_Uncomplete, nodes[0].Channels[0].State)
+}
diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go
index 9a09ef90d154b3ece84a58c6ff6b3daf0cd29b37..7899ec212a136a75b27db7d55e3eda07e574285c 100644
--- a/internal/dataservice/server.go
+++ b/internal/dataservice/server.go
@@ -560,7 +560,6 @@ func (s *Server) prepareBinlogAndPos(req *datapb.SaveBinlogPathsRequest) (map[st
log.Error("Failed to get segment info", zap.Int64("segmentID", req.GetSegmentID()), zap.Error(err))
return nil, err
}
- log.Debug("segment", zap.Int64("segment", segInfo.CollectionID))
for _, fieldBlp := range req.Field2BinlogPaths {
fieldMeta, err := s.prepareField2PathMeta(req.SegmentID, fieldBlp)
diff --git a/internal/dataservice/server_test.go b/internal/dataservice/server_test.go
index 0448dab7bba699add2f535c11c91f00a3790fc93..bd3fc37a06badcba891ea6ff894175178485c946 100644
--- a/internal/dataservice/server_test.go
+++ b/internal/dataservice/server_test.go
@@ -494,8 +494,11 @@ func TestSaveBinlogPaths(t *testing.T) {
CollectionID: 0,
Field2BinlogPaths: []*datapb.ID2PathList{
{
- ID: 1,
- Paths: []string{"/by-dev/test/0/1/2/1/Allo1", "/by-dev/test/0/1/2/1/Allo2"},
+ ID: 1,
+ Paths: []string{
+ "/by-dev/test/0/1/2/1/Allo1",
+ "/by-dev/test/0/1/2/1/Allo2",
+ },
},
},
DdlBinlogPaths: []*datapb.DDLBinlogMeta{
@@ -508,6 +511,34 @@ func TestSaveBinlogPaths(t *testing.T) {
TsBinlogPath: "/by-dev/test/0/ts/Allo8",
},
},
+ DmlPosition: &datapb.PositionPair{
+ StartPosition: &internalpb.MsgPosition{
+ ChannelName: "ch1",
+ MsgID: []byte{1, 2, 3},
+ MsgGroup: "",
+ Timestamp: 0,
+ },
+ EndPosition: &internalpb.MsgPosition{
+ ChannelName: "ch1",
+ MsgID: []byte{3, 4, 5},
+ MsgGroup: "",
+ Timestamp: 0,
+ },
+ },
+ DdlPosition: &datapb.PositionPair{
+ StartPosition: &internalpb.MsgPosition{
+ ChannelName: "ch2",
+ MsgID: []byte{1, 2, 3},
+ MsgGroup: "",
+ Timestamp: 0,
+ },
+ EndPosition: &internalpb.MsgPosition{
+ ChannelName: "ch2",
+ MsgID: []byte{3, 4, 5},
+ MsgGroup: "",
+ Timestamp: 0,
+ },
+ },
})
assert.Nil(t, err)
assert.EqualValues(t, resp.ErrorCode, commonpb.ErrorCode_Success)
@@ -767,6 +798,94 @@ func TestResumeChannel(t *testing.T) {
})
}
+func TestGetVChannelPos(t *testing.T) {
+ svr := newTestServer(t, nil)
+ defer closeTestServer(t, svr)
+ schema := newTestSchema()
+ err := svr.meta.AddCollection(&datapb.CollectionInfo{
+ ID: 0,
+ Schema: schema,
+ })
+ assert.Nil(t, err)
+ err = svr.meta.AddSegment(&datapb.SegmentInfo{
+ ID: 1,
+ CollectionID: 0,
+ PartitionID: 0,
+ InsertChannel: "ch1",
+ })
+ assert.Nil(t, err)
+ req := &datapb.SaveBinlogPathsRequest{
+ SegmentID: 1,
+ CollectionID: 0,
+ Field2BinlogPaths: []*datapb.ID2PathList{},
+ DdlBinlogPaths: []*datapb.DDLBinlogMeta{},
+ DmlPosition: &datapb.PositionPair{
+ StartPosition: &internalpb.MsgPosition{
+ ChannelName: "ch1",
+ MsgID: []byte{1, 2, 3},
+ MsgGroup: "",
+ Timestamp: 0,
+ },
+ EndPosition: &internalpb.MsgPosition{
+ ChannelName: "ch1",
+ MsgID: []byte{3, 4, 5},
+ MsgGroup: "",
+ Timestamp: 0,
+ },
+ },
+ DdlPosition: &datapb.PositionPair{
+ StartPosition: &internalpb.MsgPosition{
+ ChannelName: "ch2",
+ MsgID: []byte{1, 2, 3},
+ MsgGroup: "",
+ Timestamp: 0,
+ },
+ EndPosition: &internalpb.MsgPosition{
+ ChannelName: "ch2",
+ MsgID: []byte{3, 4, 5},
+ MsgGroup: "",
+ Timestamp: 0,
+ },
+ },
+ }
+ status, err := svr.SaveBinlogPaths(context.TODO(), req)
+ assert.Nil(t, err)
+ assert.EqualValues(t, commonpb.ErrorCode_Success, status.ErrorCode)
+
+ t.Run("get unexisted channel", func(t *testing.T) {
+ pair, err := svr.GetVChanPositions([]vchannel{
+ {
+ CollectionID: 0,
+ DmlChannel: "chx1",
+ DdlChannel: "chx2",
+ },
+ })
+ assert.Nil(t, err)
+ assert.EqualValues(t, 1, len(pair))
+ assert.Nil(t, pair[0].DmlPosition.StartPosition.MsgID)
+ assert.Nil(t, pair[0].DmlPosition.EndPosition.MsgID)
+ assert.Nil(t, pair[0].DdlPosition.StartPosition.MsgID)
+ assert.Nil(t, pair[0].DdlPosition.EndPosition.MsgID)
+ })
+
+ t.Run("get existed channel", func(t *testing.T) {
+ pair, err := svr.GetVChanPositions([]vchannel{
+ {
+ CollectionID: 0,
+ DmlChannel: "ch1",
+ DdlChannel: "ch2",
+ },
+ })
+ assert.Nil(t, err)
+ assert.EqualValues(t, 1, len(pair))
+ assert.EqualValues(t, 0, pair[0].CollectionID)
+ assert.EqualValues(t, []byte{1, 2, 3}, pair[0].DmlPosition.StartPosition.MsgID)
+ assert.EqualValues(t, []byte{3, 4, 5}, pair[0].DmlPosition.EndPosition.MsgID)
+ assert.EqualValues(t, []byte{1, 2, 3}, pair[0].DdlPosition.StartPosition.MsgID)
+ assert.EqualValues(t, []byte{3, 4, 5}, pair[0].DdlPosition.EndPosition.MsgID)
+ })
+}
+
func newTestServer(t *testing.T, receiveCh chan interface{}) *Server {
Params.Init()
var err error