func (t *FlashFunc) Flash() (current uintptr) {
if t.pool == nil {
- t.pool = idpool.New()
+ t.pool = idpool.New(func() interface{} { return new(struct{}) })
}
if t.b == nil {
t.b = list.New()
import (
"sync"
+ "sync/atomic"
"unsafe"
)
type Idpool struct {
pool sync.Pool
- sum uint
- sync.Mutex
+ sum int64
}
type Id struct {
- Id uintptr
- item interface{}
+ Id uintptr
+ Item interface{}
}
-func New() (*Idpool) {
+func New(f func() interface{}) *Idpool {
return &Idpool{
- pool:sync.Pool{
+ pool: sync.Pool{
New: func() interface{} {
- return new(struct{})
+ var o = new(Id)
+ o.Item = f()
+ o.Id = uintptr(unsafe.Pointer(&o.Item))
+ return o
},
},
}
}
func (t *Idpool) Get() (o *Id) {
- o = new(Id)
- o.item = t.pool.Get()
- o.Id = uintptr(unsafe.Pointer(&o.item))
- t.Lock()
- t.sum += 1
- t.Unlock()
+ o = t.pool.Get().(*Id)
+ atomic.AddInt64(&t.sum, 1)
return
}
func (t *Idpool) Put(i *Id) {
- if i.item == nil {return}
- t.pool.Put(i.item)
- i.item = nil
- t.Lock()
- t.sum -= 1
- t.Unlock()
+ if i.Item == nil {
+ return
+ }
+ i.Item = nil
+ t.pool.Put(i)
+ atomic.AddInt64(&t.sum, -1)
}
-func (t *Idpool) Len() uint {
- return t.sum
-}
\ No newline at end of file
+func (t *Idpool) Len() int64 {
+ return atomic.LoadInt64(&t.sum)
+}
"testing"
)
-func Test(t *testing.T){
- pool := New()
+func Test(t *testing.T) {
+ pool := New(func() interface{} {
+ return new(int)
+ })
a := pool.Get()
b := pool.Get()
- t.Log(a.Id,a.item,pool.Len())
- t.Log(b.Id,b.item)
+ t.Log(a.Id, a.Item, pool.Len())
+ t.Log(b.Id, b.Item)
pool.Put(a)
pool.Put(a)
- t.Log(a.Id,a.item,pool.Len())
- t.Log(b.Id,b.item)
+ t.Log(a.Id, a.Item, pool.Len())
+ t.Log(b.Id, b.Item)
a = pool.Get()
- t.Log(a.Id,a.item,pool.Len())
- t.Log(b.Id,b.item)
+ t.Log(a.Id, a.Item, pool.Len())
+ t.Log(b.Id, b.Item)
}
"time"
compress "github.com/qydysky/part/compress"
- idpool "github.com/qydysky/part/idpool"
pio "github.com/qydysky/part/io"
signal "github.com/qydysky/part/signal"
// "encoding/binary"
Response *http.Response
UsedTime time.Duration
- id *idpool.Id
- idp *idpool.Idpool
cancel *signal.Signal
sync.Mutex
}
func New() *Req {
- idp := idpool.New()
- return &Req{
- idp: idp,
- id: idp.Get(),
- }
+ return new(Req)
}
// func main(){
_val := val
- defer func() {
- t.idp.Put(t.id)
- }()
-
t.cancel = signal.Init()
for SleepTime, Retry := _val.SleepTime, _val.Retry; Retry >= 0; Retry -= 1 {
t.cancel.Done()
}
-func (t *Req) Id() uintptr {
- if t.id == nil {
- return 0
- }
- return t.id.Id
-}
-
func IsTimeout(e error) bool {
if errors.Is(e, context.DeadlineExceeded) || errors.Is(e, ErrConnectTimeout) || errors.Is(e, ErrReadTimeout) {
return true
package part
import (
- "time"
"container/list"
- syncmap "github.com/qydysky/part/sync"
+ "time"
+
idpool "github.com/qydysky/part/idpool"
+ syncmap "github.com/qydysky/part/sync"
)
type tmplK struct {
SumInDruation int64
- Druation int64
- now int64
- pool *idpool.Idpool
- kvt_map syncmap.Map
- slowBackList *list.List
+ Druation int64
+ now int64
+ pool *idpool.Idpool
+ kvt_map syncmap.Map
+ slowBackList *list.List
}
type tmplK_item struct {
- kv uintptr
- kt int64
+ kv uintptr
+ kt int64
uid *idpool.Id
}
-func New_tmplK(SumInDruation,Druation int64) (*tmplK) {
+func New_tmplK(SumInDruation, Druation int64) *tmplK {
s := &tmplK{
- SumInDruation:SumInDruation,
- Druation:Druation,
- pool:idpool.New(),
- slowBackList:list.New(),
+ SumInDruation: SumInDruation,
+ Druation: Druation,
+ pool: idpool.New(func() interface{} { return new(struct{}) }),
+ slowBackList: list.New(),
}
- go func(){
+ go func() {
ticker := time.NewTicker(time.Second)
- for{
- s.now = (<- ticker.C).Unix()
+ for {
+ s.now = (<-ticker.C).Unix()
}
}()
}
func (s *tmplK) Set(key interface{}) (id uintptr) {
-
- if tmp, oks := s.kvt_map.LoadV(key).(tmplK_item);oks {
+
+ if tmp, oks := s.kvt_map.LoadV(key).(tmplK_item); oks {
s.free(tmp.uid)
- } else if s.SumInDruation >= 0 && s.freeLen() >= s.SumInDruation{//不为无限&&达到限额 随机替代
- s.kvt_map.Range(func(oldkey,item interface{})(bool){
+ } else if s.SumInDruation >= 0 && s.freeLen() >= s.SumInDruation { //不为无限&&达到限额 随机替代
+ s.kvt_map.Range(func(oldkey, item interface{}) bool {
id = item.(tmplK_item).kv
s.kvt_map.Store(key, tmplK_item{
- kv: id,
- kt: s.now,
+ kv: id,
+ kt: s.now,
uid: item.(tmplK_item).uid,
})
s.kvt_map.Delete(oldkey)
Uid := s.pool.Get()
s.kvt_map.Store(key, tmplK_item{
- kv: Uid.Id,
- kt: s.now,
+ kv: Uid.Id,
+ kt: s.now,
uid: Uid,
})
return Uid.Id
}
-func (s *tmplK) Get(key interface{}) (isLive bool,id uintptr){
+func (s *tmplK) Get(key interface{}) (isLive bool, id uintptr) {
tmp, ok := s.kvt_map.Load(key)
- item,_ := tmp.(tmplK_item)
+ item, _ := tmp.(tmplK_item)
id = item.kv
- isLive = ok && s.Druation < 0 || s.now - item.kt <= s.Druation
+ isLive = ok && s.Druation < 0 || s.now-item.kt <= s.Druation
if !isLive && ok {
s.free(item.uid)
s.kvt_map.Delete(key)
return
}
-func (s *tmplK) Check(key interface{},id uintptr) bool {
- ok,k := s.Get(key)
+func (s *tmplK) Check(key interface{}, id uintptr) bool {
+ ok, k := s.Get(key)
return ok && (k == id)
}
-func (s *tmplK) Len() (int64,int) {
- return s.now,s.kvt_map.Len()
+func (s *tmplK) Len() (int64, int) {
+ return s.now, s.kvt_map.Len()
}
-func (s *tmplK) freeLen() (int64) {
+func (s *tmplK) freeLen() int64 {
return int64(int(s.pool.Len()) + s.slowBackList.Len())
}
func (s *tmplK) free(i *idpool.Id) {
s.slowBackList.PushBack(i)
if s.freeLen() > s.SumInDruation {
- if el := s.slowBackList.Front();el != nil && el.Value != nil{
+ if el := s.slowBackList.Front(); el != nil && el.Value != nil {
e := s.slowBackList.Remove(el)
s.pool.Put(e.(*idpool.Id))
}
}
-}
\ No newline at end of file
+}
package part
import (
- "time"
"sync"
+ "time"
+
idpool "github.com/qydysky/part/idpool"
)
type tmplV struct {
SumInDruation int64
- Druation int64
- now int64
- deleteNum int
- pool *idpool.Idpool
- kvt_map map[uintptr]tmplV_item
+ Druation int64
+ now int64
+ deleteNum int
+ pool *idpool.Idpool
+ kvt_map map[uintptr]tmplV_item
sync.RWMutex
}
type tmplV_item struct {
- kv string
- kt int64
+ kv string
+ kt int64
uid *idpool.Id
}
-func New_tmplV(SumInDruation,Druation int64) (*tmplV) {
+func New_tmplV(SumInDruation, Druation int64) *tmplV {
s := &tmplV{
- SumInDruation:SumInDruation,
- Druation:Druation,
- kvt_map:make(map[uintptr]tmplV_item),
- pool:idpool.New(),
+ SumInDruation: SumInDruation,
+ Druation: Druation,
+ kvt_map: make(map[uintptr]tmplV_item),
+ pool: idpool.New(func() interface{} { return new(struct{}) }),
}
- go func(){
+ go func() {
ticker := time.NewTicker(time.Second)
- for{
- s.now = (<- ticker.C).Unix()
+ for {
+ s.now = (<-ticker.C).Unix()
}
}()
func (s *tmplV) Set(contect string) (key uintptr) {
- if s.SumInDruation >= 0 && s.pool.Len() >= uint(s.SumInDruation) {//不为无限&&达到限额 随机替代
+ if s.SumInDruation >= 0 && s.pool.Len() >= uint(s.SumInDruation) { //不为无限&&达到限额 随机替代
s.Lock()
- for key,item := range s.kvt_map {
+ for key, item := range s.kvt_map {
s.kvt_map[key] = tmplV_item{
- kv: contect,
- kt: s.now,
+ kv: contect,
+ kt: s.now,
uid: item.uid,
}
s.Unlock()
s.Lock()
s.kvt_map[Uid.Id] = tmplV_item{
- kv: contect,
- kt: s.now,
+ kv: contect,
+ kt: s.now,
uid: Uid,
}
s.Unlock()
return Uid.Id
}
-func (s *tmplV) Get(key uintptr) (isLive bool,contect string){
+func (s *tmplV) Get(key uintptr) (isLive bool, contect string) {
s.RLock()
K, ok := s.kvt_map[key]
s.RUnlock()
contect = K.kv
- isLive = ok && s.Druation < 0 || s.now - K.kt <= s.Druation
+ isLive = ok && s.Druation < 0 || s.now-K.kt <= s.Druation
if !isLive && ok {
s.pool.Put(K.uid)
s.Lock()
- delete(s.kvt_map,key)
+ delete(s.kvt_map, key)
if s.deleteNum > len(s.kvt_map) {
s.deleteNum = 0
go s.Tidy()
return
}
-func (s *tmplV) Check(key uintptr,contect string) bool {
- ok,k := s.Get(key)
+func (s *tmplV) Check(key uintptr, contect string) bool {
+ ok, k := s.Get(key)
return ok && (k == contect)
}
-func (s *tmplV) Buf() (int64,int) {
- return s.now,len(s.kvt_map)
+func (s *tmplV) Buf() (int64, int) {
+ return s.now, len(s.kvt_map)
}
func (s *tmplV) Tidy() {
tmp := make(map[uintptr]tmplV_item)
s.Lock()
- for k,v := range s.kvt_map {tmp[k] = v}
+ for k, v := range s.kvt_map {
+ tmp[k] = v
+ }
s.kvt_map = tmp
s.Unlock()
-}
\ No newline at end of file
+}
import (
"net"
"net/http"
- "time"
+ "time"
+
"github.com/gorilla/websocket"
idpool "github.com/qydysky/part/idpool"
mq "github.com/qydysky/part/msgq"
)
type Server struct {
- ws_mq *mq.Msgq
+ ws_mq *mq.Msgq
userpool *idpool.Idpool
}
type Uinterface struct {
- Id uintptr
+ Id uintptr
Data []byte
}
-type uinterface struct {//内部消息
- Id uintptr
+type uinterface struct { //内部消息
+ Id uintptr
Data interface{}
}
-func New_server() (*Server) {
+func New_server() *Server {
return &Server{
- ws_mq: mq.New(200),//收发通道
- userpool: idpool.New(),//浏览器标签页池
+ ws_mq: mq.New(200), //收发通道
+ userpool: idpool.New(func() interface{} { return new(struct{}) }), //浏览器标签页池
}
}
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
- t.ws_mq.Push_tag(`error`,err)
+ t.ws_mq.Push_tag(`error`, err)
return
}
- o = make(chan uintptr,1)
+ o = make(chan uintptr, 1)
//从池中获取本会话id
User := t.userpool.Get()
//发送
- t.ws_mq.Pull_tag(map[string]func(interface{})(bool){
- `send`:func(data interface{})(bool){
- if u,ok := data.(Uinterface);ok && u.Id == 0 || u.Id == User.Id{
- if err := ws.WriteMessage(websocket.TextMessage,u.Data);err != nil {
- t.ws_mq.Push_tag(`error`,err)
+ t.ws_mq.Pull_tag(map[string]func(interface{}) bool{
+ `send`: func(data interface{}) bool {
+ if u, ok := data.(Uinterface); ok && u.Id == 0 || u.Id == User.Id {
+ if err := ws.WriteMessage(websocket.TextMessage, u.Data); err != nil {
+ t.ws_mq.Push_tag(`error`, err)
return true
}
}
return false
},
- `close`:func(data interface{})(bool){
- if u,ok := data.(Uinterface);ok && u.Id == 0 || u.Id == User.Id{//服务器主动关闭
+ `close`: func(data interface{}) bool {
+ if u, ok := data.(Uinterface); ok && u.Id == 0 || u.Id == User.Id { //服务器主动关闭
msg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, string(u.Data))
- TO := time.Now().Add(time.Second*time.Duration(5))
+ TO := time.Now().Add(time.Second * time.Duration(5))
- if err := ws.WriteControl(websocket.CloseMessage, msg, TO);err != nil {
- t.ws_mq.Push_tag(`error`,err)
+ if err := ws.WriteControl(websocket.CloseMessage, msg, TO); err != nil {
+ t.ws_mq.Push_tag(`error`, err)
}
return true
- } else if u,ok := data.(uinterface);ok{//接收发生错误关闭
+ } else if u, ok := data.(uinterface); ok { //接收发生错误关闭
return ok && u.Data.(string) == `rev_close` && u.Id == 0 || u.Id == User.Id
}
return false
})
//接收
- go func(){
+ go func() {
for {
- ws.SetReadDeadline(time.Now().Add(time.Second*time.Duration(300)))
- if _, message, err := ws.ReadMessage();err != nil {
- if websocket.IsCloseError(err,websocket.CloseGoingAway) {
+ ws.SetReadDeadline(time.Now().Add(time.Second * time.Duration(300)))
+ if _, message, err := ws.ReadMessage(); err != nil {
+ if websocket.IsCloseError(err, websocket.CloseGoingAway) {
//client close
- } else if e,ok := err.(net.Error);ok && e.Timeout() {
+ } else if e, ok := err.(net.Error); ok && e.Timeout() {
//Timeout
} else {
//other
- t.ws_mq.Push_tag(`error`,err)
+ t.ws_mq.Push_tag(`error`, err)
}
break
} else {
- t.ws_mq.Push_tag(`recv`,Uinterface{
- Id:User.Id,
- Data:message,
+ t.ws_mq.Push_tag(`recv`, Uinterface{
+ Id: User.Id,
+ Data: message,
})
}
}
//接收发生错误,通知发送关闭
- t.ws_mq.Push_tag(`close`,uinterface{
- Id:User.Id,
- Data:`rev_close`,
+ t.ws_mq.Push_tag(`close`, uinterface{
+ Id: User.Id,
+ Data: `rev_close`,
})
//归还
t.userpool.Put(User)
// ws_mq.Push_tag(`close`,Uinterface{//close
// Id:0,//close all connect
// })
-// //or
+// //or
// // ws_mq.Push_tag(`close`,Uinterface{//close
// // Id:tmp.Id,//close this connect
// // })
// return false
// },
// })
-func (t *Server) Interface() (*mq.Msgq) {
+func (t *Server) Interface() *mq.Msgq {
return t.ws_mq
}
-func (t *Server) Len() uint {
+func (t *Server) Len() int64 {
return t.userpool.Len()
-}
\ No newline at end of file
+}