From: qydysky Date: Sat, 25 Jan 2025 17:29:25 +0000 (+0800) Subject: 1 (#15) X-Git-Tag: v0.28.20250125173504 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=138b3d01ce3d70fa21e032bc77371b17936a8415;p=part%2F.git 1 (#15) --- diff --git a/io/io.go b/io/io.go index 8b80863..153d6a1 100644 --- a/io/io.go +++ b/io/io.go @@ -577,3 +577,41 @@ func ReadAll(r io.Reader, b []byte) ([]byte, error) { } } } + +type CacheWriter struct { + ctx context.Context + cancelCauseFunc context.CancelCauseFunc + w io.Writer + pushLock atomic.Bool + pushBuf []byte +} + +var ErrBusy = errors.New(`ErrBusy`) + +func NewCacheWriter(ws io.Writer, ctx ...context.Context) *CacheWriter { + t := CacheWriter{w: ws} + ctx = append(ctx, context.Background()) + t.ctx, t.cancelCauseFunc = context.WithCancelCause(ctx[0]) + return &t +} + +func (t *CacheWriter) Write(b []byte) (int, error) { + select { + case <-t.ctx.Done(): + return 0, t.ctx.Err() + default: + } + if !t.pushLock.CompareAndSwap(false, true) { + return 0, ErrBusy + } + t.pushBuf = append(t.pushBuf[:0], b...) + go func() { + defer t.pushLock.Store(false) + if n, err := t.w.Write(t.pushBuf); err != nil || n == 0 { + if !errors.Is(err, ErrBusy) { + t.cancelCauseFunc(err) + } + } + }() + return len(t.pushBuf), t.ctx.Err() +} diff --git a/io/io_test.go b/io/io_test.go index e3c02e1..8dba4e3 100644 --- a/io/io_test.go +++ b/io/io_test.go @@ -2,8 +2,10 @@ package part import ( "bytes" + "errors" "io" "testing" + "time" ) func Test_CopyIO(t *testing.T) { @@ -96,3 +98,23 @@ func Benchmark_readall1(b *testing.B) { r.Reset(data) } } + +func Test_CacheWrite(t *testing.T) { + r, w := io.Pipe() + rc, _ := RW2Chan(r, nil) + go func() { + time.Sleep(time.Millisecond * 500) + b := <-rc + if !bytes.Equal(b, []byte("123")) { + t.Fatal() + } + }() + writer := NewCacheWriter(w) + if n, err := writer.Write([]byte("123")); n != 3 || err != nil { + t.Fatal() + } + if _, err := writer.Write([]byte("123")); !errors.Is(err, ErrBusy) { + t.Fatal() + } + time.Sleep(time.Second) +} diff --git a/web/Web.go b/web/Web.go index f261ca8..8493a0b 100644 --- a/web/Web.go +++ b/web/Web.go @@ -14,6 +14,7 @@ import ( "github.com/dustin/go-humanize" "github.com/google/uuid" + pio "github.com/qydysky/part/io" psync "github.com/qydysky/part/sync" sys "github.com/qydysky/part/sys" ) @@ -570,6 +571,29 @@ func (t withflush) WriteHeader(i int) { } } +type withCache struct { + cw *pio.CacheWriter + raw http.ResponseWriter +} + +func (t withCache) Header() http.Header { + if t.raw != nil { + return t.raw.Header() + } + return make(http.Header) +} +func (t withCache) Write(b []byte) (i int, e error) { + if t.cw != nil { + return t.cw.Write(b) + } + return t.raw.Write(b) +} +func (t withCache) WriteHeader(i int) { + if t.raw != nil { + t.raw.WriteHeader(i) + } +} + type Exprier struct { max int m psync.Map @@ -687,6 +711,17 @@ func WithFlush(w http.ResponseWriter) http.ResponseWriter { return withflush{w} } +// IsCacheBusy +func WithCache(w http.ResponseWriter) http.ResponseWriter { + t := withCache{raw: w} + t.cw = pio.NewCacheWriter(w) + return t +} + +func IsCacheBusy(e error) bool { + return errors.Is(e, pio.ErrBusy) +} + func WithStatusCode(w http.ResponseWriter, code int) { w.WriteHeader(code) _, _ = w.Write([]byte(http.StatusText(code)))