From 404f4b1030c851b0c439159dcbdda90249a0621c Mon Sep 17 00:00:00 2001 From: Tomohiro Matsuzawa Date: Fri, 3 Apr 2020 19:27:47 +0900 Subject: [PATCH 1/2] updated srt version 1.41 --- .travis.yml | 7 ++-- Dockerfile | 4 +- internal/poll/runtime/netpoll_epoll.go | 51 +++++++++++++++++++++++++- srt/mockserver_test.go | 1 + srt/sockopt.go | 4 +- srtapi/srtapi_cgo.go | 47 +++++++++++++++++++++++- srtapi/types.go | 13 ++++++- 7 files changed, 117 insertions(+), 10 deletions(-) diff --git a/.travis.yml b/.travis.yml index 479ebce..47c120d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,11 +1,12 @@ language: go sudo: false go: - - 1.12.x + - 1.14.x + - 1.13.x - master env: - - SRT_VERSION=v1.3.4 + - SRT_VERSION=v1.4.1 matrix: allow_failures: @@ -37,4 +38,4 @@ script: after_script: - gover - - goveralls -coverprofile=gover.coverprofile -repotoken $COVERALLS_TOKEN \ No newline at end of file + - goveralls -coverprofile=gover.coverprofile -repotoken $COVERALLS_TOKEN diff --git a/Dockerfile b/Dockerfile index 8962e6f..dde9c79 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,8 +1,8 @@ #build stage -ARG GO_VERSION=1.12.9 +ARG GO_VERSION=1.14 FROM golang:${GO_VERSION}-alpine AS build-stage -ENV SRT_VERSION v1.3.4 +ENV SRT_VERSION v1.4.1 ENV LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/lib64 RUN wget -O srt.tar.gz "https://github.com/Haivision/srt/archive/${SRT_VERSION}.tar.gz" \ diff --git a/internal/poll/runtime/netpoll_epoll.go b/internal/poll/runtime/netpoll_epoll.go index f6b81ef..6ca6d3b 100644 --- a/internal/poll/runtime/netpoll_epoll.go +++ b/internal/poll/runtime/netpoll_epoll.go @@ -49,7 +49,7 @@ func netpolldescriptor() int { } func netpollopen(fd int, pd *pollDesc) error { - events := srtapi.EpollIn | srtapi.EpollErr + events := srtapi.EpollIn | srtapi.EpollErr | srtapi.EpollEt pdsLock.Lock() pds[fd] = pd pdsLock.Unlock() @@ -64,7 +64,7 @@ func netpollclose(fd int) error { } func netpoll_wait_for_write(fd int, enable bool) { - events := srtapi.EpollIn | srtapi.EpollErr + events := srtapi.EpollIn | srtapi.EpollErr | srtapi.EpollEt if enable { events |= srtapi.EpollOut } @@ -88,6 +88,11 @@ func run() { for atomic.LoadInt32(&intState) == 0 { rfdslen = len(rfds) wfdslen = len(wfds) + + if _, err := srtapi.EpollSet(epfd, srtapi.EpollEnableEmpty); err != nil { + println("runtime: srt_epoll_set failed with", err.Error()) + panic("runtime: netpoll::run failed") + } n := srtapi.EpollWait(epfd, &rfds[0], &rfdslen, &wfds[0], &wfdslen, 100) if n > 0 { pdsLock.RLock() @@ -107,3 +112,45 @@ func run() { } } } + +// this version may be better but it get deadlock state when tring to connect to closed SRT socket currently +func runUWaitVersion() { + const fdsSize = 128 + var fdsSet [fdsSize]srtapi.SrtEpollEvent + + defer func() { + for s, pd := range pds { + if !pd.closing { + srtapi.Close(s) + } + } + srtapi.Cleanup() + done <- true + }() + + for atomic.LoadInt32(&intState) == 0 { + if _, err := srtapi.EpollSet(epfd, srtapi.EpollEnableEmpty); err != nil { + println("runtime: srt_epoll_set failed with", err.Error()) + panic("runtime: netpoll::run failed") + } + n := srtapi.EpollUwait(epfd, &fdsSet[0], fdsSize, 100) + if n > 0 { + pdsLock.RLock() + for i := 0; i < n; i++ { + fds := fdsSet[i] + fd := int(srtapi.GetFdFromEpollEvent(&fds)) + events := srtapi.GetEventsFromEpollEvent(&fds) + if pd := pds[fd]; pd != nil { + var mode int + if (events & srtapi.EpollOut) == 0 { + mode = 'r' + } else { + mode = 'w' + } + netpollready(pd, mode) + } + } + pdsLock.RUnlock() + } + } +} diff --git a/srt/mockserver_test.go b/srt/mockserver_test.go index 4022e3a..fa69090 100644 --- a/srt/mockserver_test.go +++ b/srt/mockserver_test.go @@ -213,6 +213,7 @@ func transponder(ln net.Listener, ch chan<- error) { ch <- err return } + time.Sleep(time.Second * 1) } func transceiver(c net.Conn, wb []byte, ch chan<- error) { diff --git a/srt/sockopt.go b/srt/sockopt.go index 0c83363..e8ce3b4 100644 --- a/srt/sockopt.go +++ b/srt/sockopt.go @@ -92,7 +92,9 @@ var srtOptions = []socketOption{ {"payloadsize", 0, srtapi.OptionPayloadsize, bindPre, typeInt}, {"kmrefreshrate", 0, srtapi.OptionKmrefreshrate, bindPre, typeInt}, {"kmpreannounce", 0, srtapi.OptionKmpreannounce, bindPre, typeInt}, - {"strictenc", 0, srtapi.OptionStrictenc, bindPre, typeInt}, + {"enforcedencryption", 0, srtapi.OptionEnforcedencryption, bindPre, typeBool}, + {"peeridletimeo", 0, srtapi.OptionPeeridletimeo, bindPre, typeInt}, + {"packetfilter", 0, srtapi.OptionPacketfilter, bindPre, typeString}, } type option struct { diff --git a/srtapi/srtapi_cgo.go b/srtapi/srtapi_cgo.go index 0032e73..0f7ed92 100644 --- a/srtapi/srtapi_cgo.go +++ b/srtapi/srtapi_cgo.go @@ -81,7 +81,7 @@ func EpollUpdateUsock(epfd int, fd int, events int) (err error) { } // EpollWait call srt_epoll_wait -func EpollWait(epfd int, rfds *SrtSocket, rfdslen *int, wfds *SrtSocket, wfdslen *int, timeout int) (n int) { +func EpollWait(epfd int, rfds *SrtSocket, rfdslen *int, wfds *SrtSocket, wfdslen *int, timeout int64) (n int) { runtime.LockOSThread() defer runtime.UnlockOSThread() @@ -104,6 +104,47 @@ func EpollWait(epfd int, rfds *SrtSocket, rfdslen *int, wfds *SrtSocket, wfdslen return } +// EpollUwait call srt_epoll_uwait +func EpollUwait(epfd int, fdsSet *SrtEpollEvent, fdsSize int, msTimeOut int64) (n int) { + runtime.LockOSThread() + defer runtime.UnlockOSThread() + + n = int(C.srt_epoll_uwait(C.int(epfd), (*C.SRT_EPOLL_EVENT)(fdsSet), C.int(fdsSize), C.int64_t(msTimeOut))) + if n < 0 { + err := getLastError() + switch err { + case ETIMEOUT: + default: + println("runtime: srt_epoll_uwait on fd", epfd, "failed with", err.Error()) + panic("runtime: netpoll failed") + } + ClearLastError() + n = 0 + } + return +} + +// GetFdFromEpollEvent return fd from SrtEpollEvent +func GetFdFromEpollEvent(fds *SrtEpollEvent) SrtSocket { + return SrtSocket(fds.fd) +} + +// GetEventsFromEpollEvent return events from SrtEpollEvent +func GetEventsFromEpollEvent(fds *SrtEpollEvent) int { + return int(fds.events) +} + +// EpollSet call srt_epoll_set +func EpollSet(epfd int, flags int) (oflags int, err error) { + runtime.LockOSThread() + defer runtime.UnlockOSThread() + oflags = int(C.srt_epoll_set(C.int(epfd), C.int(flags))) + if oflags == APIError { + err = getLastError() + } + return +} + func accept(s int, rsa *syscall.RawSockaddrAny, addrlen *_Socklen) (fd int, err error) { runtime.LockOSThread() defer runtime.UnlockOSThread() @@ -310,6 +351,7 @@ func GetStats(fd int, clear bool) map[string]interface{} { "packetsLost": mon.pktSndLoss, "packetsDropped": mon.pktSndDrop, "packetsRetransmitted": mon.pktRetrans, + "packetsFilterExtra": mon.pktSndFilterExtra, "bytes": mon.byteSent, "bytesDropped": mon.byteSndDrop, "mbitRate": mon.mbpsSendRate, @@ -320,6 +362,9 @@ func GetStats(fd int, clear bool) map[string]interface{} { "packetsDropped": mon.pktRcvDrop, "packetsRetransmitted": mon.pktRcvRetrans, "packetsBelated": mon.pktRcvBelated, + "packetsFilterExtra": mon.pktRcvFilterExtra, + "packetsFilterSupply": mon.pktRcvFilterSupply, + "packetsFilterLoss": mon.pktRcvFilterLoss, "bytes": mon.byteRecv, "bytesLost": mon.byteRcvLoss, "bytesDropped": mon.byteRcvDrop, diff --git a/srtapi/types.go b/srtapi/types.go index c5c7a7b..1da8c67 100644 --- a/srtapi/types.go +++ b/srtapi/types.go @@ -16,6 +16,9 @@ type _Socklen C.int // SrtSocket represents SRT C API SRTSOCKET type type SrtSocket C.SRTSOCKET +// SrtEpollEvent represent SRT C API SRT_EPOLL_EVENT structure +type SrtEpollEvent C.SRT_EPOLL_EVENT + //lint:ignore U1000 we want to use it to calculate size var rsa syscall.RawSockaddrAny //lint:ignore U1000 we want to use it to calculate size @@ -94,9 +97,10 @@ const ( OptionTranstype = C.SRTO_TRANSTYPE OptionKmrefreshrate = C.SRTO_KMREFRESHRATE OptionKmpreannounce = C.SRTO_KMPREANNOUNCE - OptionStrictenc = C.SRTO_STRICTENC + OptionEnforcedencryption = C.SRTO_ENFORCEDENCRYPTION OptionIpv60only = C.SRTO_IPV6ONLY OptionPeeridletimeo = C.SRTO_PEERIDLETIMEO + OptionPacketfilter = C.SRTO_PACKETFILTER ) // SRT trans type @@ -141,6 +145,7 @@ const ( EpollIn = C.SRT_EPOLL_IN EpollOut = C.SRT_EPOLL_OUT EpollErr = C.SRT_EPOLL_ERR + EpollEt = C.SRT_EPOLL_ET ) // SRT const @@ -150,3 +155,9 @@ const ( DefaultSendfileBlock = 364000 DefaultRecvfileBlock = 7280000 ) + +// SRT_EPOLL_FLAGS +const ( + EpollEnableEmpty = C.SRT_EPOLL_ENABLE_EMPTY + EpollEnableOutputcheck = C.SRT_EPOLL_ENABLE_OUTPUTCHECK +) \ No newline at end of file From ea225f7b11799da28d40d5a25e9367c2910227cf Mon Sep 17 00:00:00 2001 From: Tomohiro Matsuzawa Date: Fri, 3 Apr 2020 19:37:23 +0900 Subject: [PATCH 2/2] removed unused function --- internal/poll/runtime/netpoll_epoll.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/poll/runtime/netpoll_epoll.go b/internal/poll/runtime/netpoll_epoll.go index 6ca6d3b..912fadd 100644 --- a/internal/poll/runtime/netpoll_epoll.go +++ b/internal/poll/runtime/netpoll_epoll.go @@ -114,7 +114,7 @@ func run() { } // this version may be better but it get deadlock state when tring to connect to closed SRT socket currently -func runUWaitVersion() { +/*func runUWaitVersion() { const fdsSize = 128 var fdsSet [fdsSize]srtapi.SrtEpollEvent @@ -153,4 +153,4 @@ func runUWaitVersion() { pdsLock.RUnlock() } } -} +}*/