"io"
"log"
"os"
- "time"
- p "github.com/qydysky/part"
+ f "github.com/qydysky/part/file"
m "github.com/qydysky/part/msgq"
- s "github.com/qydysky/part/signal"
)
var (
)
type Log_interface struct {
- MQ *m.Msgq
+ MQ *m.MsgType[Msg_item]
Config
}
type Config struct {
- File string
- Stdout bool
+ File string
+ Stdout bool
+
Prefix_string map[string]struct{}
- Base_string []interface{}
+ Base_string []any
}
type Msg_item struct {
Prefix string
- Msg_obj []interface{}
+ Msg_obj []any
Config
}
Config: c,
}
if c.File != `` {
- p.File().NewPath(c.File)
+ f.New(c.File, 0, true).Create()
}
- o.MQ = m.New()
- o.MQ.Pull_tag(map[string]func(interface{}) bool{
- `block`: func(data interface{}) bool {
- if v, ok := data.(*s.Signal); ok {
- v.Done()
- }
- return false
- },
- `L`: func(data interface{}) bool {
- msg := data.(Msg_item)
- var showObj = []io.Writer{}
- if msg.Stdout {
- showObj = append(showObj, os.Stdout)
- }
- if msg.File != `` {
- file, err := os.OpenFile(msg.File, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
- if err == nil {
- showObj = append(showObj, file)
- defer file.Close()
- } else {
- log.Println(err)
- }
+ o.MQ = m.NewType[Msg_item]()
+ o.MQ.Pull_tag_only(`L`, func(msg Msg_item) bool {
+ var showObj = []io.Writer{}
+ if msg.Stdout {
+ showObj = append(showObj, os.Stdout)
+ }
+ if msg.File != `` {
+ file, err := os.OpenFile(msg.File, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
+ if err == nil {
+ showObj = append(showObj, file)
+ defer file.Close()
+ } else {
+ log.Println(err)
}
- log.New(io.MultiWriter(showObj...),
- msg.Prefix,
- log.Ldate|log.Ltime).Println(msg.Msg_obj...)
- return false
- },
- })
- { //启动阻塞
- b := s.Init()
- for b.Islive() {
- o.MQ.Push_tag(`block`, b)
- time.Sleep(time.Duration(20) * time.Millisecond)
}
- }
+ log.New(io.MultiWriter(showObj...),
+ msg.Prefix,
+ log.Ldate|log.Ltime).Println(msg.Msg_obj...)
+ return false
+ })
+ //启动阻塞
+ o.MQ.PushLock_tag(`block`, Msg_item{})
return
}
Config: (*i).Config,
MQ: (*i).MQ,
}
- { //启动阻塞
- b := s.Init()
- for b.Islive() {
- o.MQ.Push_tag(`block`, b)
- time.Sleep(time.Duration(20) * time.Millisecond)
- }
- }
+ //启动阻塞
+ o.MQ.PushLock_tag(`block`, Msg_item{})
return
}
// Level 设置之后日志等级
func (I *Log_interface) Level(log map[string]struct{}) (O *Log_interface) {
O = Copy(I)
- for k, _ := range O.Prefix_string {
+ for k := range O.Prefix_string {
if _, ok := log[k]; !ok {
delete(O.Prefix_string, k)
}
return
}
+func (I *Log_interface) LShow(show bool) (O *Log_interface) {
+ return I.Log_show_control(show)
+}
+
// Open 日志输出至文件
func (I *Log_interface) Log_to_file(fileP string) (O *Log_interface) {
O = I
//
O.Block(100)
- O.File = fileP
if O.File != `` {
- p.File().NewPath(O.File)
+ O.File = fileP
+ f.New(O.File, 0, true).Create()
+ } else {
+ O.File = ``
}
return
}
+func (I *Log_interface) LFile(fileP string) (O *Log_interface) {
+ return I.Log_to_file(fileP)
+}
+
// Block 阻塞直到本轮日志输出完毕
-func (I *Log_interface) Block(timeout int) (O *Log_interface) {
+func (I *Log_interface) Block(ms int) (O *Log_interface) {
O = I
- b := s.Init()
- O.MQ.Push_tag(`block`, b)
- b.Wait()
+ O.MQ.PushLock_tag(`block`, Msg_item{})
return
}
+func (I *Log_interface) Close() {
+ I.MQ.ClearAll()
+}
+
// 日志等级
// Base 追加到后续输出
-func (I *Log_interface) Base(i ...interface{}) (O *Log_interface) {
+func (I *Log_interface) Base(i ...any) (O *Log_interface) {
O = Copy(I)
O.Base_string = i
return
}
-func (I *Log_interface) Base_add(i ...interface{}) (O *Log_interface) {
+func (I *Log_interface) Base_add(i ...any) (O *Log_interface) {
O = Copy(I)
O.Base_string = append(O.Base_string, i...)
return
}
-func (I *Log_interface) L(prefix string, i ...interface{}) (O *Log_interface) {
+func (I *Log_interface) L(prefix string, i ...any) (O *Log_interface) {
O = I
if _, ok := O.Prefix_string[prefix]; !ok {
return
package part
import (
- // "fmt"
- "runtime"
- "time"
- "testing"
-
+ // "fmt"
+
+ "testing"
+ "time"
+
"net/http"
_ "net/http/pprof"
)
-type test_item struct {
- data string
-}
-
func Test_1(t *testing.T) {
- n := New(Config{
- File:`1.log`,
- Stdout:true,
- Prefix_string:map[string]struct{}{`T:`:On,`I:`:On,`W:`:On,`E:`:On},
- })
-
- n.L(`T:`,`s`).L(`I:`,`s`).Block(1000)
- n.Log_to_file(`2.log`).L(`W:`,`s`).L(`E:`,`s`)
-
- {
- n1 := n.Base(`>1`)
- n1.L(`T:`,`s`).L(`I:`,`s`)
- {
- n2 := n1.Base_add(`>2`)
- n2.L(`T:`,`s`).L(`I:`,`s`)
- }
- }
-
- n.Level(map[string]struct{}{`W:`:On}).L(`T:`,`s`).L(`I:`,`s`).L(`W:`,`s`).L(`E:`,`s`)
- n.Block(1000)
+ n := New(Config{
+ File: `1.log`,
+ Stdout: true,
+ Prefix_string: map[string]struct{}{`T:`: On, `I:`: On, `W:`: On, `E:`: On},
+ })
+
+ n.L(`T:`, `s`).L(`I:`, `s`).Block(1000)
+ n.Log_to_file(`2.log`).L(`W:`, `s`).L(`E:`, `s`)
+
+ {
+ n1 := n.Base(`>1`)
+ n1.L(`T:`, `s`).L(`I:`, `s`)
+ {
+ n2 := n1.Base_add(`>2`)
+ n2.L(`T:`, `s`).L(`I:`, `s`)
+ }
+ }
+
+ n.Level(map[string]struct{}{`W:`: On}).L(`T:`, `s`).L(`I:`, `s`).L(`W:`, `s`).L(`E:`, `s`)
+ n.Block(1000)
}
-
var n *Log_interface
func Test_2(t *testing.T) {
- n = New(Config{
- File:`1.log`,
- Stdout:true,
- Prefix_string:map[string]struct{}{`T:`:On,`I:`:On,`W:`:On,`E:`:On},
- })
+ n = New(Config{
+ File: `1.log`,
+ Stdout: true,
+ Prefix_string: map[string]struct{}{`T:`: On, `I:`: On, `W:`: On, `E:`: On},
+ })
go func() {
http.ListenAndServe("0.0.0.0:8899", nil)
- }()
- // n = nil
- for {
- n:=n.Base_add(`>1`)
- n.L(`T:`,`s`)
- time.Sleep(time.Second*time.Duration(1))
- // n=nil
- }
- n.L(`T:`,`s`)
- runtime.GC()
- time.Sleep(time.Second*time.Duration(1000))
- // fmt.Printf("%p %p\n",n.MQ,n1.MQ)
- // n1 = nil
- // fmt.Println(n)
-}
\ No newline at end of file
+ }()
+ // n = nil
+ for {
+ n := n.Base_add(`>1`)
+ n.L(`T:`, `s`)
+ time.Sleep(time.Second * time.Duration(1))
+ // n=nil
+ }
+}
}
func NewTo(to time.Duration) *Msgq {
+ fmt.Println("Warn: NewTo is slow, consider New")
m := new(Msgq)
m.funcs = list.New()
if to != 0 {
close(ch)
return true
default:
- if len(ch) == size {
+ for len(ch) != 0 {
<-ch
}
ch <- d.Data
- return false
}
}
return false
}
type MsgType[T any] struct {
- m *Msgq
+ to []time.Duration
+ funcs *list.List
+ someNeedRemove atomic.Int32
+ lock sync.RWMutex
+ runTag sync.Map
+}
+
+type MsgType_tag_data[T any] struct {
+ Tag string
+ Data T
}
func NewType[T any]() *MsgType[T] {
- return &MsgType[T]{
- m: New(),
- }
+ m := new(MsgType[T])
+ m.funcs = list.New()
+ return m
}
func NewTypeTo[T any](to time.Duration) *MsgType[T] {
- return &MsgType[T]{
- m: NewTo(to),
+ fmt.Println("Warn: NewTypeTo[T any] is slow, consider NewType[T any]")
+ m := new(MsgType[T])
+ m.funcs = list.New()
+ if to != 0 {
+ m.to = append(m.to, to)
+ } else {
+ m.to = append(m.to, time.Second*30)
+ }
+ return m
+}
+
+func (m *MsgType[T]) push(msg MsgType_tag_data[T]) {
+ for m.someNeedRemove.Load() != 0 {
+ time.Sleep(time.Millisecond)
+ runtime.Gosched()
+ }
+
+ var removes []*list.Element
+
+ ul := m.lock.RLock(m.to...)
+ for el := m.funcs.Front(); el != nil; el = el.Next() {
+ if disable := el.Value.(func(MsgType_tag_data[T]) bool)(msg); disable {
+ m.someNeedRemove.Add(1)
+ removes = append(removes, el)
+ }
+ }
+ ul()
+
+ if len(removes) != 0 {
+ ul := m.lock.Lock(m.to...)
+ m.someNeedRemove.Add(-int32(len(removes)))
+ for i := 0; i < len(removes); i++ {
+ m.funcs.Remove(removes[i])
+ }
+ ul()
+ }
+}
+
+func (m *MsgType[T]) pushLock(msg MsgType_tag_data[T]) {
+ for m.someNeedRemove.Load() != 0 {
+ time.Sleep(time.Millisecond)
+ runtime.Gosched()
+ }
+
+ ul := m.lock.Lock(m.to...)
+ defer ul()
+
+ var removes []*list.Element
+
+ for el := m.funcs.Front(); el != nil; el = el.Next() {
+ if disable := el.Value.(func(MsgType_tag_data[T]) bool)(msg); disable {
+ m.someNeedRemove.Add(1)
+ removes = append(removes, el)
+ }
+ }
+
+ if len(removes) != 0 {
+ m.someNeedRemove.Add(-int32(len(removes)))
+ for i := 0; i < len(removes); i++ {
+ m.funcs.Remove(removes[i])
+ }
}
}
+func (m *MsgType[T]) register(f func(MsgType_tag_data[T]) (disable bool)) {
+ ul := m.lock.Lock(m.to...)
+ m.funcs.PushBack(f)
+ ul()
+}
+
+func (m *MsgType[T]) register_front(f func(MsgType_tag_data[T]) (disable bool)) {
+ ul := m.lock.Lock(m.to...)
+ m.funcs.PushFront(f)
+ ul()
+}
+
func (m *MsgType[T]) Push_tag(Tag string, Data T) {
- if len(m.m.to) > 0 {
+ if len(m.to) > 0 {
ptr := uintptr(unsafe.Pointer(&Data))
- m.m.runTag.Store(ptr, "[T]Push_tag(`"+Tag+"`,...)")
+ m.runTag.Store(ptr, "[T]Push_tag(`"+Tag+"`,...)")
defer func() {
if e := recover(); e != nil {
- m.m.runTag.Range(func(key, value any) bool {
+ m.runTag.Range(func(key, value any) bool {
if key == ptr {
fmt.Printf("%v panic > %v\n", value, e)
} else {
}
return true
})
- m.m.runTag.ClearAll()
+ m.runTag.ClearAll()
panic(e)
}
- m.m.runTag.Delete(ptr)
+ m.runTag.Delete(ptr)
}()
}
- m.m.Push(Msgq_tag_data{
+ m.push(MsgType_tag_data[T]{
Tag: Tag,
Data: Data,
})
}
func (m *MsgType[T]) PushLock_tag(Tag string, Data T) {
- if len(m.m.to) > 0 {
+ if len(m.to) > 0 {
ptr := uintptr(unsafe.Pointer(&Data))
- m.m.runTag.Store(ptr, "[T]PushLock_tag(`"+Tag+"`,...)")
+ m.runTag.Store(ptr, "[T]PushLock_tag(`"+Tag+"`,...)")
defer func() {
if e := recover(); e != nil {
- m.m.runTag.Range(func(key, value any) bool {
+ m.runTag.Range(func(key, value any) bool {
if key == ptr {
fmt.Printf("%v panic > %v\n", value, e)
} else {
}
return true
})
- m.m.runTag.ClearAll()
+ m.runTag.ClearAll()
panic(e)
}
- m.m.runTag.Delete(ptr)
+ m.runTag.Delete(ptr)
}()
}
- m.m.PushLock(Msgq_tag_data{
+ m.pushLock(MsgType_tag_data[T]{
Tag: Tag,
Data: Data,
})
}
func (m *MsgType[T]) ClearAll() {
- m.m.ClearAll()
+ for m.someNeedRemove.Load() != 0 {
+ time.Sleep(time.Millisecond)
+ runtime.Gosched()
+ }
+
+ ul := m.lock.Lock(m.to...)
+ defer ul()
+
+ var removes []*list.Element
+
+ for el := m.funcs.Front(); el != nil; el = el.Next() {
+ m.someNeedRemove.Add(1)
+ removes = append(removes, el)
+ }
+
+ if len(removes) != 0 {
+ m.someNeedRemove.Add(-int32(len(removes)))
+ for i := 0; i < len(removes); i++ {
+ m.funcs.Remove(removes[i])
+ }
+ }
}
func (m *MsgType[T]) Pull_tag_chan(key string, size int, ctx context.Context) <-chan T {
var ch = make(chan T, size)
- m.m.Register(func(data any) bool {
- if d, ok := data.(Msgq_tag_data); ok && d.Tag == key {
+ m.register(func(data MsgType_tag_data[T]) bool {
+ if data.Tag == key {
select {
case <-ctx.Done():
close(ch)
return true
default:
- if len(ch) == size {
+ for len(ch) != 0 {
<-ch
}
- ch <- d.Data.(T)
- return false
+ ch <- data.Data
}
}
return false
}
func (m *MsgType[T]) Pull_tag_only(key string, f func(T) (disable bool)) {
- m.m.Register(func(data any) (disable bool) {
- if d, ok := data.(Msgq_tag_data); ok && d.Tag == key {
- return f(d.Data.(T))
+ m.register(func(data MsgType_tag_data[T]) (disable bool) {
+ if data.Tag == key {
+ return f(data.Data)
}
return false
})
}
func (m *MsgType[T]) Pull_tag(func_map map[string]func(T) (disable bool)) {
- m.m.Register(func(data any) (disable bool) {
- if d, ok := data.(Msgq_tag_data); ok {
- if f, ok := func_map[d.Tag]; ok {
- return f(d.Data.(T))
- }
+ m.register(func(data MsgType_tag_data[T]) (disable bool) {
+ if f, ok := func_map[data.Tag]; ok {
+ return f(data.Data)
}
return false
})
func (m *MsgType[T]) Pull_tag_async_only(key string, f func(T) (disable bool)) {
var disable = signal.Init()
- m.m.Register_front(func(data any) bool {
+ m.register_front(func(data MsgType_tag_data[T]) bool {
if !disable.Islive() {
return true
}
- if d, ok := data.(Msgq_tag_data); ok && d.Tag == key {
+ if data.Tag == key {
go func() {
- if f(d.Data.(T)) {
+ if f(data.Data) {
disable.Done()
}
}()
func (m *MsgType[T]) Pull_tag_async(func_map map[string]func(T) (disable bool)) {
var disable = signal.Init()
- m.m.Register_front(func(data any) bool {
+ m.register_front(func(data MsgType_tag_data[T]) bool {
if !disable.Islive() {
return true
}
- if d, ok := data.(Msgq_tag_data); ok {
- if f, ok := func_map[d.Tag]; ok {
- go func() {
- if f(d.Data.(T)) {
- disable.Done()
- }
- }()
- }
+ if f, ok := func_map[data.Tag]; ok {
+ go func() {
+ if f(data.Data) {
+ disable.Done()
+ }
+ }()
}
return false
})
func TestPushLock(t *testing.T) {
defer func() {
- if e := recover(); e.(string) != "timeout to wait rlock, rlc:1" {
+ if e := recover(); e != nil {
t.Fatal(e)
}
}()
for i := 0; i < 5; i++ {
mq.Push_tag(`a`, i)
}
+ if len(ch) != 1 {
+ t.Fatal()
+ }
var o = 0
for s := true; s; {
select {
s = false
}
}
- if o != 7 {
+ if o != 4 {
t.Fatal()
}
select {
}
}
+func Test_Pull_tag_chan2(t *testing.T) {
+ mq := New()
+
+ mq.Pull_tag_chan(`a`, 1, context.Background())
+ go func() {
+ mq.PushLock_tag(`a`, nil)
+ mq.PushLock_tag(`a`, nil)
+ }()
+}
+
func Test_msgq1(t *testing.T) {
mq := New()
c := make(chan time.Time, 10)
for len(t.cancelFs) != 0 {
<-t.cancelFs
}
+ t.Respon = t.Respon[:0]
+ t.responBuf.Reset()
+
t.err = t.Reqf_1(_val)
if t.err == nil || IsCancel(t.err) {
break
}
func (m *RWMutex) RLock(to ...time.Duration) (unrlock func()) {
+ var callC atomic.Bool
if len(to) > 0 {
+ var calls []string
+ for i := 1; true; i++ {
+ if pc, file, line, ok := runtime.Caller(i); !ok {
+ break
+ } else {
+ calls = append(calls, fmt.Sprintf("%s\n\t%s:%d", runtime.FuncForPC(pc).Name(), file, line))
+ }
+ }
c := time.Now()
- for m.rlc.Load() < ulock {
+ for m.rlc.Load() < ulock || m.cul.Load() != m.oll.Load() {
if time.Since(c) > to[0] {
panic(fmt.Sprintf("timeout to wait lock, rlc:%d", m.rlc.Load()))
}
runtime.Gosched()
}
+ c = time.Now()
+ go func() {
+ for !callC.Load() {
+ if time.Since(c) > to[0] {
+ panicS := fmt.Sprintf("timeout to run rlock %v > %v\n", time.Since(c), to[0])
+ for i := 0; i < len(calls); i++ {
+ panicS += fmt.Sprintf("%s\n", calls[i])
+ }
+ panic(panicS)
+ }
+ runtime.Gosched()
+ }
+ }()
} else {
- for m.rlc.Load() < ulock {
+ for m.rlc.Load() < ulock || m.cul.Load() != m.oll.Load() {
time.Sleep(time.Millisecond)
runtime.Gosched()
}
}
m.rlc.Add(1)
- var callC atomic.Bool
return func() {
if !callC.CompareAndSwap(false, true) {
panic("had unrlock")
func (m *RWMutex) Lock(to ...time.Duration) (unlock func()) {
lockid := m.cul.Add(1)
+ var callC atomic.Bool
if len(to) > 0 {
+ var calls []string
+ for i := 1; true; i++ {
+ if pc, file, line, ok := runtime.Caller(i); !ok {
+ break
+ } else {
+ calls = append(calls, fmt.Sprintf("%s\n\t%s:%d", runtime.FuncForPC(pc).Name(), file, line))
+ }
+ }
c := time.Now()
for m.rlc.Load() > ulock {
if time.Since(c) > to[0] {
runtime.Gosched()
}
if !m.rlc.CompareAndSwap(ulock, lock) {
- panic("csa error, bug")
+ panic(fmt.Sprintf("csa error, rlc:%d", m.rlc.Load()))
}
+ c = time.Now()
+ go func() {
+ for !callC.Load() {
+ if time.Since(c) > to[0] {
+ panicS := fmt.Sprintf("timeout to run lock %v > %v\n", time.Since(c), to[0])
+ for i := 0; i < len(calls); i++ {
+ panicS += fmt.Sprintf("call by %s\n", calls[i])
+ }
+ panic(panicS)
+ }
+ runtime.Gosched()
+ }
+ }()
} else {
for m.rlc.Load() > ulock {
time.Sleep(time.Millisecond)
runtime.Gosched()
}
if !m.rlc.CompareAndSwap(ulock, lock) {
- panic("csa error, bug")
+ panic(fmt.Sprintf("csa error, rlc:%d", m.rlc.Load()))
}
}
- var callC atomic.Bool
return func() {
if !callC.CompareAndSwap(false, true) {
panic("had unlock")
if wm.Type == 0 {
wm.Type = websocket.TextMessage
}
+ c.SetWriteDeadline(time.Now().Add(time.Duration(o.TO * int(time.Millisecond))))
if err := c.WriteMessage(wm.Type, wm.Msg); err != nil {
o.error(err)
o.msg.ClearAll()