]> 127.0.0.1 Git - part/.git/commitdiff
1 (#22) v0.28.20250204153805
authorqydysky <qydysky@foxmail.com>
Tue, 4 Feb 2025 15:37:55 +0000 (23:37 +0800)
committerGitHub <noreply@github.com>
Tue, 4 Feb 2025 15:37:55 +0000 (23:37 +0800)
io/io.go
io/io_test.go
slice/FIFO.go [deleted file]
slice/FIFO_test.go [deleted file]
web/Web.go

index 92bdfa53d867f71c39bab42c8a8c4e2c9da342f7..8c300a5ece425b7e71a6840a91cf0613e1ccd75d 100644 (file)
--- a/io/io.go
+++ b/io/io.go
@@ -8,8 +8,6 @@ import (
        "sync"
        "sync/atomic"
        "time"
-
-       ps "github.com/qydysky/part/slice"
 )
 
 // no close rc any time
@@ -580,38 +578,67 @@ func ReadAll(r io.Reader, b []byte) ([]byte, error) {
        }
 }
 
+var (
+       ErrCacheWriterCapOverflow = errors.New(`ErrCacheWriterCapOverflow`)
+)
+
 type CacheWriter struct {
        ctx             context.Context
        cancelCauseFunc context.CancelCauseFunc
        w               io.Writer
-       pushBuf         *ps.FIFO[byte]
+       pushBuf         []byte
+       ic              chan int
+       maxCap          int
+       l               sync.RWMutex
 }
 
-type CacheWriterSize ps.FIFOSize
-
-func NewCacheWriter[T CacheWriterSize](ws io.Writer, bufsize T) *CacheWriter {
-       t := CacheWriter{w: ws, pushBuf: ps.NewFIFO[byte](bufsize)}
+func NewCacheWriter(ws io.Writer, maxWait uint, maxCap int) *CacheWriter {
+       t := CacheWriter{w: ws, ic: make(chan int, maxWait), maxCap: maxCap}
        t.ctx, t.cancelCauseFunc = context.WithCancelCause(context.Background())
        return &t
 }
 
-func (t *CacheWriter) Write(b []byte) (int, error) {
+func (t *CacheWriter) Write(b []byte) (n int, e error) {
        select {
        case <-t.ctx.Done():
                return 0, t.ctx.Err()
        default:
        }
-       if e := t.pushBuf.In(b); e != nil {
-               return 0, e
+
+       {
+               t.l.Lock()
+               if len(t.pushBuf)+len(b) > t.maxCap {
+                       t.l.Unlock()
+                       return 0, ErrCacheWriterCapOverflow
+               }
+               select {
+               case t.ic <- len(b):
+                       t.pushBuf = append(t.pushBuf, b...)
+                       t.l.Unlock()
+               default:
+                       i := <-t.ic
+                       if _, err := t.w.Write(t.pushBuf[:i]); err != nil {
+                               t.cancelCauseFunc(err)
+                       }
+                       t.ic <- len(b)
+                       t.pushBuf = append(t.pushBuf[i:], b...)
+                       t.l.Unlock()
+                       return len(b), t.ctx.Err()
+               }
        }
+
        go func() {
-               if _, err := t.pushBuf.Out(t.w); err != nil && !errors.Is(err, ps.ErrFIFOEmpty) {
+               t.l.Lock()
+               i := <-t.ic
+               if _, err := t.w.Write(t.pushBuf[:i]); err != nil {
                        t.cancelCauseFunc(err)
                }
+               t.pushBuf = t.pushBuf[:copy(t.pushBuf, t.pushBuf[i:])]
+               t.l.Unlock()
        }()
        return len(b), t.ctx.Err()
 }
 
-func (t *CacheWriter) Size() int {
-       return t.pushBuf.Size()
+func (t *CacheWriter) Cap() int {
+       return cap(t.pushBuf)
 }
index fe83fc4de107a9b90489b5f1dfc261b403befa28..53cb91cc0b5341c8969a039013cd41fe41f8fd97 100644 (file)
@@ -5,8 +5,6 @@ import (
        "io"
        "testing"
        "time"
-
-       ps "github.com/qydysky/part/slice"
 )
 
 func Test_CopyIO(t *testing.T) {
@@ -110,12 +108,23 @@ func Test_CacheWrite(t *testing.T) {
                        t.Fatal()
                }
        }()
-       writer := NewCacheWriter(w, 4)
+       writer := NewCacheWriter(w, 100, 4)
        if n, err := writer.Write([]byte("123")); n != 3 || err != nil {
                t.Fatal()
        }
-       if _, err := writer.Write([]byte("123")); err != ps.ErrFIFOOverflow {
-               t.Fatal(err)
+       if _, err := writer.Write([]byte("123")); err == nil {
+               t.Fatal()
        }
        time.Sleep(time.Second)
 }
+
+func BenchmarkCache(b *testing.B) {
+       writer := NewCacheWriter(io.Discard, 100, 4000)
+       tmp := []byte("1")
+       b.ResetTimer()
+       for i := 0; i < b.N; i++ {
+               if _, err := writer.Write(tmp); err != nil {
+                       b.Fatal(err)
+               }
+       }
+}
diff --git a/slice/FIFO.go b/slice/FIFO.go
deleted file mode 100644 (file)
index 7da12df..0000000
+++ /dev/null
@@ -1,150 +0,0 @@
-package part
-
-import (
-       "sync"
-
-       "errors"
-)
-
-var (
-       ErrFIFOOverflow = errors.New(`ErrFIFOOverflow`)
-       ErrFIFOEmpty    = errors.New(`ErrFIFOEmpty`)
-)
-
-type FIFOSize interface {
-       int | int32 | int64 | uint | uint32 | uint64
-}
-
-type item struct {
-       op, ed int
-}
-
-type FIFO[S any] struct {
-       ed, op, opc int
-       c           chan item
-       buf         []S
-       l           sync.RWMutex
-}
-
-func NewFIFO[S any, T FIFOSize](size T) *FIFO[S] {
-       return &FIFO[S]{
-               c:   make(chan item, size),
-               buf: make([]S, size),
-       }
-}
-
-func (t *FIFO[S]) lock() func() {
-       t.l.Lock()
-       return t.l.Unlock
-}
-
-func (t *FIFO[S]) rlock() func() {
-       t.l.RLock()
-       return t.l.RUnlock
-}
-
-func (t *FIFO[S]) inok(size int) bool {
-       if t.ed+size > len(t.buf) {
-               if size > t.op {
-                       return false
-               }
-               t.ed = 0
-       } else if t.op > t.ed && t.ed+size > t.op {
-               return false
-       }
-       return true
-}
-
-func (t *FIFO[S]) In(p []S) error {
-       defer t.lock()()
-
-       t.op = t.opc
-       if !t.inok(len(p)) {
-               return ErrFIFOOverflow
-       }
-       select {
-       case t.c <- item{
-               op: t.ed,
-               ed: t.ed + len(p),
-       }:
-               t.ed = t.ed + copy(t.buf[t.ed:], p)
-       default:
-               return ErrFIFOOverflow
-       }
-       return nil
-}
-
-func (t *FIFO[S]) Out(w interface {
-       Write(p []S) (n int, err error)
-}) (n int, err error) {
-       defer t.rlock()()
-
-       select {
-       case item := <-t.c:
-               n, err = w.Write(t.buf[item.op:item.ed])
-               t.opc = item.ed
-       default:
-               err = ErrFIFOEmpty
-       }
-
-       return
-}
-
-func (t *FIFO[S]) OutDirect() (p []S, err error, used func()) {
-       used = t.rlock()
-
-       select {
-       case item := <-t.c:
-               p = t.buf[item.op:item.ed]
-               t.opc = item.ed
-       default:
-               err = ErrFIFOEmpty
-       }
-
-       return
-}
-
-func (t *FIFO[S]) Size() int {
-       defer t.rlock()()
-
-       if t.opc > t.ed {
-               return len(t.buf) - t.opc - t.ed
-       } else {
-               return t.ed - t.opc
-       }
-}
-
-func (t *FIFO[S]) Num() int {
-       return len(t.c)
-}
-
-func (t *FIFO[S]) Clear() {
-       defer t.lock()()
-
-       t.op = 0
-       t.opc = 0
-       t.ed = 0
-       for {
-               select {
-               case <-t.c:
-               default:
-                       return
-               }
-       }
-}
-
-func (t *FIFO[S]) Reset() {
-       defer t.lock()()
-
-       clear(t.buf)
-       t.op = 0
-       t.opc = 0
-       t.ed = 0
-       for {
-               select {
-               case <-t.c:
-               default:
-                       return
-               }
-       }
-}
diff --git a/slice/FIFO_test.go b/slice/FIFO_test.go
deleted file mode 100644 (file)
index 7c36d25..0000000
+++ /dev/null
@@ -1,104 +0,0 @@
-package part
-
-import (
-       "bytes"
-       "testing"
-)
-
-func TestFIFO(t *testing.T) {
-       fifo := NewFIFO[byte](5)
-
-       if fifo.In([]byte("012345")) != ErrFIFOOverflow {
-               t.Fatal()
-       }
-       fifo.Clear()
-
-       if fifo.In([]byte("012")) != nil {
-               t.Fatal()
-       }
-       if fifo.In([]byte("345")) != ErrFIFOOverflow {
-               t.Fatal()
-       }
-       fifo.Clear()
-
-       if fifo.In([]byte("012")) != nil {
-               t.Fatal()
-       }
-       if fifo.In([]byte("34")) != nil {
-               t.Fatal()
-       }
-       fifo.Clear()
-
-       if fifo.In([]byte("012")) != nil {
-               t.Fatal()
-       }
-       if tmp, e, used := fifo.OutDirect(); e != nil || !bytes.Equal(tmp, []byte("012")) {
-               t.Fatal()
-       } else {
-               used()
-       }
-       fifo.Clear()
-
-       if fifo.In([]byte("01")) != nil {
-               t.Fatal()
-       }
-       if fifo.Size() != 2 {
-               t.Fatal()
-       }
-       if e := fifo.In([]byte("234")); e != nil {
-               t.Fatal(e)
-       }
-       if fifo.Size() != 5 {
-               t.Fatal()
-       }
-       if tmp, e, used := fifo.OutDirect(); e != nil || !bytes.Equal(tmp, []byte("01")) {
-               t.Fatal()
-       } else {
-               used()
-       }
-       if fifo.In([]byte("56")) != nil {
-               t.Fatal()
-       }
-       if tmp, e, used := fifo.OutDirect(); e != nil || !bytes.Equal(tmp, []byte("234")) {
-               t.Fatal()
-       } else {
-               used()
-       }
-       if tmp, e, used := fifo.OutDirect(); e != nil || !bytes.Equal(tmp, []byte("56")) {
-               t.Fatal()
-       } else {
-               used()
-       }
-       fifo.Clear()
-
-       // if fifo.In([]byte("012")) != nil {
-       //      t.Fatal()
-       // }
-       // go func() {
-       //      time.Sleep(time.Millisecond * 500)
-       //      fifo.Out()
-       // }()
-       // if e := fifo.In([]byte("345")); e != nil {
-       //      t.Fatal(e)
-       // }
-       // time.Sleep(time.Second * 10)
-       // fifo.Clear()
-}
-
-func BenchmarkFIFO(b *testing.B) {
-       fifo := NewFIFO[byte](5)
-       buf := []byte("12")
-       b.ResetTimer()
-       for i := 0; i < b.N; i++ {
-               if e := fifo.In(buf); e != nil {
-                       b.FailNow()
-               }
-               if fifo.Num() > 1 {
-                       if tmp, e, used := fifo.OutDirect(); e != nil || !bytes.Equal(tmp, buf) {
-                               b.FailNow()
-                       } else {
-                               used()
-                       }
-               }
-       }
-}
index 61fd0dcaf8f08ea5f6f2f83c43ada1af1168593f..82f581f69eef42b444477d37f506ce9afbbcba60 100644 (file)
@@ -571,28 +571,34 @@ func (t withflush) WriteHeader(i int) {
        }
 }
 
-type withCache struct {
+type WithCacheWiter struct {
        cw  *pio.CacheWriter
        raw http.ResponseWriter
 }
 
-func (t withCache) Header() http.Header {
+func (t *WithCacheWiter) Header() http.Header {
        if t.raw != nil {
                return t.raw.Header()
        }
        return make(http.Header)
 }
-func (t withCache) Write(b []byte) (i int, e error) {
+func (t *WithCacheWiter) Write(b []byte) (i int, e error) {
        if t.cw != nil {
                return t.cw.Write(b)
        }
        return t.raw.Write(b)
 }
-func (t withCache) WriteHeader(i int) {
+func (t *WithCacheWiter) WriteHeader(i int) {
        if t.raw != nil {
                t.raw.WriteHeader(i)
        }
 }
+func (t *WithCacheWiter) Cap() (i int) {
+       if t.cw != nil {
+               return t.cw.Cap()
+       }
+       return 0
+}
 
 type Exprier struct {
        max int
@@ -711,10 +717,8 @@ func WithFlush(w http.ResponseWriter) http.ResponseWriter {
        return withflush{w}
 }
 
-func WithCache(w http.ResponseWriter, cw *pio.CacheWriter) http.ResponseWriter {
-       t := withCache{raw: w}
-       t.cw = cw
-       return t
+func WithCache(w http.ResponseWriter, maxWait uint, maxCap int) *WithCacheWiter {
+       return &WithCacheWiter{raw: w, cw: pio.NewCacheWriter(w, maxWait, maxCap)}
 }
 
 func WithStatusCode(w http.ResponseWriter, code int) {