diff --git a/cmd/distributed/components/index_node.go b/cmd/distributed/components/index_node.go index ab80a132226ea7e76d2c8127d7bb33f7e23ec845..11ec3da4bfb1e0a548b74179898e821c9c3f3bdd 100644 --- a/cmd/distributed/components/index_node.go +++ b/cmd/distributed/components/index_node.go @@ -2,16 +2,37 @@ package components import ( "context" + "fmt" + "io" + "github.com/opentracing/opentracing-go" + "github.com/uber/jaeger-client-go/config" grpcindexnode "github.com/zilliztech/milvus-distributed/internal/distributed/indexnode" ) type IndexNode struct { svr *grpcindexnode.Server + + tracer opentracing.Tracer + closer io.Closer } func NewIndexNode(ctx context.Context) (*IndexNode, error) { + var err error n := &IndexNode{} + cfg := &config.Configuration{ + ServiceName: "indexnode", + Sampler: &config.SamplerConfig{ + Type: "const", + Param: 1, + }, + } + n.tracer, n.closer, err = cfg.NewTracer() + if err != nil { + panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) + } + opentracing.SetGlobalTracer(n.tracer) + svr, err := grpcindexnode.NewServer(ctx) if err != nil { return nil, err diff --git a/cmd/distributed/components/index_service.go b/cmd/distributed/components/index_service.go index c42fba525872ec0fbcf5eeda81ee477e1fe4d49d..07db95c121ce0ba835d072fd1847b78cf37691b4 100644 --- a/cmd/distributed/components/index_service.go +++ b/cmd/distributed/components/index_service.go @@ -2,17 +2,39 @@ package components import ( "context" + "fmt" + "io" + "github.com/opentracing/opentracing-go" + "github.com/uber/jaeger-client-go/config" grpcindexserver "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice" ) type IndexService struct { svr *grpcindexserver.Server + + tracer opentracing.Tracer + closer io.Closer } func NewIndexService(ctx context.Context) (*IndexService, error) { + var err error s := &IndexService{} + + cfg := &config.Configuration{ + ServiceName: "indexservice", + Sampler: &config.SamplerConfig{ + Type: "const", + Param: 1, + }, + } + s.tracer, s.closer, err = cfg.NewTracer() + if err != nil { + panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) + } + opentracing.SetGlobalTracer(s.tracer) svr, err := grpcindexserver.NewServer(ctx) + if err != nil { return nil, err } @@ -26,6 +48,7 @@ func (s *IndexService) Run() error { return nil } func (s *IndexService) Stop() error { + s.closer.Close() if err := s.svr.Stop(); err != nil { return err } diff --git a/docs/developer_guides/chap04_message_stream.md b/docs/developer_guides/chap04_message_stream.md index 5f4312ad103bb5d8d02d0b66b98fd431e79bb079..fbc40b41da4673d8c3f780113f58e813e7342e59 100644 --- a/docs/developer_guides/chap04_message_stream.md +++ b/docs/developer_guides/chap04_message_stream.md @@ -90,6 +90,7 @@ const { } type TsMsg interface { + ID() UniqueID SetTs(ts Timestamp) BeginTs() Timestamp EndTs() Timestamp diff --git a/go.mod b/go.mod index 172b01907ac22924ea139276860f9077c71c3a03..bb64f8f9f6be0e0eb5835a271e1467e7805443a6 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/zilliztech/milvus-distributed go 1.15 require ( + github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect github.com/apache/pulsar-client-go v0.1.1 github.com/apache/thrift/lib/go/thrift v0.0.0-20210120171102-e27e82c46ba4 github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c // indirect @@ -21,6 +22,7 @@ require ( github.com/mitchellh/mapstructure v1.1.2 github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect github.com/onsi/gomega v1.10.5 // indirect + github.com/opentracing/opentracing-go v1.2.0 github.com/pierrec/lz4 v2.5.2+incompatible // indirect github.com/sirupsen/logrus v1.6.0 // indirect github.com/spaolacci/murmur3 v1.1.0 @@ -28,6 +30,8 @@ require ( github.com/spf13/viper v1.7.1 github.com/stretchr/testify v1.6.1 github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c + github.com/uber/jaeger-client-go v2.25.0+incompatible + github.com/uber/jaeger-lib v2.4.0+incompatible // indirect github.com/yahoo/athenz v1.9.16 // indirect go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738 go.uber.org/zap v1.15.0 diff --git a/go.sum b/go.sum index 7c577ff8eb486c58a851d712b74d493b85b1d7fc..d36ea03ef8ca0cdfb993cdf4e269e6eef4235d46 100644 --- a/go.sum +++ b/go.sum @@ -15,6 +15,8 @@ github.com/99designs/keyring v1.1.5/go.mod h1:7hsVvt2qXgtadGevGJ4ujg+u8m6SpJ5TpH github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/HdrHistogram/hdrhistogram-go v1.0.1 h1:GX8GAYDuhlFQnI2fRDHQhTlkHMz8bEn0jTI6LJU0mpw= +github.com/HdrHistogram/hdrhistogram-go v1.0.1/go.mod h1:BWJ+nMSHY3L41Zj7CA3uXnloDp7xxV0YvstAE7nKTaM= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM= @@ -295,6 +297,7 @@ github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.10.5 h1:7n6FEkpFmfCoo2t+YYqXH0evK+a9ICQz0xcAy9dYcaQ= github.com/onsi/gomega v1.10.5/go.mod h1:gza4q3jKQJijlu05nKWRCW/GavJumGt8aNRxWg7mt48= +github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= github.com/ozonru/etcd v3.3.20-grpc1.27-origmodule+incompatible h1:CAG0PUvo1fen+ZEfxKJjFIc8GuuN5RuaBuCAuaP2Hno= github.com/ozonru/etcd v3.3.20-grpc1.27-origmodule+incompatible/go.mod h1:iIubILNIN6Jq9h8uiSLrN9L1tuj3iSSFwz3R61skm/A= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= @@ -386,6 +389,12 @@ github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c/go.mod h1:ahpPrc7 github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= +github.com/uber/jaeger-client-go v1.6.0 h1:3+zLlq+4npI5fg8IsgAje3YsP7TcEdNzJScyqFIzxEQ= +github.com/uber/jaeger-client-go v2.25.0+incompatible h1:IxcNZ7WRY1Y3G4poYlx24szfsn/3LvK9QHCq9oQw8+U= +github.com/uber/jaeger-client-go v2.25.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= +github.com/uber/jaeger-lib v1.5.0 h1:OHbgr8l656Ub3Fw5k9SWnBfIEwvoHQ+W2y+Aa9D1Uyo= +github.com/uber/jaeger-lib v2.4.0+incompatible h1:fY7QsGQWiCt8pajv4r7JEvmATdCVaWxXbjwyYwsNaLQ= +github.com/uber/jaeger-lib v2.4.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/valyala/gozstd v1.7.0 h1:Ljh5c9zboqLhwTI33al32R72iCZfn0mCbVGcFWbGwRQ= github.com/valyala/gozstd v1.7.0/go.mod h1:y5Ew47GLlP37EkTB+B4s7r6A5rdaeB7ftbl9zoYiIPQ= diff --git a/internal/dataservice/allocator.go b/internal/dataservice/allocator.go index 66f23f048e724b9dc412241e380f2ea49c943429..975ec4ef15e3a1f8944788d6debd665a4fbf0083 100644 --- a/internal/dataservice/allocator.go +++ b/internal/dataservice/allocator.go @@ -30,7 +30,7 @@ func (allocator *allocatorImpl) allocTimestamp() (Timestamp, error) { }, Count: 1, }) - if err != nil { + if err = VerifyResponse(resp, err); err != nil { return 0, err } return resp.Timestamp, nil @@ -46,8 +46,9 @@ func (allocator *allocatorImpl) allocID() (UniqueID, error) { }, Count: 1, }) - if err != nil { + if err = VerifyResponse(resp, err); err != nil { return 0, err } + return resp.ID, nil } diff --git a/internal/dataservice/cluster.go b/internal/dataservice/cluster.go index 99113292f57d21b256a5cf409eb636d57ddfedb2..5d79a0094a76f01d0585ac734ebceeb5e3a59ffd 100644 --- a/internal/dataservice/cluster.go +++ b/internal/dataservice/cluster.go @@ -1,9 +1,12 @@ package dataservice import ( - "log" + "fmt" "sync" + "go.uber.org/zap" + + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" @@ -28,6 +31,10 @@ type ( } ) +func (node *dataNode) String() string { + return fmt.Sprintf("id: %d, address: %s:%d", node.id, node.address.ip, node.address.port) +} + func newDataNodeCluster(finishCh chan struct{}) *dataNodeCluster { return &dataNodeCluster{ finishCh: finishCh, @@ -92,12 +99,8 @@ func (c *dataNodeCluster) WatchInsertChannels(channels []string) { }, ChannelNames: group, }) - if err != nil { - log.Println(err.Error()) - continue - } - if resp.ErrorCode != commonpb.ErrorCode_SUCCESS { - log.Println(resp.Reason) + if err = VerifyResponse(resp, err); err != nil { + log.Error("watch dm channels error", zap.Stringer("dataNode", c.nodes[i]), zap.Error(err)) continue } c.nodes[i].channelNum += len(group) @@ -111,7 +114,7 @@ func (c *dataNodeCluster) GetDataNodeStates() ([]*internalpb2.ComponentInfo, err for _, node := range c.nodes { states, err := node.client.GetComponentStates(&commonpb.Empty{}) if err != nil { - log.Println(err.Error()) + log.Error("get component states error", zap.Stringer("dataNode", node), zap.Error(err)) continue } ret = append(ret, states.State) @@ -124,7 +127,7 @@ func (c *dataNodeCluster) FlushSegment(request *datapb.FlushSegRequest) { defer c.mu.RUnlock() for _, node := range c.nodes { if _, err := node.client.FlushSegments(request); err != nil { - log.Println(err.Error()) + log.Error("flush segment err", zap.Stringer("dataNode", node), zap.Error(err)) continue } } @@ -133,7 +136,7 @@ func (c *dataNodeCluster) FlushSegment(request *datapb.FlushSegRequest) { func (c *dataNodeCluster) ShutDownClients() { for _, node := range c.nodes { if err := node.client.Stop(); err != nil { - log.Println(err.Error()) + log.Error("stop client error", zap.Stringer("dataNode", node), zap.Error(err)) continue } } diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index 91cf889e33e3491950d4f1d5c700252e7073f344..7e42b4ffc492e42c5dd7c34d8ebd98fcc84d9f6d 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -2,7 +2,6 @@ package dataservice import ( "context" - "errors" "fmt" "path" "strconv" @@ -213,7 +212,8 @@ func (s *Server) initMsgProducer() error { func (s *Server) loadMetaFromMaster() error { log.Debug("loading collection meta from master") - if err := s.checkMasterIsHealthy(); err != nil { + var err error + if err = s.checkMasterIsHealthy(); err != nil { return err } if s.ddChannelName == "" { @@ -232,7 +232,7 @@ func (s *Server) loadMetaFromMaster() error { }, DbName: "", }) - if err != nil { + if err = VerifyResponse(collections, err); err != nil { return err } for _, collectionName := range collections.CollectionNames { @@ -246,8 +246,8 @@ func (s *Server) loadMetaFromMaster() error { DbName: "", CollectionName: collectionName, }) - if err != nil { - log.Error("describe collection error", zap.Error(err)) + if err = VerifyResponse(collection, err); err != nil { + log.Error("describe collection error", zap.String("collectionName", collectionName), zap.Error(err)) continue } partitions, err := s.masterClient.ShowPartitions(&milvuspb.ShowPartitionRequest{ @@ -261,8 +261,8 @@ func (s *Server) loadMetaFromMaster() error { CollectionName: collectionName, CollectionID: collection.CollectionID, }) - if err != nil { - log.Error("show partitions error", zap.Error(err)) + if err = VerifyResponse(partitions, err); err != nil { + log.Error("show partitions error", zap.String("collectionName", collectionName), zap.Int64("collectionID", collection.CollectionID), zap.Error(err)) continue } err = s.meta.AddCollection(&collectionInfo{ @@ -271,7 +271,7 @@ func (s *Server) loadMetaFromMaster() error { Partitions: partitions.PartitionIDs, }) if err != nil { - log.Error("add collection error", zap.Error(err)) + log.Error("add collection to meta error", zap.Int64("collectionID", collection.CollectionID), zap.Error(err)) continue } } @@ -294,12 +294,9 @@ func (s *Server) checkMasterIsHealthy() error { return fmt.Errorf("master is not healthy") case <-ticker.C: resp, err = s.masterClient.GetComponentStates() - if err != nil { + if err = VerifyResponse(resp, err); err != nil { return err } - if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { - return errors.New(resp.Status.Reason) - } } if resp.State.StateCode == internalpb2.StateCode_HEALTHY { break @@ -330,10 +327,13 @@ func (s *Server) startStatsChannel(ctx context.Context) { } msgPack := statsStream.Consume() for _, msg := range msgPack.Msgs { - statistics := msg.(*msgstream.SegmentStatisticsMsg) + statistics, ok := msg.(*msgstream.SegmentStatisticsMsg) + if !ok { + log.Error("receive unknown type msg from stats channel", zap.Stringer("msgType", msg.Type())) + } for _, stat := range statistics.SegStats { if err := s.statsHandler.HandleSegmentStat(stat); err != nil { - log.Error("handle segment stat error", zap.Error(err)) + log.Error("handle segment stat error", zap.Int64("segmentID", stat.SegmentID), zap.Error(err)) continue } } @@ -363,7 +363,7 @@ func (s *Server) startSegmentFlushChannel(ctx context.Context) { segmentInfo, err := s.meta.GetSegment(realMsg.SegmentID) if err != nil { - log.Error("get segment error", zap.Error(err)) + log.Error("get segment from meta error", zap.Int64("segmentID", realMsg.SegmentID), zap.Error(err)) continue } segmentInfo.FlushedTime = realMsg.BeginTimestamp @@ -473,7 +473,7 @@ func (s *Server) RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.Register s.cluster.Register(node) if s.ddChannelName == "" { resp, err := s.masterClient.GetDdChannel() - if err != nil { + if err = VerifyResponse(resp, err); err != nil { ret.Status.Reason = err.Error() return ret, err } diff --git a/internal/dataservice/util.go b/internal/dataservice/util.go new file mode 100644 index 0000000000000000000000000000000000000000..989d4ed542bc715561539d14b0eb85d4084d379d --- /dev/null +++ b/internal/dataservice/util.go @@ -0,0 +1,33 @@ +package dataservice + +import ( + "errors" + + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" +) + +type Response interface { + GetStatus() *commonpb.Status +} + +func VerifyResponse(response interface{}, err error) error { + if err != nil { + return err + } + if response == nil { + return errors.New("response is nil") + } + switch resp := response.(type) { + case Response: + if resp.GetStatus().ErrorCode != commonpb.ErrorCode_SUCCESS { + return errors.New(resp.GetStatus().Reason) + } + case *commonpb.Status: + if resp.ErrorCode != commonpb.ErrorCode_SUCCESS { + return errors.New(resp.Reason) + } + default: + return errors.New("unknown response type") + } + return nil +} diff --git a/internal/dataservice/watcher.go b/internal/dataservice/watcher.go index 02159eae587dd7bff6de52a502f16c7264d2d736..9a3a33450b00af27fcc7d48e19925662af329f83 100644 --- a/internal/dataservice/watcher.go +++ b/internal/dataservice/watcher.go @@ -1,10 +1,10 @@ package dataservice import ( - "log" - + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" + "go.uber.org/zap" "golang.org/x/net/context" @@ -35,11 +35,11 @@ func (watcher *proxyTimeTickWatcher) StartBackgroundLoop(ctx context.Context) { for { select { case <-ctx.Done(): - log.Println("proxy time tick watcher closed") + log.Debug("proxy time tick watcher closed") return case msg := <-watcher.msgQueue: if err := watcher.allocator.ExpireAllocations(msg.Base.Timestamp); err != nil { - log.Printf("expire allocations error : %s", err.Error()) + log.Error("expire allocations error", zap.Error(err)) } } } @@ -66,11 +66,11 @@ func (watcher *dataNodeTimeTickWatcher) StartBackgroundLoop(ctx context.Context) for { select { case <-ctx.Done(): - log.Println("data node time tick watcher closed") + log.Debug("data node time tick watcher closed") return case msg := <-watcher.msgQueue: if err := watcher.handleTimeTickMsg(msg); err != nil { - log.Println(err.Error()) + log.Error("handle time tick error", zap.Error(err)) continue } } @@ -85,17 +85,17 @@ func (watcher *dataNodeTimeTickWatcher) handleTimeTickMsg(msg *msgstream.TimeTic for _, id := range segments { expired, err := watcher.allocator.IsAllocationsExpired(id, msg.Base.Timestamp) if err != nil { - log.Printf("check allocations expired error %s", err.Error()) + log.Error("check allocations expired error", zap.Int64("segmentID", id), zap.Error(err)) continue } if expired { segmentInfo, err := watcher.meta.GetSegment(id) if err != nil { - log.Println(err.Error()) + log.Error("get segment from meta error", zap.Int64("segmentID", id), zap.Error(err)) continue } if err = watcher.meta.SetSegmentState(id, commonpb.SegmentState_SegmentSealed); err != nil { - log.Println(err.Error()) + log.Error("set segment state error", zap.Int64("segmentID", id), zap.Error(err)) continue } watcher.cluster.FlushSegment(&datapb.FlushSegRequest{ diff --git a/internal/distributed/proxynode/service.go b/internal/distributed/proxynode/service.go index 43a1e27c0bfc1d543df206ab2378dcb8f1903727..a1eca52e5b1f4b78d3fb4cb1ba1a43a64a356d0c 100644 --- a/internal/distributed/proxynode/service.go +++ b/internal/distributed/proxynode/service.go @@ -2,6 +2,8 @@ package grpcproxynode import ( "context" + "fmt" + "io" "log" "net" "os" @@ -9,22 +11,21 @@ import ( "sync" "time" - grpcproxyserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice/client" - - "github.com/zilliztech/milvus-distributed/internal/msgstream" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" - "github.com/zilliztech/milvus-distributed/internal/util/funcutil" - - "google.golang.org/grpc" - + "github.com/opentracing/opentracing-go" + "github.com/uber/jaeger-client-go/config" grpcdataservice "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice" grpcindexserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client" grcpmasterservice "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice" + grpcproxyserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice/client" grpcqueryserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice/client" + "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/proto/proxypb" "github.com/zilliztech/milvus-distributed/internal/proxynode" + "github.com/zilliztech/milvus-distributed/internal/util/funcutil" + "google.golang.org/grpc" ) type Server struct { @@ -46,16 +47,32 @@ type Server struct { dataServiceClient *grpcdataservice.Client queryServiceClient *grpcqueryserviceclient.Client indexServiceClient *grpcindexserviceclient.Client + + tracer opentracing.Tracer + closer io.Closer } func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) { + var err error server := &Server{ ctx: ctx, grpcErrChan: make(chan error), } - var err error + cfg := &config.Configuration{ + ServiceName: "proxynode", + Sampler: &config.SamplerConfig{ + Type: "const", + Param: 1, + }, + } + server.tracer, server.closer, err = cfg.NewTracer() + if err != nil { + panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) + } + opentracing.SetGlobalTracer(server.tracer) + server.impl, err = proxynode.NewProxyNodeImpl(server.ctx, factory) if err != nil { return nil, err @@ -220,6 +237,7 @@ func (s *Server) start() error { func (s *Server) Stop() error { var err error + s.closer.Close() if s.grpcServer != nil { s.grpcServer.GracefulStop() diff --git a/internal/distributed/proxyservice/service.go b/internal/distributed/proxyservice/service.go index e40f5c9746add7c05bfbb4588c1943b3724a32d1..3765813cbd15db4c73b279ff3dec4a47492a83eb 100644 --- a/internal/distributed/proxyservice/service.go +++ b/internal/distributed/proxyservice/service.go @@ -2,11 +2,15 @@ package grpcproxyservice import ( "context" + "fmt" + "io" "log" "net" "strconv" "sync" + "github.com/opentracing/opentracing-go" + "github.com/uber/jaeger-client-go/config" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" @@ -26,10 +30,14 @@ type Server struct { grpcErrChan chan error impl *proxyservice.ServiceImpl + + tracer opentracing.Tracer + closer io.Closer } func NewServer(ctx1 context.Context, factory msgstream.Factory) (*Server, error) { ctx, cancel := context.WithCancel(ctx1) + var err error server := &Server{ ctx: ctx, @@ -37,7 +45,19 @@ func NewServer(ctx1 context.Context, factory msgstream.Factory) (*Server, error) grpcErrChan: make(chan error), } - var err error + cfg := &config.Configuration{ + ServiceName: "proxyservice", + Sampler: &config.SamplerConfig{ + Type: "const", + Param: 1, + }, + } + server.tracer, server.closer, err = cfg.NewTracer() + if err != nil { + panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) + } + opentracing.SetGlobalTracer(server.tracer) + server.impl, err = proxyservice.NewServiceImpl(server.ctx, factory) if err != nil { return nil, err @@ -113,6 +133,7 @@ func (s *Server) start() error { } func (s *Server) Stop() error { + s.closer.Close() err := s.impl.Stop() if err != nil { return err diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index 97b1ad55b613876815ed491141ee8c5a0f2e408b..b8a63fc309e3ca38210210da33c4994ee0c09f67 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -146,11 +146,17 @@ func (i *NodeImpl) SetIndexServiceClient(serviceClient typeutil.IndexServiceInte } func (i *NodeImpl) BuildIndex(request *indexpb.BuildIndexCmd) (*commonpb.Status, error) { - t := newIndexBuildTask() - t.cmd = request - t.kv = i.kv - t.serviceClient = i.serviceClient - t.nodeID = Params.NodeID + ctx := context.Background() + t := &IndexBuildTask{ + BaseTask: BaseTask{ + ctx: ctx, + done: make(chan error), // intend to do this + }, + cmd: request, + kv: i.kv, + serviceClient: i.serviceClient, + nodeID: Params.NodeID, + } ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval) defer cancel() diff --git a/internal/indexnode/task.go b/internal/indexnode/task.go index ac19c3a67ca80d51d14f362bfd11842e01a35e6d..34a35035821fe40902796d65aae2379413f8343b 100644 --- a/internal/indexnode/task.go +++ b/internal/indexnode/task.go @@ -18,15 +18,18 @@ import ( ) const ( - paramsKeyToParse = "params" + paramsKeyToParse = "params" + IndexBuildTaskName = "IndexBuildTask" ) type task interface { - ID() UniqueID // return ReqID + Ctx() context.Context + ID() UniqueID // return ReqID + Name() string SetID(uid UniqueID) // set ReqID - PreExecute() error - Execute() error - PostExecute() error + PreExecute(ctx context.Context) error + Execute(ctx context.Context) error + PostExecute(ctx context.Context) error WaitToFinish() error Notify(err error) OnEnqueue() error @@ -69,32 +72,34 @@ type IndexBuildTask struct { nodeID UniqueID } -func newIndexBuildTask() *IndexBuildTask { - ctx := context.Background() - return &IndexBuildTask{ - BaseTask: BaseTask{ - ctx: ctx, - done: make(chan error), // intend to do this - }, - } +func (it *IndexBuildTask) Ctx() context.Context { + return it.ctx +} + +func (it *IndexBuildTask) ID() UniqueID { + return it.id } func (it *IndexBuildTask) SetID(ID UniqueID) { it.BaseTask.setID(ID) } +func (bt *BaseTask) Name() string { + return IndexBuildTaskName +} + func (it *IndexBuildTask) OnEnqueue() error { it.SetID(it.cmd.IndexBuildID) log.Printf("[IndexBuilderTask] Enqueue TaskID: %v", it.ID()) return nil } -func (it *IndexBuildTask) PreExecute() error { +func (it *IndexBuildTask) PreExecute(ctx context.Context) error { log.Println("preExecute...") return nil } -func (it *IndexBuildTask) PostExecute() error { +func (it *IndexBuildTask) PostExecute(ctx context.Context) error { log.Println("PostExecute...") var err error defer func() { @@ -129,21 +134,7 @@ func (it *IndexBuildTask) PostExecute() error { return err } -func (it *IndexBuildTask) Rollback() error { - - if it.savePaths == nil { - return nil - } - - err := it.kv.MultiRemove(it.savePaths) - if err != nil { - log.Println("IndexBuildTask Rollback Failed:", err.Error()) - return err - } - return nil -} - -func (it *IndexBuildTask) Execute() error { +func (it *IndexBuildTask) Execute(ctx context.Context) error { log.Println("start build index ...") var err error @@ -313,3 +304,16 @@ func (it *IndexBuildTask) Execute() error { // } return nil } +func (it *IndexBuildTask) Rollback() error { + + if it.savePaths == nil { + return nil + } + + err := it.kv.MultiRemove(it.savePaths) + if err != nil { + log.Println("IndexBuildTask Rollback Failed:", err.Error()) + return err + } + return nil +} diff --git a/internal/indexnode/task_scheduler.go b/internal/indexnode/task_scheduler.go index c4a43ca940f8c2f1accba0c2029fee69911f584c..b6bd95148d92f23e8bec8842281a34a303a8c0b1 100644 --- a/internal/indexnode/task_scheduler.go +++ b/internal/indexnode/task_scheduler.go @@ -7,7 +7,10 @@ import ( "log" "sync" + "github.com/opentracing/opentracing-go" + oplog "github.com/opentracing/opentracing-go/log" "github.com/zilliztech/milvus-distributed/internal/kv" + "github.com/zilliztech/milvus-distributed/internal/util/trace" ) type TaskQueue interface { @@ -182,31 +185,42 @@ func (sched *TaskScheduler) scheduleIndexBuildTask() []task { } func (sched *TaskScheduler) processTask(t task, q TaskQueue) { - - err := t.PreExecute() + span, ctx := trace.StartSpanFromContext(t.Ctx(), + opentracing.Tags{ + "Type": t.Name(), + "ID": t.ID(), + }) + defer span.Finish() + span.LogFields(oplog.Int64("scheduler process PreExecute", t.ID())) + err := t.PreExecute(ctx) defer func() { t.Notify(err) // log.Printf("notify with error: %v", err) }() if err != nil { + trace.LogError(span, err) return } + span.LogFields(oplog.Int64("scheduler process AddActiveTask", t.ID())) q.AddActiveTask(t) // log.Printf("task add to active list ...") defer func() { + span.LogFields(oplog.Int64("scheduler process PopActiveTask", t.ID())) q.PopActiveTask(t.ID()) // log.Printf("pop from active list ...") }() - err = t.Execute() + span.LogFields(oplog.Int64("scheduler process Execute", t.ID())) + err = t.Execute(ctx) if err != nil { log.Printf("execute definition task failed, error = %v", err) return } // log.Printf("task execution done ...") - err = t.PostExecute() + span.LogFields(oplog.Int64("scheduler process PostExecute", t.ID())) + err = t.PostExecute(ctx) // log.Printf("post execute task done ...") } diff --git a/internal/indexservice/indexservice.go b/internal/indexservice/indexservice.go index 2f2dfebbf45607f3bd6a263baebd932ac8f01a87..eb1167986838d9ab2d672510e7e2a5c7b9d82b77 100644 --- a/internal/indexservice/indexservice.go +++ b/internal/indexservice/indexservice.go @@ -187,11 +187,17 @@ func (i *ServiceImpl) BuildIndex(req *indexpb.BuildIndexRequest) (*indexpb.Build ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, }, } - t := NewIndexAddTask() - t.req = req - t.idAllocator = i.idAllocator - t.table = i.metaTable - t.kv = i.kv + ctx := context.Background() + t := &IndexAddTask{ + BaseTask: BaseTask{ + ctx: ctx, + done: make(chan error), + table: i.metaTable, + }, + req: req, + idAllocator: i.idAllocator, + kv: i.kv, + } if i.nodeClients == nil || i.nodeClients.Len() <= 0 { ret.Status.Reason = "IndexBuilding Service not available" @@ -200,7 +206,6 @@ func (i *ServiceImpl) BuildIndex(req *indexpb.BuildIndexRequest) (*indexpb.Build t.nodeClients = i.nodeClients var cancel func() - ctx := context.Background() t.ctx, cancel = context.WithTimeout(ctx, reqTimeoutInterval) defer cancel() diff --git a/internal/indexservice/task.go b/internal/indexservice/task.go index 28643b02482c599226216d603b3ca97bfec1b484..8fa164c597fa888663b8b675fa9af7833d8d84ca 100644 --- a/internal/indexservice/task.go +++ b/internal/indexservice/task.go @@ -12,12 +12,18 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/indexpb" ) +const ( + IndexAddTaskName = "IndexAddTask" +) + type task interface { + Ctx() context.Context ID() UniqueID // return ReqID SetID(uid UniqueID) // set ReqID - PreExecute() error - Execute() error - PostExecute() error + Name() string + PreExecute(ctx context.Context) error + Execute(ctx context.Context) error + PostExecute(ctx context.Context) error WaitToFinish() error Notify(err error) OnEnqueue() error @@ -63,10 +69,22 @@ type IndexAddTask struct { buildClientNodeID UniqueID } +func (it *IndexAddTask) Ctx() context.Context { + return it.ctx +} + +func (it *IndexAddTask) ID() UniqueID { + return it.id +} + func (it *IndexAddTask) SetID(ID UniqueID) { it.BaseTask.setID(ID) } +func (it *IndexAddTask) Name() string { + return IndexAddTaskName +} + func (it *IndexAddTask) OnEnqueue() error { var err error it.indexBuildID, err = it.idAllocator.AllocOne() @@ -76,7 +94,7 @@ func (it *IndexAddTask) OnEnqueue() error { return nil } -func (it *IndexAddTask) PreExecute() error { +func (it *IndexAddTask) PreExecute(ctx context.Context) error { log.Println("pretend to check Index Req") nodeID, builderClient := it.nodeClients.PeekClient() if builderClient == nil { @@ -91,7 +109,7 @@ func (it *IndexAddTask) PreExecute() error { return nil } -func (it *IndexAddTask) Execute() error { +func (it *IndexAddTask) Execute(ctx context.Context) error { cmd := &indexpb.BuildIndexCmd{ IndexBuildID: it.indexBuildID, Req: it.req, @@ -109,14 +127,6 @@ func (it *IndexAddTask) Execute() error { return nil } -func (it *IndexAddTask) PostExecute() error { +func (it *IndexAddTask) PostExecute(ctx context.Context) error { return nil } - -func NewIndexAddTask() *IndexAddTask { - return &IndexAddTask{ - BaseTask: BaseTask{ - done: make(chan error), - }, - } -} diff --git a/internal/indexservice/task_scheduler.go b/internal/indexservice/task_scheduler.go index 3367ce7dd7a83a855e2f11b051358495e8c2e5be..c58901506a54cec9fc4aa1180bb7e55f0e61d811 100644 --- a/internal/indexservice/task_scheduler.go +++ b/internal/indexservice/task_scheduler.go @@ -7,7 +7,10 @@ import ( "log" "sync" + "github.com/opentracing/opentracing-go" + oplog "github.com/opentracing/opentracing-go/log" "github.com/zilliztech/milvus-distributed/internal/kv" + "github.com/zilliztech/milvus-distributed/internal/util/trace" ) type TaskQueue interface { @@ -184,32 +187,38 @@ func (sched *TaskScheduler) scheduleIndexAddTask() task { //} func (sched *TaskScheduler) processTask(t task, q TaskQueue) { - - err := t.PreExecute() + span, ctx := trace.StartSpanFromContext(t.Ctx(), + opentracing.Tags{ + "Type": t.Name(), + }) + defer span.Finish() + span.LogFields(oplog.String("scheduler process PreExecute", t.Name())) + err := t.PreExecute(ctx) defer func() { t.Notify(err) log.Printf("notify with error: %v", err) }() if err != nil { + trace.LogError(span, err) return } + span.LogFields(oplog.String("scheduler process AddActiveTask", t.Name())) q.AddActiveTask(t) - log.Printf("task add to active list ...") defer func() { + span.LogFields(oplog.String("scheduler process PopActiveTask", t.Name())) q.PopActiveTask(t.ID()) - log.Printf("pop from active list ...") }() - err = t.Execute() + span.LogFields(oplog.String("scheduler process Execute", t.Name())) + err = t.Execute(ctx) if err != nil { - log.Printf("execute definition task failed, error = %v", err) + trace.LogError(span, err) return } - log.Printf("task execution done ...") - err = t.PostExecute() - log.Printf("post execute task done ...") + span.LogFields(oplog.String("scheduler process PostExecute", t.Name())) + err = t.PostExecute(ctx) } func (sched *TaskScheduler) indexAddLoop() { diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index caa08b1ede037565f51192474a19a20599b1e7c3..8ca43e6860ee5e59287711ba000e5c2d616529f5 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -473,7 +473,6 @@ func (c *Core) setMsgStreams() error { c.SendTimeTick = func(t typeutil.Timestamp) error { msgPack := ms.MsgPack{} baseMsg := ms.BaseMsg{ - MsgCtx: nil, BeginTimestamp: t, EndTimestamp: t, HashValues: []uint32{0}, diff --git a/internal/masterservice/master_service_test.go b/internal/masterservice/master_service_test.go index 3a4231be11fb757ed3225e64862804605bbb9313..1e487205a69d79902c5da68986cdd9eb8b29ab06 100644 --- a/internal/masterservice/master_service_test.go +++ b/internal/masterservice/master_service_test.go @@ -235,7 +235,6 @@ func TestMasterService(t *testing.T) { var timeTick typeutil.Timestamp = 100 msgPack := ms.MsgPack{} baseMsg := ms.BaseMsg{ - MsgCtx: nil, BeginTimestamp: timeTick, EndTimestamp: timeTick, HashValues: []uint32{0}, diff --git a/internal/msgstream/msg.go b/internal/msgstream/msg.go index f23bf92e939e71fcc7948df72ccc6dd109b11004..90c2341e7a29d400df2fd7dc83591813cd799d93 100644 --- a/internal/msgstream/msg.go +++ b/internal/msgstream/msg.go @@ -1,7 +1,6 @@ package msgstream import ( - "context" "errors" "github.com/golang/protobuf/proto" @@ -14,6 +13,7 @@ type MsgType = commonpb.MsgType type MarshalType = interface{} type TsMsg interface { + ID() UniqueID BeginTs() Timestamp EndTs() Timestamp Type() MsgType @@ -25,7 +25,6 @@ type TsMsg interface { } type BaseMsg struct { - MsgCtx context.Context BeginTimestamp Timestamp EndTimestamp Timestamp HashValues []uint32 @@ -67,6 +66,10 @@ type InsertMsg struct { internalpb2.InsertRequest } +func (it *InsertMsg) ID() UniqueID { + return it.Base.MsgID +} + func (it *InsertMsg) Type() MsgType { return it.Base.MsgType } @@ -115,6 +118,10 @@ type FlushCompletedMsg struct { internalpb2.SegmentFlushCompletedMsg } +func (fl *FlushCompletedMsg) ID() UniqueID { + return fl.Base.MsgID +} + func (fl *FlushCompletedMsg) Type() MsgType { return fl.Base.MsgType } @@ -153,6 +160,10 @@ type FlushMsg struct { internalpb2.FlushMsg } +func (fl *FlushMsg) ID() UniqueID { + return fl.Base.MsgID +} + func (fl *FlushMsg) Type() MsgType { return fl.Base.MsgType } @@ -190,6 +201,10 @@ type DeleteMsg struct { internalpb2.DeleteRequest } +func (dt *DeleteMsg) ID() UniqueID { + return dt.Base.MsgID +} + func (dt *DeleteMsg) Type() MsgType { return dt.Base.MsgType } @@ -239,6 +254,10 @@ type SearchMsg struct { internalpb2.SearchRequest } +func (st *SearchMsg) ID() UniqueID { + return st.Base.MsgID +} + func (st *SearchMsg) Type() MsgType { return st.Base.MsgType } @@ -276,6 +295,10 @@ type SearchResultMsg struct { internalpb2.SearchResults } +func (srt *SearchResultMsg) ID() UniqueID { + return srt.Base.MsgID +} + func (srt *SearchResultMsg) Type() MsgType { return srt.Base.MsgType } @@ -313,6 +336,10 @@ type TimeTickMsg struct { internalpb2.TimeTickMsg } +func (tst *TimeTickMsg) ID() UniqueID { + return tst.Base.MsgID +} + func (tst *TimeTickMsg) Type() MsgType { return tst.Base.MsgType } @@ -351,6 +378,10 @@ type QueryNodeStatsMsg struct { internalpb2.QueryNodeStats } +func (qs *QueryNodeStatsMsg) ID() UniqueID { + return qs.Base.MsgID +} + func (qs *QueryNodeStatsMsg) Type() MsgType { return qs.Base.MsgType } @@ -386,6 +417,10 @@ type SegmentStatisticsMsg struct { internalpb2.SegmentStatistics } +func (ss *SegmentStatisticsMsg) ID() UniqueID { + return ss.Base.MsgID +} + func (ss *SegmentStatisticsMsg) Type() MsgType { return ss.Base.MsgType } @@ -431,6 +466,10 @@ type CreateCollectionMsg struct { internalpb2.CreateCollectionRequest } +func (cc *CreateCollectionMsg) ID() UniqueID { + return cc.Base.MsgID +} + func (cc *CreateCollectionMsg) Type() MsgType { return cc.Base.MsgType } @@ -468,6 +507,10 @@ type DropCollectionMsg struct { internalpb2.DropCollectionRequest } +func (dc *DropCollectionMsg) ID() UniqueID { + return dc.Base.MsgID +} + func (dc *DropCollectionMsg) Type() MsgType { return dc.Base.MsgType } @@ -505,6 +548,10 @@ type CreatePartitionMsg struct { internalpb2.CreatePartitionRequest } +func (cc *CreatePartitionMsg) ID() UniqueID { + return cc.Base.MsgID +} + func (cc *CreatePartitionMsg) Type() MsgType { return cc.Base.MsgType } @@ -542,6 +589,10 @@ type DropPartitionMsg struct { internalpb2.DropPartitionRequest } +func (dc *DropPartitionMsg) ID() UniqueID { + return dc.Base.MsgID +} + func (dc *DropPartitionMsg) Type() MsgType { return dc.Base.MsgType } @@ -579,16 +630,12 @@ type LoadIndexMsg struct { internalpb2.LoadIndex } -func (lim *LoadIndexMsg) Type() MsgType { - return lim.Base.MsgType -} - -func (lim *LoadIndexMsg) GetMsgContext() context.Context { - return lim.MsgCtx +func (lim *LoadIndexMsg) ID() UniqueID { + return lim.Base.MsgID } -func (lim *LoadIndexMsg) SetMsgContext(ctx context.Context) { - lim.MsgCtx = ctx +func (lim *LoadIndexMsg) Type() MsgType { + return lim.Base.MsgType } func (lim *LoadIndexMsg) Marshal(input TsMsg) (MarshalType, error) { @@ -622,6 +669,10 @@ type SegmentInfoMsg struct { datapb.SegmentMsg } +func (sim *SegmentInfoMsg) ID() UniqueID { + return sim.Base.MsgID +} + func (sim *SegmentInfoMsg) Type() MsgType { return sim.Base.MsgType } diff --git a/internal/proxynode/impl.go b/internal/proxynode/impl.go index 8b67f2ddfb18dbe5b99d9a6823b4864137e64bab..ee191e7aa89f3e3c7523f3ea633403c44533dec0 100644 --- a/internal/proxynode/impl.go +++ b/internal/proxynode/impl.go @@ -39,6 +39,7 @@ func (node *NodeImpl) CreateCollection(request *milvuspb.CreateCollectionRequest defer cancel() cct := &CreateCollectionTask{ + ctx: ctx, Condition: NewTaskCondition(ctx), CreateCollectionRequest: request, masterClient: node.masterClient, @@ -70,6 +71,7 @@ func (node *NodeImpl) DropCollection(request *milvuspb.DropCollectionRequest) (* defer cancel() dct := &DropCollectionTask{ + ctx: ctx, Condition: NewTaskCondition(ctx), DropCollectionRequest: request, masterClient: node.masterClient, @@ -100,6 +102,7 @@ func (node *NodeImpl) HasCollection(request *milvuspb.HasCollectionRequest) (*mi defer cancel() hct := &HasCollectionTask{ + ctx: ctx, Condition: NewTaskCondition(ctx), HasCollectionRequest: request, masterClient: node.masterClient, @@ -134,6 +137,7 @@ func (node *NodeImpl) LoadCollection(request *milvuspb.LoadCollectionRequest) (* defer cancel() lct := &LoadCollectionTask{ + ctx: ctx, Condition: NewTaskCondition(ctx), LoadCollectionRequest: request, queryserviceClient: node.queryServiceClient, @@ -164,6 +168,7 @@ func (node *NodeImpl) ReleaseCollection(request *milvuspb.ReleaseCollectionReque defer cancel() rct := &ReleaseCollectionTask{ + ctx: ctx, Condition: NewTaskCondition(ctx), ReleaseCollectionRequest: request, queryserviceClient: node.queryServiceClient, @@ -194,6 +199,7 @@ func (node *NodeImpl) DescribeCollection(request *milvuspb.DescribeCollectionReq defer cancel() dct := &DescribeCollectionTask{ + ctx: ctx, Condition: NewTaskCondition(ctx), DescribeCollectionRequest: request, masterClient: node.masterClient, @@ -227,6 +233,7 @@ func (node *NodeImpl) GetCollectionStatistics(request *milvuspb.CollectionStatsR ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval) defer cancel() g := &GetCollectionsStatisticsTask{ + ctx: ctx, Condition: NewTaskCondition(ctx), CollectionStatsRequest: request, dataServiceClient: node.dataServiceClient, @@ -260,6 +267,7 @@ func (node *NodeImpl) ShowCollections(request *milvuspb.ShowCollectionRequest) ( ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval) defer cancel() sct := &ShowCollectionsTask{ + ctx: ctx, Condition: NewTaskCondition(ctx), ShowCollectionRequest: request, masterClient: node.masterClient, @@ -293,6 +301,7 @@ func (node *NodeImpl) CreatePartition(request *milvuspb.CreatePartitionRequest) ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval) defer cancel() cpt := &CreatePartitionTask{ + ctx: ctx, Condition: NewTaskCondition(ctx), CreatePartitionRequest: request, masterClient: node.masterClient, @@ -321,6 +330,7 @@ func (node *NodeImpl) DropPartition(request *milvuspb.DropPartitionRequest) (*co ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval) defer cancel() dpt := &DropPartitionTask{ + ctx: ctx, Condition: NewTaskCondition(ctx), DropPartitionRequest: request, masterClient: node.masterClient, @@ -350,6 +360,7 @@ func (node *NodeImpl) HasPartition(request *milvuspb.HasPartitionRequest) (*milv ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval) defer cancel() hpt := &HasPartitionTask{ + ctx: ctx, Condition: NewTaskCondition(ctx), HasPartitionRequest: request, masterClient: node.masterClient, @@ -386,6 +397,7 @@ func (node *NodeImpl) LoadPartitions(request *milvuspb.LoadPartitonRequest) (*co defer cancel() lpt := &LoadPartitionTask{ + ctx: ctx, Condition: NewTaskCondition(ctx), LoadPartitonRequest: request, queryserviceClient: node.queryServiceClient, @@ -416,6 +428,7 @@ func (node *NodeImpl) ReleasePartitions(request *milvuspb.ReleasePartitionReques defer cancel() rpt := &ReleasePartitionTask{ + ctx: ctx, Condition: NewTaskCondition(ctx), ReleasePartitionRequest: request, queryserviceClient: node.queryServiceClient, @@ -449,6 +462,7 @@ func (node *NodeImpl) ShowPartitions(request *milvuspb.ShowPartitionRequest) (*m ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval) defer cancel() spt := &ShowPartitionsTask{ + ctx: ctx, Condition: NewTaskCondition(ctx), ShowPartitionRequest: request, masterClient: node.masterClient, @@ -483,6 +497,7 @@ func (node *NodeImpl) CreateIndex(request *milvuspb.CreateIndexRequest) (*common ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval) defer cancel() cit := &CreateIndexTask{ + ctx: ctx, Condition: NewTaskCondition(ctx), CreateIndexRequest: request, masterClient: node.masterClient, @@ -512,6 +527,7 @@ func (node *NodeImpl) DescribeIndex(request *milvuspb.DescribeIndexRequest) (*mi ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval) defer cancel() dit := &DescribeIndexTask{ + ctx: ctx, Condition: NewTaskCondition(ctx), DescribeIndexRequest: request, masterClient: node.masterClient, @@ -545,6 +561,7 @@ func (node *NodeImpl) DropIndex(request *milvuspb.DropIndexRequest) (*commonpb.S ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval) defer cancel() dit := &DropIndexTask{ + ctx: ctx, Condition: NewTaskCondition(ctx), DropIndexRequest: request, masterClient: node.masterClient, @@ -571,6 +588,7 @@ func (node *NodeImpl) GetIndexState(request *milvuspb.IndexStateRequest) (*milvu ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval) defer cancel() dipt := &GetIndexStateTask{ + ctx: ctx, Condition: NewTaskCondition(ctx), IndexStateRequest: request, indexServiceClient: node.indexServiceClient, @@ -605,6 +623,7 @@ func (node *NodeImpl) Insert(request *milvuspb.InsertRequest) (*milvuspb.InsertR defer cancel() it := &InsertTask{ + ctx: ctx, Condition: NewTaskCondition(ctx), dataServiceClient: node.dataServiceClient, BaseInsertTask: BaseInsertTask{ @@ -656,8 +675,9 @@ func (node *NodeImpl) Search(request *milvuspb.SearchRequest) (*milvuspb.SearchR defer cancel() qt := &SearchTask{ + ctx: ctx, Condition: NewTaskCondition(ctx), - SearchRequest: internalpb2.SearchRequest{ + SearchRequest: &internalpb2.SearchRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kSearch, SourceID: Params.ProxyID, @@ -697,6 +717,7 @@ func (node *NodeImpl) Flush(request *milvuspb.FlushRequest) (*commonpb.Status, e ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval) defer cancel() ft := &FlushTask{ + ctx: ctx, Condition: NewTaskCondition(ctx), FlushRequest: request, dataServiceClient: node.dataServiceClient, diff --git a/internal/proxynode/task.go b/internal/proxynode/task.go index cfd6ebb7b20aea4b232afd18f68f865b872cca36..c9bb10e6559365ae2dcbdb8e3021db5c627e68b8 100644 --- a/internal/proxynode/task.go +++ b/internal/proxynode/task.go @@ -21,17 +21,43 @@ import ( "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) +const ( + InsertTaskName = "InsertTask" + CreateCollectionTaskName = "CreateCollectionTask" + DropCollectionTaskName = "DropCollectionTask" + SearchTaskName = "SearchTask" + HasCollectionTaskName = "HasCollectionTask" + DescribeCollectionTaskName = "DescribeCollectionTask" + GetCollectionStatisticsTaskName = "GetCollectionStatisticsTask" + ShowCollectionTaskName = "ShowCollectionTask" + CreatePartitionTaskName = "CreatePartitionTask" + DropPartitionTaskName = "DropPartitionTask" + HasPartitionTaskName = "HasPartitionTask" + ShowPartitionTaskName = "ShowPartitionTask" + CreateIndexTaskName = "CreateIndexTask" + DescribeIndexTaskName = "DescribeIndexTask" + DropIndexTaskName = "DropIndexTask" + GetIndexStateTaskName = "GetIndexStateTask" + FlushTaskName = "FlushTask" + LoadCollectionTaskName = "LoadCollectionTask" + ReleaseCollectionTaskName = "ReleaseCollectionTask" + LoadPartitionTaskName = "LoadPartitionTask" + ReleasePartitionTaskName = "ReleasePartitionTask" +) + type task interface { + Ctx() context.Context ID() UniqueID // return ReqID SetID(uid UniqueID) // set ReqID + Name() string Type() commonpb.MsgType BeginTs() Timestamp EndTs() Timestamp SetTs(ts Timestamp) OnEnqueue() error - PreExecute() error - Execute() error - PostExecute() error + PreExecute(ctx context.Context) error + Execute(ctx context.Context) error + PostExecute(ctx context.Context) error WaitToFinish() error Notify(err error) } @@ -41,19 +67,36 @@ type BaseInsertTask = msgstream.InsertMsg type InsertTask struct { BaseInsertTask Condition + ctx context.Context dataServiceClient DataServiceClient result *milvuspb.InsertResponse rowIDAllocator *allocator.IDAllocator } -func (it *InsertTask) OnEnqueue() error { - return nil +func (it *InsertTask) Ctx() context.Context { + return it.ctx +} + +func (it *InsertTask) ID() UniqueID { + return it.Base.MsgID } func (it *InsertTask) SetID(uid UniqueID) { it.Base.MsgID = uid } +func (it *InsertTask) Name() string { + return InsertTaskName +} + +func (it *InsertTask) Type() commonpb.MsgType { + return it.Base.MsgType +} + +func (it *InsertTask) BeginTs() Timestamp { + return it.BeginTimestamp +} + func (it *InsertTask) SetTs(ts Timestamp) { rowNum := len(it.RowData) it.Timestamps = make([]uint64, rowNum) @@ -64,23 +107,15 @@ func (it *InsertTask) SetTs(ts Timestamp) { it.EndTimestamp = ts } -func (it *InsertTask) BeginTs() Timestamp { - return it.BeginTimestamp -} - func (it *InsertTask) EndTs() Timestamp { return it.EndTimestamp } -func (it *InsertTask) ID() UniqueID { - return it.Base.MsgID -} - -func (it *InsertTask) Type() commonpb.MsgType { - return it.Base.MsgType +func (it *InsertTask) OnEnqueue() error { + return nil } -func (it *InsertTask) PreExecute() error { +func (it *InsertTask) PreExecute(ctx context.Context) error { it.Base.MsgType = commonpb.MsgType_kInsert it.Base.SourceID = Params.ProxyID @@ -96,7 +131,7 @@ func (it *InsertTask) PreExecute() error { return nil } -func (it *InsertTask) Execute() error { +func (it *InsertTask) Execute(ctx context.Context) error { collectionName := it.BaseInsertTask.CollectionName collSchema, err := globalMetaCache.GetCollectionSchema(collectionName) if err != nil { @@ -199,22 +234,22 @@ func (it *InsertTask) Execute() error { return nil } -func (it *InsertTask) PostExecute() error { +func (it *InsertTask) PostExecute(ctx context.Context) error { return nil } type CreateCollectionTask struct { Condition *milvuspb.CreateCollectionRequest + ctx context.Context masterClient MasterClient dataServiceClient DataServiceClient result *commonpb.Status schema *schemapb.CollectionSchema } -func (cct *CreateCollectionTask) OnEnqueue() error { - cct.Base = &commonpb.MsgBase{} - return nil +func (cct *CreateCollectionTask) Ctx() context.Context { + return cct.ctx } func (cct *CreateCollectionTask) ID() UniqueID { @@ -225,6 +260,10 @@ func (cct *CreateCollectionTask) SetID(uid UniqueID) { cct.Base.MsgID = uid } +func (cct *CreateCollectionTask) Name() string { + return CreateCollectionTaskName +} + func (cct *CreateCollectionTask) Type() commonpb.MsgType { return cct.Base.MsgType } @@ -241,7 +280,12 @@ func (cct *CreateCollectionTask) SetTs(ts Timestamp) { cct.Base.Timestamp = ts } -func (cct *CreateCollectionTask) PreExecute() error { +func (cct *CreateCollectionTask) OnEnqueue() error { + cct.Base = &commonpb.MsgBase{} + return nil +} + +func (cct *CreateCollectionTask) PreExecute(ctx context.Context) error { cct.Base.MsgType = commonpb.MsgType_kCreateCollection cct.Base.SourceID = Params.ProxyID @@ -308,7 +352,7 @@ func (cct *CreateCollectionTask) PreExecute() error { return nil } -func (cct *CreateCollectionTask) Execute() error { +func (cct *CreateCollectionTask) Execute(ctx context.Context) error { var err error cct.result, err = cct.masterClient.CreateCollection(cct.CreateCollectionRequest) if err != nil { @@ -343,20 +387,20 @@ func (cct *CreateCollectionTask) Execute() error { return nil } -func (cct *CreateCollectionTask) PostExecute() error { +func (cct *CreateCollectionTask) PostExecute(ctx context.Context) error { return nil } type DropCollectionTask struct { Condition *milvuspb.DropCollectionRequest + ctx context.Context masterClient MasterClient result *commonpb.Status } -func (dct *DropCollectionTask) OnEnqueue() error { - dct.Base = &commonpb.MsgBase{} - return nil +func (dct *DropCollectionTask) Ctx() context.Context { + return dct.ctx } func (dct *DropCollectionTask) ID() UniqueID { @@ -367,6 +411,10 @@ func (dct *DropCollectionTask) SetID(uid UniqueID) { dct.Base.MsgID = uid } +func (dct *DropCollectionTask) Name() string { + return DropCollectionTaskName +} + func (dct *DropCollectionTask) Type() commonpb.MsgType { return dct.Base.MsgType } @@ -383,7 +431,12 @@ func (dct *DropCollectionTask) SetTs(ts Timestamp) { dct.Base.Timestamp = ts } -func (dct *DropCollectionTask) PreExecute() error { +func (dct *DropCollectionTask) OnEnqueue() error { + dct.Base = &commonpb.MsgBase{} + return nil +} + +func (dct *DropCollectionTask) PreExecute(ctx context.Context) error { dct.Base.MsgType = commonpb.MsgType_kDropCollection dct.Base.SourceID = Params.ProxyID @@ -393,7 +446,7 @@ func (dct *DropCollectionTask) PreExecute() error { return nil } -func (dct *DropCollectionTask) Execute() error { +func (dct *DropCollectionTask) Execute(ctx context.Context) error { collID, err := globalMetaCache.GetCollectionID(dct.CollectionName) if err != nil { return err @@ -412,22 +465,23 @@ func (dct *DropCollectionTask) Execute() error { return nil } -func (dct *DropCollectionTask) PostExecute() error { +func (dct *DropCollectionTask) PostExecute(ctx context.Context) error { globalMetaCache.RemoveCollection(dct.CollectionName) return nil } type SearchTask struct { Condition - internalpb2.SearchRequest + *internalpb2.SearchRequest + ctx context.Context queryMsgStream msgstream.MsgStream resultBuf chan []*internalpb2.SearchResults result *milvuspb.SearchResults query *milvuspb.SearchRequest } -func (st *SearchTask) OnEnqueue() error { - return nil +func (st *SearchTask) Ctx() context.Context { + return st.ctx } func (st *SearchTask) ID() UniqueID { @@ -438,6 +492,10 @@ func (st *SearchTask) SetID(uid UniqueID) { st.Base.MsgID = uid } +func (st *SearchTask) Name() string { + return SearchTaskName +} + func (st *SearchTask) Type() commonpb.MsgType { return st.Base.MsgType } @@ -454,7 +512,11 @@ func (st *SearchTask) SetTs(ts Timestamp) { st.Base.Timestamp = ts } -func (st *SearchTask) PreExecute() error { +func (st *SearchTask) OnEnqueue() error { + return nil +} + +func (st *SearchTask) PreExecute(ctx context.Context) error { st.Base.MsgType = commonpb.MsgType_kSearch st.Base.SourceID = Params.ProxyID @@ -503,9 +565,9 @@ func (st *SearchTask) PreExecute() error { return nil } -func (st *SearchTask) Execute() error { +func (st *SearchTask) Execute(ctx context.Context) error { var tsMsg msgstream.TsMsg = &msgstream.SearchMsg{ - SearchRequest: st.SearchRequest, + SearchRequest: *st.SearchRequest, BaseMsg: msgstream.BaseMsg{ HashValues: []uint32{uint32(Params.ProxyID)}, BeginTimestamp: st.Base.Timestamp, @@ -526,7 +588,7 @@ func (st *SearchTask) Execute() error { return err } -func (st *SearchTask) PostExecute() error { +func (st *SearchTask) PostExecute(ctx context.Context) error { for { select { case <-st.Ctx().Done(): @@ -682,13 +744,13 @@ func (st *SearchTask) PostExecute() error { type HasCollectionTask struct { Condition *milvuspb.HasCollectionRequest + ctx context.Context masterClient MasterClient result *milvuspb.BoolResponse } -func (hct *HasCollectionTask) OnEnqueue() error { - hct.Base = &commonpb.MsgBase{} - return nil +func (hct *HasCollectionTask) Ctx() context.Context { + return hct.ctx } func (hct *HasCollectionTask) ID() UniqueID { @@ -699,6 +761,10 @@ func (hct *HasCollectionTask) SetID(uid UniqueID) { hct.Base.MsgID = uid } +func (hct *HasCollectionTask) Name() string { + return HasCollectionTaskName +} + func (hct *HasCollectionTask) Type() commonpb.MsgType { return hct.Base.MsgType } @@ -715,7 +781,12 @@ func (hct *HasCollectionTask) SetTs(ts Timestamp) { hct.Base.Timestamp = ts } -func (hct *HasCollectionTask) PreExecute() error { +func (hct *HasCollectionTask) OnEnqueue() error { + hct.Base = &commonpb.MsgBase{} + return nil +} + +func (hct *HasCollectionTask) PreExecute(ctx context.Context) error { hct.Base.MsgType = commonpb.MsgType_kHasCollection hct.Base.SourceID = Params.ProxyID @@ -725,7 +796,7 @@ func (hct *HasCollectionTask) PreExecute() error { return nil } -func (hct *HasCollectionTask) Execute() error { +func (hct *HasCollectionTask) Execute(ctx context.Context) error { var err error hct.result, err = hct.masterClient.HasCollection(hct.HasCollectionRequest) if hct.result == nil { @@ -737,20 +808,20 @@ func (hct *HasCollectionTask) Execute() error { return err } -func (hct *HasCollectionTask) PostExecute() error { +func (hct *HasCollectionTask) PostExecute(ctx context.Context) error { return nil } type DescribeCollectionTask struct { Condition *milvuspb.DescribeCollectionRequest + ctx context.Context masterClient MasterClient result *milvuspb.DescribeCollectionResponse } -func (dct *DescribeCollectionTask) OnEnqueue() error { - dct.Base = &commonpb.MsgBase{} - return nil +func (dct *DescribeCollectionTask) Ctx() context.Context { + return dct.ctx } func (dct *DescribeCollectionTask) ID() UniqueID { @@ -761,6 +832,10 @@ func (dct *DescribeCollectionTask) SetID(uid UniqueID) { dct.Base.MsgID = uid } +func (dct *DescribeCollectionTask) Name() string { + return DescribeCollectionTaskName +} + func (dct *DescribeCollectionTask) Type() commonpb.MsgType { return dct.Base.MsgType } @@ -777,7 +852,12 @@ func (dct *DescribeCollectionTask) SetTs(ts Timestamp) { dct.Base.Timestamp = ts } -func (dct *DescribeCollectionTask) PreExecute() error { +func (dct *DescribeCollectionTask) OnEnqueue() error { + dct.Base = &commonpb.MsgBase{} + return nil +} + +func (dct *DescribeCollectionTask) PreExecute(ctx context.Context) error { dct.Base.MsgType = commonpb.MsgType_kDescribeCollection dct.Base.SourceID = Params.ProxyID @@ -787,7 +867,7 @@ func (dct *DescribeCollectionTask) PreExecute() error { return nil } -func (dct *DescribeCollectionTask) Execute() error { +func (dct *DescribeCollectionTask) Execute(ctx context.Context) error { var err error dct.result, err = dct.masterClient.DescribeCollection(dct.DescribeCollectionRequest) if dct.result == nil { @@ -799,17 +879,22 @@ func (dct *DescribeCollectionTask) Execute() error { return err } -func (dct *DescribeCollectionTask) PostExecute() error { +func (dct *DescribeCollectionTask) PostExecute(ctx context.Context) error { return nil } type GetCollectionsStatisticsTask struct { Condition *milvuspb.CollectionStatsRequest + ctx context.Context dataServiceClient DataServiceClient result *milvuspb.CollectionStatsResponse } +func (g *GetCollectionsStatisticsTask) Ctx() context.Context { + return g.ctx +} + func (g *GetCollectionsStatisticsTask) ID() UniqueID { return g.Base.MsgID } @@ -818,6 +903,10 @@ func (g *GetCollectionsStatisticsTask) SetID(uid UniqueID) { g.Base.MsgID = uid } +func (g *GetCollectionsStatisticsTask) Name() string { + return GetCollectionStatisticsTaskName +} + func (g *GetCollectionsStatisticsTask) Type() commonpb.MsgType { return g.Base.MsgType } @@ -839,13 +928,13 @@ func (g *GetCollectionsStatisticsTask) OnEnqueue() error { return nil } -func (g *GetCollectionsStatisticsTask) PreExecute() error { +func (g *GetCollectionsStatisticsTask) PreExecute(ctx context.Context) error { g.Base.MsgType = commonpb.MsgType_kGetCollectionStatistics g.Base.SourceID = Params.ProxyID return nil } -func (g *GetCollectionsStatisticsTask) Execute() error { +func (g *GetCollectionsStatisticsTask) Execute(ctx context.Context) error { collID, err := globalMetaCache.GetCollectionID(g.CollectionName) if err != nil { return err @@ -877,21 +966,20 @@ func (g *GetCollectionsStatisticsTask) Execute() error { return nil } -func (g *GetCollectionsStatisticsTask) PostExecute() error { +func (g *GetCollectionsStatisticsTask) PostExecute(ctx context.Context) error { return nil } type ShowCollectionsTask struct { Condition *milvuspb.ShowCollectionRequest + ctx context.Context masterClient MasterClient result *milvuspb.ShowCollectionResponse - ctx context.Context } -func (sct *ShowCollectionsTask) OnEnqueue() error { - sct.Base = &commonpb.MsgBase{} - return nil +func (sct *ShowCollectionsTask) Ctx() context.Context { + return sct.ctx } func (sct *ShowCollectionsTask) ID() UniqueID { @@ -902,6 +990,10 @@ func (sct *ShowCollectionsTask) SetID(uid UniqueID) { sct.Base.MsgID = uid } +func (sct *ShowCollectionsTask) Name() string { + return ShowCollectionTaskName +} + func (sct *ShowCollectionsTask) Type() commonpb.MsgType { return sct.Base.MsgType } @@ -918,14 +1010,19 @@ func (sct *ShowCollectionsTask) SetTs(ts Timestamp) { sct.Base.Timestamp = ts } -func (sct *ShowCollectionsTask) PreExecute() error { +func (sct *ShowCollectionsTask) OnEnqueue() error { + sct.Base = &commonpb.MsgBase{} + return nil +} + +func (sct *ShowCollectionsTask) PreExecute(ctx context.Context) error { sct.Base.MsgType = commonpb.MsgType_kShowCollections sct.Base.SourceID = Params.ProxyID return nil } -func (sct *ShowCollectionsTask) Execute() error { +func (sct *ShowCollectionsTask) Execute(ctx context.Context) error { var err error sct.result, err = sct.masterClient.ShowCollections(sct.ShowCollectionRequest) if sct.result == nil { @@ -937,20 +1034,20 @@ func (sct *ShowCollectionsTask) Execute() error { return err } -func (sct *ShowCollectionsTask) PostExecute() error { +func (sct *ShowCollectionsTask) PostExecute(ctx context.Context) error { return nil } type CreatePartitionTask struct { Condition *milvuspb.CreatePartitionRequest + ctx context.Context masterClient MasterClient result *commonpb.Status } -func (cpt *CreatePartitionTask) OnEnqueue() error { - cpt.Base = &commonpb.MsgBase{} - return nil +func (cpt *CreatePartitionTask) Ctx() context.Context { + return cpt.ctx } func (cpt *CreatePartitionTask) ID() UniqueID { @@ -961,6 +1058,10 @@ func (cpt *CreatePartitionTask) SetID(uid UniqueID) { cpt.Base.MsgID = uid } +func (cpt *CreatePartitionTask) Name() string { + return CreatePartitionTaskName +} + func (cpt *CreatePartitionTask) Type() commonpb.MsgType { return cpt.Base.MsgType } @@ -977,7 +1078,12 @@ func (cpt *CreatePartitionTask) SetTs(ts Timestamp) { cpt.Base.Timestamp = ts } -func (cpt *CreatePartitionTask) PreExecute() error { +func (cpt *CreatePartitionTask) OnEnqueue() error { + cpt.Base = &commonpb.MsgBase{} + return nil +} + +func (cpt *CreatePartitionTask) PreExecute(ctx context.Context) error { cpt.Base.MsgType = commonpb.MsgType_kCreatePartition cpt.Base.SourceID = Params.ProxyID @@ -994,7 +1100,7 @@ func (cpt *CreatePartitionTask) PreExecute() error { return nil } -func (cpt *CreatePartitionTask) Execute() (err error) { +func (cpt *CreatePartitionTask) Execute(ctx context.Context) (err error) { cpt.result, err = cpt.masterClient.CreatePartition(cpt.CreatePartitionRequest) if cpt.result == nil { return errors.New("get collection statistics resp is nil") @@ -1005,20 +1111,20 @@ func (cpt *CreatePartitionTask) Execute() (err error) { return err } -func (cpt *CreatePartitionTask) PostExecute() error { +func (cpt *CreatePartitionTask) PostExecute(ctx context.Context) error { return nil } type DropPartitionTask struct { Condition *milvuspb.DropPartitionRequest + ctx context.Context masterClient MasterClient result *commonpb.Status } -func (dpt *DropPartitionTask) OnEnqueue() error { - dpt.Base = &commonpb.MsgBase{} - return nil +func (dpt *DropPartitionTask) Ctx() context.Context { + return dpt.ctx } func (dpt *DropPartitionTask) ID() UniqueID { @@ -1029,6 +1135,10 @@ func (dpt *DropPartitionTask) SetID(uid UniqueID) { dpt.Base.MsgID = uid } +func (dpt *DropPartitionTask) Name() string { + return DropPartitionTaskName +} + func (dpt *DropPartitionTask) Type() commonpb.MsgType { return dpt.Base.MsgType } @@ -1045,7 +1155,12 @@ func (dpt *DropPartitionTask) SetTs(ts Timestamp) { dpt.Base.Timestamp = ts } -func (dpt *DropPartitionTask) PreExecute() error { +func (dpt *DropPartitionTask) OnEnqueue() error { + dpt.Base = &commonpb.MsgBase{} + return nil +} + +func (dpt *DropPartitionTask) PreExecute(ctx context.Context) error { dpt.Base.MsgType = commonpb.MsgType_kDropPartition dpt.Base.SourceID = Params.ProxyID @@ -1062,7 +1177,7 @@ func (dpt *DropPartitionTask) PreExecute() error { return nil } -func (dpt *DropPartitionTask) Execute() (err error) { +func (dpt *DropPartitionTask) Execute(ctx context.Context) (err error) { dpt.result, err = dpt.masterClient.DropPartition(dpt.DropPartitionRequest) if dpt.result == nil { return errors.New("get collection statistics resp is nil") @@ -1073,20 +1188,20 @@ func (dpt *DropPartitionTask) Execute() (err error) { return err } -func (dpt *DropPartitionTask) PostExecute() error { +func (dpt *DropPartitionTask) PostExecute(ctx context.Context) error { return nil } type HasPartitionTask struct { Condition *milvuspb.HasPartitionRequest + ctx context.Context masterClient MasterClient result *milvuspb.BoolResponse } -func (hpt *HasPartitionTask) OnEnqueue() error { - hpt.Base = &commonpb.MsgBase{} - return nil +func (hpt *HasPartitionTask) Ctx() context.Context { + return hpt.ctx } func (hpt *HasPartitionTask) ID() UniqueID { @@ -1097,6 +1212,10 @@ func (hpt *HasPartitionTask) SetID(uid UniqueID) { hpt.Base.MsgID = uid } +func (hpt *HasPartitionTask) Name() string { + return HasPartitionTaskName +} + func (hpt *HasPartitionTask) Type() commonpb.MsgType { return hpt.Base.MsgType } @@ -1113,7 +1232,12 @@ func (hpt *HasPartitionTask) SetTs(ts Timestamp) { hpt.Base.Timestamp = ts } -func (hpt *HasPartitionTask) PreExecute() error { +func (hpt *HasPartitionTask) OnEnqueue() error { + hpt.Base = &commonpb.MsgBase{} + return nil +} + +func (hpt *HasPartitionTask) PreExecute(ctx context.Context) error { hpt.Base.MsgType = commonpb.MsgType_kHasPartition hpt.Base.SourceID = Params.ProxyID @@ -1129,7 +1253,7 @@ func (hpt *HasPartitionTask) PreExecute() error { return nil } -func (hpt *HasPartitionTask) Execute() (err error) { +func (hpt *HasPartitionTask) Execute(ctx context.Context) (err error) { hpt.result, err = hpt.masterClient.HasPartition(hpt.HasPartitionRequest) if hpt.result == nil { return errors.New("get collection statistics resp is nil") @@ -1140,20 +1264,20 @@ func (hpt *HasPartitionTask) Execute() (err error) { return err } -func (hpt *HasPartitionTask) PostExecute() error { +func (hpt *HasPartitionTask) PostExecute(ctx context.Context) error { return nil } type ShowPartitionsTask struct { Condition *milvuspb.ShowPartitionRequest + ctx context.Context masterClient MasterClient result *milvuspb.ShowPartitionResponse } -func (spt *ShowPartitionsTask) OnEnqueue() error { - spt.Base = &commonpb.MsgBase{} - return nil +func (spt *ShowPartitionsTask) Ctx() context.Context { + return spt.ctx } func (spt *ShowPartitionsTask) ID() UniqueID { @@ -1164,6 +1288,10 @@ func (spt *ShowPartitionsTask) SetID(uid UniqueID) { spt.Base.MsgID = uid } +func (spt *ShowPartitionsTask) Name() string { + return ShowPartitionTaskName +} + func (spt *ShowPartitionsTask) Type() commonpb.MsgType { return spt.Base.MsgType } @@ -1180,7 +1308,12 @@ func (spt *ShowPartitionsTask) SetTs(ts Timestamp) { spt.Base.Timestamp = ts } -func (spt *ShowPartitionsTask) PreExecute() error { +func (spt *ShowPartitionsTask) OnEnqueue() error { + spt.Base = &commonpb.MsgBase{} + return nil +} + +func (spt *ShowPartitionsTask) PreExecute(ctx context.Context) error { spt.Base.MsgType = commonpb.MsgType_kShowPartitions spt.Base.SourceID = Params.ProxyID @@ -1190,7 +1323,7 @@ func (spt *ShowPartitionsTask) PreExecute() error { return nil } -func (spt *ShowPartitionsTask) Execute() error { +func (spt *ShowPartitionsTask) Execute(ctx context.Context) error { var err error spt.result, err = spt.masterClient.ShowPartitions(spt.ShowPartitionRequest) if spt.result == nil { @@ -1202,20 +1335,20 @@ func (spt *ShowPartitionsTask) Execute() error { return err } -func (spt *ShowPartitionsTask) PostExecute() error { +func (spt *ShowPartitionsTask) PostExecute(ctx context.Context) error { return nil } type CreateIndexTask struct { Condition *milvuspb.CreateIndexRequest + ctx context.Context masterClient MasterClient result *commonpb.Status } -func (cit *CreateIndexTask) OnEnqueue() error { - cit.Base = &commonpb.MsgBase{} - return nil +func (cit *CreateIndexTask) Ctx() context.Context { + return cit.ctx } func (cit *CreateIndexTask) ID() UniqueID { @@ -1226,6 +1359,10 @@ func (cit *CreateIndexTask) SetID(uid UniqueID) { cit.Base.MsgID = uid } +func (cit *CreateIndexTask) Name() string { + return CreateIndexTaskName +} + func (cit *CreateIndexTask) Type() commonpb.MsgType { return cit.Base.MsgType } @@ -1242,7 +1379,12 @@ func (cit *CreateIndexTask) SetTs(ts Timestamp) { cit.Base.Timestamp = ts } -func (cit *CreateIndexTask) PreExecute() error { +func (cit *CreateIndexTask) OnEnqueue() error { + cit.Base = &commonpb.MsgBase{} + return nil +} + +func (cit *CreateIndexTask) PreExecute(ctx context.Context) error { cit.Base.MsgType = commonpb.MsgType_kCreateIndex cit.Base.SourceID = Params.ProxyID @@ -1259,7 +1401,7 @@ func (cit *CreateIndexTask) PreExecute() error { return nil } -func (cit *CreateIndexTask) Execute() error { +func (cit *CreateIndexTask) Execute(ctx context.Context) error { var err error cit.result, err = cit.masterClient.CreateIndex(cit.CreateIndexRequest) if cit.result == nil { @@ -1271,20 +1413,20 @@ func (cit *CreateIndexTask) Execute() error { return err } -func (cit *CreateIndexTask) PostExecute() error { +func (cit *CreateIndexTask) PostExecute(ctx context.Context) error { return nil } type DescribeIndexTask struct { Condition *milvuspb.DescribeIndexRequest + ctx context.Context masterClient MasterClient result *milvuspb.DescribeIndexResponse } -func (dit *DescribeIndexTask) OnEnqueue() error { - dit.Base = &commonpb.MsgBase{} - return nil +func (dit *DescribeIndexTask) Ctx() context.Context { + return dit.ctx } func (dit *DescribeIndexTask) ID() UniqueID { @@ -1295,6 +1437,10 @@ func (dit *DescribeIndexTask) SetID(uid UniqueID) { dit.Base.MsgID = uid } +func (dit *DescribeIndexTask) Name() string { + return DescribeIndexTaskName +} + func (dit *DescribeIndexTask) Type() commonpb.MsgType { return dit.Base.MsgType } @@ -1311,7 +1457,12 @@ func (dit *DescribeIndexTask) SetTs(ts Timestamp) { dit.Base.Timestamp = ts } -func (dit *DescribeIndexTask) PreExecute() error { +func (dit *DescribeIndexTask) OnEnqueue() error { + dit.Base = &commonpb.MsgBase{} + return nil +} + +func (dit *DescribeIndexTask) PreExecute(ctx context.Context) error { dit.Base.MsgType = commonpb.MsgType_kDescribeIndex dit.Base.SourceID = Params.ProxyID @@ -1333,7 +1484,7 @@ func (dit *DescribeIndexTask) PreExecute() error { return nil } -func (dit *DescribeIndexTask) Execute() error { +func (dit *DescribeIndexTask) Execute(ctx context.Context) error { var err error dit.result, err = dit.masterClient.DescribeIndex(dit.DescribeIndexRequest) log.Println("YYYYY:", dit.result) @@ -1346,20 +1497,20 @@ func (dit *DescribeIndexTask) Execute() error { return err } -func (dit *DescribeIndexTask) PostExecute() error { +func (dit *DescribeIndexTask) PostExecute(ctx context.Context) error { return nil } type DropIndexTask struct { Condition + ctx context.Context *milvuspb.DropIndexRequest masterClient MasterClient result *commonpb.Status } -func (dit *DropIndexTask) OnEnqueue() error { - dit.Base = &commonpb.MsgBase{} - return nil +func (dit *DropIndexTask) Ctx() context.Context { + return dit.ctx } func (dit *DropIndexTask) ID() UniqueID { @@ -1370,6 +1521,10 @@ func (dit *DropIndexTask) SetID(uid UniqueID) { dit.Base.MsgID = uid } +func (dit *DropIndexTask) Name() string { + return DropIndexTaskName +} + func (dit *DropIndexTask) Type() commonpb.MsgType { return dit.Base.MsgType } @@ -1386,7 +1541,12 @@ func (dit *DropIndexTask) SetTs(ts Timestamp) { dit.Base.Timestamp = ts } -func (dit *DropIndexTask) PreExecute() error { +func (dit *DropIndexTask) OnEnqueue() error { + dit.Base = &commonpb.MsgBase{} + return nil +} + +func (dit *DropIndexTask) PreExecute(ctx context.Context) error { dit.Base.MsgType = commonpb.MsgType_kDropIndex dit.Base.SourceID = Params.ProxyID @@ -1403,7 +1563,7 @@ func (dit *DropIndexTask) PreExecute() error { return nil } -func (dit *DropIndexTask) Execute() error { +func (dit *DropIndexTask) Execute(ctx context.Context) error { var err error dit.result, err = dit.masterClient.DropIndex(dit.DropIndexRequest) if dit.result == nil { @@ -1415,52 +1575,61 @@ func (dit *DropIndexTask) Execute() error { return err } -func (dit *DropIndexTask) PostExecute() error { +func (dit *DropIndexTask) PostExecute(ctx context.Context) error { return nil } type GetIndexStateTask struct { Condition *milvuspb.IndexStateRequest + ctx context.Context indexServiceClient IndexServiceClient masterClient MasterClient result *milvuspb.IndexStateResponse } -func (dipt *GetIndexStateTask) OnEnqueue() error { - dipt.Base = &commonpb.MsgBase{} - return nil +func (gist *GetIndexStateTask) Ctx() context.Context { + return gist.ctx } -func (dipt *GetIndexStateTask) ID() UniqueID { - return dipt.Base.MsgID +func (gist *GetIndexStateTask) ID() UniqueID { + return gist.Base.MsgID } -func (dipt *GetIndexStateTask) SetID(uid UniqueID) { - dipt.Base.MsgID = uid +func (gist *GetIndexStateTask) SetID(uid UniqueID) { + gist.Base.MsgID = uid } -func (dipt *GetIndexStateTask) Type() commonpb.MsgType { - return dipt.Base.MsgType +func (gist *GetIndexStateTask) Name() string { + return GetIndexStateTaskName } -func (dipt *GetIndexStateTask) BeginTs() Timestamp { - return dipt.Base.Timestamp +func (gist *GetIndexStateTask) Type() commonpb.MsgType { + return gist.Base.MsgType } -func (dipt *GetIndexStateTask) EndTs() Timestamp { - return dipt.Base.Timestamp +func (gist *GetIndexStateTask) BeginTs() Timestamp { + return gist.Base.Timestamp } -func (dipt *GetIndexStateTask) SetTs(ts Timestamp) { - dipt.Base.Timestamp = ts +func (gist *GetIndexStateTask) EndTs() Timestamp { + return gist.Base.Timestamp } -func (dipt *GetIndexStateTask) PreExecute() error { - dipt.Base.MsgType = commonpb.MsgType_kGetIndexState - dipt.Base.SourceID = Params.ProxyID +func (gist *GetIndexStateTask) SetTs(ts Timestamp) { + gist.Base.Timestamp = ts +} + +func (gist *GetIndexStateTask) OnEnqueue() error { + gist.Base = &commonpb.MsgBase{} + return nil +} - collName, fieldName := dipt.CollectionName, dipt.FieldName +func (gist *GetIndexStateTask) PreExecute(ctx context.Context) error { + gist.Base.MsgType = commonpb.MsgType_kGetIndexState + gist.Base.SourceID = Params.ProxyID + + collName, fieldName := gist.CollectionName, gist.FieldName if err := ValidateCollectionName(collName); err != nil { return err @@ -1473,8 +1642,8 @@ func (dipt *GetIndexStateTask) PreExecute() error { return nil } -func (dipt *GetIndexStateTask) Execute() error { - collectionName := dipt.CollectionName +func (gist *GetIndexStateTask) Execute(ctx context.Context) error { + collectionName := gist.CollectionName collectionID, err := globalMetaCache.GetCollectionID(collectionName) if err != nil { // err is not nil if collection not exists return err @@ -1483,37 +1652,37 @@ func (dipt *GetIndexStateTask) Execute() error { showPartitionRequest := &milvuspb.ShowPartitionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kShowPartitions, - MsgID: dipt.Base.MsgID, - Timestamp: dipt.Base.Timestamp, + MsgID: gist.Base.MsgID, + Timestamp: gist.Base.Timestamp, SourceID: Params.ProxyID, }, - DbName: dipt.DbName, + DbName: gist.DbName, CollectionName: collectionName, CollectionID: collectionID, } - partitions, err := dipt.masterClient.ShowPartitions(showPartitionRequest) + partitions, err := gist.masterClient.ShowPartitions(showPartitionRequest) if err != nil { return err } - if dipt.IndexName == "" { - dipt.IndexName = Params.DefaultIndexName + if gist.IndexName == "" { + gist.IndexName = Params.DefaultIndexName } describeIndexReq := milvuspb.DescribeIndexRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kDescribeIndex, - MsgID: dipt.Base.MsgID, - Timestamp: dipt.Base.Timestamp, + MsgID: gist.Base.MsgID, + Timestamp: gist.Base.Timestamp, SourceID: Params.ProxyID, }, - DbName: dipt.DbName, - CollectionName: dipt.CollectionName, - FieldName: dipt.FieldName, - IndexName: dipt.IndexName, + DbName: gist.DbName, + CollectionName: gist.CollectionName, + FieldName: gist.FieldName, + IndexName: gist.IndexName, } - indexDescriptionResp, err2 := dipt.masterClient.DescribeIndex(&describeIndexReq) + indexDescriptionResp, err2 := gist.masterClient.DescribeIndex(&describeIndexReq) if err2 != nil { return err2 } @@ -1521,14 +1690,14 @@ func (dipt *GetIndexStateTask) Execute() error { matchIndexID := int64(-1) foundIndexID := false for _, desc := range indexDescriptionResp.IndexDescriptions { - if desc.IndexName == dipt.IndexName { + if desc.IndexName == gist.IndexName { matchIndexID = desc.IndexID foundIndexID = true break } } if !foundIndexID { - return errors.New(fmt.Sprint("Can't found IndexID for indexName", dipt.IndexName)) + return errors.New(fmt.Sprint("Can't found IndexID for indexName", gist.IndexName)) } var allSegmentIDs []UniqueID @@ -1536,14 +1705,14 @@ func (dipt *GetIndexStateTask) Execute() error { showSegmentsRequest := &milvuspb.ShowSegmentRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kShowSegment, - MsgID: dipt.Base.MsgID, - Timestamp: dipt.Base.Timestamp, + MsgID: gist.Base.MsgID, + Timestamp: gist.Base.Timestamp, SourceID: Params.ProxyID, }, CollectionID: collectionID, PartitionID: partitionID, } - segments, err := dipt.masterClient.ShowSegments(showSegmentsRequest) + segments, err := gist.masterClient.ShowSegments(showSegmentsRequest) if err != nil { return err } @@ -1561,14 +1730,14 @@ func (dipt *GetIndexStateTask) Execute() error { describeSegmentRequest := &milvuspb.DescribeSegmentRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kDescribeSegment, - MsgID: dipt.Base.MsgID, - Timestamp: dipt.Base.Timestamp, + MsgID: gist.Base.MsgID, + Timestamp: gist.Base.Timestamp, SourceID: Params.ProxyID, }, CollectionID: collectionID, SegmentID: segmentID, } - segmentDesc, err := dipt.masterClient.DescribeSegment(describeSegmentRequest) + segmentDesc, err := gist.masterClient.DescribeSegment(describeSegmentRequest) if err != nil { return err } @@ -1579,7 +1748,7 @@ func (dipt *GetIndexStateTask) Execute() error { log.Println("GetIndexState:: len of allSegmentIDs:", len(allSegmentIDs), " len of IndexBuildIDs", len(getIndexStatesRequest.IndexBuildIDs)) if len(allSegmentIDs) != len(getIndexStatesRequest.IndexBuildIDs) { - dipt.result = &milvuspb.IndexStateResponse{ + gist.result = &milvuspb.IndexStateResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, Reason: "", @@ -1589,13 +1758,13 @@ func (dipt *GetIndexStateTask) Execute() error { return err } - states, err := dipt.indexServiceClient.GetIndexStates(getIndexStatesRequest) + states, err := gist.indexServiceClient.GetIndexStates(getIndexStatesRequest) if err != nil { return err } if states.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { - dipt.result = &milvuspb.IndexStateResponse{ + gist.result = &milvuspb.IndexStateResponse{ Status: states.Status, State: commonpb.IndexState_FAILED, } @@ -1605,7 +1774,7 @@ func (dipt *GetIndexStateTask) Execute() error { for _, state := range states.States { if state.State != commonpb.IndexState_FINISHED { - dipt.result = &milvuspb.IndexStateResponse{ + gist.result = &milvuspb.IndexStateResponse{ Status: states.Status, State: state.State, } @@ -1614,7 +1783,7 @@ func (dipt *GetIndexStateTask) Execute() error { } } - dipt.result = &milvuspb.IndexStateResponse{ + gist.result = &milvuspb.IndexStateResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, Reason: "", @@ -1625,20 +1794,20 @@ func (dipt *GetIndexStateTask) Execute() error { return nil } -func (dipt *GetIndexStateTask) PostExecute() error { +func (gist *GetIndexStateTask) PostExecute(ctx context.Context) error { return nil } type FlushTask struct { Condition *milvuspb.FlushRequest + ctx context.Context dataServiceClient DataServiceClient result *commonpb.Status } -func (ft *FlushTask) OnEnqueue() error { - ft.Base = &commonpb.MsgBase{} - return nil +func (ft *FlushTask) Ctx() context.Context { + return ft.ctx } func (ft *FlushTask) ID() UniqueID { @@ -1649,6 +1818,10 @@ func (ft *FlushTask) SetID(uid UniqueID) { ft.Base.MsgID = uid } +func (ft *FlushTask) Name() string { + return FlushTaskName +} + func (ft *FlushTask) Type() commonpb.MsgType { return ft.Base.MsgType } @@ -1665,13 +1838,18 @@ func (ft *FlushTask) SetTs(ts Timestamp) { ft.Base.Timestamp = ts } -func (ft *FlushTask) PreExecute() error { +func (ft *FlushTask) OnEnqueue() error { + ft.Base = &commonpb.MsgBase{} + return nil +} + +func (ft *FlushTask) PreExecute(ctx context.Context) error { ft.Base.MsgType = commonpb.MsgType_kFlush ft.Base.SourceID = Params.ProxyID return nil } -func (ft *FlushTask) Execute() error { +func (ft *FlushTask) Execute(ctx context.Context) error { for _, collName := range ft.CollectionNames { collID, err := globalMetaCache.GetCollectionID(collName) if err != nil { @@ -1702,20 +1880,20 @@ func (ft *FlushTask) Execute() error { return nil } -func (ft *FlushTask) PostExecute() error { +func (ft *FlushTask) PostExecute(ctx context.Context) error { return nil } type LoadCollectionTask struct { Condition *milvuspb.LoadCollectionRequest + ctx context.Context queryserviceClient QueryServiceClient result *commonpb.Status } -func (lct *LoadCollectionTask) OnEnqueue() error { - lct.Base = &commonpb.MsgBase{} - return nil +func (lct *LoadCollectionTask) Ctx() context.Context { + return lct.ctx } func (lct *LoadCollectionTask) ID() UniqueID { @@ -1726,6 +1904,10 @@ func (lct *LoadCollectionTask) SetID(uid UniqueID) { lct.Base.MsgID = uid } +func (lct *LoadCollectionTask) Name() string { + return LoadCollectionTaskName +} + func (lct *LoadCollectionTask) Type() commonpb.MsgType { return lct.Base.MsgType } @@ -1742,7 +1924,12 @@ func (lct *LoadCollectionTask) SetTs(ts Timestamp) { lct.Base.Timestamp = ts } -func (lct *LoadCollectionTask) PreExecute() error { +func (lct *LoadCollectionTask) OnEnqueue() error { + lct.Base = &commonpb.MsgBase{} + return nil +} + +func (lct *LoadCollectionTask) PreExecute(ctx context.Context) error { lct.Base.MsgType = commonpb.MsgType_kLoadCollection lct.Base.SourceID = Params.ProxyID @@ -1755,7 +1942,7 @@ func (lct *LoadCollectionTask) PreExecute() error { return nil } -func (lct *LoadCollectionTask) Execute() (err error) { +func (lct *LoadCollectionTask) Execute(ctx context.Context) (err error) { collID, err := globalMetaCache.GetCollectionID(lct.CollectionName) if err != nil { return err @@ -1780,20 +1967,20 @@ func (lct *LoadCollectionTask) Execute() (err error) { return err } -func (lct *LoadCollectionTask) PostExecute() error { +func (lct *LoadCollectionTask) PostExecute(ctx context.Context) error { return nil } type ReleaseCollectionTask struct { Condition *milvuspb.ReleaseCollectionRequest + ctx context.Context queryserviceClient QueryServiceClient result *commonpb.Status } -func (rct *ReleaseCollectionTask) OnEnqueue() error { - rct.Base = &commonpb.MsgBase{} - return nil +func (rct *ReleaseCollectionTask) Ctx() context.Context { + return rct.ctx } func (rct *ReleaseCollectionTask) ID() UniqueID { @@ -1804,6 +1991,10 @@ func (rct *ReleaseCollectionTask) SetID(uid UniqueID) { rct.Base.MsgID = uid } +func (rct *ReleaseCollectionTask) Name() string { + return ReleaseCollectionTaskName +} + func (rct *ReleaseCollectionTask) Type() commonpb.MsgType { return rct.Base.MsgType } @@ -1820,7 +2011,12 @@ func (rct *ReleaseCollectionTask) SetTs(ts Timestamp) { rct.Base.Timestamp = ts } -func (rct *ReleaseCollectionTask) PreExecute() error { +func (rct *ReleaseCollectionTask) OnEnqueue() error { + rct.Base = &commonpb.MsgBase{} + return nil +} + +func (rct *ReleaseCollectionTask) PreExecute(ctx context.Context) error { rct.Base.MsgType = commonpb.MsgType_kReleaseCollection rct.Base.SourceID = Params.ProxyID @@ -1833,7 +2029,7 @@ func (rct *ReleaseCollectionTask) PreExecute() error { return nil } -func (rct *ReleaseCollectionTask) Execute() (err error) { +func (rct *ReleaseCollectionTask) Execute(ctx context.Context) (err error) { collID, err := globalMetaCache.GetCollectionID(rct.CollectionName) if err != nil { return err @@ -1852,22 +2048,18 @@ func (rct *ReleaseCollectionTask) Execute() (err error) { return err } -func (rct *ReleaseCollectionTask) PostExecute() error { +func (rct *ReleaseCollectionTask) PostExecute(ctx context.Context) error { return nil } type LoadPartitionTask struct { Condition *milvuspb.LoadPartitonRequest + ctx context.Context queryserviceClient QueryServiceClient result *commonpb.Status } -func (lpt *LoadPartitionTask) OnEnqueue() error { - lpt.Base = &commonpb.MsgBase{} - return nil -} - func (lpt *LoadPartitionTask) ID() UniqueID { return lpt.Base.MsgID } @@ -1876,6 +2068,10 @@ func (lpt *LoadPartitionTask) SetID(uid UniqueID) { lpt.Base.MsgID = uid } +func (lpt *LoadPartitionTask) Name() string { + return LoadPartitionTaskName +} + func (lpt *LoadPartitionTask) Type() commonpb.MsgType { return lpt.Base.MsgType } @@ -1892,7 +2088,12 @@ func (lpt *LoadPartitionTask) SetTs(ts Timestamp) { lpt.Base.Timestamp = ts } -func (lpt *LoadPartitionTask) PreExecute() error { +func (lpt *LoadPartitionTask) OnEnqueue() error { + lpt.Base = &commonpb.MsgBase{} + return nil +} + +func (lpt *LoadPartitionTask) PreExecute(ctx context.Context) error { lpt.Base.MsgType = commonpb.MsgType_kLoadPartition lpt.Base.SourceID = Params.ProxyID @@ -1905,7 +2106,7 @@ func (lpt *LoadPartitionTask) PreExecute() error { return nil } -func (lpt *LoadPartitionTask) Execute() (err error) { +func (lpt *LoadPartitionTask) Execute(ctx context.Context) error { var partitionIDs []int64 collID, err := globalMetaCache.GetCollectionID(lpt.CollectionName) if err != nil { @@ -1938,20 +2139,20 @@ func (lpt *LoadPartitionTask) Execute() (err error) { return err } -func (lpt *LoadPartitionTask) PostExecute() error { +func (lpt *LoadPartitionTask) PostExecute(ctx context.Context) error { return nil } type ReleasePartitionTask struct { Condition *milvuspb.ReleasePartitionRequest + ctx context.Context queryserviceClient QueryServiceClient result *commonpb.Status } -func (rpt *ReleasePartitionTask) OnEnqueue() error { - rpt.Base = &commonpb.MsgBase{} - return nil +func (rpt *ReleasePartitionTask) Ctx() context.Context { + return rpt.ctx } func (rpt *ReleasePartitionTask) ID() UniqueID { @@ -1966,6 +2167,10 @@ func (rpt *ReleasePartitionTask) Type() commonpb.MsgType { return rpt.Base.MsgType } +func (rpt *ReleasePartitionTask) Name() string { + return ReleasePartitionTaskName +} + func (rpt *ReleasePartitionTask) BeginTs() Timestamp { return rpt.Base.Timestamp } @@ -1978,7 +2183,12 @@ func (rpt *ReleasePartitionTask) SetTs(ts Timestamp) { rpt.Base.Timestamp = ts } -func (rpt *ReleasePartitionTask) PreExecute() error { +func (rpt *ReleasePartitionTask) OnEnqueue() error { + rpt.Base = &commonpb.MsgBase{} + return nil +} + +func (rpt *ReleasePartitionTask) PreExecute(ctx context.Context) error { rpt.Base.MsgType = commonpb.MsgType_kReleasePartition rpt.Base.SourceID = Params.ProxyID @@ -1991,7 +2201,7 @@ func (rpt *ReleasePartitionTask) PreExecute() error { return nil } -func (rpt *ReleasePartitionTask) Execute() (err error) { +func (rpt *ReleasePartitionTask) Execute(ctx context.Context) (err error) { var partitionIDs []int64 collID, err := globalMetaCache.GetCollectionID(rpt.CollectionName) if err != nil { @@ -2019,6 +2229,6 @@ func (rpt *ReleasePartitionTask) Execute() (err error) { return err } -func (rpt *ReleasePartitionTask) PostExecute() error { +func (rpt *ReleasePartitionTask) PostExecute(ctx context.Context) error { return nil } diff --git a/internal/proxynode/task_scheduler.go b/internal/proxynode/task_scheduler.go index fdfab974bd34efafcff1da5f8f107253d8f8c2d9..6347bdc95c2bce9365e9cce4974a589dccf468b5 100644 --- a/internal/proxynode/task_scheduler.go +++ b/internal/proxynode/task_scheduler.go @@ -9,10 +9,12 @@ import ( "strconv" "sync" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" - + "github.com/opentracing/opentracing-go" + oplog "github.com/opentracing/opentracing-go/log" "github.com/zilliztech/milvus-distributed/internal/allocator" "github.com/zilliztech/milvus-distributed/internal/msgstream" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + "github.com/zilliztech/milvus-distributed/internal/util/trace" ) type TaskQueue interface { @@ -295,31 +297,42 @@ func (sched *TaskScheduler) getTaskByReqID(collMeta UniqueID) task { } func (sched *TaskScheduler) processTask(t task, q TaskQueue) { - - err := t.PreExecute() + span, ctx := trace.StartSpanFromContext(t.Ctx(), + opentracing.Tags{ + "Type": t.Name(), + "ID": t.ID(), + }) + defer span.Finish() + span.LogFields(oplog.Int64("scheduler process PreExecute", t.ID())) + err := t.PreExecute(ctx) defer func() { t.Notify(err) // log.Printf("notify with error: %v", err) }() if err != nil { + trace.LogError(span, err) return } + span.LogFields(oplog.Int64("scheduler process AddActiveTask", t.ID())) q.AddActiveTask(t) + // log.Printf("task add to active list ...") defer func() { + span.LogFields(oplog.Int64("scheduler process PopActiveTask", t.ID())) q.PopActiveTask(t.EndTs()) // log.Printf("pop from active list ...") }() - - err = t.Execute() + span.LogFields(oplog.Int64("scheduler process Execute", t.ID())) + err = t.Execute(ctx) if err != nil { - log.Printf("execute definition task failed, error = %v", err) + trace.LogError(span, err) return } // log.Printf("task execution done ...") - err = t.PostExecute() + span.LogFields(oplog.Int64("scheduler process PostExecute", t.ID())) + err = t.PostExecute(ctx) // log.Printf("post execute task done ...") } diff --git a/internal/proxyservice/impl.go b/internal/proxyservice/impl.go index 6a70511d8b00f44394e1e2bc93a300963d111b69..21a5996ceef9616e6ed5dbfa883d637f6914f7ef 100644 --- a/internal/proxyservice/impl.go +++ b/internal/proxyservice/impl.go @@ -199,6 +199,7 @@ func (s *ServiceImpl) RegisterLink() (*milvuspb.RegisterLinkResponse, error) { defer cancel() t := &RegisterLinkTask{ + ctx: ctx, Condition: NewTaskCondition(ctx), nodeInfos: s.nodeInfos, } @@ -236,6 +237,7 @@ func (s *ServiceImpl) RegisterNode(request *proxypb.RegisterNodeRequest) (*proxy defer cancel() t := &RegisterNodeTask{ + ctx: ctx, request: request, startParams: s.nodeStartParams, Condition: NewTaskCondition(ctx), @@ -276,6 +278,7 @@ func (s *ServiceImpl) InvalidateCollectionMetaCache(request *proxypb.InvalidateC defer cancel() t := &InvalidateCollectionMetaCacheTask{ + ctx: ctx, request: request, Condition: NewTaskCondition(ctx), nodeInfos: s.nodeInfos, diff --git a/internal/proxyservice/task.go b/internal/proxyservice/task.go index 68e977234ec4d980ac4f92b59b0a687b30ff24ac..b6870e0057ba1802f92345cdb42fac7c9fcfdbbd 100644 --- a/internal/proxyservice/task.go +++ b/internal/proxyservice/task.go @@ -18,10 +18,19 @@ const ( FromNode TaskEnum = 2 ) +const ( + RegisterLinkTaskName = "RegisLinkTask" + RegisterNodeTaskName = "RegisNodeTask" + InvalidateCollectionMetaCacheTaskName = "InvalidateCollectionMetaCacheTask" +) + type task interface { - PreExecute() error - Execute() error - PostExecute() error + Ctx() context.Context + ID() UniqueID // return ReqID + Name() string + PreExecute(ctx context.Context) error + Execute(ctx context.Context) error + PostExecute(ctx context.Context) error WaitToFinish() error Notify(err error) } @@ -58,15 +67,28 @@ func NewTaskCondition(ctx context.Context) Condition { type RegisterLinkTask struct { Condition + ctx context.Context response *milvuspb.RegisterLinkResponse nodeInfos *GlobalNodeInfoTable } -func (t *RegisterLinkTask) PreExecute() error { +func (t *RegisterLinkTask) Ctx() context.Context { + return t.ctx +} + +func (t *RegisterLinkTask) ID() UniqueID { + return 0 +} + +func (t *RegisterLinkTask) Name() string { + return RegisterLinkTaskName +} + +func (t *RegisterLinkTask) PreExecute(ctx context.Context) error { return nil } -func (t *RegisterLinkTask) Execute() error { +func (t *RegisterLinkTask) Execute(ctx context.Context) error { info, err := t.nodeInfos.Pick() if err != nil { return err @@ -84,12 +106,13 @@ func (t *RegisterLinkTask) Execute() error { return nil } -func (t *RegisterLinkTask) PostExecute() error { +func (t *RegisterLinkTask) PostExecute(ctx context.Context) error { return nil } type RegisterNodeTask struct { Condition + ctx context.Context request *proxypb.RegisterNodeRequest response *proxypb.RegisterNodeResponse startParams []*commonpb.KeyValuePair @@ -97,11 +120,23 @@ type RegisterNodeTask struct { nodeInfos *GlobalNodeInfoTable } -func (t *RegisterNodeTask) PreExecute() error { +func (t *RegisterNodeTask) Ctx() context.Context { + return t.ctx +} + +func (t *RegisterNodeTask) ID() UniqueID { + return t.request.Base.MsgID +} + +func (t *RegisterNodeTask) Name() string { + return RegisterNodeTaskName +} + +func (t *RegisterNodeTask) PreExecute(ctx context.Context) error { return nil } -func (t *RegisterNodeTask) Execute() error { +func (t *RegisterNodeTask) Execute(ctx context.Context) error { nodeID := t.allocator.AllocOne() info := NodeInfo{ ip: t.request.Address.Ip, @@ -122,22 +157,35 @@ func (t *RegisterNodeTask) Execute() error { return err } -func (t *RegisterNodeTask) PostExecute() error { +func (t *RegisterNodeTask) PostExecute(ctx context.Context) error { return nil } type InvalidateCollectionMetaCacheTask struct { Condition + ctx context.Context request *proxypb.InvalidateCollMetaCacheRequest response *commonpb.Status nodeInfos *GlobalNodeInfoTable } -func (t *InvalidateCollectionMetaCacheTask) PreExecute() error { +func (t *InvalidateCollectionMetaCacheTask) Ctx() context.Context { + return t.ctx +} + +func (t *InvalidateCollectionMetaCacheTask) ID() UniqueID { + return t.request.Base.MsgID +} + +func (t *InvalidateCollectionMetaCacheTask) Name() string { + return InvalidateCollectionMetaCacheTaskName +} + +func (t *InvalidateCollectionMetaCacheTask) PreExecute(ctx context.Context) error { return nil } -func (t *InvalidateCollectionMetaCacheTask) Execute() error { +func (t *InvalidateCollectionMetaCacheTask) Execute(ctx context.Context) error { var err error clients, err := t.nodeInfos.ObtainAllClients() if err != nil { @@ -158,6 +206,6 @@ func (t *InvalidateCollectionMetaCacheTask) Execute() error { return nil } -func (t *InvalidateCollectionMetaCacheTask) PostExecute() error { +func (t *InvalidateCollectionMetaCacheTask) PostExecute(ctx context.Context) error { return nil } diff --git a/internal/proxyservice/task_scheduler.go b/internal/proxyservice/task_scheduler.go index 1194c918c1c2c5ed585d8eaf2adc6d0bb3986ee1..1ebaf57b287c6afad1f17a82a48fb441c9aaa593 100644 --- a/internal/proxyservice/task_scheduler.go +++ b/internal/proxyservice/task_scheduler.go @@ -3,6 +3,10 @@ package proxyservice import ( "context" "sync" + + "github.com/opentracing/opentracing-go" + oplog "github.com/opentracing/opentracing-go/log" + "github.com/zilliztech/milvus-distributed/internal/util/trace" ) type TaskScheduler struct { @@ -40,21 +44,30 @@ func (sched *TaskScheduler) scheduleInvalidateCollectionMetaCacheTask() task { } func (sched *TaskScheduler) processTask(t task, q TaskQueue) { - var err error - err = t.PreExecute() + span, ctx := trace.StartSpanFromContext(t.Ctx(), + opentracing.Tags{ + "Type": t.Name(), + }) + defer span.Finish() + span.LogFields(oplog.String("scheduler process PreExecute", t.Name())) + err := t.PreExecute(ctx) defer func() { + trace.LogError(span, err) t.Notify(err) }() if err != nil { return } - err = t.Execute() + span.LogFields(oplog.String("scheduler process Execute", t.Name())) + err = t.Execute(ctx) if err != nil { + trace.LogError(span, err) return } - err = t.PostExecute() + span.LogFields(oplog.String("scheduler process PostExecute", t.Name())) + err = t.PostExecute(ctx) } func (sched *TaskScheduler) registerLinkLoop() { diff --git a/internal/util/trace/util.go b/internal/util/trace/util.go new file mode 100644 index 0000000000000000000000000000000000000000..2234f5c04a890b5dd57e4543d2d77f0b9b051951 --- /dev/null +++ b/internal/util/trace/util.go @@ -0,0 +1,178 @@ +package trace + +import ( + "context" + "errors" + "runtime" + "strings" + + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" + "github.com/opentracing/opentracing-go/log" + "github.com/uber/jaeger-client-go" + "github.com/zilliztech/milvus-distributed/internal/msgstream" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" +) + +func StartSpanFromContext(ctx context.Context, opts ...opentracing.StartSpanOption) (opentracing.Span, context.Context) { + if ctx == nil { + panic("StartSpanFromContext called with nil context") + } + + var pcs [1]uintptr + n := runtime.Callers(2, pcs[:]) + if n < 1 { + span, ctx := opentracing.StartSpanFromContext(ctx, "unknown", opts...) + span.LogFields(log.Error(errors.New("runtime.Callers failed"))) + return span, ctx + } + fn := runtime.FuncForPC(pcs[0]) + name := fn.Name() + if lastSlash := strings.LastIndexByte(name, '/'); lastSlash > 0 { + name = name[lastSlash+1:] + } + + if parent := opentracing.SpanFromContext(ctx); parent != nil { + opts = append(opts, opentracing.ChildOf(parent.Context())) + } + span := opentracing.StartSpan(name, opts...) + + file, line := fn.FileLine(pcs[0]) + span.LogFields(log.String("filename", file), log.Int("line", line)) + + return span, opentracing.ContextWithSpan(ctx, span) +} + +func StartSpanFromContextWithOperationName(ctx context.Context, operationName string, opts ...opentracing.StartSpanOption) (opentracing.Span, context.Context) { + if ctx == nil { + panic("StartSpanFromContextWithOperationName called with nil context") + } + + var pcs [1]uintptr + n := runtime.Callers(2, pcs[:]) + if n < 1 { + span, ctx := opentracing.StartSpanFromContext(ctx, operationName, opts...) + span.LogFields(log.Error(errors.New("runtime.Callers failed"))) + return span, ctx + } + file, line := runtime.FuncForPC(pcs[0]).FileLine(pcs[0]) + + if parentSpan := opentracing.SpanFromContext(ctx); parentSpan != nil { + opts = append(opts, opentracing.ChildOf(parentSpan.Context())) + } + span := opentracing.StartSpan(operationName, opts...) + ctx = opentracing.ContextWithSpan(ctx, span) + + span.LogFields(log.String("filename", file), log.Int("line", line)) + + return span, ctx +} + +func LogError(span opentracing.Span, err error) error { + if err == nil { + return nil + } + + // Get caller frame. + var pcs [1]uintptr + n := runtime.Callers(2, pcs[:]) + if n < 1 { + span.LogFields(log.Error(err)) + span.LogFields(log.Error(errors.New("runtime.Callers failed"))) + return err + } + + file, line := runtime.FuncForPC(pcs[0]).FileLine(pcs[0]) + span.LogFields(log.String("filename", file), log.Int("line", line), log.Error(err)) + + return err +} + +func InfoFromSpan(span opentracing.Span) (traceID string, sampled bool, found bool) { + if spanContext, ok := span.Context().(jaeger.SpanContext); ok { + traceID = spanContext.TraceID().String() + sampled = spanContext.IsSampled() + return traceID, sampled, true + } + return "", false, false +} + +func InfoFromContext(ctx context.Context) (traceID string, sampled bool, found bool) { + if span := opentracing.SpanFromContext(ctx); span != nil { + return InfoFromSpan(span) + } + return "", false, false +} + +func InjectContextToPulsarMsgProperties(sc opentracing.SpanContext, properties map[string]string) { + tracer := opentracing.GlobalTracer() + tracer.Inject(sc, opentracing.TextMap, propertiesReaderWriter{properties}) +} + +func ExtractFromPulsarMsgProperties(msg msgstream.TsMsg, properties map[string]string) opentracing.Span { + if !allowTrace(msg) { + return noopSpan() + } + tracer := opentracing.GlobalTracer() + sc, _ := tracer.Extract(opentracing.TextMap, propertiesReaderWriter{properties}) + name := "receive pulsar msg" + opts := []opentracing.StartSpanOption{ + ext.RPCServerOption(sc), + opentracing.Tags{ + "ID": msg.ID(), + "Type": msg.Type(), + "HashKeys": msg.HashKeys(), + "Position": msg.Position(), + }} + return opentracing.StartSpan(name, opts...) +} + +func MsgSpanFromCtx(ctx context.Context, msg msgstream.TsMsg, opts ...opentracing.StartSpanOption) (opentracing.Span, context.Context) { + if !allowTrace(msg) { + return noopSpan(), ctx + } + name := "send pulsar msg" + opts = append(opts, opentracing.Tags{ + "ID": msg.ID(), + "Type": msg.Type(), + "HashKeys": msg.HashKeys(), + "Position": msg.Position(), + }) + return StartSpanFromContextWithOperationName(ctx, name, opts...) +} + +type propertiesReaderWriter struct { + ppMap map[string]string +} + +func (ppRW propertiesReaderWriter) Set(key, val string) { + key = strings.ToLower(key) + ppRW.ppMap[key] = val +} + +func (ppRW propertiesReaderWriter) ForeachKey(handler func(key, val string) error) error { + for k, val := range ppRW.ppMap { + if err := handler(k, val); err != nil { + return err + } + } + return nil +} + +func allowTrace(in interface{}) bool { + if in == nil { + return false + } + switch res := in.(type) { + case msgstream.TsMsg: + return !(res.Type() == commonpb.MsgType_kTimeTick || + res.Type() == commonpb.MsgType_kQueryNodeStats || + res.Type() == commonpb.MsgType_kLoadIndex) + default: + return false + } +} + +func noopSpan() opentracing.Span { + return opentracing.NoopTracer{}.StartSpan("Default-span") +} diff --git a/internal/util/trace/util_test.go b/internal/util/trace/util_test.go new file mode 100644 index 0000000000000000000000000000000000000000..cd5f343d0006253ce22a5bb99342e787627eeace --- /dev/null +++ b/internal/util/trace/util_test.go @@ -0,0 +1,86 @@ +package trace + +import ( + "context" + "errors" + "fmt" + "io" + "testing" + + "github.com/opentracing/opentracing-go" + oplog "github.com/opentracing/opentracing-go/log" + "github.com/uber/jaeger-client-go/config" +) + +func InitTracing() io.Closer { + cfg := &config.Configuration{ + ServiceName: "test", + Sampler: &config.SamplerConfig{ + Type: "const", + Param: 1, + }, + } + tracer, closer, err := cfg.NewTracer() + if err != nil { + panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) + } + opentracing.SetGlobalTracer(tracer) + return closer +} + +type simpleStruct struct { + name string + value string +} + +func TestTracing(t *testing.T) { + //Already Init in each framework, this can be ignored in debug + closer := InitTracing() + defer closer.Close() + + // context normally can be propagated through func params + ctx := context.Background() + + //start span + //default use function name for operation name + sp, ctx := StartSpanFromContext(ctx) + sp.SetTag("tag1", "tag1") + // use self-defined operation name for span + // sp, ctx := StartSpanFromContextWithOperationName(ctx, "self-defined name") + defer sp.Finish() + + ss := &simpleStruct{ + name: "name", + value: "value", + } + sp.LogFields(oplog.String("key", "value"), oplog.Object("key", ss)) + + err := caller(ctx) + + if err != nil { + LogError(sp, err) //LogError do something error log in trace and returns origin error. + } + +} + +func caller(ctx context.Context) error { + for i := 0; i < 2; i++ { + // if span starts in a loop, defer is not allowed. + // manually call span.Finish() if error occurs or one loop ends + sp, _ := StartSpanFromContextWithOperationName(ctx, fmt.Sprintf("test:%d", i)) + sp.SetTag(fmt.Sprintf("tags:%d", i), fmt.Sprintf("tags:%d", i)) + + var err error + if i == 1 { + err = errors.New("test") + } + + if err != nil { + sp.Finish() + return LogError(sp, err) + } + + sp.Finish() + } + return nil +}