]> 127.0.0.1 Git - part/.git/commitdiff
pool v0.9.4
authorqydysky <qydysky@foxmail.com>
Thu, 12 May 2022 02:15:42 +0000 (10:15 +0800)
committerqydysky <qydysky@foxmail.com>
Thu, 12 May 2022 02:15:42 +0000 (10:15 +0800)
funcCtrl/FuncCtrl.go
idpool/Idpool.go
idpool/Idpool_test.go
reqf/Reqf.go
tmplKV/tmplK.go
tmplKV/tmplV.go
websocket/Server.go

index 799a1fcd3d070ac1e7879d35dc8fc482e1979f94..054c1581d15a4466a4eacbe71a892ef92ba51d49 100644 (file)
@@ -29,7 +29,7 @@ type FlashFunc struct { //新的替换旧的
 
 func (t *FlashFunc) Flash() (current uintptr) {
        if t.pool == nil {
-               t.pool = idpool.New()
+               t.pool = idpool.New(func() interface{} { return new(struct{}) })
        }
        if t.b == nil {
                t.b = list.New()
index ad35a4723ad9f61e2c4667296b155493c39c73b4..3ed36ac6b4e6ceb313e5e2bed2237ccbf6c2e1b0 100644 (file)
@@ -2,49 +2,48 @@ package part
 
 import (
        "sync"
+       "sync/atomic"
        "unsafe"
 )
 
 type Idpool struct {
        pool sync.Pool
-       sum uint
-       sync.Mutex
+       sum  int64
 }
 
 type Id struct {
-       Id uintptr
-       item interface{}
+       Id   uintptr
+       Item interface{}
 }
 
-func New() (*Idpool) {
+func New(f func() interface{}) *Idpool {
        return &Idpool{
-               pool:sync.Pool{
+               pool: sync.Pool{
                        New: func() interface{} {
-                               return new(struct{})
+                               var o = new(Id)
+                               o.Item = f()
+                               o.Id = uintptr(unsafe.Pointer(&o.Item))
+                               return o
                        },
                },
        }
 }
 
 func (t *Idpool) Get() (o *Id) {
-       o = new(Id)
-       o.item = t.pool.Get()
-       o.Id = uintptr(unsafe.Pointer(&o.item))
-       t.Lock()
-       t.sum += 1
-       t.Unlock()
+       o = t.pool.Get().(*Id)
+       atomic.AddInt64(&t.sum, 1)
        return
 }
 
 func (t *Idpool) Put(i *Id) {
-       if i.item == nil {return}
-       t.pool.Put(i.item)
-       i.item = nil
-       t.Lock()
-       t.sum -= 1
-       t.Unlock()
+       if i.Item == nil {
+               return
+       }
+       i.Item = nil
+       t.pool.Put(i)
+       atomic.AddInt64(&t.sum, -1)
 }
 
-func (t *Idpool) Len() uint {
-       return t.sum
-}
\ No newline at end of file
+func (t *Idpool) Len() int64 {
+       return atomic.LoadInt64(&t.sum)
+}
index 43ffd68b517fbfe7bcdbbef563e03f18bcf6d284..894b2f183b6c1b7867b5328048755b594f800407 100644 (file)
@@ -4,17 +4,19 @@ import (
        "testing"
 )
 
-func Test(t *testing.T){
-       pool := New()
+func Test(t *testing.T) {
+       pool := New(func() interface{} {
+               return new(int)
+       })
        a := pool.Get()
        b := pool.Get()
-       t.Log(a.Id,a.item,pool.Len())
-       t.Log(b.Id,b.item)
+       t.Log(a.Id, a.Item, pool.Len())
+       t.Log(b.Id, b.Item)
        pool.Put(a)
        pool.Put(a)
-       t.Log(a.Id,a.item,pool.Len())
-       t.Log(b.Id,b.item)
+       t.Log(a.Id, a.Item, pool.Len())
+       t.Log(b.Id, b.Item)
        a = pool.Get()
-       t.Log(a.Id,a.item,pool.Len())
-       t.Log(b.Id,b.item)
+       t.Log(a.Id, a.Item, pool.Len())
+       t.Log(b.Id, b.Item)
 }
index 4217827f54808284736331395dde6f80ad3ce553..fadabd3f44d422f16e5b7a8f73b6708f0746c40d 100644 (file)
@@ -15,7 +15,6 @@ import (
        "time"
 
        compress "github.com/qydysky/part/compress"
-       idpool "github.com/qydysky/part/idpool"
        pio "github.com/qydysky/part/io"
        signal "github.com/qydysky/part/signal"
        // "encoding/binary"
@@ -50,18 +49,12 @@ type Req struct {
        Response *http.Response
        UsedTime time.Duration
 
-       id     *idpool.Id
-       idp    *idpool.Idpool
        cancel *signal.Signal
        sync.Mutex
 }
 
 func New() *Req {
-       idp := idpool.New()
-       return &Req{
-               idp: idp,
-               id:  idp.Get(),
-       }
+       return new(Req)
 }
 
 // func main(){
@@ -82,10 +75,6 @@ func (t *Req) Reqf(val Rval) error {
 
        _val := val
 
-       defer func() {
-               t.idp.Put(t.id)
-       }()
-
        t.cancel = signal.Init()
 
        for SleepTime, Retry := _val.SleepTime, _val.Retry; Retry >= 0; Retry -= 1 {
@@ -374,13 +363,6 @@ func (t *Req) Close() {
        t.cancel.Done()
 }
 
-func (t *Req) Id() uintptr {
-       if t.id == nil {
-               return 0
-       }
-       return t.id.Id
-}
-
 func IsTimeout(e error) bool {
        if errors.Is(e, context.DeadlineExceeded) || errors.Is(e, ErrConnectTimeout) || errors.Is(e, ErrReadTimeout) {
                return true
index 2e3dd8c2c7e4b5af9420934b7eed438a3e669c99..f0530bf4ec56fab6f42eb7626dcc62d8376370d7 100644 (file)
@@ -1,39 +1,40 @@
 package part
 
 import (
-       "time"
        "container/list"
-       syncmap "github.com/qydysky/part/sync"
+       "time"
+
        idpool "github.com/qydysky/part/idpool"
+       syncmap "github.com/qydysky/part/sync"
 )
 
 type tmplK struct {
        SumInDruation int64
-       Druation int64
-       now int64
-       pool *idpool.Idpool
-       kvt_map syncmap.Map
-       slowBackList *list.List
+       Druation      int64
+       now           int64
+       pool          *idpool.Idpool
+       kvt_map       syncmap.Map
+       slowBackList  *list.List
 }
 
 type tmplK_item struct {
-       kv uintptr
-       kt int64
+       kv  uintptr
+       kt  int64
        uid *idpool.Id
 }
 
-func New_tmplK(SumInDruation,Druation int64) (*tmplK) {
+func New_tmplK(SumInDruation, Druation int64) *tmplK {
 
        s := &tmplK{
-               SumInDruation:SumInDruation,
-               Druation:Druation,
-               pool:idpool.New(),
-               slowBackList:list.New(),
+               SumInDruation: SumInDruation,
+               Druation:      Druation,
+               pool:          idpool.New(func() interface{} { return new(struct{}) }),
+               slowBackList:  list.New(),
        }
-       go func(){
+       go func() {
                ticker := time.NewTicker(time.Second)
-               for{
-                       s.now = (<- ticker.C).Unix()
+               for {
+                       s.now = (<-ticker.C).Unix()
                }
        }()
 
@@ -41,15 +42,15 @@ func New_tmplK(SumInDruation,Druation int64) (*tmplK) {
 }
 
 func (s *tmplK) Set(key interface{}) (id uintptr) {
-       
-       if tmp, oks := s.kvt_map.LoadV(key).(tmplK_item);oks {
+
+       if tmp, oks := s.kvt_map.LoadV(key).(tmplK_item); oks {
                s.free(tmp.uid)
-       } else if s.SumInDruation >= 0 && s.freeLen() >= s.SumInDruation{//不为无限&&达到限额 随机替代
-               s.kvt_map.Range(func(oldkey,item interface{})(bool){
+       } else if s.SumInDruation >= 0 && s.freeLen() >= s.SumInDruation { //不为无限&&达到限额 随机替代
+               s.kvt_map.Range(func(oldkey, item interface{}) bool {
                        id = item.(tmplK_item).kv
                        s.kvt_map.Store(key, tmplK_item{
-                               kv: id,
-                               kt: s.now,
+                               kv:  id,
+                               kt:  s.now,
                                uid: item.(tmplK_item).uid,
                        })
                        s.kvt_map.Delete(oldkey)
@@ -61,21 +62,21 @@ func (s *tmplK) Set(key interface{}) (id uintptr) {
        Uid := s.pool.Get()
 
        s.kvt_map.Store(key, tmplK_item{
-               kv: Uid.Id,
-               kt: s.now,
+               kv:  Uid.Id,
+               kt:  s.now,
                uid: Uid,
        })
 
        return Uid.Id
 }
 
-func (s *tmplK) Get(key interface{}) (isLive bool,id uintptr){
+func (s *tmplK) Get(key interface{}) (isLive bool, id uintptr) {
        tmp, ok := s.kvt_map.Load(key)
 
-       item,_ := tmp.(tmplK_item)
+       item, _ := tmp.(tmplK_item)
        id = item.kv
 
-       isLive = ok && s.Druation < 0 || s.now - item.kt <= s.Druation
+       isLive = ok && s.Druation < 0 || s.now-item.kt <= s.Druation
        if !isLive && ok {
                s.free(item.uid)
                s.kvt_map.Delete(key)
@@ -83,25 +84,25 @@ func (s *tmplK) Get(key interface{}) (isLive bool,id uintptr){
        return
 }
 
-func (s *tmplK) Check(key interface{},id uintptr) bool {
-       ok,k := s.Get(key)
+func (s *tmplK) Check(key interface{}, id uintptr) bool {
+       ok, k := s.Get(key)
        return ok && (k == id)
 }
 
-func (s *tmplK) Len() (int64,int) {
-       return s.now,s.kvt_map.Len()
+func (s *tmplK) Len() (int64, int) {
+       return s.now, s.kvt_map.Len()
 }
 
-func (s *tmplK) freeLen() (int64) {
+func (s *tmplK) freeLen() int64 {
        return int64(int(s.pool.Len()) + s.slowBackList.Len())
 }
 
 func (s *tmplK) free(i *idpool.Id) {
        s.slowBackList.PushBack(i)
        if s.freeLen() > s.SumInDruation {
-               if el := s.slowBackList.Front();el != nil && el.Value != nil{
+               if el := s.slowBackList.Front(); el != nil && el.Value != nil {
                        e := s.slowBackList.Remove(el)
                        s.pool.Put(e.(*idpool.Id))
                }
        }
-}
\ No newline at end of file
+}
index 8213eabda556cb60734409c223668310eaf94816..2acc30eb629bbae74ce56feb2fdf583914ace4f6 100644 (file)
@@ -1,39 +1,40 @@
 package part
 
 import (
-       "time"
        "sync"
+       "time"
+
        idpool "github.com/qydysky/part/idpool"
 )
 
 type tmplV struct {
        SumInDruation int64
-       Druation int64
-       now int64
-       deleteNum int
-       pool *idpool.Idpool
-       kvt_map map[uintptr]tmplV_item
+       Druation      int64
+       now           int64
+       deleteNum     int
+       pool          *idpool.Idpool
+       kvt_map       map[uintptr]tmplV_item
        sync.RWMutex
 }
 
 type tmplV_item struct {
-       kv string
-       kt int64
+       kv  string
+       kt  int64
        uid *idpool.Id
 }
 
-func New_tmplV(SumInDruation,Druation int64) (*tmplV) {
+func New_tmplV(SumInDruation, Druation int64) *tmplV {
 
        s := &tmplV{
-               SumInDruation:SumInDruation,
-               Druation:Druation,
-               kvt_map:make(map[uintptr]tmplV_item),
-               pool:idpool.New(),
+               SumInDruation: SumInDruation,
+               Druation:      Druation,
+               kvt_map:       make(map[uintptr]tmplV_item),
+               pool:          idpool.New(func() interface{} { return new(struct{}) }),
        }
-       go func(){
+       go func() {
                ticker := time.NewTicker(time.Second)
-               for{
-                       s.now = (<- ticker.C).Unix()
+               for {
+                       s.now = (<-ticker.C).Unix()
                }
        }()
 
@@ -42,12 +43,12 @@ func New_tmplV(SumInDruation,Druation int64) (*tmplV) {
 
 func (s *tmplV) Set(contect string) (key uintptr) {
 
-       if s.SumInDruation >= 0 && s.pool.Len() >= uint(s.SumInDruation) {//不为无限&&达到限额 随机替代
+       if s.SumInDruation >= 0 && s.pool.Len() >= uint(s.SumInDruation) { //不为无限&&达到限额 随机替代
                s.Lock()
-               for key,item := range s.kvt_map {
+               for key, item := range s.kvt_map {
                        s.kvt_map[key] = tmplV_item{
-                               kv: contect,
-                               kt: s.now,
+                               kv:  contect,
+                               kt:  s.now,
                                uid: item.uid,
                        }
                        s.Unlock()
@@ -59,8 +60,8 @@ func (s *tmplV) Set(contect string) (key uintptr) {
 
        s.Lock()
        s.kvt_map[Uid.Id] = tmplV_item{
-               kv: contect,
-               kt: s.now,
+               kv:  contect,
+               kt:  s.now,
                uid: Uid,
        }
        s.Unlock()
@@ -68,16 +69,16 @@ func (s *tmplV) Set(contect string) (key uintptr) {
        return Uid.Id
 }
 
-func (s *tmplV) Get(key uintptr) (isLive bool,contect string){
+func (s *tmplV) Get(key uintptr) (isLive bool, contect string) {
        s.RLock()
        K, ok := s.kvt_map[key]
        s.RUnlock()
        contect = K.kv
-       isLive = ok && s.Druation < 0 || s.now - K.kt <= s.Druation
+       isLive = ok && s.Druation < 0 || s.now-K.kt <= s.Druation
        if !isLive && ok {
                s.pool.Put(K.uid)
                s.Lock()
-               delete(s.kvt_map,key)
+               delete(s.kvt_map, key)
                if s.deleteNum > len(s.kvt_map) {
                        s.deleteNum = 0
                        go s.Tidy()
@@ -87,19 +88,21 @@ func (s *tmplV) Get(key uintptr) (isLive bool,contect string){
        return
 }
 
-func (s *tmplV) Check(key uintptr,contect string) bool {
-       ok,k := s.Get(key)
+func (s *tmplV) Check(key uintptr, contect string) bool {
+       ok, k := s.Get(key)
        return ok && (k == contect)
 }
 
-func (s *tmplV) Buf() (int64,int) {
-       return s.now,len(s.kvt_map)
+func (s *tmplV) Buf() (int64, int) {
+       return s.now, len(s.kvt_map)
 }
 
 func (s *tmplV) Tidy() {
        tmp := make(map[uintptr]tmplV_item)
        s.Lock()
-       for k,v := range s.kvt_map {tmp[k] = v}
+       for k, v := range s.kvt_map {
+               tmp[k] = v
+       }
        s.kvt_map = tmp
        s.Unlock()
-}
\ No newline at end of file
+}
index 7e78964f25f90f920131cbfdf349262cbd63a395..f9178bfc47c93ad168897f575cfb836cc4c4a369 100644 (file)
@@ -3,31 +3,32 @@ package part
 import (
        "net"
        "net/http"
-    "time"
+       "time"
+
        "github.com/gorilla/websocket"
        idpool "github.com/qydysky/part/idpool"
        mq "github.com/qydysky/part/msgq"
 )
 
 type Server struct {
-       ws_mq *mq.Msgq
+       ws_mq    *mq.Msgq
        userpool *idpool.Idpool
 }
 
 type Uinterface struct {
-       Id uintptr
+       Id   uintptr
        Data []byte
 }
 
-type uinterface struct {//内部消息
-       Id uintptr
+type uinterface struct { //内部消息
+       Id   uintptr
        Data interface{}
 }
 
-func New_server() (*Server) {
+func New_server() *Server {
        return &Server{
-               ws_mq: mq.New(200),//收发通道
-               userpool: idpool.New(),//浏览器标签页池
+               ws_mq:    mq.New(200),                                             //收发通道
+               userpool: idpool.New(func() interface{} { return new(struct{}) }), //浏览器标签页池
        }
 }
 
@@ -36,36 +37,36 @@ func (t *Server) WS(w http.ResponseWriter, r *http.Request) (o chan uintptr) {
 
        ws, err := upgrader.Upgrade(w, r, nil)
        if err != nil {
-               t.ws_mq.Push_tag(`error`,err)
+               t.ws_mq.Push_tag(`error`, err)
                return
        }
 
-       o = make(chan uintptr,1)
+       o = make(chan uintptr, 1)
 
        //从池中获取本会话id
        User := t.userpool.Get()
 
        //发送
-       t.ws_mq.Pull_tag(map[string]func(interface{})(bool){
-               `send`:func(data interface{})(bool){
-                       if u,ok := data.(Uinterface);ok && u.Id == 0 || u.Id == User.Id{
-                               if err := ws.WriteMessage(websocket.TextMessage,u.Data);err != nil {
-                                       t.ws_mq.Push_tag(`error`,err)
+       t.ws_mq.Pull_tag(map[string]func(interface{}) bool{
+               `send`: func(data interface{}) bool {
+                       if u, ok := data.(Uinterface); ok && u.Id == 0 || u.Id == User.Id {
+                               if err := ws.WriteMessage(websocket.TextMessage, u.Data); err != nil {
+                                       t.ws_mq.Push_tag(`error`, err)
                                        return true
                                }
                        }
                        return false
                },
-               `close`:func(data interface{})(bool){
-                       if u,ok := data.(Uinterface);ok && u.Id == 0 || u.Id == User.Id{//服务器主动关闭
+               `close`: func(data interface{}) bool {
+                       if u, ok := data.(Uinterface); ok && u.Id == 0 || u.Id == User.Id { //服务器主动关闭
                                msg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, string(u.Data))
-                               TO := time.Now().Add(time.Second*time.Duration(5))
+                               TO := time.Now().Add(time.Second * time.Duration(5))
 
-                               if err := ws.WriteControl(websocket.CloseMessage, msg, TO);err != nil {
-                                       t.ws_mq.Push_tag(`error`,err)
+                               if err := ws.WriteControl(websocket.CloseMessage, msg, TO); err != nil {
+                                       t.ws_mq.Push_tag(`error`, err)
                                }
                                return true
-                       } else if u,ok := data.(uinterface);ok{//接收发生错误关闭
+                       } else if u, ok := data.(uinterface); ok { //接收发生错误关闭
                                return ok && u.Data.(string) == `rev_close` && u.Id == 0 || u.Id == User.Id
                        }
                        return false
@@ -73,31 +74,31 @@ func (t *Server) WS(w http.ResponseWriter, r *http.Request) (o chan uintptr) {
        })
 
        //接收
-       go func(){
+       go func() {
                for {
-                       ws.SetReadDeadline(time.Now().Add(time.Second*time.Duration(300)))
-                       if _, message, err := ws.ReadMessage();err != nil {
-                               if websocket.IsCloseError(err,websocket.CloseGoingAway) {
+                       ws.SetReadDeadline(time.Now().Add(time.Second * time.Duration(300)))
+                       if _, message, err := ws.ReadMessage(); err != nil {
+                               if websocket.IsCloseError(err, websocket.CloseGoingAway) {
                                        //client close
-                               } else if e,ok := err.(net.Error);ok && e.Timeout() {
+                               } else if e, ok := err.(net.Error); ok && e.Timeout() {
                                        //Timeout
                                } else {
                                        //other
-                                       t.ws_mq.Push_tag(`error`,err)
+                                       t.ws_mq.Push_tag(`error`, err)
                                }
                                break
                        } else {
-                               t.ws_mq.Push_tag(`recv`,Uinterface{
-                                       Id:User.Id,
-                                       Data:message,
+                               t.ws_mq.Push_tag(`recv`, Uinterface{
+                                       Id:   User.Id,
+                                       Data: message,
                                })
                        }
                }
 
                //接收发生错误,通知发送关闭
-               t.ws_mq.Push_tag(`close`,uinterface{
-                       Id:User.Id,
-                       Data:`rev_close`,
+               t.ws_mq.Push_tag(`close`, uinterface{
+                       Id:   User.Id,
+                       Data: `rev_close`,
                })
                //归还
                t.userpool.Put(User)
@@ -121,7 +122,7 @@ func (t *Server) WS(w http.ResponseWriter, r *http.Request) (o chan uintptr) {
 //                             ws_mq.Push_tag(`close`,Uinterface{//close
 //                                     Id:0,//close all connect
 //                             })
-//                             //or 
+//                             //or
 //                             // ws_mq.Push_tag(`close`,Uinterface{//close
 //                             //      Id:tmp.Id,//close this connect
 //                             // })
@@ -145,10 +146,10 @@ func (t *Server) WS(w http.ResponseWriter, r *http.Request) (o chan uintptr) {
 //             return false
 //     },
 // })
-func (t *Server) Interface() (*mq.Msgq) {
+func (t *Server) Interface() *mq.Msgq {
        return t.ws_mq
 }
 
-func (t *Server) Len() uint {
+func (t *Server) Len() int64 {
        return t.userpool.Len()
-}
\ No newline at end of file
+}