diff --git a/go.mod b/go.mod index b1db075aa9c06711e3287c91e314eb60aa9219d9..94fe702b1058c081e4d292e03fe84bed46e3a6e9 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/apache/pulsar-client-go v0.4.0 github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6 // indirect github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b // indirect - github.com/coreos/etcd v3.3.13+incompatible // indirect + github.com/coreos/etcd v3.3.13+incompatible github.com/datadog/zstd v1.4.6-0.20200617134701-89f69fb7df32 // indirect github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c // indirect github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect diff --git a/internal/datanode/binlog_meta.go b/internal/datanode/binlog_meta.go index 4eb56ed9b871238355d0c857db029a55c909eac3..838c0f5fb1c27929bfc0905a978530efd6537589 100644 --- a/internal/datanode/binlog_meta.go +++ b/internal/datanode/binlog_meta.go @@ -8,6 +8,7 @@ // Unless required by applicable law or agreed to in writing, software distributed under the License // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License. +// GOOSE TODO remove this package datanode diff --git a/internal/datanode/collection_replica.go b/internal/datanode/collection_replica.go index bb09e3994577052d9d00888a35376685b305b848..760e67eec452b2c9ad53477d0938ae0371c289f3 100644 --- a/internal/datanode/collection_replica.go +++ b/internal/datanode/collection_replica.go @@ -42,6 +42,8 @@ type Replica interface { updateStatistics(segmentID UniqueID, numRows int64) error getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb.SegmentStatisticsUpdates, error) getSegmentByID(segmentID UniqueID) (*Segment, error) + bufferAutoFlushBinlogPaths(segmentID UniqueID, field2Path map[UniqueID]string) error + getBufferPaths(segID UniqueID) (map[UniqueID][]string, error) } // Segment is the data structure of segments in data node replica. @@ -59,6 +61,7 @@ type Segment struct { startPosition *internalpb.MsgPosition endPosition *internalpb.MsgPosition // not using channelName string + field2Paths map[UniqueID][]string // fieldID to binlog paths, only auto-flushed paths will be buffered. } // CollectionSegmentReplica is the data replication of persistent data in datanode. @@ -69,6 +72,8 @@ type CollectionSegmentReplica struct { collections map[UniqueID]*Collection } +var _ Replica = &CollectionSegmentReplica{} + func newReplica() Replica { segments := make(map[UniqueID]*Segment) collections := make(map[UniqueID]*Collection) @@ -80,7 +85,43 @@ func newReplica() Replica { return replica } -// --- segment --- +// bufferAutoFlushBinlogPaths buffers binlog paths generated by auto-flush +func (replica *CollectionSegmentReplica) bufferAutoFlushBinlogPaths(segID UniqueID, field2Path map[UniqueID]string) error { + replica.mu.RLock() + defer replica.mu.RUnlock() + + seg, ok := replica.segments[segID] + if !ok { + return fmt.Errorf("Cannot find segment, id = %v", segID) + } + + for fieldID, path := range field2Path { + buffpaths, ok := seg.field2Paths[fieldID] + + if !ok { + buffpaths = make([]string, 0) + } + + buffpaths = append(buffpaths, path) + seg.field2Paths[fieldID] = buffpaths + } + log.Info("Buffer auto flush binlog paths", zap.Int64("segment ID", segID)) + + return nil +} + +func (replica *CollectionSegmentReplica) getBufferPaths(segID UniqueID) (map[UniqueID][]string, error) { + replica.mu.RLock() + defer replica.mu.RUnlock() + + if seg, ok := replica.segments[segID]; ok { + return seg.field2Paths, nil + } + + return nil, fmt.Errorf("Cannot find segment, id = %v", segID) + +} + func (replica *CollectionSegmentReplica) getSegmentByID(segmentID UniqueID) (*Segment, error) { replica.mu.RLock() defer replica.mu.RUnlock() @@ -118,6 +159,7 @@ func (replica *CollectionSegmentReplica) addSegment( startPosition: position, endPosition: new(internalpb.MsgPosition), channelName: channelName, + field2Paths: make(map[UniqueID][]string), } seg.isNew.Store(true) diff --git a/internal/datanode/collection_replica_test.go b/internal/datanode/collection_replica_test.go index 4953750f1c24978b05db0bbc548e5892601230a4..dc747404701068d0ed0040be7248ec2c27c3001b 100644 --- a/internal/datanode/collection_replica_test.go +++ b/internal/datanode/collection_replica_test.go @@ -138,6 +138,24 @@ func TestReplica_Segment(t *testing.T) { assert.NotNil(t, update.StartPosition) assert.Nil(t, update.EndPosition) + f2p := map[UniqueID]string{ + 1: "a", + 2: "b", + } + + err = replica.bufferAutoFlushBinlogPaths(UniqueID(0), f2p) + assert.NoError(t, err) + r, err := replica.getBufferPaths(0) + assert.NoError(t, err) + assert.ElementsMatch(t, []string{"a"}, r[1]) + assert.ElementsMatch(t, []string{"b"}, r[2]) + err = replica.bufferAutoFlushBinlogPaths(UniqueID(0), f2p) + assert.NoError(t, err) + r, err = replica.getBufferPaths(0) + assert.NoError(t, err) + assert.ElementsMatch(t, []string{"a", "a"}, r[1]) + assert.ElementsMatch(t, []string{"b", "b"}, r[2]) + err = replica.setIsFlushed(0) assert.NoError(t, err) err = replica.setStartPosition(0, &internalpb.MsgPosition{}) diff --git a/internal/datanode/param_table.go b/internal/datanode/param_table.go index 35aae041331fa41414f64d10d2ec5c38bae79264..8b218b780cb264d8af21e7c30858958937d04502 100644 --- a/internal/datanode/param_table.go +++ b/internal/datanode/param_table.go @@ -61,8 +61,8 @@ type ParamTable struct { // --- ETCD --- EtcdAddress string MetaRootPath string - SegFlushMetaSubPath string - DDLFlushMetaSubPath string + SegFlushMetaSubPath string // GOOSE TODO remove + DDLFlushMetaSubPath string // GOOSE TODO remove // --- MinIO --- MinioAddress string @@ -109,8 +109,8 @@ func (p *ParamTable) Init() { // --- ETCD --- p.initEtcdAddress() p.initMetaRootPath() - p.initSegFlushMetaSubPath() - p.initDDLFlushMetaSubPath() + p.initSegFlushMetaSubPath() // GOOSE TODO remove + p.initDDLFlushMetaSubPath() // GOOSE TODO remove // --- MinIO --- p.initMinioAddress() @@ -220,6 +220,7 @@ func (p *ParamTable) initMetaRootPath() { p.MetaRootPath = path.Join(rootPath, subPath) } +// GOOSE TODO remove func (p *ParamTable) initSegFlushMetaSubPath() { subPath, err := p.Load("etcd.segFlushMetaSubPath") if err != nil { @@ -228,6 +229,7 @@ func (p *ParamTable) initSegFlushMetaSubPath() { p.SegFlushMetaSubPath = subPath } +// GOOSE TODO remove func (p *ParamTable) initDDLFlushMetaSubPath() { subPath, err := p.Load("etcd.ddlFlushMetaSubPath") if err != nil {