Skip to content

Commit

Permalink
Refactor ClusterLogging and ClusterLogForwarder generation code
Browse files Browse the repository at this point in the history
  • Loading branch information
DebakelOrakel committed Jul 9, 2024
1 parent f47ae20 commit 5a7dee1
Show file tree
Hide file tree
Showing 12 changed files with 381 additions and 219 deletions.
32 changes: 1 addition & 31 deletions class/defaults.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,37 +56,7 @@ parameters:
for: 6h
severity: warning

clusterLogging:
managementState: Managed
logStore:
type: lokistack
lokistack:
name: loki
elasticsearch:
nodeCount: 3
storage:
size: 200Gi
redundancyPolicy: SingleRedundancy
nodeSelector:
node-role.kubernetes.io/infra: ''
retentionPolicy:
application:
maxAge: 7d
pruneNamespacesInterval: 15m
infra:
maxAge: 30d
pruneNamespacesInterval: 15m
audit:
maxAge: 30d
pruneNamespacesInterval: 15m
visualization:
type: kibana
kibana:
replicas: 2
nodeSelector:
node-role.kubernetes.io/infra: ''
collection:
type: vector
clusterLogging: {}

clusterLogForwarding:
enabled: false
Expand Down
186 changes: 186 additions & 0 deletions component/config_forwarding.libsonnet
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
local kap = import 'lib/kapitan.libjsonnet';
local lib = import 'lib/openshift4-logging.libsonnet';

local inv = kap.inventory();
local params = inv.parameters.openshift4_logging;

local deployLokistack = params.components.lokistack.enabled;
local deployElasticsearch = params.components.elasticsearch.enabled;
local forwardingOnly = !deployLokistack && !deployElasticsearch;

local pipelineOutputRefs(pipeline) =
local default = if forwardingOnly then [] else [ 'default' ];
std.get(pipeline, 'forwarders', []) + default;

// Apply default config for application logs.
local patchAppLogDefaults = {
local outputRefs = pipelineOutputRefs(params.clusterLogForwarding.application_logs),
local enablePipeline = std.length(outputRefs) > 0,

pipelines: {
[if enablePipeline then 'application-logs']: {
inputRefs: [ 'application' ],
outputRefs: outputRefs,
},
},
};

// Apply default config for infra logs.
local patchInfraLogDefaults = {
local outputRefs = pipelineOutputRefs(params.clusterLogForwarding.infrastructure_logs),
local enablePipeline = params.clusterLogForwarding.infrastructure_logs.enabled && std.length(outputRefs) > 0,

pipelines: {
[if enablePipeline then 'infrastructure-logs']: {
inputRefs: [ 'infrastructure' ],
outputRefs: outputRefs,
},
},
};

// Apply default config for audit logs.
local patchAuditLogDefaults = {
local outputRefs = pipelineOutputRefs(params.clusterLogForwarding.audit_logs),
local enablePipeline = params.clusterLogForwarding.audit_logs.enabled && std.length(outputRefs) > 0,

pipelines: {
[if enablePipeline then 'audit-logs']: {
inputRefs: [ 'audit' ],
outputRefs: outputRefs,
},
},
};

// Enable json parsing for default pipelines if configured.
local patchJsonLogging = {
local enableAppLogs = std.get(params.clusterLogForwarding.application_logs, 'json', false),
local enableInfraLogs = std.get(params.clusterLogForwarding.infrastructure_logs, 'json', false),

pipelines: {
[if enableAppLogs then 'application-logs']: { parse: 'json' },
[if enableInfraLogs then 'infrastructure-logs']: { parse: 'json' },
},
[if deployElasticsearch && params.clusterLogForwarding.json.enabled then 'outputDefaults']: {
elasticsearch: {
structuredTypeKey: params.clusterLogForwarding.json.typekey,
structuredTypeName: params.clusterLogForwarding.json.typename,
},
},
};

// Enable detectMultilineErrors for default pipelines if configured.
local patchMultilineErrors = {
local enableAppLogs = std.get(params.clusterLogForwarding.application_logs, 'detectMultilineErrors', false),
local enableInfraLogs = std.get(params.clusterLogForwarding.infrastructure_logs, 'detectMultilineErrors', false),

pipelines: {
[if enableAppLogs then 'application-logs']: { detectMultilineErrors: true },
[if enableInfraLogs then 'infrastructure-logs']: { detectMultilineErrors: true },
},
};

// --- patch deprecated `clusterLogForwarding.namespace` config
local namespaceGroups = (
if std.objectHas(params.clusterLogForwarding, 'namespaces') then
{
[ns]: {
namespaces: [ ns ],
forwarders: [ params.clusterLogForwarding.namespaces[ns].forwarder ],
}
for ns in std.objectFields(params.clusterLogForwarding.namespaces)
} else {}
) + params.clusterLogForwarding.namespace_groups;
// --- patch end

// Add inputs entry for every namespace_group defined in `clusterLogForwarding.namespace_groups`.
local patchCustomInputs = {
[if std.length(namespaceGroups) > 0 then 'inputs']: {
[group]: {
application: {
namespaces: namespaceGroups[group].namespaces,
},
}
for group in std.objectFields(namespaceGroups)
},
};

// Add pipelines entry for every namespace_group defined in `clusterLogForwarding.namespace_groups`.
local patchCustomPipelines = {
[if std.length(namespaceGroups) > 0 then 'pipelines']: {
local enableJson = std.get(namespaceGroups[group], 'json', false),
local enableMultilineError = std.get(namespaceGroups[group], 'detectMultilineErrors', false),

[group]: {
inputRefs: [ group ],
outputRefs: std.get(namespaceGroups[group], 'forwarders', []),
[if enableJson then 'parse']: 'json',
[if enableMultilineError then 'detectMultilineErrors']: true,
}
for group in std.objectFields(namespaceGroups)
},
};

// Add outputs entry for every forwarder defined in `clusterLogForwarding.forwarders`.
local patchCustomOutputs = {
[if std.length(params.clusterLogForwarding.forwarders) > 0 then 'outputs']: {
[name]: params.clusterLogForwarding.forwarders[name]
for name in std.objectFields(params.clusterLogForwarding.forwarders)
},
};

// ClusterLogForwarderSpecs:
// Consecutively apply patches to result of previous apply.
local clusterLogForwarderSpec = std.foldl(
function(manifest, patch) std.mergePatch(manifest, patch),
[
patchAppLogDefaults,
patchInfraLogDefaults,
patchAuditLogDefaults,
patchJsonLogging,
patchMultilineErrors,
patchCustomInputs,
patchCustomOutputs,
patchCustomPipelines,
],
{
inputs: {},
outputs: {},
pipelines: {},
}
);

// ClusterLogForwarder:
// Create definitive ClusterLogForwarder resource from specs.
local clusterLogForwarder = lib.ClusterLogForwarder(params.namespace, 'instance') {
spec: {
// Unfold objects into array.
[if std.length(clusterLogForwarderSpec.inputs) > 0 then 'inputs']: [
{ name: name } + clusterLogForwarderSpec.inputs[name]
for name in std.objectFields(clusterLogForwarderSpec.inputs)
],
[if std.length(clusterLogForwarderSpec.outputs) > 0 then 'outputs']: [
{ name: name } + clusterLogForwarderSpec.outputs[name]
for name in std.objectFields(clusterLogForwarderSpec.outputs)
],
[if std.length(clusterLogForwarderSpec.pipelines) > 0 then 'pipelines']: [
{ name: name } + clusterLogForwarderSpec.pipelines[name]
for name in std.objectFields(clusterLogForwarderSpec.pipelines)
],
} + {
// Import remaining specs as is.
[key]: clusterLogForwarderSpec[key]
for key in std.objectFields(clusterLogForwarderSpec)
if !std.member([ 'inputs', 'outputs', 'pipelines' ], key)
},
};

// Define outputs below
if params.clusterLogForwarding.enabled then
{
'31_cluster_logforwarding': clusterLogForwarder,
}
else
std.trace(
'Log forwarding disabled, not deploying ClusterLogForwarder',
{}
)
121 changes: 121 additions & 0 deletions component/config_logging.libsonnet
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
local kap = import 'lib/kapitan.libjsonnet';
local lib = import 'lib/openshift4-logging.libsonnet';

local inv = kap.inventory();
local params = inv.parameters.openshift4_logging;

local deployLokistack = params.components.lokistack.enabled;
local deployElasticsearch = params.components.elasticsearch.enabled;

// Apply defaults for Lokistack.
local patchLokistackDefaults = {
[if deployLokistack then 'spec']: {
logStore: {
type: 'lokistack',
lokistack: {
name: 'loki',
},
},
},
};

// Apply defaults for Elasticsearch.
local patchElasticsearchDefaults = {
[if deployElasticsearch then 'spec']: {
logStore: {
elasticsearch: {
nodeCount: 3,
storage: {
size: '200Gi',
},
redundancyPolicy: 'SingleRedundancy',
nodeSelector: {
'node-role.kubernetes.io/infra': '',
},
},
retentionPolicy: {
application: {
maxAge: '7d',
pruneNamespacesInterval: '15m',
},
infra: {
maxAge: '30d',
pruneNamespacesInterval: '15m',
},
audit: {
maxAge: '30d',
pruneNamespacesInterval: '15m',
},
},
},
visualization: {
type: 'kibana',
kibana: {
replicas: 2,
nodeSelector: {
'node-role.kubernetes.io/infra': '',
},
},
},
},
};

// Apply customisations from params.clusterLogging.
local patchLoggingConfig = {
spec: params.clusterLogging {
collection: {
// Don't include legacy config key 'collection.logs'.
[it]: params.clusterLogging.collection[it]
for it in std.objectFields(std.get(params.clusterLogging, 'collection', {}))
if it != 'logs'
},
},
};

// --- patch deprecated logging resource
local patchLegacyConfig = {
local legacyConfig = std.get(std.get(params.clusterLogging, 'collection', { collection: {} }), 'logs', {}),
local legacyType = std.get(legacyConfig, 'type', ''),
local legacyFluentd = std.get(legacyConfig, 'fluentd', {}),

spec: {
collection: if std.length(legacyConfig) > 0 then std.trace(
'Parameter `clusterLogging.collector.logs` is deprecated. Please update your config to use `clusterLogging.collector`',
{
[if legacyType != '' then 'type']: legacyType,
} + legacyFluentd,
) else {},
},
};
// --- patch end


// ClusterLogging specs:
// Consecutively apply patches to result of previous apply.
local clusterLogging = std.foldl(
function(manifest, patch) std.mergePatch(manifest, patch),
[
patchLokistackDefaults,
patchElasticsearchDefaults,
patchLoggingConfig,
patchLegacyConfig,
],
lib.ClusterLogging(params.namespace, 'instance') {
metadata+: {
annotations+: {
'argocd.argoproj.io/sync-options': 'SkipDryRunOnMissingResource=true',
},
},
spec: {
managementState: 'Managed',
collection: {
type: 'vector',
},
},
}
);

// Define outputs below
{
'30_cluster_logging': clusterLogging,
}
Loading

0 comments on commit 5a7dee1

Please sign in to comment.