--- /dev/null
+package part
+
+import (
+ "bytes"
+ "errors"
+ "io"
+ "os"
+ "strings"
+ "sync"
+
+ l "github.com/qydysky/part/Limit"
+)
+
+var (
+ ErrFilePathTooLong = errors.New("ErrFilePathTooLong")
+ ErrNewFileCantSeed = errors.New("ErrNewFileCantSeed")
+ ErrFailToLock = errors.New("ErrFailToLock")
+ ErrMaxReadSizeReach = errors.New("ErrMaxReadSizeReach")
+)
+
+type File struct {
+ Config Config
+ file *os.File
+ sync.RWMutex
+}
+
+type Config struct {
+ FilePath string //文件路径
+ CurIndex int64 //初始化光标位置
+ AutoClose bool //自动关闭句柄
+}
+
+func New(filePath string, curIndex int64, autoClose bool) *File {
+ return &File{
+ Config: Config{
+ FilePath: filePath,
+ CurIndex: curIndex,
+ AutoClose: autoClose,
+ },
+ }
+}
+
+func (t *File) CopyTo(to *File, byteInSec int64, tryLock bool) error {
+ t.getRWCloser()
+ if t.Config.AutoClose {
+ defer t.Close()
+ }
+
+ if !t.TryRLock() {
+ return ErrFailToLock
+ }
+ defer t.RUnlock()
+
+ to.getRWCloser()
+ if t.Config.AutoClose {
+ defer to.Close()
+ }
+
+ if tryLock {
+ if !to.TryLock() {
+ return ErrFailToLock
+ }
+ } else {
+ to.Lock()
+ }
+ defer to.Unlock()
+
+ return transfer(t.file, to.file, byteInSec)
+}
+
+func (t *File) Write(data []byte, tryLock bool) (int, error) {
+ t.getRWCloser()
+ if t.Config.AutoClose {
+ defer t.Close()
+ }
+
+ if tryLock {
+ if !t.TryLock() {
+ return 0, ErrFailToLock
+ }
+ } else {
+ t.Lock()
+ }
+ defer t.Unlock()
+
+ return t.file.Write(data)
+}
+
+func (t *File) Read(data []byte) (int, error) {
+ t.getRWCloser()
+ if t.Config.AutoClose {
+ defer t.Close()
+ }
+
+ if !t.TryRLock() {
+ return 0, ErrFailToLock
+ }
+ defer t.RUnlock()
+
+ return t.file.Read(data)
+}
+
+func (t *File) ReadUntil(separation byte, perReadSize int, maxReadSize int) (data []byte, e error) {
+ t.getRWCloser()
+ if t.Config.AutoClose {
+ defer t.Close()
+ }
+
+ if !t.TryRLock() {
+ return nil, ErrFailToLock
+ }
+ defer t.RUnlock()
+
+ var (
+ tmpArea = make([]byte, perReadSize)
+ n int
+ )
+
+ for maxReadSize > 0 {
+ n, e = t.file.Read(tmpArea)
+
+ if e != nil {
+ if errors.Is(e, io.EOF) {
+ e = nil
+ }
+ return
+ }
+ maxReadSize = maxReadSize - n
+
+ if i := bytes.Index(tmpArea[:n], []byte{separation}); i != -1 {
+ if n-i-1 != 0 {
+ t.file.Seek(-int64(n-i-1), 1)
+ }
+ if i != 0 {
+ data = append(data, tmpArea[:i]...)
+ }
+ break
+ } else {
+ data = append(data, tmpArea[:n]...)
+ }
+ }
+
+ if maxReadSize <= 0 {
+ e = ErrMaxReadSizeReach
+ }
+
+ return
+}
+
+func (t *File) Seed(index int64) (e error) {
+ t.getRWCloser()
+ if t.Config.AutoClose {
+ defer t.Close()
+ }
+
+ if !t.TryLock() {
+ return ErrFailToLock
+ }
+ defer t.Unlock()
+
+ whenc := 0
+ if index < 0 {
+ whenc = 2
+ }
+ t.Config.CurIndex, e = t.file.Seek(index, whenc)
+
+ return nil
+}
+
+func (t *File) Delete() error {
+ if !t.TryLock() {
+ return ErrFailToLock
+ }
+ defer t.Unlock()
+
+ return os.Remove(t.Config.FilePath)
+}
+
+func (t *File) Close() error {
+ if t.file != nil {
+ return t.file.Close()
+ }
+ return nil
+}
+
+func (t *File) getRWCloser() {
+ if t.Config.AutoClose || t.file == nil {
+ if !t.isExist() {
+ if f, e := os.Create(t.Config.FilePath); e != nil {
+ panic(e)
+ } else {
+ if t.Config.CurIndex != 0 {
+ whenc := 0
+ if t.Config.CurIndex < 0 {
+ whenc = 2
+ }
+ t.Config.CurIndex, e = f.Seek(t.Config.CurIndex, whenc)
+ if e != nil {
+ panic(e)
+ }
+ }
+ t.file = f
+ }
+ } else {
+ if f, e := os.OpenFile(t.Config.FilePath, os.O_RDWR|os.O_EXCL, 0644); e != nil {
+ panic(e)
+ } else {
+ if t.Config.CurIndex != 0 {
+ whenc := 0
+ if t.Config.CurIndex < 0 {
+ whenc = 2
+ }
+ t.Config.CurIndex, e = f.Seek(t.Config.CurIndex, whenc)
+ if e != nil {
+ panic(e)
+ }
+ }
+ t.file = f
+ }
+ }
+ }
+}
+
+func transfer(r io.ReadCloser, w io.WriteCloser, byteInSec int64) (e error) {
+ if byteInSec > 0 {
+ limit := l.New(1, 1000, -1)
+ defer limit.Close()
+
+ buf := make([]byte, byteInSec)
+ for {
+ n, err := r.Read(buf)
+ if n != 0 {
+ w.Write(buf[:n])
+ } else if err != nil {
+ e = err
+ break
+ }
+ limit.TO()
+ }
+ } else {
+ _, e = io.Copy(w, r)
+ }
+
+ return nil
+}
+
+func (t *File) isExist() bool {
+ if len(t.Config.FilePath) > 4096 {
+ panic(ErrFilePathTooLong)
+ }
+
+ _, err := os.Stat(t.Config.FilePath)
+ if err != nil {
+ if errors.Is(err, os.ErrNotExist) {
+ return false
+ } else {
+ if !strings.Contains(err.Error(), "file name too long") {
+ panic(ErrFilePathTooLong)
+ }
+ return false
+ }
+ }
+ return true
+}
--- /dev/null
+package part
+
+import (
+ "bytes"
+ "testing"
+)
+
+func TestWriteReadDelSync(t *testing.T) {
+ f := New("rwd.txt", 0, true)
+ if i, e := f.Write([]byte("sss"), true); i == 0 || e != nil {
+ t.Fatal(e)
+ }
+
+ var buf = make([]byte, 3)
+ if i, e := f.Read(buf); i == 0 || e != nil {
+ t.Fatal(i, e)
+ } else {
+ for _, v := range buf {
+ if v != 's' {
+ t.Fatal(v)
+ }
+ }
+ }
+
+ if i, e := f.Read(buf); i == 0 || e != nil {
+ t.Fatal(i, e)
+ } else {
+ for _, v := range buf {
+ if v != 's' {
+ t.Fatal(v)
+ }
+ }
+ }
+
+ if e := f.Delete(); e != nil {
+ t.Fatal(e)
+ }
+}
+
+func TestWriteReadDel(t *testing.T) {
+ f := New("rwd.txt", 0, false)
+ if i, e := f.Write([]byte("sss"), true); i == 0 || e != nil {
+ t.Fatal(e)
+ }
+
+ if e := f.Seed(0); e != nil {
+ t.Fatal(e)
+ }
+
+ var buf = make([]byte, 3)
+ if i, e := f.Read(buf); i == 0 || e != nil {
+ t.Fatal(i, e)
+ } else {
+ for _, v := range buf {
+ if v != 's' {
+ t.Fatal(v)
+ }
+ }
+ }
+
+ if e := f.Close(); e != nil {
+ t.Fatal(e)
+ }
+
+ if e := f.Delete(); e != nil {
+ t.Fatal(e)
+ }
+}
+
+func TestSeed(t *testing.T) {
+ f := New("rwd.txt", 0, false)
+ if i, e := f.Write([]byte("12er4x3"), true); i == 0 || e != nil {
+ t.Fatal(e)
+ }
+
+ if e := f.Seed(1); e != nil {
+ t.Fatal(e)
+ }
+
+ var buf = make([]byte, 1)
+ if i, e := f.Read(buf); i == 0 || e != nil {
+ t.Fatal(i, e)
+ } else {
+ if buf[0] != '2' {
+ t.Fatal(buf[0])
+ }
+ }
+
+ if e := f.Seed(-1); e != nil {
+ t.Fatal(e)
+ }
+
+ if i, e := f.Read(buf); i == 0 || e != nil {
+ t.Fatal(i, e)
+ } else {
+ if buf[0] != '3' {
+ t.Fatal(buf[0])
+ }
+ }
+
+ if e := f.Close(); e != nil {
+ t.Fatal(e)
+ }
+
+ if e := f.Delete(); e != nil {
+ t.Fatal(e)
+ }
+}
+
+func TestCopy(t *testing.T) {
+ sf := New("s.txt", 0, true)
+ if i, e := sf.Write([]byte("12er4x3"), true); i == 0 || e != nil {
+ t.Fatal(e)
+ }
+
+ tf := New("t.txt", 0, true)
+ if e := sf.CopyTo(tf, 1, true); e != nil {
+ t.Fatal(e)
+ }
+
+ sf.Delete()
+ tf.Delete()
+}
+
+func TestReadUntil(t *testing.T) {
+ f := New("s.txt", 0, false)
+ if i, e := f.Write([]byte("18u3y7\ns99s9\n"), true); i == 0 || e != nil {
+ t.Fatal(e)
+ }
+
+ if e := f.Seed(0); e != nil {
+ t.Fatal(e)
+ }
+
+ if data, e := f.ReadUntil('\n', 5, 20); e != nil {
+ t.Fatal(e)
+ } else if !bytes.Equal(data, []byte("18u3y7")) {
+ t.Fatal(string(data))
+ }
+
+ t.Log(f.Config.CurIndex)
+
+ if data, e := f.ReadUntil('\n', 5, 20); e != nil {
+ t.Fatal(e)
+ } else if !bytes.Equal(data, []byte("s99s9")) {
+ t.Fatal(string(data))
+ }
+
+ if e := f.Close(); e != nil {
+ t.Fatal(e)
+ }
+
+ if e := f.Delete(); e != nil {
+ t.Fatal(e)
+ }
+}
--- /dev/null
+package part
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+ "strconv"
+ "time"
+
+ file "github.com/qydysky/part/file"
+ funcCtrl "github.com/qydysky/part/funcCtrl"
+ signal "github.com/qydysky/part/signal"
+)
+
+var (
+ ErrSerIsNil = errors.New("ErrSerIsNil")
+ ErrFileNoSet = errors.New("ErrFileNoSet")
+ ErrhadStart = errors.New("ErrhadStart")
+)
+
+type Recorder struct {
+ Server *Server
+ FilePath string
+ onlyOnce funcCtrl.SkipFunc
+ stopflag *signal.Signal
+}
+
+func (t *Recorder) Start() error {
+ if t.Server == nil {
+ return ErrSerIsNil
+ }
+ if t.FilePath == "" {
+ return ErrFileNoSet
+ }
+ if t.onlyOnce.NeedSkip() {
+ return ErrhadStart
+ }
+
+ go func() {
+ f := file.New(t.FilePath, 0, false)
+ defer f.Close()
+
+ var startTimeStamp time.Time
+
+ t.stopflag = signal.Init()
+ if startTimeStamp.IsZero() {
+ startTimeStamp = time.Now()
+ }
+ t.Server.Interface().Pull_tag(map[string]func(interface{}) bool{
+ `send`: func(data interface{}) bool {
+ if !t.stopflag.Islive() {
+ return true
+ }
+ if tmp, ok := data.(Uinterface); ok {
+ f.Write([]byte(fmt.Sprintf("%f,%d,%s\n", time.Since(startTimeStamp).Seconds(), tmp.Id, tmp.Data)), true)
+ }
+ return false
+ },
+ })
+ t.stopflag.Wait()
+ }()
+ return nil
+}
+
+func (t *Recorder) Stop() {
+ if t.stopflag.Islive() {
+ t.stopflag.Done()
+ }
+ t.onlyOnce.UnSet()
+}
+
+func Play(filePath string, perReadSize int, maxReadSize int) (s *Server) {
+ s = New_server()
+
+ go func() {
+ f := file.New(filePath, 0, false)
+ defer f.Close()
+
+ startT := time.Now()
+ timer := time.NewTicker(time.Second)
+
+ for {
+ cu := (<-timer.C).Sub(startT).Seconds()
+
+ for {
+ if data, err := f.ReadUntil('\n', perReadSize, maxReadSize); err != nil {
+ panic(err)
+ } else if len(data) != 0 {
+ datas := bytes.Split(data, []byte(","))
+ d, _ := strconv.ParseFloat(string(datas[0]), 64)
+ s.Interface().Push_tag(`send`, Uinterface{
+ Id: 0, //send to all
+ Data: datas[2],
+ })
+ if d > cu {
+ break
+ }
+ } else {
+ break
+ }
+ }
+
+ }
+ }()
+
+ return
+}
package part
import (
+ "net/http"
"strconv"
"testing"
- "net/http"
"time"
- "github.com/skratchdot/open-golang/open"
+
web "github.com/qydysky/part/web"
+ "github.com/skratchdot/open-golang/open"
)
func Test_Server(t *testing.T) {
num := 5
ws_mq := s.Interface()
- ws_mq.Pull_tag(map[string]func(interface{})(bool){
- `error`:func(data interface{})(bool){
+
+ recoder := &Recorder{
+ Server: s,
+ FilePath: "l.csv",
+ }
+ recoder.Start()
+ defer recoder.Stop()
+
+ ws_mq.Pull_tag(map[string]func(interface{}) bool{
+ `error`: func(data interface{}) bool {
t.Log(data)
return false
},
- `recv`:func(data interface{})(bool){
- if tmp,ok := data.(Uinterface);ok {
- t.Log(tmp.Id, `=>`,string(tmp.Data))
- t.Log(string(tmp.Data), `=>`,tmp.Id)
+ `recv`: func(data interface{}) bool {
+ if tmp, ok := data.(Uinterface); ok {
+ t.Log(tmp.Id, `=>`, string(tmp.Data))
+ t.Log(string(tmp.Data), `=>`, tmp.Id)
num -= 1
if num > 0 {
- ws_mq.Push_tag(`send`,Uinterface{//just reply
- Id:tmp.Id,
- Data:append(tmp.Data,[]byte(` get.server:close after `+strconv.Itoa(num)+` s`)...),
+ ws_mq.Push_tag(`send`, Uinterface{ //just reply
+ Id: tmp.Id,
+ Data: append(tmp.Data, []byte(` get.server:close after `+strconv.Itoa(num)+` s`)...),
})
} else {
- ws_mq.Push_tag(`close`,Uinterface{//close
- Id:tmp.Id,
- Data:[]byte(`closeNormal`),
+ ws_mq.Push_tag(`close`, Uinterface{ //close
+ Id: tmp.Id,
+ Data: []byte(`closeNormal`),
})
}
}
}
w := web.Easy_boot()
- open.Run("http://"+w.Server.Addr)
- w.Handle(map[string]func(http.ResponseWriter,*http.Request){
- `/ws`:func(w http.ResponseWriter,r *http.Request){
- conn := s.WS(w,r)
- id :=<- conn
- t.Log(`user connect!`,id)
- <- conn
- t.Log(`user disconnect!`,id)
+ open.Run("http://" + w.Server.Addr)
+ w.Handle(map[string]func(http.ResponseWriter, *http.Request){
+ `/ws`: func(w http.ResponseWriter, r *http.Request) {
+ conn := s.WS(w, r)
+ id := <-conn
+ t.Log(`user connect!`, id)
+ <-conn
+ t.Log(`user disconnect!`, id)
+ },
+ })
+ time.Sleep(time.Second * time.Duration(10))
+}
+
+func Test_Recoder(t *testing.T) {
+ s := Play("l.csv", 50, 5000)
+
+ w := web.Easy_boot()
+ open.Run("http://" + w.Server.Addr)
+ w.Handle(map[string]func(http.ResponseWriter, *http.Request){
+ `/ws`: func(w http.ResponseWriter, r *http.Request) {
+ conn := s.WS(w, r)
+ id := <-conn
+ t.Log(`user connect!`, id)
+ <-conn
+ t.Log(`user disconnect!`, id)
},
})
- time.Sleep(time.Second*time.Duration(100))
-}
\ No newline at end of file
+ time.Sleep(time.Second * time.Duration(10))
+}
--- /dev/null
+3.325727,824637721992,send get.server:close after 4 s
+6.333226,824637721992,send get.server:close after 3 s
+9.335760,824637721992,send get.server:close after 2 s