From: qydysky Date: Tue, 4 Feb 2025 11:43:45 +0000 (+0800) Subject: 1 (#21) X-Git-Tag: v0.28.20250204114354 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=fba58a92071d83e5ff50bcb34dfb072bd4bb91c5;p=part%2F.git 1 (#21) * 1 * 1 --- diff --git a/io/io.go b/io/io.go index 153d6a1..92bdfa5 100644 --- a/io/io.go +++ b/io/io.go @@ -8,6 +8,8 @@ import ( "sync" "sync/atomic" "time" + + ps "github.com/qydysky/part/slice" ) // no close rc any time @@ -582,16 +584,14 @@ type CacheWriter struct { ctx context.Context cancelCauseFunc context.CancelCauseFunc w io.Writer - pushLock atomic.Bool - pushBuf []byte + pushBuf *ps.FIFO[byte] } -var ErrBusy = errors.New(`ErrBusy`) +type CacheWriterSize ps.FIFOSize -func NewCacheWriter(ws io.Writer, ctx ...context.Context) *CacheWriter { - t := CacheWriter{w: ws} - ctx = append(ctx, context.Background()) - t.ctx, t.cancelCauseFunc = context.WithCancelCause(ctx[0]) +func NewCacheWriter[T CacheWriterSize](ws io.Writer, bufsize T) *CacheWriter { + t := CacheWriter{w: ws, pushBuf: ps.NewFIFO[byte](bufsize)} + t.ctx, t.cancelCauseFunc = context.WithCancelCause(context.Background()) return &t } @@ -601,17 +601,17 @@ func (t *CacheWriter) Write(b []byte) (int, error) { return 0, t.ctx.Err() default: } - if !t.pushLock.CompareAndSwap(false, true) { - return 0, ErrBusy + if e := t.pushBuf.In(b); e != nil { + return 0, e } - t.pushBuf = append(t.pushBuf[:0], b...) go func() { - defer t.pushLock.Store(false) - if n, err := t.w.Write(t.pushBuf); err != nil || n == 0 { - if !errors.Is(err, ErrBusy) { - t.cancelCauseFunc(err) - } + if _, err := t.pushBuf.Out(t.w); err != nil && !errors.Is(err, ps.ErrFIFOEmpty) { + t.cancelCauseFunc(err) } }() - return len(t.pushBuf), t.ctx.Err() + return len(b), t.ctx.Err() +} + +func (t *CacheWriter) Size() int { + return t.pushBuf.Size() } diff --git a/io/io_test.go b/io/io_test.go index 8dba4e3..fe83fc4 100644 --- a/io/io_test.go +++ b/io/io_test.go @@ -2,10 +2,11 @@ package part import ( "bytes" - "errors" "io" "testing" "time" + + ps "github.com/qydysky/part/slice" ) func Test_CopyIO(t *testing.T) { @@ -109,12 +110,12 @@ func Test_CacheWrite(t *testing.T) { t.Fatal() } }() - writer := NewCacheWriter(w) + writer := NewCacheWriter(w, 4) if n, err := writer.Write([]byte("123")); n != 3 || err != nil { t.Fatal() } - if _, err := writer.Write([]byte("123")); !errors.Is(err, ErrBusy) { - t.Fatal() + if _, err := writer.Write([]byte("123")); err != ps.ErrFIFOOverflow { + t.Fatal(err) } time.Sleep(time.Second) } diff --git a/slice/FIFO.go b/slice/FIFO.go index 01f2311..7da12df 100644 --- a/slice/FIFO.go +++ b/slice/FIFO.go @@ -11,35 +11,39 @@ var ( ErrFIFOEmpty = errors.New(`ErrFIFOEmpty`) ) +type FIFOSize interface { + int | int32 | int64 | uint | uint32 | uint64 +} + type item struct { op, ed int } -type FIFO[S any, T []S] struct { +type FIFO[S any] struct { ed, op, opc int c chan item buf []S l sync.RWMutex } -func NewFIFO[S any, T []S](size int) *FIFO[S, T] { - return &FIFO[S, T]{ +func NewFIFO[S any, T FIFOSize](size T) *FIFO[S] { + return &FIFO[S]{ c: make(chan item, size), buf: make([]S, size), } } -func (t *FIFO[S, T]) lock() func() { +func (t *FIFO[S]) lock() func() { t.l.Lock() return t.l.Unlock } -func (t *FIFO[S, T]) rlock() func() { +func (t *FIFO[S]) rlock() func() { t.l.RLock() return t.l.RUnlock } -func (t *FIFO[S, T]) inok(size int) bool { +func (t *FIFO[S]) inok(size int) bool { if t.ed+size > len(t.buf) { if size > t.op { return false @@ -51,7 +55,7 @@ func (t *FIFO[S, T]) inok(size int) bool { return true } -func (t *FIFO[S, T]) In(p T) error { +func (t *FIFO[S]) In(p []S) error { defer t.lock()() t.op = t.opc @@ -70,21 +74,37 @@ func (t *FIFO[S, T]) In(p T) error { return nil } -func (t *FIFO[S, T]) Out() (p T, e error) { +func (t *FIFO[S]) Out(w interface { + Write(p []S) (n int, err error) +}) (n int, err error) { defer t.rlock()() + select { + case item := <-t.c: + n, err = w.Write(t.buf[item.op:item.ed]) + t.opc = item.ed + default: + err = ErrFIFOEmpty + } + + return +} + +func (t *FIFO[S]) OutDirect() (p []S, err error, used func()) { + used = t.rlock() + select { case item := <-t.c: p = t.buf[item.op:item.ed] t.opc = item.ed default: - e = ErrFIFOEmpty + err = ErrFIFOEmpty } return } -func (t *FIFO[S, T]) Size() int { +func (t *FIFO[S]) Size() int { defer t.rlock()() if t.opc > t.ed { @@ -94,11 +114,11 @@ func (t *FIFO[S, T]) Size() int { } } -func (t *FIFO[S, T]) Num() int { +func (t *FIFO[S]) Num() int { return len(t.c) } -func (t *FIFO[S, T]) Clear() { +func (t *FIFO[S]) Clear() { defer t.lock()() t.op = 0 @@ -113,7 +133,7 @@ func (t *FIFO[S, T]) Clear() { } } -func (t *FIFO[S, T]) Reset() { +func (t *FIFO[S]) Reset() { defer t.lock()() clear(t.buf) diff --git a/slice/FIFO_test.go b/slice/FIFO_test.go index 4c8676f..7c36d25 100644 --- a/slice/FIFO_test.go +++ b/slice/FIFO_test.go @@ -32,8 +32,10 @@ func TestFIFO(t *testing.T) { if fifo.In([]byte("012")) != nil { t.Fatal() } - if tmp, e := fifo.Out(); e != nil || !bytes.Equal(tmp, []byte("012")) { + if tmp, e, used := fifo.OutDirect(); e != nil || !bytes.Equal(tmp, []byte("012")) { t.Fatal() + } else { + used() } fifo.Clear() @@ -49,17 +51,23 @@ func TestFIFO(t *testing.T) { if fifo.Size() != 5 { t.Fatal() } - if tmp, e := fifo.Out(); e != nil || !bytes.Equal(tmp, []byte("01")) { + if tmp, e, used := fifo.OutDirect(); e != nil || !bytes.Equal(tmp, []byte("01")) { t.Fatal() + } else { + used() } if fifo.In([]byte("56")) != nil { t.Fatal() } - if tmp, e := fifo.Out(); e != nil || !bytes.Equal(tmp, []byte("234")) { + if tmp, e, used := fifo.OutDirect(); e != nil || !bytes.Equal(tmp, []byte("234")) { t.Fatal() + } else { + used() } - if tmp, e := fifo.Out(); e != nil || !bytes.Equal(tmp, []byte("56")) { + if tmp, e, used := fifo.OutDirect(); e != nil || !bytes.Equal(tmp, []byte("56")) { t.Fatal() + } else { + used() } fifo.Clear() @@ -86,8 +94,10 @@ func BenchmarkFIFO(b *testing.B) { b.FailNow() } if fifo.Num() > 1 { - if tmp, e := fifo.Out(); e != nil || !bytes.Equal(tmp, buf) { + if tmp, e, used := fifo.OutDirect(); e != nil || !bytes.Equal(tmp, buf) { b.FailNow() + } else { + used() } } } diff --git a/web/Web.go b/web/Web.go index 8493a0b..61fd0dc 100644 --- a/web/Web.go +++ b/web/Web.go @@ -711,17 +711,12 @@ func WithFlush(w http.ResponseWriter) http.ResponseWriter { return withflush{w} } -// IsCacheBusy -func WithCache(w http.ResponseWriter) http.ResponseWriter { +func WithCache(w http.ResponseWriter, cw *pio.CacheWriter) http.ResponseWriter { t := withCache{raw: w} - t.cw = pio.NewCacheWriter(w) + t.cw = cw return t } -func IsCacheBusy(e error) bool { - return errors.Is(e, pio.ErrBusy) -} - func WithStatusCode(w http.ResponseWriter, code int) { w.WriteHeader(code) _, _ = w.Write([]byte(http.StatusText(code)))