From d6da587e569babffe7246a351960353d77b4dff6 Mon Sep 17 00:00:00 2001 From: Max Hawkins Date: Sat, 16 Mar 2019 17:23:24 -0700 Subject: [PATCH] Implement RTPDump Reader/Writer RTPDump is a widely-implemented file format for saving RTP packet dumps without the overhead of UDP and IP headers found in pcap dumps. libWebRTC, Wireshark, and RTPTools all have an implementation. For more information see: https://www.cs.columbia.edu/irt/software/rtptools Relates to #549 --- pkg/media/rtpdump/reader.go | 103 +++++++++++ pkg/media/rtpdump/reader_test.go | 282 ++++++++++++++++++++++++++++++ pkg/media/rtpdump/rtpdump.go | 159 +++++++++++++++++ pkg/media/rtpdump/rtpdump_test.go | 118 +++++++++++++ pkg/media/rtpdump/writer.go | 51 ++++++ pkg/media/rtpdump/writer_test.go | 107 ++++++++++++ 6 files changed, 820 insertions(+) create mode 100644 pkg/media/rtpdump/reader.go create mode 100644 pkg/media/rtpdump/reader_test.go create mode 100644 pkg/media/rtpdump/rtpdump.go create mode 100644 pkg/media/rtpdump/rtpdump_test.go create mode 100644 pkg/media/rtpdump/writer.go create mode 100644 pkg/media/rtpdump/writer_test.go diff --git a/pkg/media/rtpdump/reader.go b/pkg/media/rtpdump/reader.go new file mode 100644 index 00000000000..e763f8b0cb8 --- /dev/null +++ b/pkg/media/rtpdump/reader.go @@ -0,0 +1,103 @@ +package rtpdump + +import ( + "bufio" + "io" + "regexp" + "sync" +) + +// The file starts with #!rtpplay1.0 address/port\n +var preambleRegexp = regexp.MustCompile(`#\!rtpplay1\.0 \d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\/\d{1,5}\n`) + +// Reader reads the RTPDump file format +type Reader struct { + readerMu sync.Mutex + reader io.Reader +} + +// NewReader opens a new Reader and immediately reads the Header from the start +// of the input stream. +func NewReader(r io.Reader) (*Reader, Header, error) { + var hdr Header + + bio := bufio.NewReader(r) + + // Look ahead to see if there's a valid preamble + peek, err := bio.Peek(preambleLen) + if err == io.EOF { + return nil, hdr, errMalformed + } + if err != nil { + return nil, hdr, err + } + if !preambleRegexp.Match(peek) { + return nil, hdr, errMalformed + } + + // consume the preamble + _, _, err = bio.ReadLine() + if err == io.EOF { + return nil, hdr, errMalformed + } + if err != nil { + return nil, hdr, err + } + + hBuf := make([]byte, headerLen) + _, err = io.ReadFull(bio, hBuf) + if err == io.ErrUnexpectedEOF || err == io.EOF { + return nil, hdr, errMalformed + } + if err != nil { + return nil, hdr, err + } + + if err := hdr.Unmarshal(hBuf); err != nil { + return nil, hdr, err + } + + return &Reader{ + reader: bio, + }, hdr, nil +} + +// Next returns the next Packet in the Reader input stream +func (r *Reader) Next() (Packet, error) { + r.readerMu.Lock() + defer r.readerMu.Unlock() + + hBuf := make([]byte, pktHeaderLen) + + _, err := io.ReadFull(r.reader, hBuf) + if err == io.ErrUnexpectedEOF { + return Packet{}, errMalformed + } + if err != nil { + return Packet{}, err + } + + var h packetHeader + if err = h.Unmarshal(hBuf); err != nil { + return Packet{}, err + } + + if h.Length == 0 { + return Packet{}, errMalformed + } + + payload := make([]byte, h.Length-pktHeaderLen) + _, err = io.ReadFull(r.reader, payload) + if err == io.ErrUnexpectedEOF { + return Packet{}, errMalformed + } + if err != nil { + return Packet{}, err + } + + return Packet{ + Offset: h.Offset, + IsRTCP: h.PacketLength == 0, + Payload: payload, + }, nil +} diff --git a/pkg/media/rtpdump/reader_test.go b/pkg/media/rtpdump/reader_test.go new file mode 100644 index 00000000000..19dd9e3ad76 --- /dev/null +++ b/pkg/media/rtpdump/reader_test.go @@ -0,0 +1,282 @@ +package rtpdump + +import ( + "bytes" + "io" + "net" + "reflect" + "testing" + "time" +) + +var validPreamble = []byte("#!rtpplay1.0 224.2.0.1/3456\n") + +func TestReader(t *testing.T) { + for _, test := range []struct { + Name string + Data []byte + WantHeader Header + WantPackets []Packet + WantErr error + }{ + { + Name: "empty", + Data: nil, + WantErr: errMalformed, + }, + { + Name: "hashbang missing ip/port", + Data: append( + []byte("#!rtpplay1.0 \n"), + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + ), + WantErr: errMalformed, + }, + { + Name: "hashbang missing port", + Data: append( + []byte("#!rtpplay1.0 0.0.0.0\n"), + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + ), + WantErr: errMalformed, + }, + { + Name: "valid empty file", + Data: append( + validPreamble, + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x00, 0x00, 0x00, + 0x01, 0x01, 0x01, 0x01, + 0x22, 0xB8, 0x00, 0x00, + ), + WantHeader: Header{ + Start: time.Unix(1, 0).UTC(), + Source: net.IPv4(1, 1, 1, 1), + Port: 8888, + }, + }, + { + Name: "malformed packet header", + Data: append( + validPreamble, + // header + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + // packet header + 0x00, + ), + WantHeader: Header{ + Start: time.Unix(0, 0).UTC(), + Source: net.IPv4(0, 0, 0, 0), + Port: 0, + }, + WantErr: errMalformed, + }, + { + Name: "short packet payload", + Data: append( + validPreamble, + // header + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + // packet header len=1048575 + 0xFF, 0xFF, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + // packet paylaod + 0x00, + ), + WantHeader: Header{ + Start: time.Unix(0, 0).UTC(), + Source: net.IPv4(0, 0, 0, 0), + Port: 0, + }, + WantErr: errMalformed, + }, + { + Name: "empty packet payload", + Data: append( + validPreamble, + // header + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + // packet header len=0 + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + ), + WantHeader: Header{ + Start: time.Unix(0, 0).UTC(), + Source: net.IPv4(0, 0, 0, 0), + Port: 0, + }, + WantErr: errMalformed, + }, + { + Name: "valid rtcp packet", + Data: append( + validPreamble, + // header + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + // packet header len=20, pLen=0, off=1 + 0x00, 0x14, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x01, + // packet payload (BYE) + 0x81, 0xcb, 0x00, 0x0c, + 0x90, 0x2f, 0x9e, 0x2e, + 0x03, 0x46, 0x4f, 0x4f, + ), + WantHeader: Header{ + Start: time.Unix(0, 0).UTC(), + Source: net.IPv4(0, 0, 0, 0), + Port: 0, + }, + WantPackets: []Packet{ + { + Offset: 1, + IsRTCP: true, + Payload: []byte{ + 0x81, 0xcb, 0x00, 0x0c, + 0x90, 0x2f, 0x9e, 0x2e, + 0x03, 0x46, 0x4f, 0x4f, + }, + }, + }, + WantErr: nil, + }, + { + Name: "truncated rtcp packet", + Data: append( + validPreamble, + // header + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + // packet header len=9, pLen=0, off=1 + 0x00, 0x09, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x01, + // invalid payload + 0x81, + ), + WantHeader: Header{ + Start: time.Unix(0, 0).UTC(), + Source: net.IPv4(0, 0, 0, 0), + Port: 0, + }, + WantPackets: []Packet{ + { + Offset: 1, + IsRTCP: true, + Payload: []byte{0x81}, + }, + }, + }, + { + Name: "two valid packets", + Data: append( + validPreamble, + // header + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + // packet header len=20, pLen=0, off=1 + 0x00, 0x14, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x01, + // packet payload (BYE) + 0x81, 0xcb, 0x00, 0x0c, + 0x90, 0x2f, 0x9e, 0x2e, + 0x03, 0x46, 0x4f, 0x4f, + // packet header len=33, pLen=25, off=2 + 0x00, 0x21, 0x19, 0x00, + 0x00, 0x00, 0x00, 0x02, + // packet payload (RTP) + 0x90, 0x60, 0x69, 0x8f, + 0xd9, 0xc2, 0x93, 0xda, + 0x1c, 0x64, 0x27, 0x82, + 0x00, 0x01, 0x00, 0x01, + 0xFF, 0xFF, 0xFF, 0xFF, + 0x98, 0x36, 0xbe, 0x88, + 0x9e, + ), + WantHeader: Header{ + Start: time.Unix(0, 0).UTC(), + Source: net.IPv4(0, 0, 0, 0), + Port: 0, + }, + WantPackets: []Packet{ + { + Offset: 1, + IsRTCP: true, + Payload: []byte{ + 0x81, 0xcb, 0x00, 0x0c, + 0x90, 0x2f, 0x9e, 0x2e, + 0x03, 0x46, 0x4f, 0x4f, + }, + }, + { + Offset: 2, + IsRTCP: false, + Payload: []byte{ + 0x90, 0x60, 0x69, 0x8f, + 0xd9, 0xc2, 0x93, 0xda, + 0x1c, 0x64, 0x27, 0x82, + 0x00, 0x01, 0x00, 0x01, + 0xFF, 0xFF, 0xFF, 0xFF, + 0x98, 0x36, 0xbe, 0x88, + 0x9e, + }, + }, + }, + WantErr: nil, + }, + } { + r, hdr, err := NewReader(bytes.NewReader(test.Data)) + if err != nil { + if got, want := err, test.WantErr; got != want { + t.Fatalf("NewReader(%s) err=%v want %v", test.Name, got, want) + } + continue + } + + if got, want := hdr, test.WantHeader; !reflect.DeepEqual(got, want) { + t.Fatalf("%q Header = %#v, want %#v", test.Name, got, want) + } + + var nextErr error + var packets []Packet + for { + pkt, err := r.Next() + if err == io.EOF { + break + } + if err != nil { + nextErr = err + break + } + + packets = append(packets, pkt) + } + + if got, want := nextErr, test.WantErr; got != want { + t.Fatalf("%s err=%v want %v", test.Name, got, want) + } + if got, want := packets, test.WantPackets; !reflect.DeepEqual(got, want) { + t.Fatalf("%q packets=%#v, want %#v", test.Name, got, want) + } + } +} diff --git a/pkg/media/rtpdump/rtpdump.go b/pkg/media/rtpdump/rtpdump.go new file mode 100644 index 00000000000..2f9a257efe6 --- /dev/null +++ b/pkg/media/rtpdump/rtpdump.go @@ -0,0 +1,159 @@ +// Package rtpdump implements the RTPDump file format documented at +// https://www.cs.columbia.edu/irt/software/rtptools/ +package rtpdump + +import ( + "encoding/binary" + "errors" + "net" + "time" +) + +const ( + pktHeaderLen = 8 + headerLen = 16 + preambleLen = 36 +) + +var errMalformed = errors.New("malformed rtpdump") + +// Header is the binary header at the top of the RTPDump file. It contains +// information about the source and start time of the packet stream included +// in the file. +type Header struct { + // start of recording (GMT) + Start time.Time + // network source (multicast address) + Source net.IP + // UDP port + Port uint16 +} + +// Marshal encodes the Header as binary. +func (h Header) Marshal() ([]byte, error) { + d := make([]byte, headerLen) + + startNano := h.Start.UnixNano() + startSec := uint32(startNano / int64(time.Second)) + startUsec := uint32( + (startNano % int64(time.Second)) / int64(time.Microsecond), + ) + binary.BigEndian.PutUint32(d[0:], startSec) + binary.BigEndian.PutUint32(d[4:], startUsec) + + source := h.Source.To4() + copy(d[8:], source) + + binary.BigEndian.PutUint16(d[12:], h.Port) + + return d, nil +} + +// Unmarshal decodes the Header from binary. +func (h *Header) Unmarshal(d []byte) error { + if len(d) < headerLen { + return errMalformed + } + + // time as a `struct timeval` + startSec := binary.BigEndian.Uint32(d[0:]) + startUsec := binary.BigEndian.Uint32(d[4:]) + h.Start = time.Unix(int64(startSec), int64(startUsec)*1e3).UTC() + + // ipv4 address + h.Source = net.IPv4(d[8], d[9], d[10], d[11]) + + h.Port = binary.BigEndian.Uint16(d[12:]) + + // 2 bytes of padding (ignored) + + return nil +} + +// Packet contains an RTP or RTCP packet along a time offset when it was logged +// (relative to the Start of the recording in Header). The Payload may contain +// truncated packets to support logging just the headers of RTP/RTCP packets. +type Packet struct { + // Offset is the time since the start of recording in millseconds + Offset uint32 + // IsRTCP is true if the payload is RTCP, false if the payload is RTP + IsRTCP bool + // Payload is the binary RTP or or RTCP payload. The contents may not parse + // as a valid packet if the contents have been truncated. + Payload []byte +} + +// Marshal encodes the Packet as binary. +func (p Packet) Marshal() ([]byte, error) { + packetLength := len(p.Payload) + 8 + if p.IsRTCP { + packetLength = 0 + } + + hdr := packetHeader{ + Length: uint16(len(p.Payload)) + 8, + PacketLength: uint16(packetLength), + Offset: p.Offset, + } + hdrData, err := hdr.Marshal() + if err != nil { + return nil, err + } + + data := append(hdrData, p.Payload...) + + return data, nil +} + +// Unmarshal decodes the Packet from binary. +func (p *Packet) Unmarshal(d []byte) error { + var hdr packetHeader + if err := hdr.Unmarshal(d); err != nil { + return err + } + + p.Offset = hdr.Offset + p.IsRTCP = hdr.Length != 0 && hdr.PacketLength == 0 + + if hdr.Length < 8 { + return errMalformed + } + if len(d) < int(hdr.Length) { + return errMalformed + } + p.Payload = d[8:hdr.Length] + + return nil +} + +type packetHeader struct { + // length of packet, including this header (may be smaller than + // plen if not whole packet recorded) + Length uint16 + // Actual header+payload length for RTP, 0 for RTCP + PacketLength uint16 + // milliseconds since the start of recording + Offset uint32 +} + +func (p packetHeader) Marshal() ([]byte, error) { + d := make([]byte, pktHeaderLen) + + binary.BigEndian.PutUint16(d[0:], p.Length) + binary.BigEndian.PutUint16(d[2:], p.PacketLength) + binary.BigEndian.PutUint32(d[4:], p.Offset) + + return d, nil +} + +func (p *packetHeader) Unmarshal(d []byte) error { + if len(d) < pktHeaderLen { + return errMalformed + } + + p.Length = binary.BigEndian.Uint16(d[0:]) + p.PacketLength = binary.BigEndian.Uint16(d[2:]) + p.Offset = binary.BigEndian.Uint32(d[4:]) + + return nil +} diff --git a/pkg/media/rtpdump/rtpdump_test.go b/pkg/media/rtpdump/rtpdump_test.go new file mode 100644 index 00000000000..b89fe066c3a --- /dev/null +++ b/pkg/media/rtpdump/rtpdump_test.go @@ -0,0 +1,118 @@ +package rtpdump + +import ( + "net" + "reflect" + "testing" + "time" +) + +func TestHeaderRoundTrip(t *testing.T) { + for _, test := range []struct { + Header Header + }{ + { + Header: Header{ + Start: time.Unix(0, 0).UTC(), + Source: net.IPv4(0, 0, 0, 0), + Port: 0, + }, + }, + { + Header: Header{ + Start: time.Date(2019, 3, 25, 1, 1, 1, 0, time.UTC), + Source: net.IPv4(1, 2, 3, 4), + Port: 8080, + }, + }, + } { + d, err := test.Header.Marshal() + if err != nil { + t.Fatal(err) + } + + var hdr Header + if err := hdr.Unmarshal(d); err != nil { + t.Fatal(err) + } + + if got, want := hdr, test.Header; !reflect.DeepEqual(got, want) { + t.Fatalf("Unmarshal(%v.Marshal()) = %v, want identical", got, want) + } + } +} + +func TestMarshalHeader(t *testing.T) { + for _, test := range []struct { + Name string + Header Header + Want []byte + WantErr error + }{ + { + Name: "nil source", + Header: Header{ + Start: time.Unix(0, 0).UTC(), + Source: nil, + Port: 0, + }, + Want: []byte{ + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + }, + }, + } { + data, err := test.Header.Marshal() + if got, want := err, test.WantErr; got != want { + t.Fatalf("Marshal(%q) err=%v, want %v", test.Name, got, want) + } + + if got, want := data, test.Want; !reflect.DeepEqual(got, want) { + t.Fatalf("Marshal(%q) = %v, want %v", test.Name, got, want) + } + } +} + +func TestPacketRoundTrip(t *testing.T) { + for _, test := range []struct { + Packet Packet + }{ + { + Packet: Packet{ + Offset: 0, + IsRTCP: false, + Payload: []byte{0}, + }, + }, + { + Packet: Packet{ + Offset: 0, + IsRTCP: true, + Payload: []byte{0}, + }, + }, + { + Packet: Packet{ + Offset: 123, + IsRTCP: false, + Payload: []byte{1, 2, 3, 4}, + }, + }, + } { + d, err := test.Packet.Marshal() + if err != nil { + t.Fatal(err) + } + + var pkt Packet + if err := pkt.Unmarshal(d); err != nil { + t.Fatal(err) + } + + if got, want := pkt, test.Packet; !reflect.DeepEqual(got, want) { + t.Fatalf("Unmarshal(%v.Marshal()) = %v, want identical", got, want) + } + } +} diff --git a/pkg/media/rtpdump/writer.go b/pkg/media/rtpdump/writer.go new file mode 100644 index 00000000000..1c4239d6c83 --- /dev/null +++ b/pkg/media/rtpdump/writer.go @@ -0,0 +1,51 @@ +package rtpdump + +import ( + "fmt" + "io" + "sync" +) + +// Writer writes the RTPDump file format +type Writer struct { + writerMu sync.Mutex + writer io.Writer +} + +// NewWriter makes a new Writer and immediately writes the given Header +// to begin the file. +func NewWriter(w io.Writer, hdr Header) (*Writer, error) { + preamble := fmt.Sprintf( + "#!rtpplay1.0 %s/%d\n", + hdr.Source.To4().String(), + hdr.Port) + if _, err := w.Write([]byte(preamble)); err != nil { + return nil, err + } + + hData, err := hdr.Marshal() + if err != nil { + return nil, err + } + if _, err := w.Write(hData); err != nil { + return nil, err + } + + return &Writer{writer: w}, nil +} + +// WritePacket writes a Packet to the output +func (w *Writer) WritePacket(p Packet) error { + w.writerMu.Lock() + defer w.writerMu.Unlock() + + data, err := p.Marshal() + if err != nil { + return err + } + if _, err := w.writer.Write(data); err != nil { + return err + } + + return nil +} diff --git a/pkg/media/rtpdump/writer_test.go b/pkg/media/rtpdump/writer_test.go new file mode 100644 index 00000000000..3d7cfa5b210 --- /dev/null +++ b/pkg/media/rtpdump/writer_test.go @@ -0,0 +1,107 @@ +package rtpdump + +import ( + "bytes" + "io" + "net" + "reflect" + "testing" + "time" +) + +func TestWriter(t *testing.T) { + buf := bytes.NewBuffer(nil) + + writer, err := NewWriter(buf, Header{ + Start: time.Unix(9, 0), + Source: net.IPv4(2, 2, 2, 2), + Port: 2222, + }) + if err != nil { + t.Fatal(err) + } + + if err := writer.WritePacket(Packet{ + Offset: 1, + IsRTCP: false, + Payload: []byte{9}, + }); err != nil { + t.Fatal(err) + } + + expected := append( + []byte("#!rtpplay1.0 2.2.2.2/2222\n"), + // header + 0x00, 0x00, 0x00, 0x09, + 0x00, 0x00, 0x00, 0x00, + 0x02, 0x02, 0x02, 0x02, + 0x08, 0xae, 0x00, 0x00, + // packet header + 0x00, 0x09, 0x00, 0x09, + 0x00, 0x00, 0x00, 0x01, + 0x09, + ) + + if got, want := buf.Bytes(), expected; !reflect.DeepEqual(got, want) { + t.Fatalf("wrote %v, want %v", string(got), want) + } +} + +func TestRoundTrip(t *testing.T) { + buf := bytes.NewBuffer(nil) + + packets := []Packet{ + { + Offset: 1, + IsRTCP: false, + Payload: []byte{9}, + }, + { + Offset: 999, + IsRTCP: true, + Payload: []byte{9}, + }, + } + hdr := Header{ + Start: time.Unix(9, 0).UTC(), + Source: net.IPv4(2, 2, 2, 2), + Port: 2222, + } + + writer, err := NewWriter(buf, hdr) + if err != nil { + t.Fatal(err) + } + + for _, pkt := range packets { + if err = writer.WritePacket(pkt); err != nil { + t.Fatal(err) + } + } + + reader, hdr2, err := NewReader(buf) + if err != nil { + t.Fatal(err) + } + + if got, want := hdr2, hdr; !reflect.DeepEqual(got, want) { + t.Fatalf("round trip: header=%v, want %v", got, want) + } + + var packets2 []Packet + for { + pkt, err := reader.Next() + if err == io.EOF { + break + } + if err != nil { + t.Fatal(err) + } + + packets2 = append(packets2, pkt) + } + + if got, want := packets2, packets; !reflect.DeepEqual(got, want) { + t.Fatalf("round trip: packets=%v, want %v", got, want) + } +}