package file import ( "fmt" "io" "os" "time" "linkfog.com/public/lib/l" ) type FileStreamer struct { Lines chan string path string fd *os.File fInfo os.FileInfo closed bool } // NewStreamer transfer the given file path to a streamer, // then file can be read by the Line channel. And FileStreamer // will reopen path if it has been rotated func NewStreamer(path string, seekToEnd bool) (*FileStreamer, error) { fInfo, err := os.Stat(path) if err != nil { return nil, fmt.Errorf("stat file err:%s", err) } fd, err := os.Open(path) if err != nil { return nil, fmt.Errorf("open file err:%s", err) } if seekToEnd { offset, err := fd.Seek(0, io.SeekEnd) if err != nil { closeErr := fd.Close() if closeErr != nil { l.Warn("file close err", closeErr) } return nil, fmt.Errorf("seek to file end err:%s", err) } l.Debug("seek offset:", offset) } f := &FileStreamer{ path: path, fd: fd, fInfo: fInfo, Lines: make(chan string, 1), } go f.readLines() return f, nil } func (f *FileStreamer) readLines() { // to recover send on closed channel panic when call f.Close() defer func() { err := recover() if err != nil { l.Error("readLines recover:", err) } }() for !f.closed { if f.fd != nil { err := ReadLine(f.fd, func(line string) error { if f.closed { return fmt.Errorf("stream closed, break readline") } f.Lines <- line return nil }) if err != nil { l.Error("read line err:", err) } FadviseSwitch(f.fd) } time.Sleep(5 * time.Second) if !f.closed { fInfo, err := os.Stat(f.path) if err != nil { l.Warn("stat file err:", err) continue } offset, err := f.fd.Seek(0, io.SeekCurrent) if err != nil { l.Warn("seek current err:", err) } if os.SameFile(fInfo, f.fInfo) && offset <= fInfo.Size() { continue } else { f.fd.Close() f.fd, err = os.Open(f.path) if err != nil { l.Error("open file err:", err) continue } f.fInfo = fInfo } } } f.fd.Close() close(f.Lines) return } func (f *FileStreamer) Close() { f.closed = true }