Skip to content
Snippets Groups Projects
Unverified Commit c3ac1375 authored by XuanYang-cn's avatar XuanYang-cn Committed by GitHub
Browse files

Add buffer function for auto flush (#5271)


Auto-flush of a segment is not considered flush-completed.
So we need to buffer binlog paths generated by auto-flush.

See also: #5268 #5220 

Signed-off-by: default avataryangxuan <xuan.yang@zilliz.com>
parent 49f6542b
No related branches found
No related tags found
No related merge requests found
...@@ -8,7 +8,7 @@ require ( ...@@ -8,7 +8,7 @@ require (
github.com/apache/pulsar-client-go v0.4.0 github.com/apache/pulsar-client-go v0.4.0
github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6 // indirect github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6 // indirect
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b // indirect github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b // indirect
github.com/coreos/etcd v3.3.13+incompatible // indirect github.com/coreos/etcd v3.3.13+incompatible
github.com/datadog/zstd v1.4.6-0.20200617134701-89f69fb7df32 // indirect github.com/datadog/zstd v1.4.6-0.20200617134701-89f69fb7df32 // indirect
github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c // indirect github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c // indirect
github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect
......
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
// Unless required by applicable law or agreed to in writing, software distributed under the License // Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License. // or implied. See the License for the specific language governing permissions and limitations under the License.
// GOOSE TODO remove this
package datanode package datanode
......
...@@ -42,6 +42,8 @@ type Replica interface { ...@@ -42,6 +42,8 @@ type Replica interface {
updateStatistics(segmentID UniqueID, numRows int64) error updateStatistics(segmentID UniqueID, numRows int64) error
getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb.SegmentStatisticsUpdates, error) getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb.SegmentStatisticsUpdates, error)
getSegmentByID(segmentID UniqueID) (*Segment, error) getSegmentByID(segmentID UniqueID) (*Segment, error)
bufferAutoFlushBinlogPaths(segmentID UniqueID, field2Path map[UniqueID]string) error
getBufferPaths(segID UniqueID) (map[UniqueID][]string, error)
} }
// Segment is the data structure of segments in data node replica. // Segment is the data structure of segments in data node replica.
...@@ -59,6 +61,7 @@ type Segment struct { ...@@ -59,6 +61,7 @@ type Segment struct {
startPosition *internalpb.MsgPosition startPosition *internalpb.MsgPosition
endPosition *internalpb.MsgPosition // not using endPosition *internalpb.MsgPosition // not using
channelName string channelName string
field2Paths map[UniqueID][]string // fieldID to binlog paths, only auto-flushed paths will be buffered.
} }
// CollectionSegmentReplica is the data replication of persistent data in datanode. // CollectionSegmentReplica is the data replication of persistent data in datanode.
...@@ -69,6 +72,8 @@ type CollectionSegmentReplica struct { ...@@ -69,6 +72,8 @@ type CollectionSegmentReplica struct {
collections map[UniqueID]*Collection collections map[UniqueID]*Collection
} }
var _ Replica = &CollectionSegmentReplica{}
func newReplica() Replica { func newReplica() Replica {
segments := make(map[UniqueID]*Segment) segments := make(map[UniqueID]*Segment)
collections := make(map[UniqueID]*Collection) collections := make(map[UniqueID]*Collection)
...@@ -80,7 +85,43 @@ func newReplica() Replica { ...@@ -80,7 +85,43 @@ func newReplica() Replica {
return replica return replica
} }
// --- segment --- // bufferAutoFlushBinlogPaths buffers binlog paths generated by auto-flush
func (replica *CollectionSegmentReplica) bufferAutoFlushBinlogPaths(segID UniqueID, field2Path map[UniqueID]string) error {
replica.mu.RLock()
defer replica.mu.RUnlock()
seg, ok := replica.segments[segID]
if !ok {
return fmt.Errorf("Cannot find segment, id = %v", segID)
}
for fieldID, path := range field2Path {
buffpaths, ok := seg.field2Paths[fieldID]
if !ok {
buffpaths = make([]string, 0)
}
buffpaths = append(buffpaths, path)
seg.field2Paths[fieldID] = buffpaths
}
log.Info("Buffer auto flush binlog paths", zap.Int64("segment ID", segID))
return nil
}
func (replica *CollectionSegmentReplica) getBufferPaths(segID UniqueID) (map[UniqueID][]string, error) {
replica.mu.RLock()
defer replica.mu.RUnlock()
if seg, ok := replica.segments[segID]; ok {
return seg.field2Paths, nil
}
return nil, fmt.Errorf("Cannot find segment, id = %v", segID)
}
func (replica *CollectionSegmentReplica) getSegmentByID(segmentID UniqueID) (*Segment, error) { func (replica *CollectionSegmentReplica) getSegmentByID(segmentID UniqueID) (*Segment, error) {
replica.mu.RLock() replica.mu.RLock()
defer replica.mu.RUnlock() defer replica.mu.RUnlock()
...@@ -118,6 +159,7 @@ func (replica *CollectionSegmentReplica) addSegment( ...@@ -118,6 +159,7 @@ func (replica *CollectionSegmentReplica) addSegment(
startPosition: position, startPosition: position,
endPosition: new(internalpb.MsgPosition), endPosition: new(internalpb.MsgPosition),
channelName: channelName, channelName: channelName,
field2Paths: make(map[UniqueID][]string),
} }
seg.isNew.Store(true) seg.isNew.Store(true)
......
...@@ -138,6 +138,24 @@ func TestReplica_Segment(t *testing.T) { ...@@ -138,6 +138,24 @@ func TestReplica_Segment(t *testing.T) {
assert.NotNil(t, update.StartPosition) assert.NotNil(t, update.StartPosition)
assert.Nil(t, update.EndPosition) assert.Nil(t, update.EndPosition)
f2p := map[UniqueID]string{
1: "a",
2: "b",
}
err = replica.bufferAutoFlushBinlogPaths(UniqueID(0), f2p)
assert.NoError(t, err)
r, err := replica.getBufferPaths(0)
assert.NoError(t, err)
assert.ElementsMatch(t, []string{"a"}, r[1])
assert.ElementsMatch(t, []string{"b"}, r[2])
err = replica.bufferAutoFlushBinlogPaths(UniqueID(0), f2p)
assert.NoError(t, err)
r, err = replica.getBufferPaths(0)
assert.NoError(t, err)
assert.ElementsMatch(t, []string{"a", "a"}, r[1])
assert.ElementsMatch(t, []string{"b", "b"}, r[2])
err = replica.setIsFlushed(0) err = replica.setIsFlushed(0)
assert.NoError(t, err) assert.NoError(t, err)
err = replica.setStartPosition(0, &internalpb.MsgPosition{}) err = replica.setStartPosition(0, &internalpb.MsgPosition{})
......
...@@ -61,8 +61,8 @@ type ParamTable struct { ...@@ -61,8 +61,8 @@ type ParamTable struct {
// --- ETCD --- // --- ETCD ---
EtcdAddress string EtcdAddress string
MetaRootPath string MetaRootPath string
SegFlushMetaSubPath string SegFlushMetaSubPath string // GOOSE TODO remove
DDLFlushMetaSubPath string DDLFlushMetaSubPath string // GOOSE TODO remove
// --- MinIO --- // --- MinIO ---
MinioAddress string MinioAddress string
...@@ -109,8 +109,8 @@ func (p *ParamTable) Init() { ...@@ -109,8 +109,8 @@ func (p *ParamTable) Init() {
// --- ETCD --- // --- ETCD ---
p.initEtcdAddress() p.initEtcdAddress()
p.initMetaRootPath() p.initMetaRootPath()
p.initSegFlushMetaSubPath() p.initSegFlushMetaSubPath() // GOOSE TODO remove
p.initDDLFlushMetaSubPath() p.initDDLFlushMetaSubPath() // GOOSE TODO remove
// --- MinIO --- // --- MinIO ---
p.initMinioAddress() p.initMinioAddress()
...@@ -220,6 +220,7 @@ func (p *ParamTable) initMetaRootPath() { ...@@ -220,6 +220,7 @@ func (p *ParamTable) initMetaRootPath() {
p.MetaRootPath = path.Join(rootPath, subPath) p.MetaRootPath = path.Join(rootPath, subPath)
} }
// GOOSE TODO remove
func (p *ParamTable) initSegFlushMetaSubPath() { func (p *ParamTable) initSegFlushMetaSubPath() {
subPath, err := p.Load("etcd.segFlushMetaSubPath") subPath, err := p.Load("etcd.segFlushMetaSubPath")
if err != nil { if err != nil {
...@@ -228,6 +229,7 @@ func (p *ParamTable) initSegFlushMetaSubPath() { ...@@ -228,6 +229,7 @@ func (p *ParamTable) initSegFlushMetaSubPath() {
p.SegFlushMetaSubPath = subPath p.SegFlushMetaSubPath = subPath
} }
// GOOSE TODO remove
func (p *ParamTable) initDDLFlushMetaSubPath() { func (p *ParamTable) initDDLFlushMetaSubPath() {
subPath, err := p.Load("etcd.ddlFlushMetaSubPath") subPath, err := p.Load("etcd.ddlFlushMetaSubPath")
if err != nil { if err != nil {
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment