diff --git a/pkg/media/rtpdump/reader.go b/pkg/media/rtpdump/reader.go new file mode 100644 index 00000000000..e763f8b0cb8 --- /dev/null +++ b/pkg/media/rtpdump/reader.go @@ -0,0 +1,103 @@ +package rtpdump + +import ( + "bufio" + "io" + "regexp" + "sync" +) + +// The file starts with #!rtpplay1.0 address/port\n +var preambleRegexp = regexp.MustCompile(`#\!rtpplay1\.0 \d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\/\d{1,5}\n`) + +// Reader reads the RTPDump file format +type Reader struct { + readerMu sync.Mutex + reader io.Reader +} + +// NewReader opens a new Reader and immediately reads the Header from the start +// of the input stream. +func NewReader(r io.Reader) (*Reader, Header, error) { + var hdr Header + + bio := bufio.NewReader(r) + + // Look ahead to see if there's a valid preamble + peek, err := bio.Peek(preambleLen) + if err == io.EOF { + return nil, hdr, errMalformed + } + if err != nil { + return nil, hdr, err + } + if !preambleRegexp.Match(peek) { + return nil, hdr, errMalformed + } + + // consume the preamble + _, _, err = bio.ReadLine() + if err == io.EOF { + return nil, hdr, errMalformed + } + if err != nil { + return nil, hdr, err + } + + hBuf := make([]byte, headerLen) + _, err = io.ReadFull(bio, hBuf) + if err == io.ErrUnexpectedEOF || err == io.EOF { + return nil, hdr, errMalformed + } + if err != nil { + return nil, hdr, err + } + + if err := hdr.Unmarshal(hBuf); err != nil { + return nil, hdr, err + } + + return &Reader{ + reader: bio, + }, hdr, nil +} + +// Next returns the next Packet in the Reader input stream +func (r *Reader) Next() (Packet, error) { + r.readerMu.Lock() + defer r.readerMu.Unlock() + + hBuf := make([]byte, pktHeaderLen) + + _, err := io.ReadFull(r.reader, hBuf) + if err == io.ErrUnexpectedEOF { + return Packet{}, errMalformed + } + if err != nil { + return Packet{}, err + } + + var h packetHeader + if err = h.Unmarshal(hBuf); err != nil { + return Packet{}, err + } + + if h.Length == 0 { + return Packet{}, errMalformed + } + + payload := make([]byte, h.Length-pktHeaderLen) + _, err = io.ReadFull(r.reader, payload) + if err == io.ErrUnexpectedEOF { + return Packet{}, errMalformed + } + if err != nil { + return Packet{}, err + } + + return Packet{ + Offset: h.Offset, + IsRTCP: h.PacketLength == 0, + Payload: payload, + }, nil +} diff --git a/pkg/media/rtpdump/reader_test.go b/pkg/media/rtpdump/reader_test.go new file mode 100644 index 00000000000..19dd9e3ad76 --- /dev/null +++ b/pkg/media/rtpdump/reader_test.go @@ -0,0 +1,282 @@ +package rtpdump + +import ( + "bytes" + "io" + "net" + "reflect" + "testing" + "time" +) + +var validPreamble = []byte("#!rtpplay1.0 224.2.0.1/3456\n") + +func TestReader(t *testing.T) { + for _, test := range []struct { + Name string + Data []byte + WantHeader Header + WantPackets []Packet + WantErr error + }{ + { + Name: "empty", + Data: nil, + WantErr: errMalformed, + }, + { + Name: "hashbang missing ip/port", + Data: append( + []byte("#!rtpplay1.0 \n"), + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + ), + WantErr: errMalformed, + }, + { + Name: "hashbang missing port", + Data: append( + []byte("#!rtpplay1.0 0.0.0.0\n"), + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + ), + WantErr: errMalformed, + }, + { + Name: "valid empty file", + Data: append( + validPreamble, + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x00, 0x00, 0x00, + 0x01, 0x01, 0x01, 0x01, + 0x22, 0xB8, 0x00, 0x00, + ), + WantHeader: Header{ + Start: time.Unix(1, 0).UTC(), + Source: net.IPv4(1, 1, 1, 1), + Port: 8888, + }, + }, + { + Name: "malformed packet header", + Data: append( + validPreamble, + // header + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + // packet header + 0x00, + ), + WantHeader: Header{ + Start: time.Unix(0, 0).UTC(), + Source: net.IPv4(0, 0, 0, 0), + Port: 0, + }, + WantErr: errMalformed, + }, + { + Name: "short packet payload", + Data: append( + validPreamble, + // header + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + // packet header len=1048575 + 0xFF, 0xFF, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + // packet paylaod + 0x00, + ), + WantHeader: Header{ + Start: time.Unix(0, 0).UTC(), + Source: net.IPv4(0, 0, 0, 0), + Port: 0, + }, + WantErr: errMalformed, + }, + { + Name: "empty packet payload", + Data: append( + validPreamble, + // header + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + // packet header len=0 + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + ), + WantHeader: Header{ + Start: time.Unix(0, 0).UTC(), + Source: net.IPv4(0, 0, 0, 0), + Port: 0, + }, + WantErr: errMalformed, + }, + { + Name: "valid rtcp packet", + Data: append( + validPreamble, + // header + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + // packet header len=20, pLen=0, off=1 + 0x00, 0x14, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x01, + // packet payload (BYE) + 0x81, 0xcb, 0x00, 0x0c, + 0x90, 0x2f, 0x9e, 0x2e, + 0x03, 0x46, 0x4f, 0x4f, + ), + WantHeader: Header{ + Start: time.Unix(0, 0).UTC(), + Source: net.IPv4(0, 0, 0, 0), + Port: 0, + }, + WantPackets: []Packet{ + { + Offset: 1, + IsRTCP: true, + Payload: []byte{ + 0x81, 0xcb, 0x00, 0x0c, + 0x90, 0x2f, 0x9e, 0x2e, + 0x03, 0x46, 0x4f, 0x4f, + }, + }, + }, + WantErr: nil, + }, + { + Name: "truncated rtcp packet", + Data: append( + validPreamble, + // header + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + // packet header len=9, pLen=0, off=1 + 0x00, 0x09, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x01, + // invalid payload + 0x81, + ), + WantHeader: Header{ + Start: time.Unix(0, 0).UTC(), + Source: net.IPv4(0, 0, 0, 0), + Port: 0, + }, + WantPackets: []Packet{ + { + Offset: 1, + IsRTCP: true, + Payload: []byte{0x81}, + }, + }, + }, + { + Name: "two valid packets", + Data: append( + validPreamble, + // header + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + // packet header len=20, pLen=0, off=1 + 0x00, 0x14, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x01, + // packet payload (BYE) + 0x81, 0xcb, 0x00, 0x0c, + 0x90, 0x2f, 0x9e, 0x2e, + 0x03, 0x46, 0x4f, 0x4f, + // packet header len=33, pLen=25, off=2 + 0x00, 0x21, 0x19, 0x00, + 0x00, 0x00, 0x00, 0x02, + // packet payload (RTP) + 0x90, 0x60, 0x69, 0x8f, + 0xd9, 0xc2, 0x93, 0xda, + 0x1c, 0x64, 0x27, 0x82, + 0x00, 0x01, 0x00, 0x01, + 0xFF, 0xFF, 0xFF, 0xFF, + 0x98, 0x36, 0xbe, 0x88, + 0x9e, + ), + WantHeader: Header{ + Start: time.Unix(0, 0).UTC(), + Source: net.IPv4(0, 0, 0, 0), + Port: 0, + }, + WantPackets: []Packet{ + { + Offset: 1, + IsRTCP: true, + Payload: []byte{ + 0x81, 0xcb, 0x00, 0x0c, + 0x90, 0x2f, 0x9e, 0x2e, + 0x03, 0x46, 0x4f, 0x4f, + }, + }, + { + Offset: 2, + IsRTCP: false, + Payload: []byte{ + 0x90, 0x60, 0x69, 0x8f, + 0xd9, 0xc2, 0x93, 0xda, + 0x1c, 0x64, 0x27, 0x82, + 0x00, 0x01, 0x00, 0x01, + 0xFF, 0xFF, 0xFF, 0xFF, + 0x98, 0x36, 0xbe, 0x88, + 0x9e, + }, + }, + }, + WantErr: nil, + }, + } { + r, hdr, err := NewReader(bytes.NewReader(test.Data)) + if err != nil { + if got, want := err, test.WantErr; got != want { + t.Fatalf("NewReader(%s) err=%v want %v", test.Name, got, want) + } + continue + } + + if got, want := hdr, test.WantHeader; !reflect.DeepEqual(got, want) { + t.Fatalf("%q Header = %#v, want %#v", test.Name, got, want) + } + + var nextErr error + var packets []Packet + for { + pkt, err := r.Next() + if err == io.EOF { + break + } + if err != nil { + nextErr = err + break + } + + packets = append(packets, pkt) + } + + if got, want := nextErr, test.WantErr; got != want { + t.Fatalf("%s err=%v want %v", test.Name, got, want) + } + if got, want := packets, test.WantPackets; !reflect.DeepEqual(got, want) { + t.Fatalf("%q packets=%#v, want %#v", test.Name, got, want) + } + } +} diff --git a/pkg/media/rtpdump/rtpdump.go b/pkg/media/rtpdump/rtpdump.go new file mode 100644 index 00000000000..2f9a257efe6 --- /dev/null +++ b/pkg/media/rtpdump/rtpdump.go @@ -0,0 +1,159 @@ +// Package rtpdump implements the RTPDump file format documented at +// https://www.cs.columbia.edu/irt/software/rtptools/ +package rtpdump + +import ( + "encoding/binary" + "errors" + "net" + "time" +) + +const ( + pktHeaderLen = 8 + headerLen = 16 + preambleLen = 36 +) + +var errMalformed = errors.New("malformed rtpdump") + +// Header is the binary header at the top of the RTPDump file. It contains +// information about the source and start time of the packet stream included +// in the file. +type Header struct { + // start of recording (GMT) + Start time.Time + // network source (multicast address) + Source net.IP + // UDP port + Port uint16 +} + +// Marshal encodes the Header as binary. +func (h Header) Marshal() ([]byte, error) { + d := make([]byte, headerLen) + + startNano := h.Start.UnixNano() + startSec := uint32(startNano / int64(time.Second)) + startUsec := uint32( + (startNano % int64(time.Second)) / int64(time.Microsecond), + ) + binary.BigEndian.PutUint32(d[0:], startSec) + binary.BigEndian.PutUint32(d[4:], startUsec) + + source := h.Source.To4() + copy(d[8:], source) + + binary.BigEndian.PutUint16(d[12:], h.Port) + + return d, nil +} + +// Unmarshal decodes the Header from binary. +func (h *Header) Unmarshal(d []byte) error { + if len(d) < headerLen { + return errMalformed + } + + // time as a `struct timeval` + startSec := binary.BigEndian.Uint32(d[0:]) + startUsec := binary.BigEndian.Uint32(d[4:]) + h.Start = time.Unix(int64(startSec), int64(startUsec)*1e3).UTC() + + // ipv4 address + h.Source = net.IPv4(d[8], d[9], d[10], d[11]) + + h.Port = binary.BigEndian.Uint16(d[12:]) + + // 2 bytes of padding (ignored) + + return nil +} + +// Packet contains an RTP or RTCP packet along a time offset when it was logged +// (relative to the Start of the recording in Header). The Payload may contain +// truncated packets to support logging just the headers of RTP/RTCP packets. +type Packet struct { + // Offset is the time since the start of recording in millseconds + Offset uint32 + // IsRTCP is true if the payload is RTCP, false if the payload is RTP + IsRTCP bool + // Payload is the binary RTP or or RTCP payload. The contents may not parse + // as a valid packet if the contents have been truncated. + Payload []byte +} + +// Marshal encodes the Packet as binary. +func (p Packet) Marshal() ([]byte, error) { + packetLength := len(p.Payload) + 8 + if p.IsRTCP { + packetLength = 0 + } + + hdr := packetHeader{ + Length: uint16(len(p.Payload)) + 8, + PacketLength: uint16(packetLength), + Offset: p.Offset, + } + hdrData, err := hdr.Marshal() + if err != nil { + return nil, err + } + + data := append(hdrData, p.Payload...) + + return data, nil +} + +// Unmarshal decodes the Packet from binary. +func (p *Packet) Unmarshal(d []byte) error { + var hdr packetHeader + if err := hdr.Unmarshal(d); err != nil { + return err + } + + p.Offset = hdr.Offset + p.IsRTCP = hdr.Length != 0 && hdr.PacketLength == 0 + + if hdr.Length < 8 { + return errMalformed + } + if len(d) < int(hdr.Length) { + return errMalformed + } + p.Payload = d[8:hdr.Length] + + return nil +} + +type packetHeader struct { + // length of packet, including this header (may be smaller than + // plen if not whole packet recorded) + Length uint16 + // Actual header+payload length for RTP, 0 for RTCP + PacketLength uint16 + // milliseconds since the start of recording + Offset uint32 +} + +func (p packetHeader) Marshal() ([]byte, error) { + d := make([]byte, pktHeaderLen) + + binary.BigEndian.PutUint16(d[0:], p.Length) + binary.BigEndian.PutUint16(d[2:], p.PacketLength) + binary.BigEndian.PutUint32(d[4:], p.Offset) + + return d, nil +} + +func (p *packetHeader) Unmarshal(d []byte) error { + if len(d) < pktHeaderLen { + return errMalformed + } + + p.Length = binary.BigEndian.Uint16(d[0:]) + p.PacketLength = binary.BigEndian.Uint16(d[2:]) + p.Offset = binary.BigEndian.Uint32(d[4:]) + + return nil +} diff --git a/pkg/media/rtpdump/rtpdump_test.go b/pkg/media/rtpdump/rtpdump_test.go new file mode 100644 index 00000000000..b89fe066c3a --- /dev/null +++ b/pkg/media/rtpdump/rtpdump_test.go @@ -0,0 +1,118 @@ +package rtpdump + +import ( + "net" + "reflect" + "testing" + "time" +) + +func TestHeaderRoundTrip(t *testing.T) { + for _, test := range []struct { + Header Header + }{ + { + Header: Header{ + Start: time.Unix(0, 0).UTC(), + Source: net.IPv4(0, 0, 0, 0), + Port: 0, + }, + }, + { + Header: Header{ + Start: time.Date(2019, 3, 25, 1, 1, 1, 0, time.UTC), + Source: net.IPv4(1, 2, 3, 4), + Port: 8080, + }, + }, + } { + d, err := test.Header.Marshal() + if err != nil { + t.Fatal(err) + } + + var hdr Header + if err := hdr.Unmarshal(d); err != nil { + t.Fatal(err) + } + + if got, want := hdr, test.Header; !reflect.DeepEqual(got, want) { + t.Fatalf("Unmarshal(%v.Marshal()) = %v, want identical", got, want) + } + } +} + +func TestMarshalHeader(t *testing.T) { + for _, test := range []struct { + Name string + Header Header + Want []byte + WantErr error + }{ + { + Name: "nil source", + Header: Header{ + Start: time.Unix(0, 0).UTC(), + Source: nil, + Port: 0, + }, + Want: []byte{ + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + }, + }, + } { + data, err := test.Header.Marshal() + if got, want := err, test.WantErr; got != want { + t.Fatalf("Marshal(%q) err=%v, want %v", test.Name, got, want) + } + + if got, want := data, test.Want; !reflect.DeepEqual(got, want) { + t.Fatalf("Marshal(%q) = %v, want %v", test.Name, got, want) + } + } +} + +func TestPacketRoundTrip(t *testing.T) { + for _, test := range []struct { + Packet Packet + }{ + { + Packet: Packet{ + Offset: 0, + IsRTCP: false, + Payload: []byte{0}, + }, + }, + { + Packet: Packet{ + Offset: 0, + IsRTCP: true, + Payload: []byte{0}, + }, + }, + { + Packet: Packet{ + Offset: 123, + IsRTCP: false, + Payload: []byte{1, 2, 3, 4}, + }, + }, + } { + d, err := test.Packet.Marshal() + if err != nil { + t.Fatal(err) + } + + var pkt Packet + if err := pkt.Unmarshal(d); err != nil { + t.Fatal(err) + } + + if got, want := pkt, test.Packet; !reflect.DeepEqual(got, want) { + t.Fatalf("Unmarshal(%v.Marshal()) = %v, want identical", got, want) + } + } +} diff --git a/pkg/media/rtpdump/writer.go b/pkg/media/rtpdump/writer.go new file mode 100644 index 00000000000..1c4239d6c83 --- /dev/null +++ b/pkg/media/rtpdump/writer.go @@ -0,0 +1,51 @@ +package rtpdump + +import ( + "fmt" + "io" + "sync" +) + +// Writer writes the RTPDump file format +type Writer struct { + writerMu sync.Mutex + writer io.Writer +} + +// NewWriter makes a new Writer and immediately writes the given Header +// to begin the file. +func NewWriter(w io.Writer, hdr Header) (*Writer, error) { + preamble := fmt.Sprintf( + "#!rtpplay1.0 %s/%d\n", + hdr.Source.To4().String(), + hdr.Port) + if _, err := w.Write([]byte(preamble)); err != nil { + return nil, err + } + + hData, err := hdr.Marshal() + if err != nil { + return nil, err + } + if _, err := w.Write(hData); err != nil { + return nil, err + } + + return &Writer{writer: w}, nil +} + +// WritePacket writes a Packet to the output +func (w *Writer) WritePacket(p Packet) error { + w.writerMu.Lock() + defer w.writerMu.Unlock() + + data, err := p.Marshal() + if err != nil { + return err + } + if _, err := w.writer.Write(data); err != nil { + return err + } + + return nil +} diff --git a/pkg/media/rtpdump/writer_test.go b/pkg/media/rtpdump/writer_test.go new file mode 100644 index 00000000000..3d7cfa5b210 --- /dev/null +++ b/pkg/media/rtpdump/writer_test.go @@ -0,0 +1,107 @@ +package rtpdump + +import ( + "bytes" + "io" + "net" + "reflect" + "testing" + "time" +) + +func TestWriter(t *testing.T) { + buf := bytes.NewBuffer(nil) + + writer, err := NewWriter(buf, Header{ + Start: time.Unix(9, 0), + Source: net.IPv4(2, 2, 2, 2), + Port: 2222, + }) + if err != nil { + t.Fatal(err) + } + + if err := writer.WritePacket(Packet{ + Offset: 1, + IsRTCP: false, + Payload: []byte{9}, + }); err != nil { + t.Fatal(err) + } + + expected := append( + []byte("#!rtpplay1.0 2.2.2.2/2222\n"), + // header + 0x00, 0x00, 0x00, 0x09, + 0x00, 0x00, 0x00, 0x00, + 0x02, 0x02, 0x02, 0x02, + 0x08, 0xae, 0x00, 0x00, + // packet header + 0x00, 0x09, 0x00, 0x09, + 0x00, 0x00, 0x00, 0x01, + 0x09, + ) + + if got, want := buf.Bytes(), expected; !reflect.DeepEqual(got, want) { + t.Fatalf("wrote %v, want %v", string(got), want) + } +} + +func TestRoundTrip(t *testing.T) { + buf := bytes.NewBuffer(nil) + + packets := []Packet{ + { + Offset: 1, + IsRTCP: false, + Payload: []byte{9}, + }, + { + Offset: 999, + IsRTCP: true, + Payload: []byte{9}, + }, + } + hdr := Header{ + Start: time.Unix(9, 0).UTC(), + Source: net.IPv4(2, 2, 2, 2), + Port: 2222, + } + + writer, err := NewWriter(buf, hdr) + if err != nil { + t.Fatal(err) + } + + for _, pkt := range packets { + if err = writer.WritePacket(pkt); err != nil { + t.Fatal(err) + } + } + + reader, hdr2, err := NewReader(buf) + if err != nil { + t.Fatal(err) + } + + if got, want := hdr2, hdr; !reflect.DeepEqual(got, want) { + t.Fatalf("round trip: header=%v, want %v", got, want) + } + + var packets2 []Packet + for { + pkt, err := reader.Next() + if err == io.EOF { + break + } + if err != nil { + t.Fatal(err) + } + + packets2 = append(packets2, pkt) + } + + if got, want := packets2, packets; !reflect.DeepEqual(got, want) { + t.Fatalf("round trip: packets=%v, want %v", got, want) + } +}