From 92b0330a652ed54dc52b8aa084920ce26ab28264 Mon Sep 17 00:00:00 2001 From: qydysky Date: Wed, 5 Feb 2025 01:48:52 +0800 Subject: [PATCH] 1 (#23) --- io/io.go | 61 +++++++++++++++++++++------------------------------ io/io_test.go | 4 ++-- web/Web.go | 10 ++------- 3 files changed, 29 insertions(+), 46 deletions(-) diff --git a/io/io.go b/io/io.go index 8c300a5..6bbb9da 100644 --- a/io/io.go +++ b/io/io.go @@ -579,21 +579,27 @@ func ReadAll(r io.Reader, b []byte) ([]byte, error) { } var ( - ErrCacheWriterCapOverflow = errors.New(`ErrCacheWriterCapOverflow`) + ErrCacheWriterBusy = errors.New(`ErrCacheWriterBusy`) ) type CacheWriter struct { ctx context.Context cancelCauseFunc context.CancelCauseFunc w io.Writer - pushBuf []byte - ic chan int - maxCap int - l sync.RWMutex + max uint32 + is []cacheWriterItem + rc sync.Mutex + cc chan uint32 + c atomic.Uint32 } -func NewCacheWriter(ws io.Writer, maxWait uint, maxCap int) *CacheWriter { - t := CacheWriter{w: ws, ic: make(chan int, maxWait), maxCap: maxCap} +type cacheWriterItem struct { + buf []byte + l atomic.Bool +} + +func NewCacheWriter(ws io.Writer, max uint32) *CacheWriter { + t := CacheWriter{w: ws, cc: make(chan uint32, max), max: max, is: make([]cacheWriterItem, max)} t.ctx, t.cancelCauseFunc = context.WithCancelCause(context.Background()) return &t } @@ -605,40 +611,23 @@ func (t *CacheWriter) Write(b []byte) (n int, e error) { default: } - { - t.l.Lock() - if len(t.pushBuf)+len(b) > t.maxCap { - t.l.Unlock() - return 0, ErrCacheWriterCapOverflow - } - select { - case t.ic <- len(b): - t.pushBuf = append(t.pushBuf, b...) - t.l.Unlock() - default: - i := <-t.ic - if _, err := t.w.Write(t.pushBuf[:i]); err != nil { - t.cancelCauseFunc(err) - } - t.ic <- len(b) - t.pushBuf = append(t.pushBuf[i:], b...) - t.l.Unlock() - return len(b), t.ctx.Err() - } + i := t.c.Add(1) % t.max + if !t.is[i].l.CompareAndSwap(false, true) { + return 0, ErrCacheWriterBusy } + t.is[i].buf = append(t.is[i].buf[:0], b...) + t.cc <- i + go func() { - t.l.Lock() - i := <-t.ic - if _, err := t.w.Write(t.pushBuf[:i]); err != nil { + t.rc.Lock() + defer t.rc.Unlock() + + i := <-t.cc + defer t.is[i].l.Store(false) + if _, err := t.w.Write(t.is[i].buf); err != nil { t.cancelCauseFunc(err) } - t.pushBuf = t.pushBuf[:copy(t.pushBuf, t.pushBuf[i:])] - t.l.Unlock() }() return len(b), t.ctx.Err() } - -func (t *CacheWriter) Cap() int { - return cap(t.pushBuf) -} diff --git a/io/io_test.go b/io/io_test.go index 53cb91c..071f55a 100644 --- a/io/io_test.go +++ b/io/io_test.go @@ -108,7 +108,7 @@ func Test_CacheWrite(t *testing.T) { t.Fatal() } }() - writer := NewCacheWriter(w, 100, 4) + writer := NewCacheWriter(w, 1) if n, err := writer.Write([]byte("123")); n != 3 || err != nil { t.Fatal() } @@ -119,7 +119,7 @@ func Test_CacheWrite(t *testing.T) { } func BenchmarkCache(b *testing.B) { - writer := NewCacheWriter(io.Discard, 100, 4000) + writer := NewCacheWriter(io.Discard, 2000) tmp := []byte("1") b.ResetTimer() for i := 0; i < b.N; i++ { diff --git a/web/Web.go b/web/Web.go index 82f581f..ab7ba84 100644 --- a/web/Web.go +++ b/web/Web.go @@ -593,12 +593,6 @@ func (t *WithCacheWiter) WriteHeader(i int) { t.raw.WriteHeader(i) } } -func (t *WithCacheWiter) Cap() (i int) { - if t.cw != nil { - return t.cw.Cap() - } - return 0 -} type Exprier struct { max int @@ -717,8 +711,8 @@ func WithFlush(w http.ResponseWriter) http.ResponseWriter { return withflush{w} } -func WithCache(w http.ResponseWriter, maxWait uint, maxCap int) *WithCacheWiter { - return &WithCacheWiter{raw: w, cw: pio.NewCacheWriter(w, maxWait, maxCap)} +func WithCache(w http.ResponseWriter, maxWait uint32) *WithCacheWiter { + return &WithCacheWiter{raw: w, cw: pio.NewCacheWriter(w, maxWait)} } func WithStatusCode(w http.ResponseWriter, code int) { -- 2.39.2