-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMongoProjector.php
120 lines (109 loc) · 3.6 KB
/
MongoProjector.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
<?php
use Google\Protobuf\Internal\Message;
use Google\Protobuf\Struct;
use Projection\DeleteOne;
use Projection\InsertOne;
use Projection\UpdateOne;
abstract class MongoProjector
{
public function __construct()
{
$this->projectors = new \Aimeos\Map();
}
/**
* This function should only be called by the projector service
* @param string $eventType Event type string
* @param Struct $evt Event payload as it comes in Protobuf
* @return Message The response operation message
*/
public function Project(string $eventType, Struct $evt): Message
{
$handler = $this->projectors[$eventType];
if ($handler == null) {
return new \Projection\Ignore();
}
$payload = json_decode($evt->serializeToJsonString());
return $handler($payload);
}
/**
* @param string $eventType Event type string
* @param callable $handler Projection handler returning a complete document to insert
* @return void
*/
protected function InsertOneOn(string $eventType, callable $handler): void
{
$this->projectors->set($eventType, function ($evt) use ($handler) {
return $this->InsertOne($evt, $handler);
});
}
/**
* @param string $eventType Event type string
* @param callable $handler Projection handler returning a complete document
* @return void
*/
protected function UpdateOneOn(string $eventType, callable $handler): void
{
$this->projectors->set($eventType, function ($evt) use ($handler) {
return $this->UpdateOne($evt, $handler);
});
}
/**
* @param string $eventType Event type string
* @param callable $handler Projection handler returning a filter for deletion
* @return void
*/
protected function DeleteOneOn(string $eventType, callable $handler): void
{
$this->projectors->set($eventType, function ($evt) use ($handler) {
return $this->DeleteOne($evt, $handler);
});
}
/**
* @param object $evt Deserialized event
* @param callable $projection Projection handler returning a complete document
* @return InsertOne
* @throws Exception
*/
private function InsertOne(object $evt, callable $projection): InsertOne
{
$doc = $projection($evt);
$op = new Struct();
$op->mergeFromJsonString(json_encode($doc));
$result = new InsertOne();
$result->setDocument($op);
return $result;
}
/**
* @param object $evt Deserialized event
* @param callable $projection Projection handler returning an object with filter and update fields
* @return UpdateOne
* @throws Exception
*/
private function UpdateOne(object $evt, callable $projection): UpdateOne
{
$r = $projection($evt);
$filter = new Struct();
$filter->mergeFromJsonString(json_encode($r->filter));
$update = new Struct();
$update->mergeFromJsonString(json_encode($r->update));
$result = new UpdateOne();
$result->setFilter($filter);
$result->setUpdate($update);
return $result;
}
/**
* @param object $evt
* @param callable $projection
* @return DeleteOne
* @throws Exception
*/
private function DeleteOne(object $evt, callable $projection): DeleteOne {
$r = $projection($evt);
$filter = new Struct();
$filter->mergeFromJsonString(json_encode($r->filter));
$result = new DeleteOne();
$result->setFilter($filter);
return $result;
}
private \Aimeos\Map $projectors;
}