]> 127.0.0.1 Git - part/.git/commitdiff
1 (#23) v0.28.20250204174904
authorqydysky <qydysky@foxmail.com>
Tue, 4 Feb 2025 17:48:52 +0000 (01:48 +0800)
committerGitHub <noreply@github.com>
Tue, 4 Feb 2025 17:48:52 +0000 (01:48 +0800)
io/io.go
io/io_test.go
web/Web.go

index 8c300a5ece425b7e71a6840a91cf0613e1ccd75d..6bbb9da204e5357ab58acdfa5877f5c22ccc16ca 100644 (file)
--- a/io/io.go
+++ b/io/io.go
@@ -579,21 +579,27 @@ func ReadAll(r io.Reader, b []byte) ([]byte, error) {
 }
 
 var (
-       ErrCacheWriterCapOverflow = errors.New(`ErrCacheWriterCapOverflow`)
+       ErrCacheWriterBusy = errors.New(`ErrCacheWriterBusy`)
 )
 
 type CacheWriter struct {
        ctx             context.Context
        cancelCauseFunc context.CancelCauseFunc
        w               io.Writer
-       pushBuf         []byte
-       ic              chan int
-       maxCap          int
-       l               sync.RWMutex
+       max             uint32
+       is              []cacheWriterItem
+       rc              sync.Mutex
+       cc              chan uint32
+       c               atomic.Uint32
 }
 
-func NewCacheWriter(ws io.Writer, maxWait uint, maxCap int) *CacheWriter {
-       t := CacheWriter{w: ws, ic: make(chan int, maxWait), maxCap: maxCap}
+type cacheWriterItem struct {
+       buf []byte
+       l   atomic.Bool
+}
+
+func NewCacheWriter(ws io.Writer, max uint32) *CacheWriter {
+       t := CacheWriter{w: ws, cc: make(chan uint32, max), max: max, is: make([]cacheWriterItem, max)}
        t.ctx, t.cancelCauseFunc = context.WithCancelCause(context.Background())
        return &t
 }
@@ -605,40 +611,23 @@ func (t *CacheWriter) Write(b []byte) (n int, e error) {
        default:
        }
 
-       {
-               t.l.Lock()
-               if len(t.pushBuf)+len(b) > t.maxCap {
-                       t.l.Unlock()
-                       return 0, ErrCacheWriterCapOverflow
-               }
-               select {
-               case t.ic <- len(b):
-                       t.pushBuf = append(t.pushBuf, b...)
-                       t.l.Unlock()
-               default:
-                       i := <-t.ic
-                       if _, err := t.w.Write(t.pushBuf[:i]); err != nil {
-                               t.cancelCauseFunc(err)
-                       }
-                       t.ic <- len(b)
-                       t.pushBuf = append(t.pushBuf[i:], b...)
-                       t.l.Unlock()
-                       return len(b), t.ctx.Err()
-               }
+       i := t.c.Add(1) % t.max
+       if !t.is[i].l.CompareAndSwap(false, true) {
+               return 0, ErrCacheWriterBusy
        }
 
+       t.is[i].buf = append(t.is[i].buf[:0], b...)
+       t.cc <- i
+
        go func() {
-               t.l.Lock()
-               i := <-t.ic
-               if _, err := t.w.Write(t.pushBuf[:i]); err != nil {
+               t.rc.Lock()
+               defer t.rc.Unlock()
+
+               i := <-t.cc
+               defer t.is[i].l.Store(false)
+               if _, err := t.w.Write(t.is[i].buf); err != nil {
                        t.cancelCauseFunc(err)
                }
-               t.pushBuf = t.pushBuf[:copy(t.pushBuf, t.pushBuf[i:])]
-               t.l.Unlock()
        }()
        return len(b), t.ctx.Err()
 }
-
-func (t *CacheWriter) Cap() int {
-       return cap(t.pushBuf)
-}
index 53cb91cc0b5341c8969a039013cd41fe41f8fd97..071f55ab2f01d3d7a190186b9f0968614bb8e5c5 100644 (file)
@@ -108,7 +108,7 @@ func Test_CacheWrite(t *testing.T) {
                        t.Fatal()
                }
        }()
-       writer := NewCacheWriter(w, 100, 4)
+       writer := NewCacheWriter(w, 1)
        if n, err := writer.Write([]byte("123")); n != 3 || err != nil {
                t.Fatal()
        }
@@ -119,7 +119,7 @@ func Test_CacheWrite(t *testing.T) {
 }
 
 func BenchmarkCache(b *testing.B) {
-       writer := NewCacheWriter(io.Discard, 100, 4000)
+       writer := NewCacheWriter(io.Discard, 2000)
        tmp := []byte("1")
        b.ResetTimer()
        for i := 0; i < b.N; i++ {
index 82f581f69eef42b444477d37f506ce9afbbcba60..ab7ba8461d898d0190a887c68ed17665765265f1 100644 (file)
@@ -593,12 +593,6 @@ func (t *WithCacheWiter) WriteHeader(i int) {
                t.raw.WriteHeader(i)
        }
 }
-func (t *WithCacheWiter) Cap() (i int) {
-       if t.cw != nil {
-               return t.cw.Cap()
-       }
-       return 0
-}
 
 type Exprier struct {
        max int
@@ -717,8 +711,8 @@ func WithFlush(w http.ResponseWriter) http.ResponseWriter {
        return withflush{w}
 }
 
-func WithCache(w http.ResponseWriter, maxWait uint, maxCap int) *WithCacheWiter {
-       return &WithCacheWiter{raw: w, cw: pio.NewCacheWriter(w, maxWait, maxCap)}
+func WithCache(w http.ResponseWriter, maxWait uint32) *WithCacheWiter {
+       return &WithCacheWiter{raw: w, cw: pio.NewCacheWriter(w, maxWait)}
 }
 
 func WithStatusCode(w http.ResponseWriter, code int) {