var tfhdBuf = make([]byte, 12)
var boxBuf = make([]byte, 4)
var trackBuf = make([]byte, 4)
- var timescaleBuf = make([]byte, 4)
- var timescale int32
+ var mdhdBuf = make([]byte, 4)
+ var timescale = make(map[int32]int32)
var opTs = make(map[int32]int)
var cuTs = make(map[int32]int)
- if e := f.SeekUntil([]byte("mvhd"), file.AtCurrent, 1<<17, 1<<22); e != nil && !errors.Is(e, file.ErrMaxReadSizeReach) {
- return e
- }
- if _, e := f.Read(boxBuf); e != nil {
- return e
- } else if !bytes.Equal(boxBuf, []byte("mvhd")) {
- return fmt.Errorf("wrong box:%v", string(boxBuf))
- }
- _ = f.SeekIndex(12, file.AtCurrent)
- if _, e := f.Read(timescaleBuf); e != nil {
- return e
- }
- timescale = btoi32(timescaleBuf, 0)
- fmt.Printf("resetTS timescale:%v\n", timescale)
-
for {
if e := f.SeekUntil([]byte("tkhd"), file.AtCurrent, 1<<17, 1<<22); e != nil {
if errors.Is(e, file.ErrMaxReadSizeReach) {
return e
}
trackId := btoi32(trackBuf, 0)
- fmt.Printf("trackId %v\n", trackId)
+
+ if e := f.SeekUntil([]byte("mdhd"), file.AtCurrent, 1<<17, 1<<22); e != nil {
+ if errors.Is(e, file.ErrMaxReadSizeReach) {
+ break
+ }
+ if errors.Is(e, io.EOF) {
+ break
+ }
+ return e
+ }
+ if _, e := f.Read(boxBuf); e != nil {
+ return e
+ } else if !bytes.Equal(boxBuf, []byte("mdhd")) {
+ return fmt.Errorf("wrong box:%v", string(boxBuf))
+ }
+ _ = f.SeekIndex(12, file.AtCurrent)
+ if _, e := f.Read(mdhdBuf); e != nil {
+ return e
+ }
+
opTs[trackId] = -1
cuTs[trackId] = 0
+ timescale[trackId] = btoi32(mdhdBuf, 0)
}
- // fmt.Println("resetTimeStamp Druation %v(%v-%v)", time.Duration(int(time.Second)*(cuTS-opTs)), cuTS, opTs)
-
_ = f.SeekIndex(0, file.AtOrigin)
for {
for k, v := range opTs {
fmt.Printf("track %v opTs:%v cuTS:%v\n", k, v, cuTs[k])
}
- // fmt.Println("resetTimeStamp Druation %v(%v-%v)", time.Duration(int(time.Second)*(cuTS-opTs)), cuTS, opTs)
-
- var duration int32
- for k, v := range opTs {
- duration = int32(cuTs[k]-v) / timescale
- break
- }
- fmt.Printf("resetTS dur:%v\n", duration)
- _ = f.SeekIndex(0, file.AtOrigin)
-
- if e := f.SeekUntil([]byte("mvhd"), file.AtCurrent, 1<<17, 1<<22); !errors.Is(e, file.ErrMaxReadSizeReach) {
- return e
- }
- _ = f.SeekIndex(20, file.AtCurrent)
- if _, e := f.Write(itob32(duration), false); e != nil {
- return e
+ // reset timestamp
+ // write mvhd
+ {
+ var duration int32
+ for k, v := range opTs {
+ duration = int32(cuTs[k]-v) / timescale[k]
+ break
+ }
+ _ = f.SeekIndex(0, file.AtOrigin)
+ if e := f.SeekUntil([]byte("mvhd"), file.AtCurrent, 1<<17, 1<<22); e != nil {
+ return e
+ }
+ _ = f.SeekIndex(20, file.AtCurrent)
+ fmt.Printf("mvhd %v \n", duration)
+ if _, e := f.Write(itob32(duration), false); e != nil {
+ return e
+ }
}
- // write tkhd
+ // write tkhd mdhd
_ = f.SeekIndex(0, file.AtOrigin)
for i := 0; i < len(opTs); i++ {
if e := f.SeekUntil([]byte("tkhd"), file.AtCurrent, 1<<17, 1<<20); e != nil {
}
trackID := btoi32(trackBuf, 0)
_ = f.SeekIndex(4, file.AtCurrent)
- if _, e := f.Write(itob32(int32(cuTs[trackID]-opTs[trackID])/timescale), false); e != nil {
+ fmt.Printf("tkhd %v \n", int32(cuTs[trackID]-opTs[trackID])/timescale[trackID])
+ if _, e := f.Write(itob32(int32(cuTs[trackID]-opTs[trackID])/timescale[trackID]), false); e != nil {
+ return e
+ }
+
+ if e := f.SeekUntil([]byte("mdhd"), file.AtCurrent, 1<<17, 1<<22); e != nil {
+ if errors.Is(e, file.ErrMaxReadSizeReach) {
+ continue
+ }
+ if errors.Is(e, io.EOF) {
+ break
+ }
+ return e
+ }
+ if _, e := f.Read(boxBuf); e != nil {
+ return e
+ } else if !bytes.Equal(boxBuf, []byte("mdhd")) {
+ return fmt.Errorf("wrong box:%v", string(boxBuf))
+ }
+ _ = f.SeekIndex(16, file.AtCurrent)
+ if _, e := f.Write(itob32(0), false); e != nil {
return e
}
}
signal "github.com/qydysky/part/signal"
slice "github.com/qydysky/part/slice"
pstring "github.com/qydysky/part/strings"
+ pweb "github.com/qydysky/part/web"
)
type M4SStream struct {
}
// 流服务推送方法
+//
+// 在客户端存在某种代理时,将有可能无法监测到客户端关闭,这有可能导致goroutine泄漏
func (t *M4SStream) PusherToHttp(conn net.Conn, w http.ResponseWriter, r *http.Request, startFunc func(*M4SStream) error, stopFunc func(*M4SStream) error) error {
switch t.stream_type {
case `m3u8`:
return e
}
- flusher, flushSupport := w.(http.Flusher)
- if flushSupport {
- flusher.Flush()
- }
+ w = pweb.WithFlush(w)
//写入头
{
if len(t.getFirstBuf()) != 0 {
if _, err := w.Write(t.getFirstBuf()); err != nil {
return err
- } else if flushSupport {
- flusher.Flush()
}
break
}
if _, err := w.Write(t.boot_buf); err != nil {
return err
}
- if flushSupport {
- flusher.Flush()
- }
}
- //
- var cancelRec func()
- cancelRec = t.Stream_msg.Pull_tag_async(map[string]func([]byte) bool{
+ var cancelRec = t.Stream_msg.Pull_tag_async(map[string]func([]byte) bool{
`data`: func(b []byte) bool {
select {
case <-r.Context().Done():
if len(b) == 0 {
return true
}
-
- // 1s内写入失败,关闭conn,防止协程泄漏
- done := time.AfterFunc(time.Second, func() {
- cancelRec()
- if conn != nil {
- conn.Close()
- }
- }).Stop
- defer done()
-
- if _, err := w.Write(b); err != nil {
+ if n, err := w.Write(b); err != nil || n == 0 {
+ return true
+ } else if e := conn.SetWriteDeadline(time.Now().Add(time.Second * 10)); e != nil {
return true
- } else if flushSupport {
- flusher.Flush()
}
return false
},
return true
},
})
+
<-r.Context().Done()
cancelRec()