}
}
}
+
+type CacheWriter struct {
+ ctx context.Context
+ cancelCauseFunc context.CancelCauseFunc
+ w io.Writer
+ pushLock atomic.Bool
+ pushBuf []byte
+}
+
+var ErrBusy = errors.New(`ErrBusy`)
+
+func NewCacheWriter(ws io.Writer, ctx ...context.Context) *CacheWriter {
+ t := CacheWriter{w: ws}
+ ctx = append(ctx, context.Background())
+ t.ctx, t.cancelCauseFunc = context.WithCancelCause(ctx[0])
+ return &t
+}
+
+func (t *CacheWriter) Write(b []byte) (int, error) {
+ select {
+ case <-t.ctx.Done():
+ return 0, t.ctx.Err()
+ default:
+ }
+ if !t.pushLock.CompareAndSwap(false, true) {
+ return 0, ErrBusy
+ }
+ t.pushBuf = append(t.pushBuf[:0], b...)
+ go func() {
+ defer t.pushLock.Store(false)
+ if n, err := t.w.Write(t.pushBuf); err != nil || n == 0 {
+ if !errors.Is(err, ErrBusy) {
+ t.cancelCauseFunc(err)
+ }
+ }
+ }()
+ return len(t.pushBuf), t.ctx.Err()
+}
import (
"bytes"
+ "errors"
"io"
"testing"
+ "time"
)
func Test_CopyIO(t *testing.T) {
r.Reset(data)
}
}
+
+func Test_CacheWrite(t *testing.T) {
+ r, w := io.Pipe()
+ rc, _ := RW2Chan(r, nil)
+ go func() {
+ time.Sleep(time.Millisecond * 500)
+ b := <-rc
+ if !bytes.Equal(b, []byte("123")) {
+ t.Fatal()
+ }
+ }()
+ writer := NewCacheWriter(w)
+ if n, err := writer.Write([]byte("123")); n != 3 || err != nil {
+ t.Fatal()
+ }
+ if _, err := writer.Write([]byte("123")); !errors.Is(err, ErrBusy) {
+ t.Fatal()
+ }
+ time.Sleep(time.Second)
+}
"github.com/dustin/go-humanize"
"github.com/google/uuid"
+ pio "github.com/qydysky/part/io"
psync "github.com/qydysky/part/sync"
sys "github.com/qydysky/part/sys"
)
}
}
+type withCache struct {
+ cw *pio.CacheWriter
+ raw http.ResponseWriter
+}
+
+func (t withCache) Header() http.Header {
+ if t.raw != nil {
+ return t.raw.Header()
+ }
+ return make(http.Header)
+}
+func (t withCache) Write(b []byte) (i int, e error) {
+ if t.cw != nil {
+ return t.cw.Write(b)
+ }
+ return t.raw.Write(b)
+}
+func (t withCache) WriteHeader(i int) {
+ if t.raw != nil {
+ t.raw.WriteHeader(i)
+ }
+}
+
type Exprier struct {
max int
m psync.Map
return withflush{w}
}
+// IsCacheBusy
+func WithCache(w http.ResponseWriter) http.ResponseWriter {
+ t := withCache{raw: w}
+ t.cw = pio.NewCacheWriter(w)
+ return t
+}
+
+func IsCacheBusy(e error) bool {
+ return errors.Is(e, pio.ErrBusy)
+}
+
func WithStatusCode(w http.ResponseWriter, code int) {
w.WriteHeader(code)
_, _ = w.Write([]byte(http.StatusText(code)))