Платформа ЦРНП "Мирокод" для разработки проектов
https://git.mirocod.ru
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
616 lines
14 KiB
616 lines
14 KiB
package mssql |
|
|
|
import ( |
|
"bytes" |
|
"context" |
|
"encoding/binary" |
|
"fmt" |
|
"math" |
|
"reflect" |
|
"strconv" |
|
"strings" |
|
"time" |
|
) |
|
|
|
type Bulk struct { |
|
cn *Conn |
|
metadata []columnStruct |
|
bulkColumns []columnStruct |
|
columnsName []string |
|
tablename string |
|
numRows int |
|
|
|
headerSent bool |
|
Options BulkOptions |
|
Debug bool |
|
} |
|
type BulkOptions struct { |
|
CheckConstraints bool |
|
FireTriggers bool |
|
KeepNulls bool |
|
KilobytesPerBatch int |
|
RowsPerBatch int |
|
Order []string |
|
Tablock bool |
|
} |
|
|
|
type DataValue interface{} |
|
|
|
func (cn *Conn) CreateBulk(table string, columns []string) (_ *Bulk) { |
|
b := Bulk{cn: cn, tablename: table, headerSent: false, columnsName: columns} |
|
b.Debug = false |
|
return &b |
|
} |
|
|
|
func (b *Bulk) sendBulkCommand() (err error) { |
|
//get table columns info |
|
err = b.getMetadata() |
|
if err != nil { |
|
return err |
|
} |
|
|
|
//match the columns |
|
for _, colname := range b.columnsName { |
|
var bulkCol *columnStruct |
|
|
|
for _, m := range b.metadata { |
|
if m.ColName == colname { |
|
bulkCol = &m |
|
break |
|
} |
|
} |
|
if bulkCol != nil { |
|
|
|
if bulkCol.ti.TypeId == typeUdt { |
|
//send udt as binary |
|
bulkCol.ti.TypeId = typeBigVarBin |
|
} |
|
b.bulkColumns = append(b.bulkColumns, *bulkCol) |
|
b.dlogf("Adding column %s %s %#x", colname, bulkCol.ColName, bulkCol.ti.TypeId) |
|
} else { |
|
return fmt.Errorf("Column %s does not exist in destination table %s", colname, b.tablename) |
|
} |
|
} |
|
|
|
//create the bulk command |
|
|
|
//columns definitions |
|
var col_defs bytes.Buffer |
|
for i, col := range b.bulkColumns { |
|
if i != 0 { |
|
col_defs.WriteString(", ") |
|
} |
|
col_defs.WriteString("[" + col.ColName + "] " + makeDecl(col.ti)) |
|
} |
|
|
|
//options |
|
var with_opts []string |
|
|
|
if b.Options.CheckConstraints { |
|
with_opts = append(with_opts, "CHECK_CONSTRAINTS") |
|
} |
|
if b.Options.FireTriggers { |
|
with_opts = append(with_opts, "FIRE_TRIGGERS") |
|
} |
|
if b.Options.KeepNulls { |
|
with_opts = append(with_opts, "KEEP_NULLS") |
|
} |
|
if b.Options.KilobytesPerBatch > 0 { |
|
with_opts = append(with_opts, fmt.Sprintf("KILOBYTES_PER_BATCH = %d", b.Options.KilobytesPerBatch)) |
|
} |
|
if b.Options.RowsPerBatch > 0 { |
|
with_opts = append(with_opts, fmt.Sprintf("ROWS_PER_BATCH = %d", b.Options.RowsPerBatch)) |
|
} |
|
if len(b.Options.Order) > 0 { |
|
with_opts = append(with_opts, fmt.Sprintf("ORDER(%s)", strings.Join(b.Options.Order, ","))) |
|
} |
|
if b.Options.Tablock { |
|
with_opts = append(with_opts, "TABLOCK") |
|
} |
|
var with_part string |
|
if len(with_opts) > 0 { |
|
with_part = fmt.Sprintf("WITH (%s)", strings.Join(with_opts, ",")) |
|
} |
|
|
|
query := fmt.Sprintf("INSERT BULK %s (%s) %s", b.tablename, col_defs.String(), with_part) |
|
|
|
stmt, err := b.cn.Prepare(query) |
|
if err != nil { |
|
return fmt.Errorf("Prepare failed: %s", err.Error()) |
|
} |
|
b.dlogf(query) |
|
|
|
_, err = stmt.Exec(nil) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
b.headerSent = true |
|
|
|
var buf = b.cn.sess.buf |
|
buf.BeginPacket(packBulkLoadBCP) |
|
|
|
// send the columns metadata |
|
columnMetadata := b.createColMetadata() |
|
_, err = buf.Write(columnMetadata) |
|
|
|
return |
|
} |
|
|
|
// AddRow immediately writes the row to the destination table. |
|
// The arguments are the row values in the order they were specified. |
|
func (b *Bulk) AddRow(row []interface{}) (err error) { |
|
if !b.headerSent { |
|
err = b.sendBulkCommand() |
|
if err != nil { |
|
return |
|
} |
|
} |
|
|
|
if len(row) != len(b.bulkColumns) { |
|
return fmt.Errorf("Row does not have the same number of columns than the destination table %d %d", |
|
len(row), len(b.bulkColumns)) |
|
} |
|
|
|
bytes, err := b.makeRowData(row) |
|
if err != nil { |
|
return |
|
} |
|
|
|
_, err = b.cn.sess.buf.Write(bytes) |
|
if err != nil { |
|
return |
|
} |
|
|
|
b.numRows = b.numRows + 1 |
|
return |
|
} |
|
|
|
func (b *Bulk) makeRowData(row []interface{}) ([]byte, error) { |
|
buf := new(bytes.Buffer) |
|
buf.WriteByte(byte(tokenRow)) |
|
|
|
var logcol bytes.Buffer |
|
for i, col := range b.bulkColumns { |
|
|
|
if b.Debug { |
|
logcol.WriteString(fmt.Sprintf(" col[%d]='%v' ", i, row[i])) |
|
} |
|
param, err := b.makeParam(row[i], col) |
|
if err != nil { |
|
return nil, fmt.Errorf("bulkcopy: %s", err.Error()) |
|
} |
|
|
|
if col.ti.Writer == nil { |
|
return nil, fmt.Errorf("no writer for column: %s, TypeId: %#x", |
|
col.ColName, col.ti.TypeId) |
|
} |
|
err = col.ti.Writer(buf, param.ti, param.buffer) |
|
if err != nil { |
|
return nil, fmt.Errorf("bulkcopy: %s", err.Error()) |
|
} |
|
} |
|
|
|
b.dlogf("row[%d] %s\n", b.numRows, logcol.String()) |
|
|
|
return buf.Bytes(), nil |
|
} |
|
|
|
func (b *Bulk) Done() (rowcount int64, err error) { |
|
if b.headerSent == false { |
|
//no rows had been sent |
|
return 0, nil |
|
} |
|
var buf = b.cn.sess.buf |
|
buf.WriteByte(byte(tokenDone)) |
|
|
|
binary.Write(buf, binary.LittleEndian, uint16(doneFinal)) |
|
binary.Write(buf, binary.LittleEndian, uint16(0)) // curcmd |
|
|
|
if b.cn.sess.loginAck.TDSVersion >= verTDS72 { |
|
binary.Write(buf, binary.LittleEndian, uint64(0)) //rowcount 0 |
|
} else { |
|
binary.Write(buf, binary.LittleEndian, uint32(0)) //rowcount 0 |
|
} |
|
|
|
buf.FinishPacket() |
|
|
|
tokchan := make(chan tokenStruct, 5) |
|
go processResponse(context.Background(), b.cn.sess, tokchan, nil) |
|
|
|
var rowCount int64 |
|
for token := range tokchan { |
|
switch token := token.(type) { |
|
case doneStruct: |
|
if token.Status&doneCount != 0 { |
|
rowCount = int64(token.RowCount) |
|
} |
|
if token.isError() { |
|
return 0, token.getError() |
|
} |
|
case error: |
|
return 0, b.cn.checkBadConn(token) |
|
} |
|
} |
|
return rowCount, nil |
|
} |
|
|
|
func (b *Bulk) createColMetadata() []byte { |
|
buf := new(bytes.Buffer) |
|
buf.WriteByte(byte(tokenColMetadata)) // token |
|
binary.Write(buf, binary.LittleEndian, uint16(len(b.bulkColumns))) // column count |
|
|
|
for i, col := range b.bulkColumns { |
|
|
|
if b.cn.sess.loginAck.TDSVersion >= verTDS72 { |
|
binary.Write(buf, binary.LittleEndian, uint32(col.UserType)) // usertype, always 0? |
|
} else { |
|
binary.Write(buf, binary.LittleEndian, uint16(col.UserType)) |
|
} |
|
binary.Write(buf, binary.LittleEndian, uint16(col.Flags)) |
|
|
|
writeTypeInfo(buf, &b.bulkColumns[i].ti) |
|
|
|
if col.ti.TypeId == typeNText || |
|
col.ti.TypeId == typeText || |
|
col.ti.TypeId == typeImage { |
|
|
|
tablename_ucs2 := str2ucs2(b.tablename) |
|
binary.Write(buf, binary.LittleEndian, uint16(len(tablename_ucs2)/2)) |
|
buf.Write(tablename_ucs2) |
|
} |
|
colname_ucs2 := str2ucs2(col.ColName) |
|
buf.WriteByte(uint8(len(colname_ucs2) / 2)) |
|
buf.Write(colname_ucs2) |
|
} |
|
|
|
return buf.Bytes() |
|
} |
|
|
|
func (b *Bulk) getMetadata() (err error) { |
|
stmt, err := b.cn.Prepare("SET FMTONLY ON") |
|
if err != nil { |
|
return |
|
} |
|
|
|
_, err = stmt.Exec(nil) |
|
if err != nil { |
|
return |
|
} |
|
|
|
//get columns info |
|
stmt, err = b.cn.Prepare(fmt.Sprintf("select * from %s SET FMTONLY OFF", b.tablename)) |
|
if err != nil { |
|
return |
|
} |
|
stmt2 := stmt.(*Stmt) |
|
cols, err := stmt2.QueryMeta() |
|
if err != nil { |
|
return fmt.Errorf("get columns info failed: %v", err.Error()) |
|
} |
|
b.metadata = cols |
|
|
|
if b.Debug { |
|
for _, col := range b.metadata { |
|
b.dlogf("col: %s typeId: %#x size: %d scale: %d prec: %d flags: %d lcid: %#x\n", |
|
col.ColName, col.ti.TypeId, col.ti.Size, col.ti.Scale, col.ti.Prec, |
|
col.Flags, col.ti.Collation.LcidAndFlags) |
|
} |
|
} |
|
|
|
return nil |
|
} |
|
|
|
// QueryMeta is almost the same as mssql.Stmt.Query, but returns all the columns info. |
|
func (s *Stmt) QueryMeta() (cols []columnStruct, err error) { |
|
if err = s.sendQuery(nil); err != nil { |
|
return |
|
} |
|
tokchan := make(chan tokenStruct, 5) |
|
go processResponse(context.Background(), s.c.sess, tokchan, s.c.outs) |
|
s.c.clearOuts() |
|
loop: |
|
for tok := range tokchan { |
|
switch token := tok.(type) { |
|
case doneStruct: |
|
break loop |
|
case []columnStruct: |
|
cols = token |
|
break loop |
|
case error: |
|
return nil, s.c.checkBadConn(token) |
|
} |
|
} |
|
return cols, nil |
|
} |
|
|
|
func (b *Bulk) makeParam(val DataValue, col columnStruct) (res Param, err error) { |
|
res.ti.Size = col.ti.Size |
|
res.ti.TypeId = col.ti.TypeId |
|
|
|
if val == nil { |
|
res.ti.Size = 0 |
|
return |
|
} |
|
|
|
switch col.ti.TypeId { |
|
|
|
case typeInt1, typeInt2, typeInt4, typeInt8, typeIntN: |
|
var intvalue int64 |
|
|
|
switch val := val.(type) { |
|
case int: |
|
intvalue = int64(val) |
|
case int32: |
|
intvalue = int64(val) |
|
case int64: |
|
intvalue = val |
|
default: |
|
err = fmt.Errorf("mssql: invalid type for int column") |
|
return |
|
} |
|
|
|
res.buffer = make([]byte, res.ti.Size) |
|
if col.ti.Size == 1 { |
|
res.buffer[0] = byte(intvalue) |
|
} else if col.ti.Size == 2 { |
|
binary.LittleEndian.PutUint16(res.buffer, uint16(intvalue)) |
|
} else if col.ti.Size == 4 { |
|
binary.LittleEndian.PutUint32(res.buffer, uint32(intvalue)) |
|
} else if col.ti.Size == 8 { |
|
binary.LittleEndian.PutUint64(res.buffer, uint64(intvalue)) |
|
} |
|
case typeFlt4, typeFlt8, typeFltN: |
|
var floatvalue float64 |
|
|
|
switch val := val.(type) { |
|
case float32: |
|
floatvalue = float64(val) |
|
case float64: |
|
floatvalue = val |
|
case int: |
|
floatvalue = float64(val) |
|
case int64: |
|
floatvalue = float64(val) |
|
default: |
|
err = fmt.Errorf("mssql: invalid type for float column: %s", val) |
|
return |
|
} |
|
|
|
if col.ti.Size == 4 { |
|
res.buffer = make([]byte, 4) |
|
binary.LittleEndian.PutUint32(res.buffer, math.Float32bits(float32(floatvalue))) |
|
} else if col.ti.Size == 8 { |
|
res.buffer = make([]byte, 8) |
|
binary.LittleEndian.PutUint64(res.buffer, math.Float64bits(floatvalue)) |
|
} |
|
case typeNVarChar, typeNText, typeNChar: |
|
|
|
switch val := val.(type) { |
|
case string: |
|
res.buffer = str2ucs2(val) |
|
case []byte: |
|
res.buffer = val |
|
default: |
|
err = fmt.Errorf("mssql: invalid type for nvarchar column: %s", val) |
|
return |
|
} |
|
res.ti.Size = len(res.buffer) |
|
|
|
case typeVarChar, typeBigVarChar, typeText, typeChar, typeBigChar: |
|
switch val := val.(type) { |
|
case string: |
|
res.buffer = []byte(val) |
|
case []byte: |
|
res.buffer = val |
|
default: |
|
err = fmt.Errorf("mssql: invalid type for varchar column: %s", val) |
|
return |
|
} |
|
res.ti.Size = len(res.buffer) |
|
|
|
case typeBit, typeBitN: |
|
if reflect.TypeOf(val).Kind() != reflect.Bool { |
|
err = fmt.Errorf("mssql: invalid type for bit column: %s", val) |
|
return |
|
} |
|
res.ti.TypeId = typeBitN |
|
res.ti.Size = 1 |
|
res.buffer = make([]byte, 1) |
|
if val.(bool) { |
|
res.buffer[0] = 1 |
|
} |
|
|
|
case typeDateTime2N, typeDateTimeOffsetN: |
|
switch val := val.(type) { |
|
case time.Time: |
|
days, ns := dateTime2(val) |
|
ns /= int64(math.Pow10(int(col.ti.Scale)*-1) * 1000000000) |
|
|
|
var data = make([]byte, 5) |
|
|
|
data[0] = byte(ns) |
|
data[1] = byte(ns >> 8) |
|
data[2] = byte(ns >> 16) |
|
data[3] = byte(ns >> 24) |
|
data[4] = byte(ns >> 32) |
|
|
|
if col.ti.Scale <= 2 { |
|
res.ti.Size = 6 |
|
} else if col.ti.Scale <= 4 { |
|
res.ti.Size = 7 |
|
} else { |
|
res.ti.Size = 8 |
|
} |
|
var buf []byte |
|
buf = make([]byte, res.ti.Size) |
|
copy(buf, data[0:res.ti.Size-3]) |
|
|
|
buf[res.ti.Size-3] = byte(days) |
|
buf[res.ti.Size-2] = byte(days >> 8) |
|
buf[res.ti.Size-1] = byte(days >> 16) |
|
|
|
if col.ti.TypeId == typeDateTimeOffsetN { |
|
_, offset := val.Zone() |
|
var offsetMinute = uint16(offset / 60) |
|
buf = append(buf, byte(offsetMinute)) |
|
buf = append(buf, byte(offsetMinute>>8)) |
|
res.ti.Size = res.ti.Size + 2 |
|
} |
|
|
|
res.buffer = buf |
|
|
|
default: |
|
err = fmt.Errorf("mssql: invalid type for datetime2 column: %s", val) |
|
return |
|
} |
|
case typeDateN: |
|
switch val := val.(type) { |
|
case time.Time: |
|
days, _ := dateTime2(val) |
|
|
|
res.ti.Size = 3 |
|
res.buffer = make([]byte, 3) |
|
res.buffer[0] = byte(days) |
|
res.buffer[1] = byte(days >> 8) |
|
res.buffer[2] = byte(days >> 16) |
|
default: |
|
err = fmt.Errorf("mssql: invalid type for date column: %s", val) |
|
return |
|
} |
|
case typeDateTime, typeDateTimeN, typeDateTim4: |
|
switch val := val.(type) { |
|
case time.Time: |
|
if col.ti.Size == 4 { |
|
res.ti.Size = 4 |
|
res.buffer = make([]byte, 4) |
|
|
|
ref := time.Date(1900, 1, 1, 0, 0, 0, 0, time.UTC) |
|
dur := val.Sub(ref) |
|
days := dur / (24 * time.Hour) |
|
if days < 0 { |
|
err = fmt.Errorf("mssql: Date %s is out of range", val) |
|
return |
|
} |
|
mins := val.Hour()*60 + val.Minute() |
|
|
|
binary.LittleEndian.PutUint16(res.buffer[0:2], uint16(days)) |
|
binary.LittleEndian.PutUint16(res.buffer[2:4], uint16(mins)) |
|
} else if col.ti.Size == 8 { |
|
res.ti.Size = 8 |
|
res.buffer = make([]byte, 8) |
|
|
|
days := divFloor(val.Unix(), 24*60*60) |
|
//25567 - number of days since Jan 1 1900 UTC to Jan 1 1970 |
|
days = days + 25567 |
|
tm := (val.Hour()*60*60+val.Minute()*60+val.Second())*300 + int(val.Nanosecond()/10000000*3) |
|
|
|
binary.LittleEndian.PutUint32(res.buffer[0:4], uint32(days)) |
|
binary.LittleEndian.PutUint32(res.buffer[4:8], uint32(tm)) |
|
} else { |
|
err = fmt.Errorf("mssql: invalid size of column") |
|
} |
|
|
|
default: |
|
err = fmt.Errorf("mssql: invalid type for datetime column: %s", val) |
|
} |
|
|
|
// case typeMoney, typeMoney4, typeMoneyN: |
|
case typeDecimal, typeDecimalN, typeNumeric, typeNumericN: |
|
var value float64 |
|
switch v := val.(type) { |
|
case int: |
|
value = float64(v) |
|
case int8: |
|
value = float64(v) |
|
case int16: |
|
value = float64(v) |
|
case int32: |
|
value = float64(v) |
|
case int64: |
|
value = float64(v) |
|
case float32: |
|
value = float64(v) |
|
case float64: |
|
value = v |
|
case string: |
|
if value, err = strconv.ParseFloat(v, 64); err != nil { |
|
return res, fmt.Errorf("bulk: unable to convert string to float: %v", err) |
|
} |
|
default: |
|
return res, fmt.Errorf("unknown value for decimal: %#v", v) |
|
} |
|
|
|
perc := col.ti.Prec |
|
scale := col.ti.Scale |
|
var dec Decimal |
|
dec, err = Float64ToDecimalScale(value, scale) |
|
if err != nil { |
|
return res, err |
|
} |
|
dec.prec = perc |
|
|
|
var length byte |
|
switch { |
|
case perc <= 9: |
|
length = 4 |
|
case perc <= 19: |
|
length = 8 |
|
case perc <= 28: |
|
length = 12 |
|
default: |
|
length = 16 |
|
} |
|
|
|
buf := make([]byte, length+1) |
|
// first byte length written by typeInfo.writer |
|
res.ti.Size = int(length) + 1 |
|
// second byte sign |
|
if value < 0 { |
|
buf[0] = 0 |
|
} else { |
|
buf[0] = 1 |
|
} |
|
|
|
ub := dec.UnscaledBytes() |
|
l := len(ub) |
|
if l > int(length) { |
|
err = fmt.Errorf("decimal out of range: %s", dec) |
|
return res, err |
|
} |
|
// reverse the bytes |
|
for i, j := 1, l-1; j >= 0; i, j = i+1, j-1 { |
|
buf[i] = ub[j] |
|
} |
|
res.buffer = buf |
|
case typeBigVarBin: |
|
switch val := val.(type) { |
|
case []byte: |
|
res.ti.Size = len(val) |
|
res.buffer = val |
|
default: |
|
err = fmt.Errorf("mssql: invalid type for Binary column: %s", val) |
|
return |
|
} |
|
case typeGuid: |
|
switch val := val.(type) { |
|
case []byte: |
|
res.ti.Size = len(val) |
|
res.buffer = val |
|
default: |
|
err = fmt.Errorf("mssql: invalid type for Guid column: %s", val) |
|
return |
|
} |
|
|
|
default: |
|
err = fmt.Errorf("mssql: type %x not implemented", col.ti.TypeId) |
|
} |
|
return |
|
|
|
} |
|
|
|
func (b *Bulk) dlogf(format string, v ...interface{}) { |
|
if b.Debug { |
|
b.cn.sess.log.Printf(format, v...) |
|
} |
|
}
|
|
|