From 55bfe5488b483f82a8606be28210338ebe61bb44 Mon Sep 17 00:00:00 2001 From: qydysky Date: Sun, 27 Aug 2023 17:42:07 +0800 Subject: [PATCH] add --- file/FileWR.go | 127 ++++++++++++++++++++++++++++++++++++++----- file/FileWR_test.go | 72 ++++++++++++++++++++---- io/io.go | 46 ++++++++++++---- websocket/Recoder.go | 2 +- 4 files changed, 212 insertions(+), 35 deletions(-) diff --git a/file/FileWR.go b/file/FileWR.go index debe6f6..781179b 100644 --- a/file/FileWR.go +++ b/file/FileWR.go @@ -21,6 +21,7 @@ var ( ErrFailToLock = errors.New("ErrFailToLock") ErrMaxReadSizeReach = errors.New("ErrMaxReadSizeReach") ErrNoDir = errors.New("ErrNoDir") + ErrArg = errors.New("ErrArg") ) type File struct { @@ -126,7 +127,10 @@ func (t *File) Read(data []byte) (int, error) { return t.read().Read(data) } -func (t *File) ReadUntil(separation byte, perReadSize int, maxReadSize int) (data []byte, e error) { +// stop after untilBytes +// +// data not include untilBytes +func (t *File) ReadUntil(untilBytes []byte, perReadSize int, maxReadSize int) (data []byte, e error) { t.getRWCloser() if t.Config.AutoClose { defer t.Close() @@ -138,13 +142,45 @@ func (t *File) ReadUntil(separation byte, perReadSize int, maxReadSize int) (dat defer t.l.RUnlock() var ( - tmpArea = make([]byte, perReadSize) + reserve = len(untilBytes) - 1 + tmpArea = make([]byte, reserve+perReadSize) n int reader = t.read() ) - for maxReadSize > 0 { + { + var seekN int + if reserve != 0 { + //avoid spik + if _, e := t.file.Seek(-int64(reserve), int(AtCurrent)); e == nil { + seekN = reserve + } + } n, e = reader.Read(tmpArea) + if n == 0 && e != nil { + return + } + + maxReadSize = maxReadSize - n + + if i := bytes.Index(tmpArea[:n], untilBytes); i != -1 { + if n-i-len(untilBytes) != 0 { + _, _ = t.file.Seek(-int64(n-i-len(untilBytes)), int(AtCurrent)) + } + if i != 0 { + data = append(data, tmpArea[seekN:i]...) + } + return + } else { + data = append(data, tmpArea[seekN:n]...) + } + } + + for maxReadSize > 0 { + if reserve != 0 { + copy(tmpArea, tmpArea[reserve:]) + } + n, e = reader.Read(tmpArea[reserve:]) if n == 0 && e != nil { return @@ -152,16 +188,16 @@ func (t *File) ReadUntil(separation byte, perReadSize int, maxReadSize int) (dat maxReadSize = maxReadSize - n - if i := bytes.Index(tmpArea[:n], []byte{separation}); i != -1 { - if n-i-1 != 0 { - t.file.Seek(-int64(n-i-1), int(AtCurrent)) + if i := bytes.Index(tmpArea[:reserve+n], untilBytes); i != -1 { + if reserve+n-i-len(untilBytes) != 0 { + _, _ = t.file.Seek(-int64(reserve+n-i-len(untilBytes)), int(AtCurrent)) } if i != 0 { - data = append(data, tmpArea[:i]...) + data = append(data, tmpArea[reserve:i]...) } break } else { - data = append(data, tmpArea[:n]...) + data = append(data, tmpArea[reserve:n]...) } } @@ -185,7 +221,7 @@ func (t *File) ReadAll(perReadSize int, maxReadSize int) (data []byte, e error) var ( tmpArea = make([]byte, perReadSize) - n = 0 + n int reader = t.read() ) @@ -197,7 +233,6 @@ func (t *File) ReadAll(perReadSize int, maxReadSize int) (data []byte, e error) } maxReadSize = maxReadSize - n - data = append(data, tmpArea[:n]...) } @@ -217,7 +252,7 @@ const ( ) // Seek sets the offset for the next Read or Write on file to offset -func (t *File) Seed(index int64, whence FileWhence) (e error) { +func (t *File) SeekIndex(index int64, whence FileWhence) (e error) { t.getRWCloser() if t.Config.AutoClose { defer t.Close() @@ -230,7 +265,73 @@ func (t *File) Seed(index int64, whence FileWhence) (e error) { t.cu, e = t.file.Seek(index, int(whence)) - return nil + return +} + +// stop before untilBytes +func (t *File) SeekUntil(untilBytes []byte, whence FileWhence, perReadSize int, maxReadSize int) (e error) { + t.getRWCloser() + if t.Config.AutoClose { + defer t.Close() + } + + if !t.l.TryRLock() { + return ErrFailToLock + } + defer t.l.RUnlock() + + var ( + reserve = len(untilBytes) - 1 + tmpArea = make([]byte, reserve+perReadSize) + n int + reader = t.read() + ) + + if reserve != 0 { + //avoid spik + _, _ = t.file.Seek(-int64(reserve), int(AtCurrent)) + } + + { + n, e = reader.Read(tmpArea) + if n == 0 && e != nil { + return + } + + maxReadSize = maxReadSize - n + + if i := bytes.Index(tmpArea[:n], untilBytes); i != -1 { + if n-i != 0 { + _, _ = t.file.Seek(-int64(n-i), int(AtCurrent)) + } + return + } + } + + for maxReadSize > 0 { + if reserve != 0 { + copy(tmpArea, tmpArea[reserve:]) + } + n, e = reader.Read(tmpArea[reserve:]) + if n == 0 && e != nil { + return + } + + maxReadSize = maxReadSize - n + + if i := bytes.Index(tmpArea[:reserve+n], untilBytes); i != -1 { + if reserve+n-i != 0 { + _, _ = t.file.Seek(-int64(reserve+n-i), int(AtCurrent)) + } + break + } + } + + if maxReadSize <= 0 { + e = ErrMaxReadSizeReach + } + + return } func (t *File) Sync() (e error) { @@ -409,7 +510,7 @@ func newPath(path string, mode fs.FileMode) { } rawPath += string(os.PathSeparator) + p if _, err := os.Stat(rawPath); os.IsNotExist(err) { - os.Mkdir(rawPath, mode) + _ = os.Mkdir(rawPath, mode) } } } diff --git a/file/FileWR_test.go b/file/FileWR_test.go index 04217d2..81665ea 100644 --- a/file/FileWR_test.go +++ b/file/FileWR_test.go @@ -98,7 +98,7 @@ func TestWriteReadDel(t *testing.T) { t.Fatal(e) } - if e := f.Seed(0, AtOrigin); e != nil { + if e := f.SeekIndex(0, AtOrigin); e != nil { t.Fatal(e) } @@ -124,13 +124,13 @@ func TestWriteReadDel(t *testing.T) { } } -func TestSeed(t *testing.T) { +func TestSeek(t *testing.T) { f := New("rwd.txt", 0, false) if i, e := f.Write([]byte("12er4x3"), true); i == 0 || e != nil { t.Fatal(e) } - if e := f.Seed(1, AtOrigin); e != nil { + if e := f.SeekIndex(1, AtOrigin); e != nil { t.Fatal(e) } @@ -143,7 +143,7 @@ func TestSeed(t *testing.T) { } } - if e := f.Seed(-1, AtEnd); e != nil { + if e := f.SeekIndex(-1, AtEnd); e != nil { t.Fatal(e) } @@ -164,6 +164,52 @@ func TestSeed(t *testing.T) { } } +func TestSeek2(t *testing.T) { + f := New("rwd.txt", 0, false) + if f.IsExist() { + if e := f.Delete(); e != nil { + t.Fatal(e) + } + } + if i, e := f.Write([]byte("12345sser4x3"), true); i == 0 || e != nil { + t.Fatal(e) + } + + f.SeekIndex(0, AtOrigin) + + if e := f.SeekUntil([]byte("sser"), AtCurrent, 3, 1<<20); e != nil && !errors.Is(e, io.EOF) { + t.Fatal(e) + } + + if data, e := f.ReadAll(5, 1<<20); e != nil && !errors.Is(e, io.EOF) { + t.Fatal(e) + } else if !bytes.Equal(data, []byte("sser4x3")) { + t.Fatal(string(data), data) + } + + if e := f.Close(); e != nil { + t.Fatal(e) + } + + if e := f.Delete(); e != nil { + t.Fatal(e) + } +} + +func TestSeek3(t *testing.T) { + f := New("0.mp4", 0, false) + defer f.Close() + var boxBuf = make([]byte, 4) + if e := f.SeekUntil([]byte("mvhd"), AtCurrent, 1<<17, 1<<22); e != nil && !errors.Is(e, ErrMaxReadSizeReach) { + t.Fatal(e) + } + if _, e := f.Read(boxBuf); e != nil { + t.Fatal(e) + } else if !bytes.Equal(boxBuf, []byte("mvhd")) { + t.Fatalf("wrong box:%v", string(boxBuf)) + } +} + func TestCopy(t *testing.T) { sf := New("s.txt", 0, true) if i, e := sf.Write([]byte("12er4x3"), true); i == 0 || e != nil { @@ -181,7 +227,7 @@ func TestCopy(t *testing.T) { func TestReadUntil(t *testing.T) { f := New("s.txt", 0, false) - if i, e := f.Write([]byte("18u3y7\ns99s9\n"), true); i == 0 || e != nil { + if i, e := f.Write([]byte("18u3y7\ns99s9\nuqienbs\n"), true); i == 0 || e != nil { t.Fatal(e) } @@ -189,23 +235,29 @@ func TestReadUntil(t *testing.T) { t.Fatal(e) } - if e := f.Seed(0, AtOrigin); e != nil { + if e := f.SeekIndex(0, AtOrigin); e != nil { t.Fatal(e) } - if data, e := f.ReadUntil('\n', 5, 20); e != nil { + if data, e := f.ReadUntil([]byte{'\n'}, 5, 20); e != nil { t.Fatal(e) } else if !bytes.Equal(data, []byte("18u3y7")) { t.Fatal(string(data)) } - if data, e := f.ReadUntil('\n', 5, 20); e != nil { + if data, e := f.ReadUntil([]byte{'\n'}, 5, 20); e != nil { t.Fatal(e) } else if !bytes.Equal(data, []byte("s99s9")) { t.Fatal(string(data)) } - if data, e := f.ReadUntil('\n', 5, 20); e == nil || !errors.Is(e, io.EOF) || len(data) != 0 { + if data, e := f.ReadUntil([]byte("s\n"), 5, 20); e != nil { + t.Fatal(e) + } else if !bytes.Equal(data, []byte("uqienb")) { + t.Fatal(string(data)) + } + + if data, e := f.ReadUntil([]byte{'\n'}, 5, 20); e == nil || !errors.Is(e, io.EOF) || len(data) != 0 { t.Fatal(e) } @@ -231,7 +283,7 @@ func TestEncoderDecoder(t *testing.T) { t.Fatal(e) } - if data, e := tf.ReadUntil('\n', 3, 100); e != nil && !errors.Is(e, io.EOF) { + if data, e := tf.ReadUntil([]byte{'\n'}, 3, 100); e != nil && !errors.Is(e, io.EOF) { t.Fatal(string(data), e) } else if !bytes.Equal(data, []byte("测1试s啊是3大家看s法$和")) { t.Fatal(string(data)) diff --git a/io/io.go b/io/io.go index dbb4daa..b4fb4b1 100644 --- a/io/io.go +++ b/io/io.go @@ -309,13 +309,19 @@ func Copy(r io.Reader, w io.Writer, c CopyConfig) (e error) { if c.MaxLoop == 0 { if c.MaxByte != 0 { leftN = c.MaxByte % c.BytePerLoop - c.MaxLoop = c.MaxByte/c.BytePerLoop + 1 + c.MaxLoop = c.MaxByte / c.BytePerLoop + if leftN > 0 { + c.MaxLoop += 1 + } } } else { if c.MaxByte != 0 { c.MaxByte = min(c.MaxByte, c.MaxLoop*c.BytePerLoop) leftN = c.MaxByte % c.BytePerLoop - c.MaxLoop = c.MaxByte/c.BytePerLoop + 1 + c.MaxLoop = c.MaxByte / c.BytePerLoop + if leftN > 0 { + c.MaxLoop += 1 + } } } } else if c.BytePerLoop > 1<<17 { @@ -323,7 +329,10 @@ func Copy(r io.Reader, w io.Writer, c CopyConfig) (e error) { if c.MaxByte != 0 { c.BytePerLoop = 1 << 17 leftN = c.MaxByte % c.BytePerLoop - c.MaxLoop = c.MaxByte/c.BytePerLoop + 1 + c.MaxLoop = c.MaxByte / c.BytePerLoop + if leftN > 0 { + c.MaxLoop += 1 + } } else { c.BytePerLoop = 1 << 17 } @@ -332,29 +341,44 @@ func Copy(r io.Reader, w io.Writer, c CopyConfig) (e error) { c.MaxByte = min(c.MaxByte, c.MaxLoop*c.BytePerLoop) c.BytePerLoop = 1 << 17 leftN = c.MaxByte % c.BytePerLoop - c.MaxLoop = c.MaxByte/c.BytePerLoop + 1 + c.MaxLoop = c.MaxByte / c.BytePerLoop + if leftN > 0 { + c.MaxLoop += 1 + } } else { c.MaxByte = c.MaxLoop * c.BytePerLoop c.BytePerLoop = 1 << 17 leftN = c.MaxByte % c.BytePerLoop - c.MaxLoop = c.MaxByte/c.BytePerLoop + 1 + c.MaxLoop = c.MaxByte / c.BytePerLoop + if leftN > 0 { + c.MaxLoop += 1 + } } } } else { if c.MaxLoop == 0 { if c.MaxByte != 0 { leftN = c.MaxByte % c.BytePerLoop - c.MaxLoop = c.MaxByte/c.BytePerLoop + 1 + c.MaxLoop = c.MaxByte / c.BytePerLoop + if leftN > 0 { + c.MaxLoop += 1 + } } } else { if c.MaxByte != 0 { c.MaxByte = min(c.MaxByte, c.MaxLoop*c.BytePerLoop) leftN = c.MaxByte % c.BytePerLoop - c.MaxLoop = c.MaxByte/c.BytePerLoop + 1 + c.MaxLoop = c.MaxByte / c.BytePerLoop + if leftN > 0 { + c.MaxLoop += 1 + } } else { c.MaxByte = c.MaxLoop * c.BytePerLoop leftN = c.MaxByte % c.BytePerLoop - c.MaxLoop = c.MaxByte/c.BytePerLoop + 1 + c.MaxLoop = c.MaxByte / c.BytePerLoop + if leftN > 0 { + c.MaxLoop += 1 + } } } } @@ -378,10 +402,10 @@ func Copy(r io.Reader, w io.Writer, c CopyConfig) (e error) { if c.MaxLoop > 0 { c.MaxLoop -= 1 - if c.MaxLoop == 0 { - return nil - } else if c.BytePerLoop == 1<<17 && leftN != 0 { + if c.MaxLoop == 1 && leftN != 0 { buf = buf[:leftN] + } else if c.MaxLoop == 0 { + return nil } } if c.BytePerSec != 0 && readC >= c.BytePerSec { diff --git a/websocket/Recoder.go b/websocket/Recoder.go index 31abe15..472f852 100644 --- a/websocket/Recoder.go +++ b/websocket/Recoder.go @@ -123,7 +123,7 @@ func Play(filePath string) (s *Server, close func()) { for sg.Islive() { if data == nil { - if data, e = f.ReadUntil('\n', 70, humanize.MByte); e != nil && !errors.Is(e, io.EOF) { + if data, e = f.ReadUntil([]byte{'\n'}, 70, humanize.MByte); e != nil && !errors.Is(e, io.EOF) { panic(e) } if len(data) == 0 { -- 2.39.2