Skip to content
Snippets Groups Projects
Commit 10c70d84 authored by daviszhen's avatar daviszhen Committed by Yingfeng Zhang
Browse files

Fix confusing error information when load data encounters the parsing error (#962)

parent 15e21473
No related branches found
No related tags found
No related merge requests found
......@@ -128,6 +128,11 @@ func newNotifyEvent(t notifyEventType,e error,w *WriteBatchHandler)*notifyEvent
}
}
type PoolElement struct {
bat *batch.Batch
lineArray [][]string
}
type ParseLineHandler struct {
SharePart
DebugTime
......@@ -140,7 +145,7 @@ type ParseLineHandler struct {
simdCsvConcurrencyCountOfWriteBatch int
//wait write routines to quit
simdCsvWaitWriteRoutineToQuit *sync.WaitGroup
simdCsvBatchPool chan *batch.Batch
simdCsvBatchPool chan *PoolElement
simdCsvNotiyEventChan chan *notifyEvent
closeOnce sync.Once
......@@ -152,6 +157,7 @@ type WriteBatchHandler struct {
DebugTime
batchData *batch.Batch
pl *PoolElement
batchFilled int
simdCsvErr error
......@@ -213,6 +219,7 @@ func (plh *ParseLineHandler) getLineOutFromSimdCsvRoutine() error {
//step 2 : append line into line array
plh.simdCsvLineArray[plh.lineIdx] = lineOut.Line
plh.lineIdx++
plh.lineCount++
plh.maxFieldCnt = Max(plh.maxFieldCnt,len(lineOut.Line))
//for _, ll := range lineOut.Line {
// plh.bytes += uint64(utf8.RuneCount([]byte(ll)))
......@@ -292,7 +299,7 @@ func (plh *ParseLineHandler) close() {
/*
alloc space for the batch
*/
func makeBatch(handler *ParseLineHandler) *batch.Batch {
func makeBatch(handler *ParseLineHandler) *PoolElement {
batchData := batch.New(true,handler.attrName)
//fmt.Printf("----- batchSize %d attrName %v \n",batchSize,handler.attrName)
......@@ -336,7 +343,10 @@ func makeBatch(handler *ParseLineHandler) *batch.Batch {
batchData.Vecs[i] = vec
}
return batchData
return &PoolElement{
bat : batchData,
lineArray: make([][]string,handler.batchSize),
}
}
/*
......@@ -397,7 +407,7 @@ func initParseLineHandler(handler *ParseLineHandler) error {
alloc a batch from the pool.
if the pool does not have batch anymore, the caller routine will be suspended.
*/
func allocBatch(handler *ParseLineHandler ) *batch.Batch{
func allocBatch(handler *ParseLineHandler ) *PoolElement{
batchData := <- handler.simdCsvBatchPool
return batchData
}
......@@ -405,10 +415,10 @@ func allocBatch(handler *ParseLineHandler ) *batch.Batch{
/*
return a batch into the pool
*/
func releaseBatch(handler *ParseLineHandler,bat *batch.Batch) {
func releaseBatch(handler *ParseLineHandler,pl *PoolElement) {
//clear batch
//clear vector.nulls.Nulls
for _, vec := range bat.Vecs {
for _, vec := range pl.bat.Vecs {
vec.Nsp = &nulls.Nulls{}
switch vec.Typ.Oid {
case types.T_char, types.T_varchar:
......@@ -416,7 +426,7 @@ func releaseBatch(handler *ParseLineHandler,bat *batch.Batch) {
vBytes.Data = vBytes.Data[:0]
}
}
handler.simdCsvBatchPool <- bat
handler.simdCsvBatchPool <- pl
}
/**
......@@ -435,7 +445,13 @@ func initWriteBatchHandler(handler *ParseLineHandler,wHandler *WriteBatchHandler
wHandler.closeRef = handler.closeRef
wHandler.lineCount = handler.lineCount
wHandler.batchData = allocBatch(handler)
wHandler.pl = allocBatch(handler)
wHandler.simdCsvLineArray = wHandler.pl.lineArray
for i := 0; i < handler.lineIdx; i++ {
wHandler.simdCsvLineArray[i] = handler.simdCsvLineArray[i]
}
wHandler.batchData = wHandler.pl.bat
return nil
}
......@@ -539,6 +555,11 @@ func rowToColumnAndSaveToStorage(handler *WriteBatchHandler, forceConvert bool)
//fmt.Printf("line %d %v \n",i,line)
//wait_a := time.Now()
rowIdx := batchBegin + i
offset := i + 1
base := handler.lineCount - uint64(fetchCnt)
//fmt.Println(line)
//fmt.Printf("------ linecount %d fetchcnt %d base %d offset %d \n",
// handler.lineCount,fetchCnt,base,offset)
//record missing column
for k := 0; k < len(columnFLags); k++ {
columnFLags[k] = 0
......@@ -581,7 +602,7 @@ func rowToColumnAndSaveToStorage(handler *WriteBatchHandler, forceConvert bool)
if err != nil {
logutil.Errorf("parse field[%v] err:%v", field, err)
if !ignoreFieldError {
return makeParsedFailedError(vec.Typ.String(),field,vecAttr,handler.lineCount,i)
return makeParsedFailedError(vec.Typ.String(),field,vecAttr,base,offset)
}
//mysql warning ER_TRUNCATED_WRONG_VALUE_FOR_FIELD
result.Warnings++
......@@ -599,7 +620,7 @@ func rowToColumnAndSaveToStorage(handler *WriteBatchHandler, forceConvert bool)
if err != nil {
logutil.Errorf("parse field[%v] err:%v", field, err)
if !ignoreFieldError {
return makeParsedFailedError(vec.Typ.String(),field,vecAttr,handler.lineCount,i)
return makeParsedFailedError(vec.Typ.String(),field,vecAttr,base,offset)
}
result.Warnings++
d = 0
......@@ -616,7 +637,7 @@ func rowToColumnAndSaveToStorage(handler *WriteBatchHandler, forceConvert bool)
if err != nil {
logutil.Errorf("parse field[%v] err:%v", field, err)
if !ignoreFieldError {
return makeParsedFailedError(vec.Typ.String(),field,vecAttr,handler.lineCount,i)
return makeParsedFailedError(vec.Typ.String(),field,vecAttr,base,offset)
}
result.Warnings++
d = 0
......@@ -633,7 +654,7 @@ func rowToColumnAndSaveToStorage(handler *WriteBatchHandler, forceConvert bool)
if err != nil {
logutil.Errorf("parse field[%v] err:%v", field, err)
if !ignoreFieldError {
return makeParsedFailedError(vec.Typ.String(),field,vecAttr,handler.lineCount,i)
return makeParsedFailedError(vec.Typ.String(),field,vecAttr,base,offset)
}
result.Warnings++
d = 0
......@@ -650,7 +671,7 @@ func rowToColumnAndSaveToStorage(handler *WriteBatchHandler, forceConvert bool)
if err != nil {
logutil.Errorf("parse field[%v] err:%v", field, err)
if !ignoreFieldError {
return makeParsedFailedError(vec.Typ.String(),field,vecAttr,handler.lineCount,i)
return makeParsedFailedError(vec.Typ.String(),field,vecAttr,base,offset)
}
result.Warnings++
d = 0
......@@ -667,7 +688,7 @@ func rowToColumnAndSaveToStorage(handler *WriteBatchHandler, forceConvert bool)
if err != nil {
logutil.Errorf("parse field[%v] err:%v", field, err)
if !ignoreFieldError {
return makeParsedFailedError(vec.Typ.String(),field,vecAttr,handler.lineCount,i)
return makeParsedFailedError(vec.Typ.String(),field,vecAttr,base,offset)
}
result.Warnings++
d = 0
......@@ -684,7 +705,7 @@ func rowToColumnAndSaveToStorage(handler *WriteBatchHandler, forceConvert bool)
if err != nil {
logutil.Errorf("parse field[%v] err:%v", field, err)
if !ignoreFieldError {
return makeParsedFailedError(vec.Typ.String(),field,vecAttr,handler.lineCount,i)
return makeParsedFailedError(vec.Typ.String(),field,vecAttr,base,offset)
}
result.Warnings++
d = 0
......@@ -701,7 +722,7 @@ func rowToColumnAndSaveToStorage(handler *WriteBatchHandler, forceConvert bool)
if err != nil {
logutil.Errorf("parse field[%v] err:%v", field, err)
if !ignoreFieldError {
return makeParsedFailedError(vec.Typ.String(),field,vecAttr,handler.lineCount,i)
return makeParsedFailedError(vec.Typ.String(),field,vecAttr,base,offset)
}
result.Warnings++
d = 0
......@@ -718,7 +739,7 @@ func rowToColumnAndSaveToStorage(handler *WriteBatchHandler, forceConvert bool)
if err != nil {
logutil.Errorf("parse field[%v] err:%v", field, err)
if !ignoreFieldError {
return makeParsedFailedError(vec.Typ.String(),field,vecAttr,handler.lineCount,i)
return makeParsedFailedError(vec.Typ.String(),field,vecAttr,base,offset)
}
result.Warnings++
d = 0
......@@ -737,7 +758,7 @@ func rowToColumnAndSaveToStorage(handler *WriteBatchHandler, forceConvert bool)
if err != nil {
logutil.Errorf("parse field[%v] err:%v", field, err)
if !ignoreFieldError {
return makeParsedFailedError(vec.Typ.String(),field,vecAttr,handler.lineCount,i)
return makeParsedFailedError(vec.Typ.String(),field,vecAttr,base,offset)
}
result.Warnings++
d = 0
......@@ -1071,7 +1092,7 @@ func rowToColumnAndSaveToStorage(handler *WriteBatchHandler, forceConvert bool)
fillBlank += time.Since(wait_b)
handler.choose_false += time.Since(wait_d)
}
handler.lineCount += uint64(fetchCnt)
handler.batchFilled = batchBegin + fetchCnt
//if handler.batchFilled == handler.batchSize {
......@@ -1239,7 +1260,6 @@ func saveLinesToStorage(handler *ParseLineHandler, force bool) error {
writeHandler := &WriteBatchHandler{
SharePart:SharePart{
lineIdx: handler.lineIdx,
simdCsvLineArray: handler.simdCsvLineArray[:handler.lineIdx],
maxFieldCnt: handler.maxFieldCnt,
},
}
......@@ -1257,8 +1277,9 @@ func saveLinesToStorage(handler *ParseLineHandler, force bool) error {
err = rowToColumnAndSaveToStorage(writeHandler,force)
writeHandler.simdCsvErr = err
releaseBatch(handler,writeHandler.batchData)
releaseBatch(handler, writeHandler.pl)
writeHandler.batchData = nil
writeHandler.simdCsvLineArray = nil
if err != nil {
handler.simdCsvNotiyEventChan <- newNotifyEvent(NOTIFY_EVENT_WRITE_BATCH_ERROR,err,writeHandler)
......@@ -1283,7 +1304,6 @@ func (mce *MysqlCmdExecutor) LoadLoop(load *tree.Load, dbHandler engine.Database
logutil.Errorf("loadLoop panic")
}
}()
var err error
ses := mce.routine.GetSession()
//begin:= time.Now()
......@@ -1331,7 +1351,7 @@ func (mce *MysqlCmdExecutor) LoadLoop(load *tree.Load, dbHandler engine.Database
handler.simdCsvConcurrencyCountOfWriteBatch = Min(int(ses.Pu.SV.GetLoadDataConcurrencyCount()),runtime.NumCPU())
handler.simdCsvConcurrencyCountOfWriteBatch = Max(1,handler.simdCsvConcurrencyCountOfWriteBatch)
handler.simdCsvBatchPool = make(chan *batch.Batch, handler.simdCsvConcurrencyCountOfWriteBatch)
handler.simdCsvBatchPool = make(chan *PoolElement, handler.simdCsvConcurrencyCountOfWriteBatch)
//fmt.Printf("-----write concurrent count %d \n",handler.simdCsvConcurrencyCountOfWriteBatch)
......@@ -1385,7 +1405,7 @@ func (mce *MysqlCmdExecutor) LoadLoop(load *tree.Load, dbHandler engine.Database
wg.Add(1)
go func() {
defer wg.Done()
err = handler.getLineOutFromSimdCsvRoutine()
err := handler.getLineOutFromSimdCsvRoutine()
if err != nil {
logutil.Errorf("get line from simdcsv failed. err:%v",err)
handler.simdCsvNotiyEventChan <- newNotifyEvent(NOTIFY_EVENT_OUTPUT_SIMDCSV_ERROR,err,nil)
......@@ -1400,7 +1420,7 @@ func (mce *MysqlCmdExecutor) LoadLoop(load *tree.Load, dbHandler engine.Database
defer wg.Done()
wait_b := time.Now()
err = handler.simdCsvReader.ReadLoop(handler.simdCsvGetParsedLinesChan)
err := handler.simdCsvReader.ReadLoop(handler.simdCsvGetParsedLinesChan)
if err != nil {
handler.simdCsvNotiyEventChan <- newNotifyEvent(NOTIFY_EVENT_READ_SIMDCSV_ERROR,err,nil)
}
......@@ -1429,7 +1449,6 @@ func (mce *MysqlCmdExecutor) LoadLoop(load *tree.Load, dbHandler engine.Database
case NOTIFY_EVENT_READ_SIMDCSV_ERROR,
NOTIFY_EVENT_OUTPUT_SIMDCSV_ERROR,
NOTIFY_EVENT_WRITE_BATCH_ERROR:
fmt.Println(ne.err)
if !errorCanBeIgnored(ne.err) {
err = ne.err
quit = true
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment