From: qydysky Date: Thu, 12 May 2022 02:15:42 +0000 (+0800) Subject: pool X-Git-Tag: v0.9.4 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=4cccf8804f92c25dea8b6b9a4e4459d11ecf38f5;p=part%2F.git pool --- diff --git a/funcCtrl/FuncCtrl.go b/funcCtrl/FuncCtrl.go index 799a1fc..054c158 100644 --- a/funcCtrl/FuncCtrl.go +++ b/funcCtrl/FuncCtrl.go @@ -29,7 +29,7 @@ type FlashFunc struct { //新的替换旧的 func (t *FlashFunc) Flash() (current uintptr) { if t.pool == nil { - t.pool = idpool.New() + t.pool = idpool.New(func() interface{} { return new(struct{}) }) } if t.b == nil { t.b = list.New() diff --git a/idpool/Idpool.go b/idpool/Idpool.go index ad35a47..3ed36ac 100644 --- a/idpool/Idpool.go +++ b/idpool/Idpool.go @@ -2,49 +2,48 @@ package part import ( "sync" + "sync/atomic" "unsafe" ) type Idpool struct { pool sync.Pool - sum uint - sync.Mutex + sum int64 } type Id struct { - Id uintptr - item interface{} + Id uintptr + Item interface{} } -func New() (*Idpool) { +func New(f func() interface{}) *Idpool { return &Idpool{ - pool:sync.Pool{ + pool: sync.Pool{ New: func() interface{} { - return new(struct{}) + var o = new(Id) + o.Item = f() + o.Id = uintptr(unsafe.Pointer(&o.Item)) + return o }, }, } } func (t *Idpool) Get() (o *Id) { - o = new(Id) - o.item = t.pool.Get() - o.Id = uintptr(unsafe.Pointer(&o.item)) - t.Lock() - t.sum += 1 - t.Unlock() + o = t.pool.Get().(*Id) + atomic.AddInt64(&t.sum, 1) return } func (t *Idpool) Put(i *Id) { - if i.item == nil {return} - t.pool.Put(i.item) - i.item = nil - t.Lock() - t.sum -= 1 - t.Unlock() + if i.Item == nil { + return + } + i.Item = nil + t.pool.Put(i) + atomic.AddInt64(&t.sum, -1) } -func (t *Idpool) Len() uint { - return t.sum -} \ No newline at end of file +func (t *Idpool) Len() int64 { + return atomic.LoadInt64(&t.sum) +} diff --git a/idpool/Idpool_test.go b/idpool/Idpool_test.go index 43ffd68..894b2f1 100644 --- a/idpool/Idpool_test.go +++ b/idpool/Idpool_test.go @@ -4,17 +4,19 @@ import ( "testing" ) -func Test(t *testing.T){ - pool := New() +func Test(t *testing.T) { + pool := New(func() interface{} { + return new(int) + }) a := pool.Get() b := pool.Get() - t.Log(a.Id,a.item,pool.Len()) - t.Log(b.Id,b.item) + t.Log(a.Id, a.Item, pool.Len()) + t.Log(b.Id, b.Item) pool.Put(a) pool.Put(a) - t.Log(a.Id,a.item,pool.Len()) - t.Log(b.Id,b.item) + t.Log(a.Id, a.Item, pool.Len()) + t.Log(b.Id, b.Item) a = pool.Get() - t.Log(a.Id,a.item,pool.Len()) - t.Log(b.Id,b.item) + t.Log(a.Id, a.Item, pool.Len()) + t.Log(b.Id, b.Item) } diff --git a/reqf/Reqf.go b/reqf/Reqf.go index 4217827..fadabd3 100644 --- a/reqf/Reqf.go +++ b/reqf/Reqf.go @@ -15,7 +15,6 @@ import ( "time" compress "github.com/qydysky/part/compress" - idpool "github.com/qydysky/part/idpool" pio "github.com/qydysky/part/io" signal "github.com/qydysky/part/signal" // "encoding/binary" @@ -50,18 +49,12 @@ type Req struct { Response *http.Response UsedTime time.Duration - id *idpool.Id - idp *idpool.Idpool cancel *signal.Signal sync.Mutex } func New() *Req { - idp := idpool.New() - return &Req{ - idp: idp, - id: idp.Get(), - } + return new(Req) } // func main(){ @@ -82,10 +75,6 @@ func (t *Req) Reqf(val Rval) error { _val := val - defer func() { - t.idp.Put(t.id) - }() - t.cancel = signal.Init() for SleepTime, Retry := _val.SleepTime, _val.Retry; Retry >= 0; Retry -= 1 { @@ -374,13 +363,6 @@ func (t *Req) Close() { t.cancel.Done() } -func (t *Req) Id() uintptr { - if t.id == nil { - return 0 - } - return t.id.Id -} - func IsTimeout(e error) bool { if errors.Is(e, context.DeadlineExceeded) || errors.Is(e, ErrConnectTimeout) || errors.Is(e, ErrReadTimeout) { return true diff --git a/tmplKV/tmplK.go b/tmplKV/tmplK.go index 2e3dd8c..f0530bf 100644 --- a/tmplKV/tmplK.go +++ b/tmplKV/tmplK.go @@ -1,39 +1,40 @@ package part import ( - "time" "container/list" - syncmap "github.com/qydysky/part/sync" + "time" + idpool "github.com/qydysky/part/idpool" + syncmap "github.com/qydysky/part/sync" ) type tmplK struct { SumInDruation int64 - Druation int64 - now int64 - pool *idpool.Idpool - kvt_map syncmap.Map - slowBackList *list.List + Druation int64 + now int64 + pool *idpool.Idpool + kvt_map syncmap.Map + slowBackList *list.List } type tmplK_item struct { - kv uintptr - kt int64 + kv uintptr + kt int64 uid *idpool.Id } -func New_tmplK(SumInDruation,Druation int64) (*tmplK) { +func New_tmplK(SumInDruation, Druation int64) *tmplK { s := &tmplK{ - SumInDruation:SumInDruation, - Druation:Druation, - pool:idpool.New(), - slowBackList:list.New(), + SumInDruation: SumInDruation, + Druation: Druation, + pool: idpool.New(func() interface{} { return new(struct{}) }), + slowBackList: list.New(), } - go func(){ + go func() { ticker := time.NewTicker(time.Second) - for{ - s.now = (<- ticker.C).Unix() + for { + s.now = (<-ticker.C).Unix() } }() @@ -41,15 +42,15 @@ func New_tmplK(SumInDruation,Druation int64) (*tmplK) { } func (s *tmplK) Set(key interface{}) (id uintptr) { - - if tmp, oks := s.kvt_map.LoadV(key).(tmplK_item);oks { + + if tmp, oks := s.kvt_map.LoadV(key).(tmplK_item); oks { s.free(tmp.uid) - } else if s.SumInDruation >= 0 && s.freeLen() >= s.SumInDruation{//不为无限&&达到限额 随机替代 - s.kvt_map.Range(func(oldkey,item interface{})(bool){ + } else if s.SumInDruation >= 0 && s.freeLen() >= s.SumInDruation { //不为无限&&达到限额 随机替代 + s.kvt_map.Range(func(oldkey, item interface{}) bool { id = item.(tmplK_item).kv s.kvt_map.Store(key, tmplK_item{ - kv: id, - kt: s.now, + kv: id, + kt: s.now, uid: item.(tmplK_item).uid, }) s.kvt_map.Delete(oldkey) @@ -61,21 +62,21 @@ func (s *tmplK) Set(key interface{}) (id uintptr) { Uid := s.pool.Get() s.kvt_map.Store(key, tmplK_item{ - kv: Uid.Id, - kt: s.now, + kv: Uid.Id, + kt: s.now, uid: Uid, }) return Uid.Id } -func (s *tmplK) Get(key interface{}) (isLive bool,id uintptr){ +func (s *tmplK) Get(key interface{}) (isLive bool, id uintptr) { tmp, ok := s.kvt_map.Load(key) - item,_ := tmp.(tmplK_item) + item, _ := tmp.(tmplK_item) id = item.kv - isLive = ok && s.Druation < 0 || s.now - item.kt <= s.Druation + isLive = ok && s.Druation < 0 || s.now-item.kt <= s.Druation if !isLive && ok { s.free(item.uid) s.kvt_map.Delete(key) @@ -83,25 +84,25 @@ func (s *tmplK) Get(key interface{}) (isLive bool,id uintptr){ return } -func (s *tmplK) Check(key interface{},id uintptr) bool { - ok,k := s.Get(key) +func (s *tmplK) Check(key interface{}, id uintptr) bool { + ok, k := s.Get(key) return ok && (k == id) } -func (s *tmplK) Len() (int64,int) { - return s.now,s.kvt_map.Len() +func (s *tmplK) Len() (int64, int) { + return s.now, s.kvt_map.Len() } -func (s *tmplK) freeLen() (int64) { +func (s *tmplK) freeLen() int64 { return int64(int(s.pool.Len()) + s.slowBackList.Len()) } func (s *tmplK) free(i *idpool.Id) { s.slowBackList.PushBack(i) if s.freeLen() > s.SumInDruation { - if el := s.slowBackList.Front();el != nil && el.Value != nil{ + if el := s.slowBackList.Front(); el != nil && el.Value != nil { e := s.slowBackList.Remove(el) s.pool.Put(e.(*idpool.Id)) } } -} \ No newline at end of file +} diff --git a/tmplKV/tmplV.go b/tmplKV/tmplV.go index 8213eab..2acc30e 100644 --- a/tmplKV/tmplV.go +++ b/tmplKV/tmplV.go @@ -1,39 +1,40 @@ package part import ( - "time" "sync" + "time" + idpool "github.com/qydysky/part/idpool" ) type tmplV struct { SumInDruation int64 - Druation int64 - now int64 - deleteNum int - pool *idpool.Idpool - kvt_map map[uintptr]tmplV_item + Druation int64 + now int64 + deleteNum int + pool *idpool.Idpool + kvt_map map[uintptr]tmplV_item sync.RWMutex } type tmplV_item struct { - kv string - kt int64 + kv string + kt int64 uid *idpool.Id } -func New_tmplV(SumInDruation,Druation int64) (*tmplV) { +func New_tmplV(SumInDruation, Druation int64) *tmplV { s := &tmplV{ - SumInDruation:SumInDruation, - Druation:Druation, - kvt_map:make(map[uintptr]tmplV_item), - pool:idpool.New(), + SumInDruation: SumInDruation, + Druation: Druation, + kvt_map: make(map[uintptr]tmplV_item), + pool: idpool.New(func() interface{} { return new(struct{}) }), } - go func(){ + go func() { ticker := time.NewTicker(time.Second) - for{ - s.now = (<- ticker.C).Unix() + for { + s.now = (<-ticker.C).Unix() } }() @@ -42,12 +43,12 @@ func New_tmplV(SumInDruation,Druation int64) (*tmplV) { func (s *tmplV) Set(contect string) (key uintptr) { - if s.SumInDruation >= 0 && s.pool.Len() >= uint(s.SumInDruation) {//不为无限&&达到限额 随机替代 + if s.SumInDruation >= 0 && s.pool.Len() >= uint(s.SumInDruation) { //不为无限&&达到限额 随机替代 s.Lock() - for key,item := range s.kvt_map { + for key, item := range s.kvt_map { s.kvt_map[key] = tmplV_item{ - kv: contect, - kt: s.now, + kv: contect, + kt: s.now, uid: item.uid, } s.Unlock() @@ -59,8 +60,8 @@ func (s *tmplV) Set(contect string) (key uintptr) { s.Lock() s.kvt_map[Uid.Id] = tmplV_item{ - kv: contect, - kt: s.now, + kv: contect, + kt: s.now, uid: Uid, } s.Unlock() @@ -68,16 +69,16 @@ func (s *tmplV) Set(contect string) (key uintptr) { return Uid.Id } -func (s *tmplV) Get(key uintptr) (isLive bool,contect string){ +func (s *tmplV) Get(key uintptr) (isLive bool, contect string) { s.RLock() K, ok := s.kvt_map[key] s.RUnlock() contect = K.kv - isLive = ok && s.Druation < 0 || s.now - K.kt <= s.Druation + isLive = ok && s.Druation < 0 || s.now-K.kt <= s.Druation if !isLive && ok { s.pool.Put(K.uid) s.Lock() - delete(s.kvt_map,key) + delete(s.kvt_map, key) if s.deleteNum > len(s.kvt_map) { s.deleteNum = 0 go s.Tidy() @@ -87,19 +88,21 @@ func (s *tmplV) Get(key uintptr) (isLive bool,contect string){ return } -func (s *tmplV) Check(key uintptr,contect string) bool { - ok,k := s.Get(key) +func (s *tmplV) Check(key uintptr, contect string) bool { + ok, k := s.Get(key) return ok && (k == contect) } -func (s *tmplV) Buf() (int64,int) { - return s.now,len(s.kvt_map) +func (s *tmplV) Buf() (int64, int) { + return s.now, len(s.kvt_map) } func (s *tmplV) Tidy() { tmp := make(map[uintptr]tmplV_item) s.Lock() - for k,v := range s.kvt_map {tmp[k] = v} + for k, v := range s.kvt_map { + tmp[k] = v + } s.kvt_map = tmp s.Unlock() -} \ No newline at end of file +} diff --git a/websocket/Server.go b/websocket/Server.go index 7e78964..f9178bf 100644 --- a/websocket/Server.go +++ b/websocket/Server.go @@ -3,31 +3,32 @@ package part import ( "net" "net/http" - "time" + "time" + "github.com/gorilla/websocket" idpool "github.com/qydysky/part/idpool" mq "github.com/qydysky/part/msgq" ) type Server struct { - ws_mq *mq.Msgq + ws_mq *mq.Msgq userpool *idpool.Idpool } type Uinterface struct { - Id uintptr + Id uintptr Data []byte } -type uinterface struct {//内部消息 - Id uintptr +type uinterface struct { //内部消息 + Id uintptr Data interface{} } -func New_server() (*Server) { +func New_server() *Server { return &Server{ - ws_mq: mq.New(200),//收发通道 - userpool: idpool.New(),//浏览器标签页池 + ws_mq: mq.New(200), //收发通道 + userpool: idpool.New(func() interface{} { return new(struct{}) }), //浏览器标签页池 } } @@ -36,36 +37,36 @@ func (t *Server) WS(w http.ResponseWriter, r *http.Request) (o chan uintptr) { ws, err := upgrader.Upgrade(w, r, nil) if err != nil { - t.ws_mq.Push_tag(`error`,err) + t.ws_mq.Push_tag(`error`, err) return } - o = make(chan uintptr,1) + o = make(chan uintptr, 1) //从池中获取本会话id User := t.userpool.Get() //发送 - t.ws_mq.Pull_tag(map[string]func(interface{})(bool){ - `send`:func(data interface{})(bool){ - if u,ok := data.(Uinterface);ok && u.Id == 0 || u.Id == User.Id{ - if err := ws.WriteMessage(websocket.TextMessage,u.Data);err != nil { - t.ws_mq.Push_tag(`error`,err) + t.ws_mq.Pull_tag(map[string]func(interface{}) bool{ + `send`: func(data interface{}) bool { + if u, ok := data.(Uinterface); ok && u.Id == 0 || u.Id == User.Id { + if err := ws.WriteMessage(websocket.TextMessage, u.Data); err != nil { + t.ws_mq.Push_tag(`error`, err) return true } } return false }, - `close`:func(data interface{})(bool){ - if u,ok := data.(Uinterface);ok && u.Id == 0 || u.Id == User.Id{//服务器主动关闭 + `close`: func(data interface{}) bool { + if u, ok := data.(Uinterface); ok && u.Id == 0 || u.Id == User.Id { //服务器主动关闭 msg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, string(u.Data)) - TO := time.Now().Add(time.Second*time.Duration(5)) + TO := time.Now().Add(time.Second * time.Duration(5)) - if err := ws.WriteControl(websocket.CloseMessage, msg, TO);err != nil { - t.ws_mq.Push_tag(`error`,err) + if err := ws.WriteControl(websocket.CloseMessage, msg, TO); err != nil { + t.ws_mq.Push_tag(`error`, err) } return true - } else if u,ok := data.(uinterface);ok{//接收发生错误关闭 + } else if u, ok := data.(uinterface); ok { //接收发生错误关闭 return ok && u.Data.(string) == `rev_close` && u.Id == 0 || u.Id == User.Id } return false @@ -73,31 +74,31 @@ func (t *Server) WS(w http.ResponseWriter, r *http.Request) (o chan uintptr) { }) //接收 - go func(){ + go func() { for { - ws.SetReadDeadline(time.Now().Add(time.Second*time.Duration(300))) - if _, message, err := ws.ReadMessage();err != nil { - if websocket.IsCloseError(err,websocket.CloseGoingAway) { + ws.SetReadDeadline(time.Now().Add(time.Second * time.Duration(300))) + if _, message, err := ws.ReadMessage(); err != nil { + if websocket.IsCloseError(err, websocket.CloseGoingAway) { //client close - } else if e,ok := err.(net.Error);ok && e.Timeout() { + } else if e, ok := err.(net.Error); ok && e.Timeout() { //Timeout } else { //other - t.ws_mq.Push_tag(`error`,err) + t.ws_mq.Push_tag(`error`, err) } break } else { - t.ws_mq.Push_tag(`recv`,Uinterface{ - Id:User.Id, - Data:message, + t.ws_mq.Push_tag(`recv`, Uinterface{ + Id: User.Id, + Data: message, }) } } //接收发生错误,通知发送关闭 - t.ws_mq.Push_tag(`close`,uinterface{ - Id:User.Id, - Data:`rev_close`, + t.ws_mq.Push_tag(`close`, uinterface{ + Id: User.Id, + Data: `rev_close`, }) //归还 t.userpool.Put(User) @@ -121,7 +122,7 @@ func (t *Server) WS(w http.ResponseWriter, r *http.Request) (o chan uintptr) { // ws_mq.Push_tag(`close`,Uinterface{//close // Id:0,//close all connect // }) -// //or +// //or // // ws_mq.Push_tag(`close`,Uinterface{//close // // Id:tmp.Id,//close this connect // // }) @@ -145,10 +146,10 @@ func (t *Server) WS(w http.ResponseWriter, r *http.Request) (o chan uintptr) { // return false // }, // }) -func (t *Server) Interface() (*mq.Msgq) { +func (t *Server) Interface() *mq.Msgq { return t.ws_mq } -func (t *Server) Len() uint { +func (t *Server) Len() int64 { return t.userpool.Len() -} \ No newline at end of file +}