Платформа ЦРНП "Мирокод" для разработки проектов
https://git.mirocod.ru
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
233 lines
5.4 KiB
233 lines
5.4 KiB
package lz4 |
|
|
|
import ( |
|
"io" |
|
|
|
"github.com/pierrec/lz4/v4/internal/lz4block" |
|
"github.com/pierrec/lz4/v4/internal/lz4errors" |
|
"github.com/pierrec/lz4/v4/internal/lz4stream" |
|
) |
|
|
|
var writerStates = []aState{ |
|
noState: newState, |
|
newState: writeState, |
|
writeState: closedState, |
|
closedState: newState, |
|
errorState: newState, |
|
} |
|
|
|
// NewWriter returns a new LZ4 frame encoder. |
|
func NewWriter(w io.Writer) *Writer { |
|
zw := &Writer{frame: lz4stream.NewFrame()} |
|
zw.state.init(writerStates) |
|
_ = zw.Apply(DefaultBlockSizeOption, DefaultChecksumOption, DefaultConcurrency, defaultOnBlockDone) |
|
zw.Reset(w) |
|
return zw |
|
} |
|
|
|
// Writer allows writing an LZ4 stream. |
|
type Writer struct { |
|
state _State |
|
src io.Writer // destination writer |
|
level lz4block.CompressionLevel // how hard to try |
|
num int // concurrency level |
|
frame *lz4stream.Frame // frame being built |
|
data []byte // pending data |
|
idx int // size of pending data |
|
handler func(int) |
|
legacy bool |
|
} |
|
|
|
func (*Writer) private() {} |
|
|
|
func (w *Writer) Apply(options ...Option) (err error) { |
|
defer w.state.check(&err) |
|
switch w.state.state { |
|
case newState: |
|
case errorState: |
|
return w.state.err |
|
default: |
|
return lz4errors.ErrOptionClosedOrError |
|
} |
|
for _, o := range options { |
|
if err = o(w); err != nil { |
|
return |
|
} |
|
} |
|
w.Reset(w.src) |
|
return |
|
} |
|
|
|
func (w *Writer) isNotConcurrent() bool { |
|
return w.num == 1 |
|
} |
|
|
|
// init sets up the Writer when in newState. It does not change the Writer state. |
|
func (w *Writer) init() error { |
|
w.frame.InitW(w.src, w.num, w.legacy) |
|
if true || !w.isNotConcurrent() { |
|
size := w.frame.Descriptor.Flags.BlockSizeIndex() |
|
w.data = size.Get() |
|
} |
|
w.idx = 0 |
|
return w.frame.Descriptor.Write(w.frame, w.src) |
|
} |
|
|
|
func (w *Writer) Write(buf []byte) (n int, err error) { |
|
defer w.state.check(&err) |
|
switch w.state.state { |
|
case writeState: |
|
case closedState, errorState: |
|
return 0, w.state.err |
|
case newState: |
|
if err = w.init(); w.state.next(err) { |
|
return |
|
} |
|
default: |
|
return 0, w.state.fail() |
|
} |
|
|
|
zn := len(w.data) |
|
for len(buf) > 0 { |
|
if w.isNotConcurrent() && w.idx == 0 && len(buf) >= zn { |
|
// Avoid a copy as there is enough data for a block. |
|
if err = w.write(buf[:zn], false); err != nil { |
|
return |
|
} |
|
n += zn |
|
buf = buf[zn:] |
|
continue |
|
} |
|
// Accumulate the data to be compressed. |
|
m := copy(w.data[w.idx:], buf) |
|
n += m |
|
w.idx += m |
|
buf = buf[m:] |
|
|
|
if w.idx < len(w.data) { |
|
// Buffer not filled. |
|
return |
|
} |
|
|
|
// Buffer full. |
|
if err = w.write(w.data, true); err != nil { |
|
return |
|
} |
|
if !w.isNotConcurrent() { |
|
size := w.frame.Descriptor.Flags.BlockSizeIndex() |
|
w.data = size.Get() |
|
} |
|
w.idx = 0 |
|
} |
|
return |
|
} |
|
|
|
func (w *Writer) write(data []byte, safe bool) error { |
|
if w.isNotConcurrent() { |
|
block := w.frame.Blocks.Block |
|
err := block.Compress(w.frame, data, w.level).Write(w.frame, w.src) |
|
w.handler(len(block.Data)) |
|
return err |
|
} |
|
c := make(chan *lz4stream.FrameDataBlock) |
|
w.frame.Blocks.Blocks <- c |
|
go func(c chan *lz4stream.FrameDataBlock, data []byte, safe bool) { |
|
b := lz4stream.NewFrameDataBlock(w.frame) |
|
c <- b.Compress(w.frame, data, w.level) |
|
<-c |
|
w.handler(len(b.Data)) |
|
b.Close(w.frame) |
|
if safe { |
|
// safe to put it back as the last usage of it was FrameDataBlock.Write() called before c is closed |
|
lz4block.Put(data) |
|
} |
|
}(c, data, safe) |
|
|
|
return nil |
|
} |
|
|
|
// Close closes the Writer, flushing any unwritten data to the underlying io.Writer, |
|
// but does not close the underlying io.Writer. |
|
func (w *Writer) Close() (err error) { |
|
switch w.state.state { |
|
case writeState: |
|
case errorState: |
|
return w.state.err |
|
default: |
|
return nil |
|
} |
|
defer w.state.nextd(&err) |
|
if w.idx > 0 { |
|
// Flush pending data, disable w.data freeing as it is done later on. |
|
if err = w.write(w.data[:w.idx], false); err != nil { |
|
return err |
|
} |
|
w.idx = 0 |
|
} |
|
err = w.frame.CloseW(w.src, w.num) |
|
// It is now safe to free the buffer. |
|
if w.data != nil { |
|
lz4block.Put(w.data) |
|
w.data = nil |
|
} |
|
return |
|
} |
|
|
|
// Reset clears the state of the Writer w such that it is equivalent to its |
|
// initial state from NewWriter, but instead writing to writer. |
|
// Reset keeps the previous options unless overwritten by the supplied ones. |
|
// No access to writer is performed. |
|
// |
|
// w.Close must be called before Reset or pending data may be dropped. |
|
func (w *Writer) Reset(writer io.Writer) { |
|
w.frame.Reset(w.num) |
|
w.state.reset() |
|
w.src = writer |
|
} |
|
|
|
// ReadFrom efficiently reads from r and compressed into the Writer destination. |
|
func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) { |
|
switch w.state.state { |
|
case closedState, errorState: |
|
return 0, w.state.err |
|
case newState: |
|
if err = w.init(); w.state.next(err) { |
|
return |
|
} |
|
default: |
|
return 0, w.state.fail() |
|
} |
|
defer w.state.check(&err) |
|
|
|
size := w.frame.Descriptor.Flags.BlockSizeIndex() |
|
var done bool |
|
var rn int |
|
data := size.Get() |
|
if w.isNotConcurrent() { |
|
// Keep the same buffer for the whole process. |
|
defer lz4block.Put(data) |
|
} |
|
for !done { |
|
rn, err = io.ReadFull(r, data) |
|
switch err { |
|
case nil: |
|
case io.EOF, io.ErrUnexpectedEOF: // read may be partial |
|
done = true |
|
default: |
|
return |
|
} |
|
n += int64(rn) |
|
err = w.write(data[:rn], true) |
|
if err != nil { |
|
return |
|
} |
|
w.handler(rn) |
|
if !done && !w.isNotConcurrent() { |
|
// The buffer will be returned automatically by go routines (safe=true) |
|
// so get a new one fo the next round. |
|
data = size.Get() |
|
} |
|
} |
|
err = w.Close() |
|
return |
|
}
|
|
|