-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathminimal.go
161 lines (130 loc) · 4.14 KB
/
minimal.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package logplexc
import (
"bytes"
"errors"
"fmt"
"net/http"
"net/url"
"strconv"
"sync"
"time"
)
// MiniStats represents statistics on MiniClient operation.
type MiniStats struct {
NumberFramed uint64
Buffered int
}
// MiniConfig represents configuration of a MiniClient.
type MiniConfig struct {
// The configuration is by-value to prevent most kinds of accidental
// sharing of modifications between clients. Also, modification of
// the URL is compulsary in the constructor, and it's not desirable to
// modify the version passed by the user as a side effect of
// constructing a client instance.
Logplex url.URL
HTTPClient http.Client
}
// Bundle represents bundle of log messages. Bundles are used to act as buffers
// of multiple log records as well as the unit of work for submittted to
// Logplex.
type Bundle struct {
MiniStats
outbox bytes.Buffer
}
// MiniClient implements a low-level, synchronous Logplex client. It
// can format messages and gather statistics. Most uses of logplexc
// are anticipated to use logplexc.Client.
type MiniClient struct {
// Configuration that should not be mutated after creation
MiniConfig
// Cached copy of the token, extracted from the Logplex URL.
token string
reqInFlight sync.WaitGroup
// Messages that have been collected but not yet sent.
bSwapLock sync.Mutex
b *Bundle
}
// NewMiniClient creates a new MiniClient based on a the given config.
func NewMiniClient(cfg *MiniConfig) (client *MiniClient, err error) {
c := MiniClient{}
c.b = &Bundle{outbox: bytes.Buffer{}}
// Make a private copy
c.MiniConfig = *cfg
if c.MiniConfig.Logplex.User == nil {
return nil, errors.New("No logplex user information provided")
}
token, ok := c.MiniConfig.Logplex.User.Password()
if !ok {
return nil, errors.New("No logplex password provided")
}
c.token = token
return &c, nil
}
// Unsynchronized statistics gathering function
//
// Useful as a subroutine for procedures that already have taken care
// of synchronization.
func unsyncStats(b *Bundle) MiniStats {
return b.MiniStats
}
// Statistics copies the statistics structure embedded in the client.
func (c *MiniClient) Statistics() MiniStats {
c.bSwapLock.Lock()
defer c.bSwapLock.Unlock()
return unsyncStats(c.b)
}
// BufferMessage buffers a message for best-effort delivery to Logplex.
//
// Return the critical statistics on what has been buffered so far so
// that the caller can opt to PostMessages() and empty the buffer.
//
// No effort is expended to clean up bad bytes disallowed by syslog,
// as Logplex has a length-prefixed format.
func (c *MiniClient) BufferMessage(
priority int, when time.Time, host string, procID string,
log []byte) MiniStats {
ts := when.UTC().Format(time.RFC3339)
syslogPrefix := "<" + strconv.Itoa(priority) + ">1 " + ts + " " +
host + " " + c.token + " " + procID + " - - "
msgLen := len(syslogPrefix) + len(log)
// Avoid racing against other operations that may want to swap
// out client's current bundle.
c.bSwapLock.Lock()
defer c.bSwapLock.Unlock()
fmt.Fprintf(&c.b.outbox, "%d %s%s", msgLen, syslogPrefix, log)
c.b.NumberFramed++
c.b.Buffered = c.b.outbox.Len()
return unsyncStats(c.b)
}
// SwapBundle swaps out the bundle of logs for a fresh one, so that buffering can
// continue again immediately. It's the caller's perogative to submit
// the returned, completed Bundle to logplex.
func (c *MiniClient) SwapBundle() Bundle {
c.bSwapLock.Lock()
defer c.bSwapLock.Unlock()
var newB Bundle
var oldB Bundle
oldB = *c.b
c.b = &newB
return oldB
}
// Post sends a Bundle of logs to Logplex via HTTP POST.
func (c *MiniClient) Post(b *Bundle) (*http.Response, error) {
// Record that a request is in progress so that a clean
// shutdown can wait for it to complete.
c.reqInFlight.Add(1)
defer c.reqInFlight.Done()
req, err := http.NewRequest("POST", c.Logplex.String(), &b.outbox)
if err != nil {
return nil, err
}
req.Header.Add("User-Agent", "logplexc")
req.Header.Add("Content-Type", "application/logplex-1")
req.Header.Add("Logplex-Msg-Count",
strconv.FormatUint(b.NumberFramed, 10))
resp, err := c.HTTPClient.Do(req)
if err != nil {
return nil, err
}
return resp, nil
}