diff --git a/.jenkins/modules/Publish/Publish.groovy b/.jenkins/modules/Publish/Publish.groovy
index 608de67f81797d90c9623d01b1febd51e5aeddc7..e60bf34d2831dbf5bfa45b0c4af3b2fd8340709c 100644
--- a/.jenkins/modules/Publish/Publish.groovy
+++ b/.jenkins/modules/Publish/Publish.groovy
@@ -25,6 +25,11 @@ dir ('build/docker/deploy') {
sh 'docker pull ${SOURCE_REPO}/querynode:${SOURCE_TAG} || true'
sh 'docker-compose build --force-rm querynode'
sh 'docker-compose push querynode'
+
+ sh 'docker pull registry.zilliz.com/milvus-distributed/milvus-distributed-dev:latest || true'
+ sh 'docker pull ${SOURCE_REPO}/writenode:${SOURCE_TAG} || true'
+ sh 'docker-compose build --force-rm writenode'
+ sh 'docker-compose push writenode'
}
} catch (exc) {
throw exc
diff --git a/.jenkins/modules/Regression/PythonRegression.groovy b/.jenkins/modules/Regression/PythonRegression.groovy
index 3c702a3bf2caa9de2597e35574e3d0c5ee59c5b6..cb7d5a06fe9f7659dcced73c1ed525d3455aba55 100644
--- a/.jenkins/modules/Regression/PythonRegression.groovy
+++ b/.jenkins/modules/Regression/PythonRegression.groovy
@@ -9,6 +9,7 @@ try {
sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} up -d proxy'
sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} run -e QUERY_NODE_ID=1 -d querynode'
sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} run -e QUERY_NODE_ID=2 -d querynode'
+ sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} run -e WRITE_NODE_ID=3 -d writenode'
}
dir ('build/docker/test') {
diff --git a/build/docker/deploy/docker-compose.yml b/build/docker/deploy/docker-compose.yml
index ee564c66b102ac2e349e9addf86d935550c8f2d0..045924ad24bd616066c545d67ab382442add8855 100644
--- a/build/docker/deploy/docker-compose.yml
+++ b/build/docker/deploy/docker-compose.yml
@@ -42,5 +42,20 @@ services:
networks:
- milvus
+ writenode:
+ image: ${TARGET_REPO}/writenode:${TARGET_TAG}
+ build:
+ context: ../../../
+ dockerfile: build/docker/deploy/writenode/DockerFile
+ cache_from:
+ - ${SOURCE_REPO}/writenode:${SOURCE_TAG}
+ environment:
+ PULSAR_ADDRESS: ${PULSAR_ADDRESS}
+ ETCD_ADDRESS: ${ETCD_ADDRESS}
+ MASTER_ADDRESS: ${MASTER_ADDRESS}
+ MINIO_ADDRESS: ${MINIO_ADDRESS}
+ networks:
+ - milvus
+
networks:
milvus:
diff --git a/build/docker/deploy/writenode/DockerFile b/build/docker/deploy/writenode/DockerFile
new file mode 100644
index 0000000000000000000000000000000000000000..6b16d984185a31ff419750fb49a6ef736cb4ef71
--- /dev/null
+++ b/build/docker/deploy/writenode/DockerFile
@@ -0,0 +1,39 @@
+# Copyright (C) 2019-2020 Zilliz. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License
+# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+# or implied. See the License for the specific language governing permissions and limitations under the License.
+
+FROM milvusdb/milvus-distributed-dev:amd64-ubuntu18.04-latest AS openblas
+
+#FROM alpine
+FROM ubuntu:bionic-20200921
+
+RUN apt-get update && apt-get install -y --no-install-recommends libtbb-dev gfortran
+
+#RUN echo "http://dl-cdn.alpinelinux.org/alpine/edge/testing" >> /etc/apk/repositories
+
+#RUN sed -i "s/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g" /etc/apk/repositories \
+# && apk add --no-cache libtbb gfortran
+
+COPY --from=openblas /usr/lib/libopenblas-r0.3.9.so /usr/lib/
+
+RUN ln -s /usr/lib/libopenblas-r0.3.9.so /usr/lib/libopenblas.so.0 && \
+ ln -s /usr/lib/libopenblas.so.0 /usr/lib/libopenblas.so
+
+COPY ./bin/writenode /milvus-distributed/bin/writenode
+
+COPY ./configs/ /milvus-distributed/configs/
+
+COPY ./lib/ /milvus-distributed/lib/
+
+ENV LD_LIBRARY_PATH=/milvus-distributed/lib:$LD_LIBRARY_PATH:/usr/lib
+
+WORKDIR /milvus-distributed/
+
+CMD ["./bin/writenode"]
diff --git a/internal/indexbuilder/task.go b/internal/indexbuilder/task.go
index 17037ca741bab0cd55579bb9e00ab791017200cb..0c92b153e3cb2956fdda7000cc2e6fa7e2d3e66b 100644
--- a/internal/indexbuilder/task.go
+++ b/internal/indexbuilder/task.go
@@ -88,6 +88,7 @@ func (it *IndexAddTask) Execute() error {
t.table = it.table
t.indexID = it.indexID
t.kv = it.kv
+ t.req = it.req
var cancel func()
t.ctx, cancel = context.WithTimeout(it.ctx, reqTimeoutInterval)
defer cancel()
@@ -121,7 +122,7 @@ type IndexBuildTask struct {
indexID UniqueID
kv kv.Base
savePaths []string
- indexMeta *indexbuilderpb.IndexMeta
+ req *indexbuilderpb.BuildIndexRequest
}
func newIndexBuildTask() *IndexBuildTask {
@@ -151,7 +152,7 @@ func (it *IndexBuildTask) Execute() error {
}
typeParams := make(map[string]string)
- for _, kvPair := range it.indexMeta.Req.GetTypeParams() {
+ for _, kvPair := range it.req.GetTypeParams() {
key, value := kvPair.GetKey(), kvPair.GetValue()
_, ok := typeParams[key]
if ok {
@@ -161,7 +162,7 @@ func (it *IndexBuildTask) Execute() error {
}
indexParams := make(map[string]string)
- for _, kvPair := range it.indexMeta.Req.GetIndexParams() {
+ for _, kvPair := range it.req.GetIndexParams() {
key, value := kvPair.GetKey(), kvPair.GetValue()
_, ok := indexParams[key]
if ok {
@@ -201,7 +202,7 @@ func (it *IndexBuildTask) Execute() error {
return blobs
}
- toLoadDataPaths := it.indexMeta.Req.GetDataPaths()
+ toLoadDataPaths := it.req.GetDataPaths()
keys := make([]string, 0)
blobs := make([]*Blob, 0)
for _, path := range toLoadDataPaths {
diff --git a/internal/master/master.go b/internal/master/master.go
index 90093cc22ecfbacc36dd89baf82aaa3dab55b285..5476a7c8dc177e249c6cca4103a264f8c83d1e17 100644
--- a/internal/master/master.go
+++ b/internal/master/master.go
@@ -181,7 +181,7 @@ func CreateServer(ctx context.Context) (*Master, error) {
m.scheduler.SetDDMsgStream(pulsarDDStream)
m.scheduler.SetIDAllocator(func() (UniqueID, error) { return m.idAllocator.AllocOne() })
- flushClient, err := writerclient.NewWriterClient(Params.EtcdAddress, kvRootPath, Params.WriteNodeSegKvSubPath, pulsarDDStream)
+ flushClient, err := writerclient.NewWriterClient(Params.EtcdAddress, Params.MetaRootPath, Params.WriteNodeSegKvSubPath, pulsarDDStream)
if err != nil {
return nil, err
}
diff --git a/internal/storage/print_binlog.go b/internal/storage/print_binlog.go
index f8eb92e21e4ec4141baa34231b5872ddd2db1f5e..a28edd28f5aa4e0d673ef16fc7304d81f25c91c6 100644
--- a/internal/storage/print_binlog.go
+++ b/internal/storage/print_binlog.go
@@ -310,32 +310,28 @@ func printDDLPayloadValues(eventType EventTypeCode, colType schemapb.DataType, r
switch eventType {
case CreateCollectionEventType:
var req internalpb.CreateCollectionRequest
- if err := proto.Unmarshal(([]byte)(val), &req); err != nil {
+ if err := proto.UnmarshalText(val, &req); err != nil {
return err
}
- msg := proto.MarshalTextString(&req)
- fmt.Printf("\t\t%d : create collection: %s\n", i, msg)
+ fmt.Printf("\t\t%d : create collection: %v\n", i, req)
case DropCollectionEventType:
- var req internalpb.DropPartitionRequest
- if err := proto.Unmarshal(([]byte)(val), &req); err != nil {
+ var req internalpb.DropCollectionRequest
+ if err := proto.UnmarshalText(val, &req); err != nil {
return err
}
- msg := proto.MarshalTextString(&req)
- fmt.Printf("\t\t%d : drop collection: %s\n", i, msg)
+ fmt.Printf("\t\t%d : drop collection: %v\n", i, req)
case CreatePartitionEventType:
var req internalpb.CreatePartitionRequest
- if err := proto.Unmarshal(([]byte)(val), &req); err != nil {
+ if err := proto.UnmarshalText(val, &req); err != nil {
return err
}
- msg := proto.MarshalTextString(&req)
- fmt.Printf("\t\t%d : create partition: %s\n", i, msg)
+ fmt.Printf("\t\t%d : create partition: %v\n", i, req)
case DropPartitionEventType:
var req internalpb.DropPartitionRequest
- if err := proto.Unmarshal(([]byte)(val), &req); err != nil {
+ if err := proto.UnmarshalText(val, &req); err != nil {
return err
}
- msg := proto.MarshalTextString(&req)
- fmt.Printf("\t\t%d : drop partition: %s\n", i, msg)
+ fmt.Printf("\t\t%d : drop partition: %v\n", i, req)
default:
return errors.Errorf("undefined ddl event type %d", eventType)
}
diff --git a/internal/writenode/flow_graph_insert_buffer_node.go b/internal/writenode/flow_graph_insert_buffer_node.go
index bf6180d269e4f93caeafccf56e8ae43209855c8e..0c001ee55d802fc3de1316de49679ce99a986e76 100644
--- a/internal/writenode/flow_graph_insert_buffer_node.go
+++ b/internal/writenode/flow_graph_insert_buffer_node.go
@@ -542,15 +542,21 @@ func (ibNode *insertBufferNode) getMeta(segID UniqueID) (*etcdpb.SegmentMeta, *e
segMeta := &etcdpb.SegmentMeta{}
key := path.Join(SegmentPrefix, strconv.FormatInt(segID, 10))
- value, _ := ibNode.kvClient.Load(key)
- err := proto.UnmarshalText(value, segMeta)
+ value, err := ibNode.kvClient.Load(key)
+ if err != nil {
+ return nil, nil, err
+ }
+ err = proto.UnmarshalText(value, segMeta)
if err != nil {
return nil, nil, err
}
collMeta := &etcdpb.CollectionMeta{}
key = path.Join(CollectionPrefix, strconv.FormatInt(segMeta.GetCollectionID(), 10))
- value, _ = ibNode.kvClient.Load(key)
+ value, err = ibNode.kvClient.Load(key)
+ if err != nil {
+ return nil, nil, err
+ }
err = proto.UnmarshalText(value, collMeta)
if err != nil {
return nil, nil, err