}
var (
- ErrCacheWriterCapOverflow = errors.New(`ErrCacheWriterCapOverflow`)
+ ErrCacheWriterBusy = errors.New(`ErrCacheWriterBusy`)
)
type CacheWriter struct {
ctx context.Context
cancelCauseFunc context.CancelCauseFunc
w io.Writer
- pushBuf []byte
- ic chan int
- maxCap int
- l sync.RWMutex
+ max uint32
+ is []cacheWriterItem
+ rc sync.Mutex
+ cc chan uint32
+ c atomic.Uint32
}
-func NewCacheWriter(ws io.Writer, maxWait uint, maxCap int) *CacheWriter {
- t := CacheWriter{w: ws, ic: make(chan int, maxWait), maxCap: maxCap}
+type cacheWriterItem struct {
+ buf []byte
+ l atomic.Bool
+}
+
+func NewCacheWriter(ws io.Writer, max uint32) *CacheWriter {
+ t := CacheWriter{w: ws, cc: make(chan uint32, max), max: max, is: make([]cacheWriterItem, max)}
t.ctx, t.cancelCauseFunc = context.WithCancelCause(context.Background())
return &t
}
default:
}
- {
- t.l.Lock()
- if len(t.pushBuf)+len(b) > t.maxCap {
- t.l.Unlock()
- return 0, ErrCacheWriterCapOverflow
- }
- select {
- case t.ic <- len(b):
- t.pushBuf = append(t.pushBuf, b...)
- t.l.Unlock()
- default:
- i := <-t.ic
- if _, err := t.w.Write(t.pushBuf[:i]); err != nil {
- t.cancelCauseFunc(err)
- }
- t.ic <- len(b)
- t.pushBuf = append(t.pushBuf[i:], b...)
- t.l.Unlock()
- return len(b), t.ctx.Err()
- }
+ i := t.c.Add(1) % t.max
+ if !t.is[i].l.CompareAndSwap(false, true) {
+ return 0, ErrCacheWriterBusy
}
+ t.is[i].buf = append(t.is[i].buf[:0], b...)
+ t.cc <- i
+
go func() {
- t.l.Lock()
- i := <-t.ic
- if _, err := t.w.Write(t.pushBuf[:i]); err != nil {
+ t.rc.Lock()
+ defer t.rc.Unlock()
+
+ i := <-t.cc
+ defer t.is[i].l.Store(false)
+ if _, err := t.w.Write(t.is[i].buf); err != nil {
t.cancelCauseFunc(err)
}
- t.pushBuf = t.pushBuf[:copy(t.pushBuf, t.pushBuf[i:])]
- t.l.Unlock()
}()
return len(b), t.ctx.Err()
}
-
-func (t *CacheWriter) Cap() int {
- return cap(t.pushBuf)
-}
t.Fatal()
}
}()
- writer := NewCacheWriter(w, 100, 4)
+ writer := NewCacheWriter(w, 1)
if n, err := writer.Write([]byte("123")); n != 3 || err != nil {
t.Fatal()
}
}
func BenchmarkCache(b *testing.B) {
- writer := NewCacheWriter(io.Discard, 100, 4000)
+ writer := NewCacheWriter(io.Discard, 2000)
tmp := []byte("1")
b.ResetTimer()
for i := 0; i < b.N; i++ {
t.raw.WriteHeader(i)
}
}
-func (t *WithCacheWiter) Cap() (i int) {
- if t.cw != nil {
- return t.cw.Cap()
- }
- return 0
-}
type Exprier struct {
max int
return withflush{w}
}
-func WithCache(w http.ResponseWriter, maxWait uint, maxCap int) *WithCacheWiter {
- return &WithCacheWiter{raw: w, cw: pio.NewCacheWriter(w, maxWait, maxCap)}
+func WithCache(w http.ResponseWriter, maxWait uint32) *WithCacheWiter {
+ return &WithCacheWiter{raw: w, cw: pio.NewCacheWriter(w, maxWait)}
}
func WithStatusCode(w http.ResponseWriter, code int) {