]> 127.0.0.1 Git - part/.git/commitdiff
添加文件读写、ws录制
authorqydysky <32743305+qydysky@users.noreply.github.com>
Sat, 15 Oct 2022 18:29:20 +0000 (02:29 +0800)
committerqydysky <32743305+qydysky@users.noreply.github.com>
Sat, 15 Oct 2022 18:29:20 +0000 (02:29 +0800)
file/FileWR.go [new file with mode: 0644]
file/FileWR_test.go [new file with mode: 0644]
websocket/Recoder.go [new file with mode: 0644]
websocket/Server_test.go
websocket/l.csv [new file with mode: 0644]

diff --git a/file/FileWR.go b/file/FileWR.go
new file mode 100644 (file)
index 0000000..61bee9f
--- /dev/null
@@ -0,0 +1,264 @@
+package part
+
+import (
+       "bytes"
+       "errors"
+       "io"
+       "os"
+       "strings"
+       "sync"
+
+       l "github.com/qydysky/part/Limit"
+)
+
+var (
+       ErrFilePathTooLong  = errors.New("ErrFilePathTooLong")
+       ErrNewFileCantSeed  = errors.New("ErrNewFileCantSeed")
+       ErrFailToLock       = errors.New("ErrFailToLock")
+       ErrMaxReadSizeReach = errors.New("ErrMaxReadSizeReach")
+)
+
+type File struct {
+       Config Config
+       file   *os.File
+       sync.RWMutex
+}
+
+type Config struct {
+       FilePath  string //文件路径
+       CurIndex  int64  //初始化光标位置
+       AutoClose bool   //自动关闭句柄
+}
+
+func New(filePath string, curIndex int64, autoClose bool) *File {
+       return &File{
+               Config: Config{
+                       FilePath:  filePath,
+                       CurIndex:  curIndex,
+                       AutoClose: autoClose,
+               },
+       }
+}
+
+func (t *File) CopyTo(to *File, byteInSec int64, tryLock bool) error {
+       t.getRWCloser()
+       if t.Config.AutoClose {
+               defer t.Close()
+       }
+
+       if !t.TryRLock() {
+               return ErrFailToLock
+       }
+       defer t.RUnlock()
+
+       to.getRWCloser()
+       if t.Config.AutoClose {
+               defer to.Close()
+       }
+
+       if tryLock {
+               if !to.TryLock() {
+                       return ErrFailToLock
+               }
+       } else {
+               to.Lock()
+       }
+       defer to.Unlock()
+
+       return transfer(t.file, to.file, byteInSec)
+}
+
+func (t *File) Write(data []byte, tryLock bool) (int, error) {
+       t.getRWCloser()
+       if t.Config.AutoClose {
+               defer t.Close()
+       }
+
+       if tryLock {
+               if !t.TryLock() {
+                       return 0, ErrFailToLock
+               }
+       } else {
+               t.Lock()
+       }
+       defer t.Unlock()
+
+       return t.file.Write(data)
+}
+
+func (t *File) Read(data []byte) (int, error) {
+       t.getRWCloser()
+       if t.Config.AutoClose {
+               defer t.Close()
+       }
+
+       if !t.TryRLock() {
+               return 0, ErrFailToLock
+       }
+       defer t.RUnlock()
+
+       return t.file.Read(data)
+}
+
+func (t *File) ReadUntil(separation byte, perReadSize int, maxReadSize int) (data []byte, e error) {
+       t.getRWCloser()
+       if t.Config.AutoClose {
+               defer t.Close()
+       }
+
+       if !t.TryRLock() {
+               return nil, ErrFailToLock
+       }
+       defer t.RUnlock()
+
+       var (
+               tmpArea = make([]byte, perReadSize)
+               n       int
+       )
+
+       for maxReadSize > 0 {
+               n, e = t.file.Read(tmpArea)
+
+               if e != nil {
+                       if errors.Is(e, io.EOF) {
+                               e = nil
+                       }
+                       return
+               }
+               maxReadSize = maxReadSize - n
+
+               if i := bytes.Index(tmpArea[:n], []byte{separation}); i != -1 {
+                       if n-i-1 != 0 {
+                               t.file.Seek(-int64(n-i-1), 1)
+                       }
+                       if i != 0 {
+                               data = append(data, tmpArea[:i]...)
+                       }
+                       break
+               } else {
+                       data = append(data, tmpArea[:n]...)
+               }
+       }
+
+       if maxReadSize <= 0 {
+               e = ErrMaxReadSizeReach
+       }
+
+       return
+}
+
+func (t *File) Seed(index int64) (e error) {
+       t.getRWCloser()
+       if t.Config.AutoClose {
+               defer t.Close()
+       }
+
+       if !t.TryLock() {
+               return ErrFailToLock
+       }
+       defer t.Unlock()
+
+       whenc := 0
+       if index < 0 {
+               whenc = 2
+       }
+       t.Config.CurIndex, e = t.file.Seek(index, whenc)
+
+       return nil
+}
+
+func (t *File) Delete() error {
+       if !t.TryLock() {
+               return ErrFailToLock
+       }
+       defer t.Unlock()
+
+       return os.Remove(t.Config.FilePath)
+}
+
+func (t *File) Close() error {
+       if t.file != nil {
+               return t.file.Close()
+       }
+       return nil
+}
+
+func (t *File) getRWCloser() {
+       if t.Config.AutoClose || t.file == nil {
+               if !t.isExist() {
+                       if f, e := os.Create(t.Config.FilePath); e != nil {
+                               panic(e)
+                       } else {
+                               if t.Config.CurIndex != 0 {
+                                       whenc := 0
+                                       if t.Config.CurIndex < 0 {
+                                               whenc = 2
+                                       }
+                                       t.Config.CurIndex, e = f.Seek(t.Config.CurIndex, whenc)
+                                       if e != nil {
+                                               panic(e)
+                                       }
+                               }
+                               t.file = f
+                       }
+               } else {
+                       if f, e := os.OpenFile(t.Config.FilePath, os.O_RDWR|os.O_EXCL, 0644); e != nil {
+                               panic(e)
+                       } else {
+                               if t.Config.CurIndex != 0 {
+                                       whenc := 0
+                                       if t.Config.CurIndex < 0 {
+                                               whenc = 2
+                                       }
+                                       t.Config.CurIndex, e = f.Seek(t.Config.CurIndex, whenc)
+                                       if e != nil {
+                                               panic(e)
+                                       }
+                               }
+                               t.file = f
+                       }
+               }
+       }
+}
+
+func transfer(r io.ReadCloser, w io.WriteCloser, byteInSec int64) (e error) {
+       if byteInSec > 0 {
+               limit := l.New(1, 1000, -1)
+               defer limit.Close()
+
+               buf := make([]byte, byteInSec)
+               for {
+                       n, err := r.Read(buf)
+                       if n != 0 {
+                               w.Write(buf[:n])
+                       } else if err != nil {
+                               e = err
+                               break
+                       }
+                       limit.TO()
+               }
+       } else {
+               _, e = io.Copy(w, r)
+       }
+
+       return nil
+}
+
+func (t *File) isExist() bool {
+       if len(t.Config.FilePath) > 4096 {
+               panic(ErrFilePathTooLong)
+       }
+
+       _, err := os.Stat(t.Config.FilePath)
+       if err != nil {
+               if errors.Is(err, os.ErrNotExist) {
+                       return false
+               } else {
+                       if !strings.Contains(err.Error(), "file name too long") {
+                               panic(ErrFilePathTooLong)
+                       }
+                       return false
+               }
+       }
+       return true
+}
diff --git a/file/FileWR_test.go b/file/FileWR_test.go
new file mode 100644 (file)
index 0000000..34b3a50
--- /dev/null
@@ -0,0 +1,156 @@
+package part
+
+import (
+       "bytes"
+       "testing"
+)
+
+func TestWriteReadDelSync(t *testing.T) {
+       f := New("rwd.txt", 0, true)
+       if i, e := f.Write([]byte("sss"), true); i == 0 || e != nil {
+               t.Fatal(e)
+       }
+
+       var buf = make([]byte, 3)
+       if i, e := f.Read(buf); i == 0 || e != nil {
+               t.Fatal(i, e)
+       } else {
+               for _, v := range buf {
+                       if v != 's' {
+                               t.Fatal(v)
+                       }
+               }
+       }
+
+       if i, e := f.Read(buf); i == 0 || e != nil {
+               t.Fatal(i, e)
+       } else {
+               for _, v := range buf {
+                       if v != 's' {
+                               t.Fatal(v)
+                       }
+               }
+       }
+
+       if e := f.Delete(); e != nil {
+               t.Fatal(e)
+       }
+}
+
+func TestWriteReadDel(t *testing.T) {
+       f := New("rwd.txt", 0, false)
+       if i, e := f.Write([]byte("sss"), true); i == 0 || e != nil {
+               t.Fatal(e)
+       }
+
+       if e := f.Seed(0); e != nil {
+               t.Fatal(e)
+       }
+
+       var buf = make([]byte, 3)
+       if i, e := f.Read(buf); i == 0 || e != nil {
+               t.Fatal(i, e)
+       } else {
+               for _, v := range buf {
+                       if v != 's' {
+                               t.Fatal(v)
+                       }
+               }
+       }
+
+       if e := f.Close(); e != nil {
+               t.Fatal(e)
+       }
+
+       if e := f.Delete(); e != nil {
+               t.Fatal(e)
+       }
+}
+
+func TestSeed(t *testing.T) {
+       f := New("rwd.txt", 0, false)
+       if i, e := f.Write([]byte("12er4x3"), true); i == 0 || e != nil {
+               t.Fatal(e)
+       }
+
+       if e := f.Seed(1); e != nil {
+               t.Fatal(e)
+       }
+
+       var buf = make([]byte, 1)
+       if i, e := f.Read(buf); i == 0 || e != nil {
+               t.Fatal(i, e)
+       } else {
+               if buf[0] != '2' {
+                       t.Fatal(buf[0])
+               }
+       }
+
+       if e := f.Seed(-1); e != nil {
+               t.Fatal(e)
+       }
+
+       if i, e := f.Read(buf); i == 0 || e != nil {
+               t.Fatal(i, e)
+       } else {
+               if buf[0] != '3' {
+                       t.Fatal(buf[0])
+               }
+       }
+
+       if e := f.Close(); e != nil {
+               t.Fatal(e)
+       }
+
+       if e := f.Delete(); e != nil {
+               t.Fatal(e)
+       }
+}
+
+func TestCopy(t *testing.T) {
+       sf := New("s.txt", 0, true)
+       if i, e := sf.Write([]byte("12er4x3"), true); i == 0 || e != nil {
+               t.Fatal(e)
+       }
+
+       tf := New("t.txt", 0, true)
+       if e := sf.CopyTo(tf, 1, true); e != nil {
+               t.Fatal(e)
+       }
+
+       sf.Delete()
+       tf.Delete()
+}
+
+func TestReadUntil(t *testing.T) {
+       f := New("s.txt", 0, false)
+       if i, e := f.Write([]byte("18u3y7\ns99s9\n"), true); i == 0 || e != nil {
+               t.Fatal(e)
+       }
+
+       if e := f.Seed(0); e != nil {
+               t.Fatal(e)
+       }
+
+       if data, e := f.ReadUntil('\n', 5, 20); e != nil {
+               t.Fatal(e)
+       } else if !bytes.Equal(data, []byte("18u3y7")) {
+               t.Fatal(string(data))
+       }
+
+       t.Log(f.Config.CurIndex)
+
+       if data, e := f.ReadUntil('\n', 5, 20); e != nil {
+               t.Fatal(e)
+       } else if !bytes.Equal(data, []byte("s99s9")) {
+               t.Fatal(string(data))
+       }
+
+       if e := f.Close(); e != nil {
+               t.Fatal(e)
+       }
+
+       if e := f.Delete(); e != nil {
+               t.Fatal(e)
+       }
+}
diff --git a/websocket/Recoder.go b/websocket/Recoder.go
new file mode 100644 (file)
index 0000000..ce0ad34
--- /dev/null
@@ -0,0 +1,107 @@
+package part
+
+import (
+       "bytes"
+       "errors"
+       "fmt"
+       "strconv"
+       "time"
+
+       file "github.com/qydysky/part/file"
+       funcCtrl "github.com/qydysky/part/funcCtrl"
+       signal "github.com/qydysky/part/signal"
+)
+
+var (
+       ErrSerIsNil  = errors.New("ErrSerIsNil")
+       ErrFileNoSet = errors.New("ErrFileNoSet")
+       ErrhadStart  = errors.New("ErrhadStart")
+)
+
+type Recorder struct {
+       Server   *Server
+       FilePath string
+       onlyOnce funcCtrl.SkipFunc
+       stopflag *signal.Signal
+}
+
+func (t *Recorder) Start() error {
+       if t.Server == nil {
+               return ErrSerIsNil
+       }
+       if t.FilePath == "" {
+               return ErrFileNoSet
+       }
+       if t.onlyOnce.NeedSkip() {
+               return ErrhadStart
+       }
+
+       go func() {
+               f := file.New(t.FilePath, 0, false)
+               defer f.Close()
+
+               var startTimeStamp time.Time
+
+               t.stopflag = signal.Init()
+               if startTimeStamp.IsZero() {
+                       startTimeStamp = time.Now()
+               }
+               t.Server.Interface().Pull_tag(map[string]func(interface{}) bool{
+                       `send`: func(data interface{}) bool {
+                               if !t.stopflag.Islive() {
+                                       return true
+                               }
+                               if tmp, ok := data.(Uinterface); ok {
+                                       f.Write([]byte(fmt.Sprintf("%f,%d,%s\n", time.Since(startTimeStamp).Seconds(), tmp.Id, tmp.Data)), true)
+                               }
+                               return false
+                       },
+               })
+               t.stopflag.Wait()
+       }()
+       return nil
+}
+
+func (t *Recorder) Stop() {
+       if t.stopflag.Islive() {
+               t.stopflag.Done()
+       }
+       t.onlyOnce.UnSet()
+}
+
+func Play(filePath string, perReadSize int, maxReadSize int) (s *Server) {
+       s = New_server()
+
+       go func() {
+               f := file.New(filePath, 0, false)
+               defer f.Close()
+
+               startT := time.Now()
+               timer := time.NewTicker(time.Second)
+
+               for {
+                       cu := (<-timer.C).Sub(startT).Seconds()
+
+                       for {
+                               if data, err := f.ReadUntil('\n', perReadSize, maxReadSize); err != nil {
+                                       panic(err)
+                               } else if len(data) != 0 {
+                                       datas := bytes.Split(data, []byte(","))
+                                       d, _ := strconv.ParseFloat(string(datas[0]), 64)
+                                       s.Interface().Push_tag(`send`, Uinterface{
+                                               Id:   0, //send to all
+                                               Data: datas[2],
+                                       })
+                                       if d > cu {
+                                               break
+                                       }
+                               } else {
+                                       break
+                               }
+                       }
+
+               }
+       }()
+
+       return
+}
index f86504898bba5c6f28579ac94716954134fcf3a6..749e5ffe416c859c10ddc12db6c4e8b0705ef24e 100644 (file)
@@ -1,12 +1,13 @@
 package part
 
 import (
+       "net/http"
        "strconv"
        "testing"
-       "net/http"
        "time"
-       "github.com/skratchdot/open-golang/open"
+
        web "github.com/qydysky/part/web"
+       "github.com/skratchdot/open-golang/open"
 )
 
 func Test_Server(t *testing.T) {
@@ -15,25 +16,33 @@ func Test_Server(t *testing.T) {
                num := 5
 
                ws_mq := s.Interface()
-               ws_mq.Pull_tag(map[string]func(interface{})(bool){
-                       `error`:func(data interface{})(bool){
+
+               recoder := &Recorder{
+                       Server:   s,
+                       FilePath: "l.csv",
+               }
+               recoder.Start()
+               defer recoder.Stop()
+
+               ws_mq.Pull_tag(map[string]func(interface{}) bool{
+                       `error`: func(data interface{}) bool {
                                t.Log(data)
                                return false
                        },
-                       `recv`:func(data interface{})(bool){
-                               if tmp,ok := data.(Uinterface);ok {
-                                       t.Log(tmp.Id, `=>`,string(tmp.Data))
-                                       t.Log(string(tmp.Data), `=>`,tmp.Id)
+                       `recv`: func(data interface{}) bool {
+                               if tmp, ok := data.(Uinterface); ok {
+                                       t.Log(tmp.Id, `=>`, string(tmp.Data))
+                                       t.Log(string(tmp.Data), `=>`, tmp.Id)
                                        num -= 1
                                        if num > 0 {
-                                               ws_mq.Push_tag(`send`,Uinterface{//just reply
-                                                       Id:tmp.Id,
-                                                       Data:append(tmp.Data,[]byte(` get.server:close after `+strconv.Itoa(num)+` s`)...),
+                                               ws_mq.Push_tag(`send`, Uinterface{ //just reply
+                                                       Id:   tmp.Id,
+                                                       Data: append(tmp.Data, []byte(` get.server:close after `+strconv.Itoa(num)+` s`)...),
                                                })
                                        } else {
-                                               ws_mq.Push_tag(`close`,Uinterface{//close
-                                                       Id:tmp.Id,
-                                                       Data:[]byte(`closeNormal`),
+                                               ws_mq.Push_tag(`close`, Uinterface{ //close
+                                                       Id:   tmp.Id,
+                                                       Data: []byte(`closeNormal`),
                                                })
                                        }
                                }
@@ -43,15 +52,32 @@ func Test_Server(t *testing.T) {
        }
 
        w := web.Easy_boot()
-       open.Run("http://"+w.Server.Addr)
-       w.Handle(map[string]func(http.ResponseWriter,*http.Request){
-               `/ws`:func(w http.ResponseWriter,r *http.Request){
-                       conn := s.WS(w,r)
-                       id :=<- conn
-                       t.Log(`user connect!`,id)
-                       <- conn
-                       t.Log(`user disconnect!`,id)
+       open.Run("http://" + w.Server.Addr)
+       w.Handle(map[string]func(http.ResponseWriter, *http.Request){
+               `/ws`: func(w http.ResponseWriter, r *http.Request) {
+                       conn := s.WS(w, r)
+                       id := <-conn
+                       t.Log(`user connect!`, id)
+                       <-conn
+                       t.Log(`user disconnect!`, id)
+               },
+       })
+       time.Sleep(time.Second * time.Duration(10))
+}
+
+func Test_Recoder(t *testing.T) {
+       s := Play("l.csv", 50, 5000)
+
+       w := web.Easy_boot()
+       open.Run("http://" + w.Server.Addr)
+       w.Handle(map[string]func(http.ResponseWriter, *http.Request){
+               `/ws`: func(w http.ResponseWriter, r *http.Request) {
+                       conn := s.WS(w, r)
+                       id := <-conn
+                       t.Log(`user connect!`, id)
+                       <-conn
+                       t.Log(`user disconnect!`, id)
                },
        })
-       time.Sleep(time.Second*time.Duration(100))
-}
\ No newline at end of file
+       time.Sleep(time.Second * time.Duration(10))
+}
diff --git a/websocket/l.csv b/websocket/l.csv
new file mode 100644 (file)
index 0000000..080dc2b
--- /dev/null
@@ -0,0 +1,3 @@
+3.325727,824637721992,send get.server:close after 4 s
+6.333226,824637721992,send get.server:close after 3 s
+9.335760,824637721992,send get.server:close after 2 s