"sync"
"sync/atomic"
"time"
-
- ps "github.com/qydysky/part/slice"
)
// no close rc any time
}
}
+var (
+ ErrCacheWriterCapOverflow = errors.New(`ErrCacheWriterCapOverflow`)
+)
+
type CacheWriter struct {
ctx context.Context
cancelCauseFunc context.CancelCauseFunc
w io.Writer
- pushBuf *ps.FIFO[byte]
+ pushBuf []byte
+ ic chan int
+ maxCap int
+ l sync.RWMutex
}
-type CacheWriterSize ps.FIFOSize
-
-func NewCacheWriter[T CacheWriterSize](ws io.Writer, bufsize T) *CacheWriter {
- t := CacheWriter{w: ws, pushBuf: ps.NewFIFO[byte](bufsize)}
+func NewCacheWriter(ws io.Writer, maxWait uint, maxCap int) *CacheWriter {
+ t := CacheWriter{w: ws, ic: make(chan int, maxWait), maxCap: maxCap}
t.ctx, t.cancelCauseFunc = context.WithCancelCause(context.Background())
return &t
}
-func (t *CacheWriter) Write(b []byte) (int, error) {
+func (t *CacheWriter) Write(b []byte) (n int, e error) {
select {
case <-t.ctx.Done():
return 0, t.ctx.Err()
default:
}
- if e := t.pushBuf.In(b); e != nil {
- return 0, e
+
+ {
+ t.l.Lock()
+ if len(t.pushBuf)+len(b) > t.maxCap {
+ t.l.Unlock()
+ return 0, ErrCacheWriterCapOverflow
+ }
+ select {
+ case t.ic <- len(b):
+ t.pushBuf = append(t.pushBuf, b...)
+ t.l.Unlock()
+ default:
+ i := <-t.ic
+ if _, err := t.w.Write(t.pushBuf[:i]); err != nil {
+ t.cancelCauseFunc(err)
+ }
+ t.ic <- len(b)
+ t.pushBuf = append(t.pushBuf[i:], b...)
+ t.l.Unlock()
+ return len(b), t.ctx.Err()
+ }
}
+
go func() {
- if _, err := t.pushBuf.Out(t.w); err != nil && !errors.Is(err, ps.ErrFIFOEmpty) {
+ t.l.Lock()
+ i := <-t.ic
+ if _, err := t.w.Write(t.pushBuf[:i]); err != nil {
t.cancelCauseFunc(err)
}
+ t.pushBuf = t.pushBuf[:copy(t.pushBuf, t.pushBuf[i:])]
+ t.l.Unlock()
}()
return len(b), t.ctx.Err()
}
-func (t *CacheWriter) Size() int {
- return t.pushBuf.Size()
+func (t *CacheWriter) Cap() int {
+ return cap(t.pushBuf)
}
"io"
"testing"
"time"
-
- ps "github.com/qydysky/part/slice"
)
func Test_CopyIO(t *testing.T) {
t.Fatal()
}
}()
- writer := NewCacheWriter(w, 4)
+ writer := NewCacheWriter(w, 100, 4)
if n, err := writer.Write([]byte("123")); n != 3 || err != nil {
t.Fatal()
}
- if _, err := writer.Write([]byte("123")); err != ps.ErrFIFOOverflow {
- t.Fatal(err)
+ if _, err := writer.Write([]byte("123")); err == nil {
+ t.Fatal()
}
time.Sleep(time.Second)
}
+
+func BenchmarkCache(b *testing.B) {
+ writer := NewCacheWriter(io.Discard, 100, 4000)
+ tmp := []byte("1")
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ if _, err := writer.Write(tmp); err != nil {
+ b.Fatal(err)
+ }
+ }
+}
+++ /dev/null
-package part
-
-import (
- "sync"
-
- "errors"
-)
-
-var (
- ErrFIFOOverflow = errors.New(`ErrFIFOOverflow`)
- ErrFIFOEmpty = errors.New(`ErrFIFOEmpty`)
-)
-
-type FIFOSize interface {
- int | int32 | int64 | uint | uint32 | uint64
-}
-
-type item struct {
- op, ed int
-}
-
-type FIFO[S any] struct {
- ed, op, opc int
- c chan item
- buf []S
- l sync.RWMutex
-}
-
-func NewFIFO[S any, T FIFOSize](size T) *FIFO[S] {
- return &FIFO[S]{
- c: make(chan item, size),
- buf: make([]S, size),
- }
-}
-
-func (t *FIFO[S]) lock() func() {
- t.l.Lock()
- return t.l.Unlock
-}
-
-func (t *FIFO[S]) rlock() func() {
- t.l.RLock()
- return t.l.RUnlock
-}
-
-func (t *FIFO[S]) inok(size int) bool {
- if t.ed+size > len(t.buf) {
- if size > t.op {
- return false
- }
- t.ed = 0
- } else if t.op > t.ed && t.ed+size > t.op {
- return false
- }
- return true
-}
-
-func (t *FIFO[S]) In(p []S) error {
- defer t.lock()()
-
- t.op = t.opc
- if !t.inok(len(p)) {
- return ErrFIFOOverflow
- }
- select {
- case t.c <- item{
- op: t.ed,
- ed: t.ed + len(p),
- }:
- t.ed = t.ed + copy(t.buf[t.ed:], p)
- default:
- return ErrFIFOOverflow
- }
- return nil
-}
-
-func (t *FIFO[S]) Out(w interface {
- Write(p []S) (n int, err error)
-}) (n int, err error) {
- defer t.rlock()()
-
- select {
- case item := <-t.c:
- n, err = w.Write(t.buf[item.op:item.ed])
- t.opc = item.ed
- default:
- err = ErrFIFOEmpty
- }
-
- return
-}
-
-func (t *FIFO[S]) OutDirect() (p []S, err error, used func()) {
- used = t.rlock()
-
- select {
- case item := <-t.c:
- p = t.buf[item.op:item.ed]
- t.opc = item.ed
- default:
- err = ErrFIFOEmpty
- }
-
- return
-}
-
-func (t *FIFO[S]) Size() int {
- defer t.rlock()()
-
- if t.opc > t.ed {
- return len(t.buf) - t.opc - t.ed
- } else {
- return t.ed - t.opc
- }
-}
-
-func (t *FIFO[S]) Num() int {
- return len(t.c)
-}
-
-func (t *FIFO[S]) Clear() {
- defer t.lock()()
-
- t.op = 0
- t.opc = 0
- t.ed = 0
- for {
- select {
- case <-t.c:
- default:
- return
- }
- }
-}
-
-func (t *FIFO[S]) Reset() {
- defer t.lock()()
-
- clear(t.buf)
- t.op = 0
- t.opc = 0
- t.ed = 0
- for {
- select {
- case <-t.c:
- default:
- return
- }
- }
-}
+++ /dev/null
-package part
-
-import (
- "bytes"
- "testing"
-)
-
-func TestFIFO(t *testing.T) {
- fifo := NewFIFO[byte](5)
-
- if fifo.In([]byte("012345")) != ErrFIFOOverflow {
- t.Fatal()
- }
- fifo.Clear()
-
- if fifo.In([]byte("012")) != nil {
- t.Fatal()
- }
- if fifo.In([]byte("345")) != ErrFIFOOverflow {
- t.Fatal()
- }
- fifo.Clear()
-
- if fifo.In([]byte("012")) != nil {
- t.Fatal()
- }
- if fifo.In([]byte("34")) != nil {
- t.Fatal()
- }
- fifo.Clear()
-
- if fifo.In([]byte("012")) != nil {
- t.Fatal()
- }
- if tmp, e, used := fifo.OutDirect(); e != nil || !bytes.Equal(tmp, []byte("012")) {
- t.Fatal()
- } else {
- used()
- }
- fifo.Clear()
-
- if fifo.In([]byte("01")) != nil {
- t.Fatal()
- }
- if fifo.Size() != 2 {
- t.Fatal()
- }
- if e := fifo.In([]byte("234")); e != nil {
- t.Fatal(e)
- }
- if fifo.Size() != 5 {
- t.Fatal()
- }
- if tmp, e, used := fifo.OutDirect(); e != nil || !bytes.Equal(tmp, []byte("01")) {
- t.Fatal()
- } else {
- used()
- }
- if fifo.In([]byte("56")) != nil {
- t.Fatal()
- }
- if tmp, e, used := fifo.OutDirect(); e != nil || !bytes.Equal(tmp, []byte("234")) {
- t.Fatal()
- } else {
- used()
- }
- if tmp, e, used := fifo.OutDirect(); e != nil || !bytes.Equal(tmp, []byte("56")) {
- t.Fatal()
- } else {
- used()
- }
- fifo.Clear()
-
- // if fifo.In([]byte("012")) != nil {
- // t.Fatal()
- // }
- // go func() {
- // time.Sleep(time.Millisecond * 500)
- // fifo.Out()
- // }()
- // if e := fifo.In([]byte("345")); e != nil {
- // t.Fatal(e)
- // }
- // time.Sleep(time.Second * 10)
- // fifo.Clear()
-}
-
-func BenchmarkFIFO(b *testing.B) {
- fifo := NewFIFO[byte](5)
- buf := []byte("12")
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- if e := fifo.In(buf); e != nil {
- b.FailNow()
- }
- if fifo.Num() > 1 {
- if tmp, e, used := fifo.OutDirect(); e != nil || !bytes.Equal(tmp, buf) {
- b.FailNow()
- } else {
- used()
- }
- }
- }
-}
}
}
-type withCache struct {
+type WithCacheWiter struct {
cw *pio.CacheWriter
raw http.ResponseWriter
}
-func (t withCache) Header() http.Header {
+func (t *WithCacheWiter) Header() http.Header {
if t.raw != nil {
return t.raw.Header()
}
return make(http.Header)
}
-func (t withCache) Write(b []byte) (i int, e error) {
+func (t *WithCacheWiter) Write(b []byte) (i int, e error) {
if t.cw != nil {
return t.cw.Write(b)
}
return t.raw.Write(b)
}
-func (t withCache) WriteHeader(i int) {
+func (t *WithCacheWiter) WriteHeader(i int) {
if t.raw != nil {
t.raw.WriteHeader(i)
}
}
+func (t *WithCacheWiter) Cap() (i int) {
+ if t.cw != nil {
+ return t.cw.Cap()
+ }
+ return 0
+}
type Exprier struct {
max int
return withflush{w}
}
-func WithCache(w http.ResponseWriter, cw *pio.CacheWriter) http.ResponseWriter {
- t := withCache{raw: w}
- t.cw = cw
- return t
+func WithCache(w http.ResponseWriter, maxWait uint, maxCap int) *WithCacheWiter {
+ return &WithCacheWiter{raw: w, cw: pio.NewCacheWriter(w, maxWait, maxCap)}
}
func WithStatusCode(w http.ResponseWriter, code int) {