go: [ '1.21' ]
steps:
- name: Set up Go${{ matrix.go }}
- uses: actions/setup-go@v4
+ uses: actions/setup-go@v5
with:
go-version: ${{ matrix.go }}
go: [ '1.21' ]
steps:
- name: Set up Go${{ matrix.go }}
- uses: actions/setup-go@v4
+ uses: actions/setup-go@v5
with:
go-version: ${{ matrix.go }}
go: [ '1.21' ]
steps:
- name: Set up Go${{ matrix.go }}
- uses: actions/setup-go@v4
+ uses: actions/setup-go@v5
with:
go-version: ${{ matrix.go }}
go: [ '1.21' ]
steps:
- name: Set up Go${{ matrix.go }}
- uses: actions/setup-go@v4
+ uses: actions/setup-go@v5
with:
go-version: ${{ matrix.go }}
time.Sleep(time.Second * time.Duration(100))
}
}()
-
t.ReqPool = pool.New(
- func() *reqf.Req {
- // if pc, file, line, ok := runtime.Caller(2); ok {
- // fmt.Printf("call by %s\n\t%s:%d\n", runtime.FuncForPC(pc).Name(), file, line)
- // }
- return reqf.New()
- },
- func(t *reqf.Req) bool {
- return t.IsLive()
- },
- func(t *reqf.Req) *reqf.Req {
- // if pc, file, line, ok := runtime.Caller(2); ok {
- // fmt.Printf("call by %s\n\t%s:%d\n", runtime.FuncForPC(pc).Name(), file, line)
- // }
- return t
+ pool.PoolFunc[reqf.Req]{
+ New: func() *reqf.Req {
+ return reqf.New()
+ },
+ InUse: func(r *reqf.Req) bool {
+ return r.IsLive()
+ },
+ Reuse: func(r *reqf.Req) *reqf.Req {
+ return r
+ },
+ Pool: func(r *reqf.Req) *reqf.Req {
+ return r
+ },
},
- func(t *reqf.Req) *reqf.Req {
- return t
- }, 100,
+ 100,
)
var (
return strconv.Atoi(base[:len(base)-4])
}
+func (link *m4s_link_item) download(reqPool *pool.Buf[reqf.Req], reqConfig reqf.Rval) {
+ link.status = 1 // 设置切片状态为正在下载
+ link.tryDownCount += 1
+
+ r := reqPool.Get()
+ defer reqPool.Put(r)
+ reqConfig.Url = link.Url
+
+ // fmt.Println(`T: `, `下载`, link.Base)
+ // defer t.log.L(`T: `, `下载完成`, link.Base, link.status, link.err)
+
+ if e := r.Reqf(reqConfig); e != nil && !errors.Is(e, io.EOF) {
+ // t.log.L(`T: `, `下载错误`, link.Base, e)
+ // if !reqf.IsTimeout(e) {
+ // // 发生非超时错误
+ // link.err = e
+ // link.tryDownCount = 3 // 设置切片状态为下载失败
+ // }
+ link.status = 3 // 设置切片状态为下载失败
+ } else {
+ link.data.Reset()
+ _ = link.data.Append(r.Respon)
+ link.status = 2 // 设置切片状态为下载完成
+ }
+}
+
func (t *M4SStream) getM4s() (p *m4s_link_item) {
return t.m4s_pool.Get()
}
} else if no == last_no {
// 只返回新增加的切片,去掉无用切片
t.putM4s(m4s_links[:k+1]...)
- m4s_links = m4s_links[k+1:]
+ slice.DelFront(&m4s_links, k+1)
// 只返回新增加的m3u8_addon字节
if index := bytes.Index(m3u8_addon, []byte(m4s_link.Base)); index != -1 {
index += len([]byte(m4s_link.Base))
//获取切片地址
p.Url = F.ResolveReferenceLast(v.Url, p.Base)
p.createdTime = time.Now()
- m4s_links = append([]*m4s_link_item{p}, m4s_links...)
+ slice.AddFront(&m4s_links, p)
}
// 请求解析成功,退出获取循环
var download_limit = &funcCtrl.BlockFuncN{Max: 3}
if v, ok := t.common.K_v.LoadV(`debug模式`).(bool); ok && v {
- ctx, can := context.WithCancel(context.Background())
- defer can()
+ cancle := make(chan struct{})
+ defer close(cancle)
go func() {
- ticker := time.NewTicker(time.Minute)
- defer ticker.Stop()
for {
select {
- case <-ctx.Done():
+ case <-cancle:
return
- case <-ticker.C:
+ case <-time.After(time.Minute):
}
reqState := t.m4s_pool.State()
t.log.L(`T: `, fmt.Sprintf("m4sPoolState pooled/no(%d/%d), inuse/no(%d/%d), sum(%d), qts(%.2f)",
go func(link *m4s_link_item) {
defer download_limit.UnBlock()
- link.status = 1 // 设置切片状态为正在下载
- link.tryDownCount += 1
-
- r := t.reqPool.Get()
- defer t.reqPool.Put(r)
- reqConfig := reqf.Rval{
- Url: link.Url,
+ link.download(t.reqPool, reqf.Rval{
Timeout: to * 1000,
WriteLoopTO: (to + 2) * 1000,
Proxy: t.common.Proxy,
Header: map[string]string{
`Connection`: `close`,
},
- }
-
- // t.log.L(`T: `, `下载`, link.Base)
- // defer t.log.L(`T: `, `下载完成`, link.Base, link.status, link.err)
-
- if e := r.Reqf(reqConfig); e != nil && !errors.Is(e, io.EOF) {
- // t.log.L(`T: `, `下载错误`, link.Base, e)
- // if !reqf.IsTimeout(e) {
- // // 发生非超时错误
- // link.err = e
- // link.tryDownCount = 3 // 设置切片状态为下载失败
- // }
- link.status = 3 // 设置切片状态为下载失败
- } else {
- link.data.Reset()
- _ = link.data.Append(r.Respon)
- link.status = 2 // 设置切片状态为下载完成
- }
+ })
}(v)
}
// 初始化池
t.m4s_pool = pool.New(
- func() *m4s_link_item {
- return &m4s_link_item{
- data: slice.New[byte](),
- }
- },
- func(t *m4s_link_item) bool {
- return t.createdTime.After(t.pooledTime) || time.Now().Before(t.pooledTime.Add(time.Second*10))
- },
- func(t *m4s_link_item) *m4s_link_item {
- return t.reset()
- },
- func(t *m4s_link_item) *m4s_link_item {
- t.pooledTime = time.Now()
- return t
+ pool.PoolFunc[m4s_link_item]{
+ New: func() *m4s_link_item {
+ return &m4s_link_item{
+ data: slice.New[byte](),
+ }
+ },
+ InUse: func(t *m4s_link_item) bool {
+ return t.createdTime.After(t.pooledTime) || time.Now().Before(t.pooledTime.Add(time.Second*10))
+ },
+ Reuse: func(t *m4s_link_item) *m4s_link_item {
+ return t.reset()
+ },
+ Pool: func(t *m4s_link_item) *m4s_link_item {
+ t.pooledTime = time.Now()
+ return t
+ },
},
50,
)