//数据整合
{
+ type id_close struct {
+ id uintptr
+ close func()
+ }
+
var (
- reqs_used_id []uintptr
- reqs_remove_id []uintptr
+ reqs_used_id []id_close
+ // reqs_remove_id []id_close
reqs_keyframe [][][]byte
reqs_func_block funcCtrl.BlockFunc
- last_time_stamp int
+ last_keyframe_timestamp int
)
reqs.Pull_tag(map[string]func(interface{})(bool){
`req`:func(data interface{})(bool){
reqs_func_block.Block()
defer reqs_func_block.UnBlock()
- {
- var isclose bool
- for i:=0;i<len(reqs_remove_id);i+=1 {
- if reqs_remove_id[i] == req.id.Id {
- isclose = true
- reqs_remove_id = append(reqs_remove_id[:i], reqs_remove_id[i+1:]...)
- break
- }
- }
- if isclose {
- // fmt.Println(`移除req`,req.id.Id)
- req.close()
- return false
- }
- }
-
-
+ // {
+ // var isclose bool
+ // for i:=0;i<len(reqs_remove_id);i+=1 {
+ // if reqs_remove_id[i].id == req.id.Id {
+ // isclose = true
+ // reqs_remove_id = append(reqs_remove_id[:i], reqs_remove_id[i+1:]...)
+ // break
+ // }
+ // }
+ // if isclose {
+ // // fmt.Println(`移除req`,req.id.Id)
+ // req.close()
+ // return false
+ // }
+ // }
+
var reqs_keyframe_index int = len(reqs_used_id)
{
var isnew bool = true
for i:=0;i<len(reqs_used_id);i+=1 {
- if reqs_used_id[i] == req.id.Id {
+ if reqs_used_id[i].id == req.id.Id {
reqs_keyframe_index = i
isnew = false
break
}
if isnew {
// fmt.Println(`新req`,req.id.Id,reqs_keyframe_index)
- reqs_used_id = append(reqs_used_id, req.id.Id)
+ reqs_used_id = append(reqs_used_id, id_close{
+ id:req.id.Id,
+ close:req.close,
+ })
}
}
-
-
+
if len(reqs_used_id) == 1 {
// fmt.Println(`单req写入`,len(req.keyframe))
-
- last_time_stamp,_ = Keyframe_timebase(req.keyframe,last_time_stamp)
+ last_keyframe_timestamp,_ = Keyframe_timebase(req.keyframe,last_keyframe_timestamp)
for i:=0;i<len(req.keyframe);i+=1 {
//stream
}
return false
}
-
+
for reqs_keyframe_index >= len(reqs_keyframe) {
reqs_keyframe = append(reqs_keyframe, [][]byte{})
}
}
}
- if success_last_keyframe_timestramp,b,merged := Merge_stream(reqs_keyframe);merged == 0 {
+ if success_last_keyframe_timestamp,b,merged := Merge_stream(reqs_keyframe,last_keyframe_timestamp);merged == 0 {
// fmt.Println(`merge失败,reqs_keyframe[1]`,reqs_keyframe[1][0][:11],reqs_keyframe[1][len(reqs_keyframe[1])-1][:11])
if reqs_keyframe_index == 0 {
// fmt.Println(`merge失败,reqs_keyframe[0]写入`,len(req.keyframe))
- last_time_stamp,_ = Keyframe_timebase(req.keyframe,last_time_stamp)
+ last_keyframe_timestamp,_ = Keyframe_timebase(req.keyframe,last_keyframe_timestamp)
for i:=0;i<len(req.keyframe);i+=1 {
//stream
savestream.flv_stream.Push_tag("stream",req.keyframe[i])
out.Write(req.keyframe[i])
}
- reqs_keyframe[0] = [][]byte{reqs_keyframe[0][len(reqs_keyframe[0])-1]}
+ // reqs_keyframe[0] = [][]byte{reqs_keyframe[0][len(reqs_keyframe[0])-1]}
} else if len(reqs_keyframe[len(reqs_used_id)-1]) > 5 {
// fmt.Println(`强行merge`)
-
+ // reqs_remove_id = append(reqs_remove_id, reqs_used_id[0])
// last_time_stamp = int(F.Btoi32([]byte{reqs_keyframe[0][0][7], reqs_keyframe[0][0][4], reqs_keyframe[0][0][5], reqs_keyframe[0][0][6]},0))
-
+
reqs_keyframe = [][][]byte{}
for i:=0;i<len(reqs_used_id)-1;i+=1 {
- reqs_remove_id = append(reqs_remove_id, reqs_used_id[i])
+ reqs_used_id[i].close()
+ // reqs_remove_id = append(reqs_remove_id, reqs_used_id[i])
}
- reqs_used_id = []uintptr{reqs_used_id[len(reqs_used_id)-1]}
+ // reqs_used_id = []uintptr{reqs_used_id[len(reqs_used_id)-1]}
+ reqs_used_id = reqs_used_id[1:]
- last_time_stamp,_ = Keyframe_timebase(req.keyframe,last_time_stamp)
+ last_keyframe_timestamp,_ = Keyframe_timebase(req.keyframe,last_keyframe_timestamp)
for i:=0;i<len(req.keyframe);i+=1 {
//stream
savestream.flv_stream.Push_tag("stream",req.keyframe[i])
out.Write(req.keyframe[i])
}
+ } else if len(reqs_keyframe[len(reqs_used_id)-1]) > 2 {
+ // fmt.Println(`merge 旧连接退出`)
+ reqs_used_id[0].close()
+ // reqs_used_id = reqs_used_id[1:]
+ // last_time_stamp = int(F.Btoi32([]byte{reqs_keyframe[0][0][7], reqs_keyframe[0][0][4], reqs_keyframe[0][0][5], reqs_keyframe[0][0][6]},0))
+
+ // reqs_keyframe = [][][]byte{}
+
+ // for i:=0;i<len(reqs_used_id)-1;i+=1 {
+ // reqs_remove_id = append(reqs_remove_id, reqs_used_id[i])
+ // }
+ // reqs_used_id = []uintptr{reqs_used_id[len(reqs_used_id)-1]}
+
+ // last_keyframe_timestamp,_ = Keyframe_timebase(req.keyframe,last_keyframe_timestamp)
+
+ // for i:=0;i<len(req.keyframe);i+=1 {
+ // //stream
+ // savestream.flv_stream.Push_tag("stream",req.keyframe[i])
+ // out.Write(req.keyframe[i])
+ // }
}
} else {
- // fmt.Println(`merge成功`,len(b))
+ fmt.Println(`merge成功`,len(b))
- last_time_stamp = success_last_keyframe_timestramp
+ last_keyframe_timestamp = success_last_keyframe_timestamp
- for i:=0;i<merged;i+=1 {
- reqs_keyframe[i] = [][]byte{}
- reqs_remove_id = append(reqs_remove_id, reqs_used_id[i])
- }
+ // for i:=0;i<merged;i+=1 {
+ // reqs_used_id[i].close()
+ // reqs_keyframe[i] = [][]byte{}
+ // // reqs_remove_id = append(reqs_remove_id, reqs_used_id[i])
+ // }
+ reqs_keyframe = [][][]byte{}
- reqs_used_id = []uintptr{reqs_used_id[merged]}
+ reqs_used_id = reqs_used_id[merged:merged]
//stream
savestream.flv_stream.Push_tag("stream",b)
return false
},
- `closereq`:func(data interface{})(bool){
- req,ok := data.(link_stream)
-
- if !ok {return false}
-
- req.close()
-
- for i:=0;i<len(reqs_used_id);i+=1 {
- if reqs_used_id[i] == req.id.Id {
- reqs_used_id = append(reqs_used_id[:i],reqs_used_id[i+1:]...)
- if len(reqs_used_id) != 1 {
- reqs_keyframe = append(reqs_keyframe[:i],reqs_keyframe[i+1:]...)
- }
- return false
- }
- }
-
- for i:=0;i<len(reqs_remove_id);i+=1 {
- if reqs_remove_id[i] == req.id.Id {
- reqs_remove_id = append(reqs_remove_id[:i], reqs_remove_id[i+1:]...)
- return false
- }
- }
-
- return false
- },
+ // 11区 1
`close`:func(data interface{})(bool){
- reqs_used_id = []uintptr{}
- reqs_remove_id = []uintptr{}
+ reqs_used_id = []id_close{}
+ // reqs_remove_id = []id_close{}
reqs_keyframe = [][][]byte{}
- last_time_stamp = 0
+ last_keyframe_timestamp = 0
return true
},
})
//SaveToPath:savestream.path + ".flv",
SaveToChan:bc,
Timeout:int(int64(exp) - p.Sys().GetSTime()),
- ReadTimeout:7,
+ ReadTimeout:5,
})
//返回通道
- var item = link_stream{
- close:req.Close,
- id:id_pool.Get(),
- }
+ var (
+ item = link_stream{
+ close:req.Close,
+ id:id_pool.Get(),
+ }
+ http_error = make(chan error)
+ )
//解析
- go func(bc chan[]byte,item *link_stream){
+ go func(http_error chan error,bc chan[]byte,item *link_stream){
var (
buf []byte
skip_buf_size int
if len(b) == 0 {
// fmt.Println(`req退出`,item.id.Id)
id_pool.Put(item.id)
- reqs.Push_tag(`closereq`,*item)
+ http_error <- errors.New(`EOF`)
+ // reqs.Push_tag(`closereq`,*item)
return
}
reqs.Push_tag(`req`,*item)
}
}
- }(bc,&item)
+ }(http_error,bc,&item)
//等待过期/退出
{
select {
case <- savestream.cancel.Chan:
exit_sign = true
- case <- time.After(time.Second*time.Duration(int(int64(exp) - p.Sys().GetSTime())-120)):;
+ case <- time.After(time.Second*time.Duration(int(int64(exp) - p.Sys().GetSTime())-60)):;
+ case e :=<- http_error:
+ l.L(`W: `,`连接`,item.id.Id,e)
}
if exit_sign {break}
}
//this fuction merge two stream and return the merge buffer,which has the newest frame.
//once len(merge_buf) isn't 0,old_buf can be drop and new_buf can be used from now on.or it's still need to keep buf until find the same tag.
-func Merge_stream(keyframe_lists [][][]byte)(last_keyframe_timestramp int,merge_buf []byte,merged int){
+func Merge_stream(keyframe_lists [][][]byte,last_keyframe_timestramp int)(keyframe_timestamp int,merge_buf []byte,merged int){
if len(keyframe_lists) == 0 {return}
// old_buf_o := buf_o
if bytes.Index(buf[n][tag_header_size:],keyframe_lists[i][o][tag_header_size:]) != -1 {
- last_time_stamp := int(F.Btoi32([]byte{buf[n][7], buf[n][4], buf[n][5], buf[n][6]},0))
+ // last_time_stamp := int(F.Btoi32([]byte{buf[n][7], buf[n][4], buf[n][5], buf[n][6]},0))
// tmp_kfs := make([][]byte,len(keyframe_lists[i][o:]))
- last_keyframe_timestramp,_ = Keyframe_timebase(keyframe_lists[i][o:],last_time_stamp)
+ keyframe_timestamp,_ = Keyframe_timebase(keyframe_lists[i][o:],last_keyframe_timestramp)
buf = append(buf[:n], keyframe_lists[i][o:]...)
merged = i
// return
}
-func Keyframe_timebase(buf [][]byte,last_time_stamp int)(newest_time_stamp int,err error){
+func Keyframe_timebase(buf [][]byte,last_keyframe_timestamp int)(keyframe_timestamp int,err error){
var (
tag_num int
- diff_time int
+ base_keyframe_time int
)
// defer func(){
- // fmt.Printf("时间戳调整 newest:%d\n",newest_time_stamp)
+ // fmt.Printf("时间戳调整 newest:%d\n",keyframe_timestamp)
// }()
+ // keyframe_timestamp = last_keyframe_timestamp+3000
for i:=0;i<len(buf);i+=1 {
+ keyframe_timestamp = last_keyframe_timestamp+3000
+
for buf_offset:=0;buf_offset+tag_header_size<len(buf[i]); {
tag_offset := buf_offset+bytes.IndexAny(buf[i][buf_offset:], string([]byte{video_tag,audio_tag,script_tag}));
time_stamp := int(F.Btoi32([]byte{buf[i][tag_offset+7], buf[i][tag_offset+4], buf[i][tag_offset+5], buf[i][tag_offset+6]},0))
- if tag_num == 1 && last_time_stamp != 0 {
- diff_time = last_time_stamp + 3000 - time_stamp
- // fmt.Printf("时间戳调整 last:%d now:%d diff:%d\n",last_time_stamp,time_stamp,diff_time)
- }
-
- if buf[i][tag_offset] == video_tag {
- if buf[i][tag_offset+11] & 0xf0 == 0x10 {//key frame
- newest_time_stamp = time_stamp+diff_time
+ // if tag_num == 1 && last_keyframe_timestamp != 0 {
+ // diff_time = last_keyframe_timestamp + 3000 - time_stamp
+ // fmt.Printf("时间戳调整 last:%d now:%d diff:%d\n",last_keyframe_timestamp,time_stamp,diff_time)
+
+ if buf[i][tag_offset] == video_tag && buf[i][tag_offset+11] & 0xf0 == 0x10{
+ // if {//key frame
+ base_keyframe_time = time_stamp
+ time_stamp = keyframe_timestamp
+ last_keyframe_timestamp = keyframe_timestamp
+ // fmt.Printf("当前关键帧时间戳 %d %d=>%d\n",last_keyframe_timestamp,base_keyframe_time,keyframe_timestamp)
+ // }
+ } else {
+ time_stamp += keyframe_timestamp-base_keyframe_time
}
- }
+ // }
- time_stamp_byte := F.Itob32(int32(time_stamp+diff_time))
+ time_stamp_byte := F.Itob32(int32(time_stamp))
buf[i][tag_offset+7] = time_stamp_byte[0]
buf[i][tag_offset+4] = time_stamp_byte[1]