-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathAUDFAccess.cs
215 lines (185 loc) · 8.51 KB
/
AUDFAccess.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
using System;
using System.Collections.Generic;
using Aerospike.Client;
using System.Data;
using System.Collections;
using System.Diagnostics;
using System.Linq;
using LPU = LINQPad.Util;
namespace Aerospike.Database.LINQPadDriver.Extensions
{
/// <summary>
///
/// </summary>
public class AModuleAccess
{
public AModuleAccess(IDbConnection dbConnection)
{
}
}
public class AUDFAccess
{
public AUDFAccess(IDbConnection dbConnection, string moduleName, string udfName, string udfSourceCode)
{
this.AerospikeConnection = dbConnection as AerospikeConnection;
this.DefaultQueryPolicy = new QueryPolicy(this.AerospikeConnection.AerospikeClient.QueryPolicyDefault);
this.DefaultWritePolicy = new WritePolicy(this.AerospikeConnection.AerospikeClient.WritePolicyDefault);
this.Name = udfName;
if (moduleName?.EndsWith(".lua") == true)
this.Module = moduleName?.Replace(".lua", string.Empty);
else
this.Module = moduleName;
this.SourceCode = udfSourceCode;
this.Type = Language.LUA;
this.FullName = $"{this.Module}.{this.Name}";
}
public AUDFAccess(AUDFAccess cloneUDF)
{
this.AerospikeConnection = cloneUDF.AerospikeConnection;
this.DefaultQueryPolicy = new QueryPolicy(cloneUDF.DefaultQueryPolicy);
this.DefaultWritePolicy = new WritePolicy(cloneUDF.DefaultWritePolicy);
this.Name = cloneUDF.Name;
this.Module = cloneUDF.Module;
this.SourceCode = cloneUDF.SourceCode;
this.Type = cloneUDF.Type;
}
public AUDFAccess Clone() => new AUDFAccess(this);
public AerospikeConnection AerospikeConnection { get; }
public QueryPolicy DefaultQueryPolicy { get; set; }
public WritePolicy DefaultWritePolicy { get; set; }
public string Module { get; }
public string Name { get; }
public string FullName { get; }
public string SourceCode { get; }
public Client.Language Type { get; }
public IEnumerable<object> QueryAggregate(Statement statement, Client.Exp filterExpression, params object[] functionArgs)
{
/*
rs = client.QueryAggregate(null, stmt, "aggregationByRegion", "sum");
*/
var funcValues = new Value[functionArgs.Length];
for (int idx = 0; idx < functionArgs.Length; ++idx)
{
if (functionArgs[idx] is Value vArg)
funcValues[idx] = vArg;
else
funcValues[idx] = Value.Get(functionArgs[idx]);
}
var queryPolicy = filterExpression == null
? this.DefaultQueryPolicy
: new QueryPolicy(this.DefaultQueryPolicy) { filterExp = Exp.Build(filterExpression) };
using var resultSet = this.AerospikeConnection.AerospikeClient.QueryAggregate(queryPolicy,
statement,
this.Module,
this.Name, funcValues);
while (resultSet.Next())
{
yield return resultSet.Object;
}
}
/// <summary>
/// Executes the UDF based on <paramref name="statement"/> and <paramref name="functionArgs"/>
/// </summary>
/// <param name="statement">Aerospike Statement instance</param>
/// <param name="functionArgs">
/// The values that are passed to the UDF.
/// These values can be a <see cref="Client.Value"/> or a C# native type.
/// </param>
/// <returns>An Aerospike Result Set from the UDF</returns>
/// <example>
/// Calls a UDF that produces a aggregate value in namespace "test" set "users" on "binstweetcount" and "region"
/// It also uses a index with a filter.
/// <code>
/// var bins = { "tweetcount", "region" };
/// var stmt = new Statement();
///
/// stmt.SetNamespace("test");
/// stmt.SetSetName("users");
/// stmt.SetIndexName("tweetcount_index");
/// stmt.SetBinNames(bins);
/// stmt.SetFilters(Filter.Range("tweetcount", min, max));
///
/// var result = this.QueryAggregate(stmt);
/// </code>
/// </example>
public IEnumerable<object> QueryAggregate(Statement statement, params object[] functionArgs) => this.QueryAggregate(statement, null, functionArgs);
public IEnumerable<object> QueryAggregate(SetRecords set, params object[] functionArgs) => this.QueryAggregate(set, null, functionArgs);
public IEnumerable<object> QueryAggregate(SetRecords set, Client.Exp filterExpression, params object[] functionArgs)
{
var statement = new Statement();
statement.SetNamespace(set.Namespace);
statement.SetSetName(set.SetName);
return this.QueryAggregate(statement, filterExpression, functionArgs);
}
/// <summary>
/// Executes the UDF based on <paramref name="key"/> and <paramref name="functionArgs"/>
/// </summary>
/// <param name="key">The primary key for the target set</param>
/// <param name="functionArgs">
/// The values that are passed to the UDF.
/// These values can be a <see cref="Client.Value"/> or a C# native type.
/// </param>
/// <returns>
/// The value returned from the UDF or null.
/// </returns>
/// <example>
/// Executing a UDF that will create a new record in namespace and set "MyNS.MySet"
/// <code>
/// Key key = new Key("MyNS", "MySet", "udfkey1");
/// BinName bin = new BinName("udfbin1", "string value");
///
/// this.Execute(Value.Get(bin.name), bin.value);
///
/// //Check to see if record was actually written
/// Record record = client.Get(null, key, bin.name);
/// AssertBinEqual(key, record, bin);
/// </code>
/// </example>
public object Execute(Aerospike.Client.Key key, params object[] functionArgs)
{
var funcValues = new Value[functionArgs.Length];
for (int idx = 0; idx < functionArgs.Length; ++idx)
{
if (functionArgs[idx] is Value vArg)
funcValues[idx] = vArg;
else
funcValues[idx] = Value.Get(functionArgs[idx]);
}
return this.AerospikeConnection.AerospikeClient.Execute(this.DefaultWritePolicy, key, this.Module, this.Name, funcValues);
}
/// <summary>
/// Executes the UDF based on <paramref name="primaryKey"/> and <paramref name="functionArgs"/>
/// </summary>
/// <param name="set">The Aerospike Set</param>
/// <param name="primaryKey">
/// The primary value. This can be a <see cref="Client.Key"/>, a <see cref="Client.Value"/>, or any object value.
/// </param>
/// <param name="functionArgs">
/// Arguments passed to the UDF
/// </param>
/// <returns>
/// The UDF result or null.
/// </returns>
public object Execute(SetRecords set, object primaryKey, params object[] functionArgs)
{
Client.Key key;
if (primaryKey is Client.Key valueKey)
{
key = new Client.Key(set.Namespace, set.SetName, valueKey.userKey);
}
else if (primaryKey is Value value)
key = new Client.Key(set.Namespace, set.SetName, value);
else
key = new Client.Key(set.Namespace, set.SetName, Value.Get(primaryKey));
return this.Execute(key, functionArgs);
}
public object Execute(ARecord asRecord, params object[] functionArgs)
{
return this.Execute(asRecord.Aerospike.Key, functionArgs);
}
virtual public object ToDump()
{
return LPU.ToExpando(this, include: "Module,Name,SourceCode,Type");
}
}
}