]> 127.0.0.1 Git - part/.git/commitdiff
1 (#21) v0.28.20250204114354
authorqydysky <qydysky@foxmail.com>
Tue, 4 Feb 2025 11:43:45 +0000 (19:43 +0800)
committerGitHub <noreply@github.com>
Tue, 4 Feb 2025 11:43:45 +0000 (19:43 +0800)
* 1

* 1

io/io.go
io/io_test.go
slice/FIFO.go
slice/FIFO_test.go
web/Web.go

index 153d6a192cac9efb507435a6b02afb4853f51ea3..92bdfa53d867f71c39bab42c8a8c4e2c9da342f7 100644 (file)
--- a/io/io.go
+++ b/io/io.go
@@ -8,6 +8,8 @@ import (
        "sync"
        "sync/atomic"
        "time"
+
+       ps "github.com/qydysky/part/slice"
 )
 
 // no close rc any time
@@ -582,16 +584,14 @@ type CacheWriter struct {
        ctx             context.Context
        cancelCauseFunc context.CancelCauseFunc
        w               io.Writer
-       pushLock        atomic.Bool
-       pushBuf         []byte
+       pushBuf         *ps.FIFO[byte]
 }
 
-var ErrBusy = errors.New(`ErrBusy`)
+type CacheWriterSize ps.FIFOSize
 
-func NewCacheWriter(ws io.Writer, ctx ...context.Context) *CacheWriter {
-       t := CacheWriter{w: ws}
-       ctx = append(ctx, context.Background())
-       t.ctx, t.cancelCauseFunc = context.WithCancelCause(ctx[0])
+func NewCacheWriter[T CacheWriterSize](ws io.Writer, bufsize T) *CacheWriter {
+       t := CacheWriter{w: ws, pushBuf: ps.NewFIFO[byte](bufsize)}
+       t.ctx, t.cancelCauseFunc = context.WithCancelCause(context.Background())
        return &t
 }
 
@@ -601,17 +601,17 @@ func (t *CacheWriter) Write(b []byte) (int, error) {
                return 0, t.ctx.Err()
        default:
        }
-       if !t.pushLock.CompareAndSwap(false, true) {
-               return 0, ErrBusy
+       if e := t.pushBuf.In(b); e != nil {
+               return 0, e
        }
-       t.pushBuf = append(t.pushBuf[:0], b...)
        go func() {
-               defer t.pushLock.Store(false)
-               if n, err := t.w.Write(t.pushBuf); err != nil || n == 0 {
-                       if !errors.Is(err, ErrBusy) {
-                               t.cancelCauseFunc(err)
-                       }
+               if _, err := t.pushBuf.Out(t.w); err != nil && !errors.Is(err, ps.ErrFIFOEmpty) {
+                       t.cancelCauseFunc(err)
                }
        }()
-       return len(t.pushBuf), t.ctx.Err()
+       return len(b), t.ctx.Err()
+}
+
+func (t *CacheWriter) Size() int {
+       return t.pushBuf.Size()
 }
index 8dba4e39a0c9af56b4d47e6e6e4399f0c9b32a56..fe83fc4de107a9b90489b5f1dfc261b403befa28 100644 (file)
@@ -2,10 +2,11 @@ package part
 
 import (
        "bytes"
-       "errors"
        "io"
        "testing"
        "time"
+
+       ps "github.com/qydysky/part/slice"
 )
 
 func Test_CopyIO(t *testing.T) {
@@ -109,12 +110,12 @@ func Test_CacheWrite(t *testing.T) {
                        t.Fatal()
                }
        }()
-       writer := NewCacheWriter(w)
+       writer := NewCacheWriter(w, 4)
        if n, err := writer.Write([]byte("123")); n != 3 || err != nil {
                t.Fatal()
        }
-       if _, err := writer.Write([]byte("123")); !errors.Is(err, ErrBusy) {
-               t.Fatal()
+       if _, err := writer.Write([]byte("123")); err != ps.ErrFIFOOverflow {
+               t.Fatal(err)
        }
        time.Sleep(time.Second)
 }
index 01f231151072ddfb48ff19e9cba40c76bc34308e..7da12dfe6aba549a34af489a3a5e521e578a01e4 100644 (file)
@@ -11,35 +11,39 @@ var (
        ErrFIFOEmpty    = errors.New(`ErrFIFOEmpty`)
 )
 
+type FIFOSize interface {
+       int | int32 | int64 | uint | uint32 | uint64
+}
+
 type item struct {
        op, ed int
 }
 
-type FIFO[S any, T []S] struct {
+type FIFO[S any] struct {
        ed, op, opc int
        c           chan item
        buf         []S
        l           sync.RWMutex
 }
 
-func NewFIFO[S any, T []S](size int) *FIFO[S, T] {
-       return &FIFO[S, T]{
+func NewFIFO[S any, T FIFOSize](size T) *FIFO[S] {
+       return &FIFO[S]{
                c:   make(chan item, size),
                buf: make([]S, size),
        }
 }
 
-func (t *FIFO[S, T]) lock() func() {
+func (t *FIFO[S]) lock() func() {
        t.l.Lock()
        return t.l.Unlock
 }
 
-func (t *FIFO[S, T]) rlock() func() {
+func (t *FIFO[S]) rlock() func() {
        t.l.RLock()
        return t.l.RUnlock
 }
 
-func (t *FIFO[S, T]) inok(size int) bool {
+func (t *FIFO[S]) inok(size int) bool {
        if t.ed+size > len(t.buf) {
                if size > t.op {
                        return false
@@ -51,7 +55,7 @@ func (t *FIFO[S, T]) inok(size int) bool {
        return true
 }
 
-func (t *FIFO[S, T]) In(p T) error {
+func (t *FIFO[S]) In(p []S) error {
        defer t.lock()()
 
        t.op = t.opc
@@ -70,21 +74,37 @@ func (t *FIFO[S, T]) In(p T) error {
        return nil
 }
 
-func (t *FIFO[S, T]) Out() (p T, e error) {
+func (t *FIFO[S]) Out(w interface {
+       Write(p []S) (n int, err error)
+}) (n int, err error) {
        defer t.rlock()()
 
+       select {
+       case item := <-t.c:
+               n, err = w.Write(t.buf[item.op:item.ed])
+               t.opc = item.ed
+       default:
+               err = ErrFIFOEmpty
+       }
+
+       return
+}
+
+func (t *FIFO[S]) OutDirect() (p []S, err error, used func()) {
+       used = t.rlock()
+
        select {
        case item := <-t.c:
                p = t.buf[item.op:item.ed]
                t.opc = item.ed
        default:
-               e = ErrFIFOEmpty
+               err = ErrFIFOEmpty
        }
 
        return
 }
 
-func (t *FIFO[S, T]) Size() int {
+func (t *FIFO[S]) Size() int {
        defer t.rlock()()
 
        if t.opc > t.ed {
@@ -94,11 +114,11 @@ func (t *FIFO[S, T]) Size() int {
        }
 }
 
-func (t *FIFO[S, T]) Num() int {
+func (t *FIFO[S]) Num() int {
        return len(t.c)
 }
 
-func (t *FIFO[S, T]) Clear() {
+func (t *FIFO[S]) Clear() {
        defer t.lock()()
 
        t.op = 0
@@ -113,7 +133,7 @@ func (t *FIFO[S, T]) Clear() {
        }
 }
 
-func (t *FIFO[S, T]) Reset() {
+func (t *FIFO[S]) Reset() {
        defer t.lock()()
 
        clear(t.buf)
index 4c8676fb65e70b82955b1b96e88fc3c7c2b159d7..7c36d25122c29073d9152d739a28902871957e62 100644 (file)
@@ -32,8 +32,10 @@ func TestFIFO(t *testing.T) {
        if fifo.In([]byte("012")) != nil {
                t.Fatal()
        }
-       if tmp, e := fifo.Out(); e != nil || !bytes.Equal(tmp, []byte("012")) {
+       if tmp, e, used := fifo.OutDirect(); e != nil || !bytes.Equal(tmp, []byte("012")) {
                t.Fatal()
+       } else {
+               used()
        }
        fifo.Clear()
 
@@ -49,17 +51,23 @@ func TestFIFO(t *testing.T) {
        if fifo.Size() != 5 {
                t.Fatal()
        }
-       if tmp, e := fifo.Out(); e != nil || !bytes.Equal(tmp, []byte("01")) {
+       if tmp, e, used := fifo.OutDirect(); e != nil || !bytes.Equal(tmp, []byte("01")) {
                t.Fatal()
+       } else {
+               used()
        }
        if fifo.In([]byte("56")) != nil {
                t.Fatal()
        }
-       if tmp, e := fifo.Out(); e != nil || !bytes.Equal(tmp, []byte("234")) {
+       if tmp, e, used := fifo.OutDirect(); e != nil || !bytes.Equal(tmp, []byte("234")) {
                t.Fatal()
+       } else {
+               used()
        }
-       if tmp, e := fifo.Out(); e != nil || !bytes.Equal(tmp, []byte("56")) {
+       if tmp, e, used := fifo.OutDirect(); e != nil || !bytes.Equal(tmp, []byte("56")) {
                t.Fatal()
+       } else {
+               used()
        }
        fifo.Clear()
 
@@ -86,8 +94,10 @@ func BenchmarkFIFO(b *testing.B) {
                        b.FailNow()
                }
                if fifo.Num() > 1 {
-                       if tmp, e := fifo.Out(); e != nil || !bytes.Equal(tmp, buf) {
+                       if tmp, e, used := fifo.OutDirect(); e != nil || !bytes.Equal(tmp, buf) {
                                b.FailNow()
+                       } else {
+                               used()
                        }
                }
        }
index 8493a0bb4c7a098c6c05135d4076c36dd179ded2..61fd0dcaf8f08ea5f6f2f83c43ada1af1168593f 100644 (file)
@@ -711,17 +711,12 @@ func WithFlush(w http.ResponseWriter) http.ResponseWriter {
        return withflush{w}
 }
 
-// IsCacheBusy
-func WithCache(w http.ResponseWriter) http.ResponseWriter {
+func WithCache(w http.ResponseWriter, cw *pio.CacheWriter) http.ResponseWriter {
        t := withCache{raw: w}
-       t.cw = pio.NewCacheWriter(w)
+       t.cw = cw
        return t
 }
 
-func IsCacheBusy(e error) bool {
-       return errors.Is(e, pio.ErrBusy)
-}
-
 func WithStatusCode(w http.ResponseWriter, code int) {
        w.WriteHeader(code)
        _, _ = w.Write([]byte(http.StatusText(code)))