From: qydysky <32743305+qydysky@users.noreply.github.com> Date: Sat, 15 Oct 2022 18:29:20 +0000 (+0800) Subject: 添加文件读写、ws录制 X-Git-Tag: v0.10.0~1 X-Git-Url: http://127.0.0.1:8081/?a=commitdiff_plain;h=0ac8751eb1271bf6e413f7945858e4288d395e2d;p=part%2F.git 添加文件读写、ws录制 --- diff --git a/file/FileWR.go b/file/FileWR.go new file mode 100644 index 0000000..61bee9f --- /dev/null +++ b/file/FileWR.go @@ -0,0 +1,264 @@ +package part + +import ( + "bytes" + "errors" + "io" + "os" + "strings" + "sync" + + l "github.com/qydysky/part/Limit" +) + +var ( + ErrFilePathTooLong = errors.New("ErrFilePathTooLong") + ErrNewFileCantSeed = errors.New("ErrNewFileCantSeed") + ErrFailToLock = errors.New("ErrFailToLock") + ErrMaxReadSizeReach = errors.New("ErrMaxReadSizeReach") +) + +type File struct { + Config Config + file *os.File + sync.RWMutex +} + +type Config struct { + FilePath string //文件路径 + CurIndex int64 //初始化光标位置 + AutoClose bool //自动关闭句柄 +} + +func New(filePath string, curIndex int64, autoClose bool) *File { + return &File{ + Config: Config{ + FilePath: filePath, + CurIndex: curIndex, + AutoClose: autoClose, + }, + } +} + +func (t *File) CopyTo(to *File, byteInSec int64, tryLock bool) error { + t.getRWCloser() + if t.Config.AutoClose { + defer t.Close() + } + + if !t.TryRLock() { + return ErrFailToLock + } + defer t.RUnlock() + + to.getRWCloser() + if t.Config.AutoClose { + defer to.Close() + } + + if tryLock { + if !to.TryLock() { + return ErrFailToLock + } + } else { + to.Lock() + } + defer to.Unlock() + + return transfer(t.file, to.file, byteInSec) +} + +func (t *File) Write(data []byte, tryLock bool) (int, error) { + t.getRWCloser() + if t.Config.AutoClose { + defer t.Close() + } + + if tryLock { + if !t.TryLock() { + return 0, ErrFailToLock + } + } else { + t.Lock() + } + defer t.Unlock() + + return t.file.Write(data) +} + +func (t *File) Read(data []byte) (int, error) { + t.getRWCloser() + if t.Config.AutoClose { + defer t.Close() + } + + if !t.TryRLock() { + return 0, ErrFailToLock + } + defer t.RUnlock() + + return t.file.Read(data) +} + +func (t *File) ReadUntil(separation byte, perReadSize int, maxReadSize int) (data []byte, e error) { + t.getRWCloser() + if t.Config.AutoClose { + defer t.Close() + } + + if !t.TryRLock() { + return nil, ErrFailToLock + } + defer t.RUnlock() + + var ( + tmpArea = make([]byte, perReadSize) + n int + ) + + for maxReadSize > 0 { + n, e = t.file.Read(tmpArea) + + if e != nil { + if errors.Is(e, io.EOF) { + e = nil + } + return + } + maxReadSize = maxReadSize - n + + if i := bytes.Index(tmpArea[:n], []byte{separation}); i != -1 { + if n-i-1 != 0 { + t.file.Seek(-int64(n-i-1), 1) + } + if i != 0 { + data = append(data, tmpArea[:i]...) + } + break + } else { + data = append(data, tmpArea[:n]...) + } + } + + if maxReadSize <= 0 { + e = ErrMaxReadSizeReach + } + + return +} + +func (t *File) Seed(index int64) (e error) { + t.getRWCloser() + if t.Config.AutoClose { + defer t.Close() + } + + if !t.TryLock() { + return ErrFailToLock + } + defer t.Unlock() + + whenc := 0 + if index < 0 { + whenc = 2 + } + t.Config.CurIndex, e = t.file.Seek(index, whenc) + + return nil +} + +func (t *File) Delete() error { + if !t.TryLock() { + return ErrFailToLock + } + defer t.Unlock() + + return os.Remove(t.Config.FilePath) +} + +func (t *File) Close() error { + if t.file != nil { + return t.file.Close() + } + return nil +} + +func (t *File) getRWCloser() { + if t.Config.AutoClose || t.file == nil { + if !t.isExist() { + if f, e := os.Create(t.Config.FilePath); e != nil { + panic(e) + } else { + if t.Config.CurIndex != 0 { + whenc := 0 + if t.Config.CurIndex < 0 { + whenc = 2 + } + t.Config.CurIndex, e = f.Seek(t.Config.CurIndex, whenc) + if e != nil { + panic(e) + } + } + t.file = f + } + } else { + if f, e := os.OpenFile(t.Config.FilePath, os.O_RDWR|os.O_EXCL, 0644); e != nil { + panic(e) + } else { + if t.Config.CurIndex != 0 { + whenc := 0 + if t.Config.CurIndex < 0 { + whenc = 2 + } + t.Config.CurIndex, e = f.Seek(t.Config.CurIndex, whenc) + if e != nil { + panic(e) + } + } + t.file = f + } + } + } +} + +func transfer(r io.ReadCloser, w io.WriteCloser, byteInSec int64) (e error) { + if byteInSec > 0 { + limit := l.New(1, 1000, -1) + defer limit.Close() + + buf := make([]byte, byteInSec) + for { + n, err := r.Read(buf) + if n != 0 { + w.Write(buf[:n]) + } else if err != nil { + e = err + break + } + limit.TO() + } + } else { + _, e = io.Copy(w, r) + } + + return nil +} + +func (t *File) isExist() bool { + if len(t.Config.FilePath) > 4096 { + panic(ErrFilePathTooLong) + } + + _, err := os.Stat(t.Config.FilePath) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return false + } else { + if !strings.Contains(err.Error(), "file name too long") { + panic(ErrFilePathTooLong) + } + return false + } + } + return true +} diff --git a/file/FileWR_test.go b/file/FileWR_test.go new file mode 100644 index 0000000..34b3a50 --- /dev/null +++ b/file/FileWR_test.go @@ -0,0 +1,156 @@ +package part + +import ( + "bytes" + "testing" +) + +func TestWriteReadDelSync(t *testing.T) { + f := New("rwd.txt", 0, true) + if i, e := f.Write([]byte("sss"), true); i == 0 || e != nil { + t.Fatal(e) + } + + var buf = make([]byte, 3) + if i, e := f.Read(buf); i == 0 || e != nil { + t.Fatal(i, e) + } else { + for _, v := range buf { + if v != 's' { + t.Fatal(v) + } + } + } + + if i, e := f.Read(buf); i == 0 || e != nil { + t.Fatal(i, e) + } else { + for _, v := range buf { + if v != 's' { + t.Fatal(v) + } + } + } + + if e := f.Delete(); e != nil { + t.Fatal(e) + } +} + +func TestWriteReadDel(t *testing.T) { + f := New("rwd.txt", 0, false) + if i, e := f.Write([]byte("sss"), true); i == 0 || e != nil { + t.Fatal(e) + } + + if e := f.Seed(0); e != nil { + t.Fatal(e) + } + + var buf = make([]byte, 3) + if i, e := f.Read(buf); i == 0 || e != nil { + t.Fatal(i, e) + } else { + for _, v := range buf { + if v != 's' { + t.Fatal(v) + } + } + } + + if e := f.Close(); e != nil { + t.Fatal(e) + } + + if e := f.Delete(); e != nil { + t.Fatal(e) + } +} + +func TestSeed(t *testing.T) { + f := New("rwd.txt", 0, false) + if i, e := f.Write([]byte("12er4x3"), true); i == 0 || e != nil { + t.Fatal(e) + } + + if e := f.Seed(1); e != nil { + t.Fatal(e) + } + + var buf = make([]byte, 1) + if i, e := f.Read(buf); i == 0 || e != nil { + t.Fatal(i, e) + } else { + if buf[0] != '2' { + t.Fatal(buf[0]) + } + } + + if e := f.Seed(-1); e != nil { + t.Fatal(e) + } + + if i, e := f.Read(buf); i == 0 || e != nil { + t.Fatal(i, e) + } else { + if buf[0] != '3' { + t.Fatal(buf[0]) + } + } + + if e := f.Close(); e != nil { + t.Fatal(e) + } + + if e := f.Delete(); e != nil { + t.Fatal(e) + } +} + +func TestCopy(t *testing.T) { + sf := New("s.txt", 0, true) + if i, e := sf.Write([]byte("12er4x3"), true); i == 0 || e != nil { + t.Fatal(e) + } + + tf := New("t.txt", 0, true) + if e := sf.CopyTo(tf, 1, true); e != nil { + t.Fatal(e) + } + + sf.Delete() + tf.Delete() +} + +func TestReadUntil(t *testing.T) { + f := New("s.txt", 0, false) + if i, e := f.Write([]byte("18u3y7\ns99s9\n"), true); i == 0 || e != nil { + t.Fatal(e) + } + + if e := f.Seed(0); e != nil { + t.Fatal(e) + } + + if data, e := f.ReadUntil('\n', 5, 20); e != nil { + t.Fatal(e) + } else if !bytes.Equal(data, []byte("18u3y7")) { + t.Fatal(string(data)) + } + + t.Log(f.Config.CurIndex) + + if data, e := f.ReadUntil('\n', 5, 20); e != nil { + t.Fatal(e) + } else if !bytes.Equal(data, []byte("s99s9")) { + t.Fatal(string(data)) + } + + if e := f.Close(); e != nil { + t.Fatal(e) + } + + if e := f.Delete(); e != nil { + t.Fatal(e) + } +} diff --git a/websocket/Recoder.go b/websocket/Recoder.go new file mode 100644 index 0000000..ce0ad34 --- /dev/null +++ b/websocket/Recoder.go @@ -0,0 +1,107 @@ +package part + +import ( + "bytes" + "errors" + "fmt" + "strconv" + "time" + + file "github.com/qydysky/part/file" + funcCtrl "github.com/qydysky/part/funcCtrl" + signal "github.com/qydysky/part/signal" +) + +var ( + ErrSerIsNil = errors.New("ErrSerIsNil") + ErrFileNoSet = errors.New("ErrFileNoSet") + ErrhadStart = errors.New("ErrhadStart") +) + +type Recorder struct { + Server *Server + FilePath string + onlyOnce funcCtrl.SkipFunc + stopflag *signal.Signal +} + +func (t *Recorder) Start() error { + if t.Server == nil { + return ErrSerIsNil + } + if t.FilePath == "" { + return ErrFileNoSet + } + if t.onlyOnce.NeedSkip() { + return ErrhadStart + } + + go func() { + f := file.New(t.FilePath, 0, false) + defer f.Close() + + var startTimeStamp time.Time + + t.stopflag = signal.Init() + if startTimeStamp.IsZero() { + startTimeStamp = time.Now() + } + t.Server.Interface().Pull_tag(map[string]func(interface{}) bool{ + `send`: func(data interface{}) bool { + if !t.stopflag.Islive() { + return true + } + if tmp, ok := data.(Uinterface); ok { + f.Write([]byte(fmt.Sprintf("%f,%d,%s\n", time.Since(startTimeStamp).Seconds(), tmp.Id, tmp.Data)), true) + } + return false + }, + }) + t.stopflag.Wait() + }() + return nil +} + +func (t *Recorder) Stop() { + if t.stopflag.Islive() { + t.stopflag.Done() + } + t.onlyOnce.UnSet() +} + +func Play(filePath string, perReadSize int, maxReadSize int) (s *Server) { + s = New_server() + + go func() { + f := file.New(filePath, 0, false) + defer f.Close() + + startT := time.Now() + timer := time.NewTicker(time.Second) + + for { + cu := (<-timer.C).Sub(startT).Seconds() + + for { + if data, err := f.ReadUntil('\n', perReadSize, maxReadSize); err != nil { + panic(err) + } else if len(data) != 0 { + datas := bytes.Split(data, []byte(",")) + d, _ := strconv.ParseFloat(string(datas[0]), 64) + s.Interface().Push_tag(`send`, Uinterface{ + Id: 0, //send to all + Data: datas[2], + }) + if d > cu { + break + } + } else { + break + } + } + + } + }() + + return +} diff --git a/websocket/Server_test.go b/websocket/Server_test.go index f865048..749e5ff 100644 --- a/websocket/Server_test.go +++ b/websocket/Server_test.go @@ -1,12 +1,13 @@ package part import ( + "net/http" "strconv" "testing" - "net/http" "time" - "github.com/skratchdot/open-golang/open" + web "github.com/qydysky/part/web" + "github.com/skratchdot/open-golang/open" ) func Test_Server(t *testing.T) { @@ -15,25 +16,33 @@ func Test_Server(t *testing.T) { num := 5 ws_mq := s.Interface() - ws_mq.Pull_tag(map[string]func(interface{})(bool){ - `error`:func(data interface{})(bool){ + + recoder := &Recorder{ + Server: s, + FilePath: "l.csv", + } + recoder.Start() + defer recoder.Stop() + + ws_mq.Pull_tag(map[string]func(interface{}) bool{ + `error`: func(data interface{}) bool { t.Log(data) return false }, - `recv`:func(data interface{})(bool){ - if tmp,ok := data.(Uinterface);ok { - t.Log(tmp.Id, `=>`,string(tmp.Data)) - t.Log(string(tmp.Data), `=>`,tmp.Id) + `recv`: func(data interface{}) bool { + if tmp, ok := data.(Uinterface); ok { + t.Log(tmp.Id, `=>`, string(tmp.Data)) + t.Log(string(tmp.Data), `=>`, tmp.Id) num -= 1 if num > 0 { - ws_mq.Push_tag(`send`,Uinterface{//just reply - Id:tmp.Id, - Data:append(tmp.Data,[]byte(` get.server:close after `+strconv.Itoa(num)+` s`)...), + ws_mq.Push_tag(`send`, Uinterface{ //just reply + Id: tmp.Id, + Data: append(tmp.Data, []byte(` get.server:close after `+strconv.Itoa(num)+` s`)...), }) } else { - ws_mq.Push_tag(`close`,Uinterface{//close - Id:tmp.Id, - Data:[]byte(`closeNormal`), + ws_mq.Push_tag(`close`, Uinterface{ //close + Id: tmp.Id, + Data: []byte(`closeNormal`), }) } } @@ -43,15 +52,32 @@ func Test_Server(t *testing.T) { } w := web.Easy_boot() - open.Run("http://"+w.Server.Addr) - w.Handle(map[string]func(http.ResponseWriter,*http.Request){ - `/ws`:func(w http.ResponseWriter,r *http.Request){ - conn := s.WS(w,r) - id :=<- conn - t.Log(`user connect!`,id) - <- conn - t.Log(`user disconnect!`,id) + open.Run("http://" + w.Server.Addr) + w.Handle(map[string]func(http.ResponseWriter, *http.Request){ + `/ws`: func(w http.ResponseWriter, r *http.Request) { + conn := s.WS(w, r) + id := <-conn + t.Log(`user connect!`, id) + <-conn + t.Log(`user disconnect!`, id) + }, + }) + time.Sleep(time.Second * time.Duration(10)) +} + +func Test_Recoder(t *testing.T) { + s := Play("l.csv", 50, 5000) + + w := web.Easy_boot() + open.Run("http://" + w.Server.Addr) + w.Handle(map[string]func(http.ResponseWriter, *http.Request){ + `/ws`: func(w http.ResponseWriter, r *http.Request) { + conn := s.WS(w, r) + id := <-conn + t.Log(`user connect!`, id) + <-conn + t.Log(`user disconnect!`, id) }, }) - time.Sleep(time.Second*time.Duration(100)) -} \ No newline at end of file + time.Sleep(time.Second * time.Duration(10)) +} diff --git a/websocket/l.csv b/websocket/l.csv new file mode 100644 index 0000000..080dc2b --- /dev/null +++ b/websocket/l.csv @@ -0,0 +1,3 @@ +3.325727,824637721992,send get.server:close after 4 s +6.333226,824637721992,send get.server:close after 3 s +9.335760,824637721992,send get.server:close after 2 s