"sync"
"sync/atomic"
"time"
+
+ ps "github.com/qydysky/part/slice"
)
// no close rc any time
ctx context.Context
cancelCauseFunc context.CancelCauseFunc
w io.Writer
- pushLock atomic.Bool
- pushBuf []byte
+ pushBuf *ps.FIFO[byte]
}
-var ErrBusy = errors.New(`ErrBusy`)
+type CacheWriterSize ps.FIFOSize
-func NewCacheWriter(ws io.Writer, ctx ...context.Context) *CacheWriter {
- t := CacheWriter{w: ws}
- ctx = append(ctx, context.Background())
- t.ctx, t.cancelCauseFunc = context.WithCancelCause(ctx[0])
+func NewCacheWriter[T CacheWriterSize](ws io.Writer, bufsize T) *CacheWriter {
+ t := CacheWriter{w: ws, pushBuf: ps.NewFIFO[byte](bufsize)}
+ t.ctx, t.cancelCauseFunc = context.WithCancelCause(context.Background())
return &t
}
return 0, t.ctx.Err()
default:
}
- if !t.pushLock.CompareAndSwap(false, true) {
- return 0, ErrBusy
+ if e := t.pushBuf.In(b); e != nil {
+ return 0, e
}
- t.pushBuf = append(t.pushBuf[:0], b...)
go func() {
- defer t.pushLock.Store(false)
- if n, err := t.w.Write(t.pushBuf); err != nil || n == 0 {
- if !errors.Is(err, ErrBusy) {
- t.cancelCauseFunc(err)
- }
+ if _, err := t.pushBuf.Out(t.w); err != nil && !errors.Is(err, ps.ErrFIFOEmpty) {
+ t.cancelCauseFunc(err)
}
}()
- return len(t.pushBuf), t.ctx.Err()
+ return len(b), t.ctx.Err()
+}
+
+func (t *CacheWriter) Size() int {
+ return t.pushBuf.Size()
}
import (
"bytes"
- "errors"
"io"
"testing"
"time"
+
+ ps "github.com/qydysky/part/slice"
)
func Test_CopyIO(t *testing.T) {
t.Fatal()
}
}()
- writer := NewCacheWriter(w)
+ writer := NewCacheWriter(w, 4)
if n, err := writer.Write([]byte("123")); n != 3 || err != nil {
t.Fatal()
}
- if _, err := writer.Write([]byte("123")); !errors.Is(err, ErrBusy) {
- t.Fatal()
+ if _, err := writer.Write([]byte("123")); err != ps.ErrFIFOOverflow {
+ t.Fatal(err)
}
time.Sleep(time.Second)
}
ErrFIFOEmpty = errors.New(`ErrFIFOEmpty`)
)
+type FIFOSize interface {
+ int | int32 | int64 | uint | uint32 | uint64
+}
+
type item struct {
op, ed int
}
-type FIFO[S any, T []S] struct {
+type FIFO[S any] struct {
ed, op, opc int
c chan item
buf []S
l sync.RWMutex
}
-func NewFIFO[S any, T []S](size int) *FIFO[S, T] {
- return &FIFO[S, T]{
+func NewFIFO[S any, T FIFOSize](size T) *FIFO[S] {
+ return &FIFO[S]{
c: make(chan item, size),
buf: make([]S, size),
}
}
-func (t *FIFO[S, T]) lock() func() {
+func (t *FIFO[S]) lock() func() {
t.l.Lock()
return t.l.Unlock
}
-func (t *FIFO[S, T]) rlock() func() {
+func (t *FIFO[S]) rlock() func() {
t.l.RLock()
return t.l.RUnlock
}
-func (t *FIFO[S, T]) inok(size int) bool {
+func (t *FIFO[S]) inok(size int) bool {
if t.ed+size > len(t.buf) {
if size > t.op {
return false
return true
}
-func (t *FIFO[S, T]) In(p T) error {
+func (t *FIFO[S]) In(p []S) error {
defer t.lock()()
t.op = t.opc
return nil
}
-func (t *FIFO[S, T]) Out() (p T, e error) {
+func (t *FIFO[S]) Out(w interface {
+ Write(p []S) (n int, err error)
+}) (n int, err error) {
defer t.rlock()()
+ select {
+ case item := <-t.c:
+ n, err = w.Write(t.buf[item.op:item.ed])
+ t.opc = item.ed
+ default:
+ err = ErrFIFOEmpty
+ }
+
+ return
+}
+
+func (t *FIFO[S]) OutDirect() (p []S, err error, used func()) {
+ used = t.rlock()
+
select {
case item := <-t.c:
p = t.buf[item.op:item.ed]
t.opc = item.ed
default:
- e = ErrFIFOEmpty
+ err = ErrFIFOEmpty
}
return
}
-func (t *FIFO[S, T]) Size() int {
+func (t *FIFO[S]) Size() int {
defer t.rlock()()
if t.opc > t.ed {
}
}
-func (t *FIFO[S, T]) Num() int {
+func (t *FIFO[S]) Num() int {
return len(t.c)
}
-func (t *FIFO[S, T]) Clear() {
+func (t *FIFO[S]) Clear() {
defer t.lock()()
t.op = 0
}
}
-func (t *FIFO[S, T]) Reset() {
+func (t *FIFO[S]) Reset() {
defer t.lock()()
clear(t.buf)
if fifo.In([]byte("012")) != nil {
t.Fatal()
}
- if tmp, e := fifo.Out(); e != nil || !bytes.Equal(tmp, []byte("012")) {
+ if tmp, e, used := fifo.OutDirect(); e != nil || !bytes.Equal(tmp, []byte("012")) {
t.Fatal()
+ } else {
+ used()
}
fifo.Clear()
if fifo.Size() != 5 {
t.Fatal()
}
- if tmp, e := fifo.Out(); e != nil || !bytes.Equal(tmp, []byte("01")) {
+ if tmp, e, used := fifo.OutDirect(); e != nil || !bytes.Equal(tmp, []byte("01")) {
t.Fatal()
+ } else {
+ used()
}
if fifo.In([]byte("56")) != nil {
t.Fatal()
}
- if tmp, e := fifo.Out(); e != nil || !bytes.Equal(tmp, []byte("234")) {
+ if tmp, e, used := fifo.OutDirect(); e != nil || !bytes.Equal(tmp, []byte("234")) {
t.Fatal()
+ } else {
+ used()
}
- if tmp, e := fifo.Out(); e != nil || !bytes.Equal(tmp, []byte("56")) {
+ if tmp, e, used := fifo.OutDirect(); e != nil || !bytes.Equal(tmp, []byte("56")) {
t.Fatal()
+ } else {
+ used()
}
fifo.Clear()
b.FailNow()
}
if fifo.Num() > 1 {
- if tmp, e := fifo.Out(); e != nil || !bytes.Equal(tmp, buf) {
+ if tmp, e, used := fifo.OutDirect(); e != nil || !bytes.Equal(tmp, buf) {
b.FailNow()
+ } else {
+ used()
}
}
}
return withflush{w}
}
-// IsCacheBusy
-func WithCache(w http.ResponseWriter) http.ResponseWriter {
+func WithCache(w http.ResponseWriter, cw *pio.CacheWriter) http.ResponseWriter {
t := withCache{raw: w}
- t.cw = pio.NewCacheWriter(w)
+ t.cw = cw
return t
}
-func IsCacheBusy(e error) bool {
- return errors.Is(e, pio.ErrBusy)
-}
-
func WithStatusCode(w http.ResponseWriter, code int) {
w.WriteHeader(code)
_, _ = w.Write([]byte(http.StatusText(code)))