From 9b1d195afc811bacf9147524049fb93245fdec1f Mon Sep 17 00:00:00 2001 From: qydysky Date: Tue, 4 Feb 2025 23:37:55 +0800 Subject: [PATCH] 1 (#22) --- io/io.go | 53 ++++++++++++---- io/io_test.go | 19 ++++-- slice/FIFO.go | 150 --------------------------------------------- slice/FIFO_test.go | 104 ------------------------------- web/Web.go | 20 +++--- 5 files changed, 66 insertions(+), 280 deletions(-) delete mode 100644 slice/FIFO.go delete mode 100644 slice/FIFO_test.go diff --git a/io/io.go b/io/io.go index 92bdfa5..8c300a5 100644 --- a/io/io.go +++ b/io/io.go @@ -8,8 +8,6 @@ import ( "sync" "sync/atomic" "time" - - ps "github.com/qydysky/part/slice" ) // no close rc any time @@ -580,38 +578,67 @@ func ReadAll(r io.Reader, b []byte) ([]byte, error) { } } +var ( + ErrCacheWriterCapOverflow = errors.New(`ErrCacheWriterCapOverflow`) +) + type CacheWriter struct { ctx context.Context cancelCauseFunc context.CancelCauseFunc w io.Writer - pushBuf *ps.FIFO[byte] + pushBuf []byte + ic chan int + maxCap int + l sync.RWMutex } -type CacheWriterSize ps.FIFOSize - -func NewCacheWriter[T CacheWriterSize](ws io.Writer, bufsize T) *CacheWriter { - t := CacheWriter{w: ws, pushBuf: ps.NewFIFO[byte](bufsize)} +func NewCacheWriter(ws io.Writer, maxWait uint, maxCap int) *CacheWriter { + t := CacheWriter{w: ws, ic: make(chan int, maxWait), maxCap: maxCap} t.ctx, t.cancelCauseFunc = context.WithCancelCause(context.Background()) return &t } -func (t *CacheWriter) Write(b []byte) (int, error) { +func (t *CacheWriter) Write(b []byte) (n int, e error) { select { case <-t.ctx.Done(): return 0, t.ctx.Err() default: } - if e := t.pushBuf.In(b); e != nil { - return 0, e + + { + t.l.Lock() + if len(t.pushBuf)+len(b) > t.maxCap { + t.l.Unlock() + return 0, ErrCacheWriterCapOverflow + } + select { + case t.ic <- len(b): + t.pushBuf = append(t.pushBuf, b...) + t.l.Unlock() + default: + i := <-t.ic + if _, err := t.w.Write(t.pushBuf[:i]); err != nil { + t.cancelCauseFunc(err) + } + t.ic <- len(b) + t.pushBuf = append(t.pushBuf[i:], b...) + t.l.Unlock() + return len(b), t.ctx.Err() + } } + go func() { - if _, err := t.pushBuf.Out(t.w); err != nil && !errors.Is(err, ps.ErrFIFOEmpty) { + t.l.Lock() + i := <-t.ic + if _, err := t.w.Write(t.pushBuf[:i]); err != nil { t.cancelCauseFunc(err) } + t.pushBuf = t.pushBuf[:copy(t.pushBuf, t.pushBuf[i:])] + t.l.Unlock() }() return len(b), t.ctx.Err() } -func (t *CacheWriter) Size() int { - return t.pushBuf.Size() +func (t *CacheWriter) Cap() int { + return cap(t.pushBuf) } diff --git a/io/io_test.go b/io/io_test.go index fe83fc4..53cb91c 100644 --- a/io/io_test.go +++ b/io/io_test.go @@ -5,8 +5,6 @@ import ( "io" "testing" "time" - - ps "github.com/qydysky/part/slice" ) func Test_CopyIO(t *testing.T) { @@ -110,12 +108,23 @@ func Test_CacheWrite(t *testing.T) { t.Fatal() } }() - writer := NewCacheWriter(w, 4) + writer := NewCacheWriter(w, 100, 4) if n, err := writer.Write([]byte("123")); n != 3 || err != nil { t.Fatal() } - if _, err := writer.Write([]byte("123")); err != ps.ErrFIFOOverflow { - t.Fatal(err) + if _, err := writer.Write([]byte("123")); err == nil { + t.Fatal() } time.Sleep(time.Second) } + +func BenchmarkCache(b *testing.B) { + writer := NewCacheWriter(io.Discard, 100, 4000) + tmp := []byte("1") + b.ResetTimer() + for i := 0; i < b.N; i++ { + if _, err := writer.Write(tmp); err != nil { + b.Fatal(err) + } + } +} diff --git a/slice/FIFO.go b/slice/FIFO.go deleted file mode 100644 index 7da12df..0000000 --- a/slice/FIFO.go +++ /dev/null @@ -1,150 +0,0 @@ -package part - -import ( - "sync" - - "errors" -) - -var ( - ErrFIFOOverflow = errors.New(`ErrFIFOOverflow`) - ErrFIFOEmpty = errors.New(`ErrFIFOEmpty`) -) - -type FIFOSize interface { - int | int32 | int64 | uint | uint32 | uint64 -} - -type item struct { - op, ed int -} - -type FIFO[S any] struct { - ed, op, opc int - c chan item - buf []S - l sync.RWMutex -} - -func NewFIFO[S any, T FIFOSize](size T) *FIFO[S] { - return &FIFO[S]{ - c: make(chan item, size), - buf: make([]S, size), - } -} - -func (t *FIFO[S]) lock() func() { - t.l.Lock() - return t.l.Unlock -} - -func (t *FIFO[S]) rlock() func() { - t.l.RLock() - return t.l.RUnlock -} - -func (t *FIFO[S]) inok(size int) bool { - if t.ed+size > len(t.buf) { - if size > t.op { - return false - } - t.ed = 0 - } else if t.op > t.ed && t.ed+size > t.op { - return false - } - return true -} - -func (t *FIFO[S]) In(p []S) error { - defer t.lock()() - - t.op = t.opc - if !t.inok(len(p)) { - return ErrFIFOOverflow - } - select { - case t.c <- item{ - op: t.ed, - ed: t.ed + len(p), - }: - t.ed = t.ed + copy(t.buf[t.ed:], p) - default: - return ErrFIFOOverflow - } - return nil -} - -func (t *FIFO[S]) Out(w interface { - Write(p []S) (n int, err error) -}) (n int, err error) { - defer t.rlock()() - - select { - case item := <-t.c: - n, err = w.Write(t.buf[item.op:item.ed]) - t.opc = item.ed - default: - err = ErrFIFOEmpty - } - - return -} - -func (t *FIFO[S]) OutDirect() (p []S, err error, used func()) { - used = t.rlock() - - select { - case item := <-t.c: - p = t.buf[item.op:item.ed] - t.opc = item.ed - default: - err = ErrFIFOEmpty - } - - return -} - -func (t *FIFO[S]) Size() int { - defer t.rlock()() - - if t.opc > t.ed { - return len(t.buf) - t.opc - t.ed - } else { - return t.ed - t.opc - } -} - -func (t *FIFO[S]) Num() int { - return len(t.c) -} - -func (t *FIFO[S]) Clear() { - defer t.lock()() - - t.op = 0 - t.opc = 0 - t.ed = 0 - for { - select { - case <-t.c: - default: - return - } - } -} - -func (t *FIFO[S]) Reset() { - defer t.lock()() - - clear(t.buf) - t.op = 0 - t.opc = 0 - t.ed = 0 - for { - select { - case <-t.c: - default: - return - } - } -} diff --git a/slice/FIFO_test.go b/slice/FIFO_test.go deleted file mode 100644 index 7c36d25..0000000 --- a/slice/FIFO_test.go +++ /dev/null @@ -1,104 +0,0 @@ -package part - -import ( - "bytes" - "testing" -) - -func TestFIFO(t *testing.T) { - fifo := NewFIFO[byte](5) - - if fifo.In([]byte("012345")) != ErrFIFOOverflow { - t.Fatal() - } - fifo.Clear() - - if fifo.In([]byte("012")) != nil { - t.Fatal() - } - if fifo.In([]byte("345")) != ErrFIFOOverflow { - t.Fatal() - } - fifo.Clear() - - if fifo.In([]byte("012")) != nil { - t.Fatal() - } - if fifo.In([]byte("34")) != nil { - t.Fatal() - } - fifo.Clear() - - if fifo.In([]byte("012")) != nil { - t.Fatal() - } - if tmp, e, used := fifo.OutDirect(); e != nil || !bytes.Equal(tmp, []byte("012")) { - t.Fatal() - } else { - used() - } - fifo.Clear() - - if fifo.In([]byte("01")) != nil { - t.Fatal() - } - if fifo.Size() != 2 { - t.Fatal() - } - if e := fifo.In([]byte("234")); e != nil { - t.Fatal(e) - } - if fifo.Size() != 5 { - t.Fatal() - } - if tmp, e, used := fifo.OutDirect(); e != nil || !bytes.Equal(tmp, []byte("01")) { - t.Fatal() - } else { - used() - } - if fifo.In([]byte("56")) != nil { - t.Fatal() - } - if tmp, e, used := fifo.OutDirect(); e != nil || !bytes.Equal(tmp, []byte("234")) { - t.Fatal() - } else { - used() - } - if tmp, e, used := fifo.OutDirect(); e != nil || !bytes.Equal(tmp, []byte("56")) { - t.Fatal() - } else { - used() - } - fifo.Clear() - - // if fifo.In([]byte("012")) != nil { - // t.Fatal() - // } - // go func() { - // time.Sleep(time.Millisecond * 500) - // fifo.Out() - // }() - // if e := fifo.In([]byte("345")); e != nil { - // t.Fatal(e) - // } - // time.Sleep(time.Second * 10) - // fifo.Clear() -} - -func BenchmarkFIFO(b *testing.B) { - fifo := NewFIFO[byte](5) - buf := []byte("12") - b.ResetTimer() - for i := 0; i < b.N; i++ { - if e := fifo.In(buf); e != nil { - b.FailNow() - } - if fifo.Num() > 1 { - if tmp, e, used := fifo.OutDirect(); e != nil || !bytes.Equal(tmp, buf) { - b.FailNow() - } else { - used() - } - } - } -} diff --git a/web/Web.go b/web/Web.go index 61fd0dc..82f581f 100644 --- a/web/Web.go +++ b/web/Web.go @@ -571,28 +571,34 @@ func (t withflush) WriteHeader(i int) { } } -type withCache struct { +type WithCacheWiter struct { cw *pio.CacheWriter raw http.ResponseWriter } -func (t withCache) Header() http.Header { +func (t *WithCacheWiter) Header() http.Header { if t.raw != nil { return t.raw.Header() } return make(http.Header) } -func (t withCache) Write(b []byte) (i int, e error) { +func (t *WithCacheWiter) Write(b []byte) (i int, e error) { if t.cw != nil { return t.cw.Write(b) } return t.raw.Write(b) } -func (t withCache) WriteHeader(i int) { +func (t *WithCacheWiter) WriteHeader(i int) { if t.raw != nil { t.raw.WriteHeader(i) } } +func (t *WithCacheWiter) Cap() (i int) { + if t.cw != nil { + return t.cw.Cap() + } + return 0 +} type Exprier struct { max int @@ -711,10 +717,8 @@ func WithFlush(w http.ResponseWriter) http.ResponseWriter { return withflush{w} } -func WithCache(w http.ResponseWriter, cw *pio.CacheWriter) http.ResponseWriter { - t := withCache{raw: w} - t.cw = cw - return t +func WithCache(w http.ResponseWriter, maxWait uint, maxCap int) *WithCacheWiter { + return &WithCacheWiter{raw: w, cw: pio.NewCacheWriter(w, maxWait, maxCap)} } func WithStatusCode(w http.ResponseWriter, code int) { -- 2.39.2