-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.html
2001 lines (1598 loc) · 347 KB
/
index.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>Hexo</title>
<meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
<meta property="og:type" content="website">
<meta property="og:title" content="Hexo">
<meta property="og:url" content="http://example.com/index.html">
<meta property="og:site_name" content="Hexo">
<meta property="og:locale" content="en_US">
<meta property="article:author" content="zhengchenyu">
<meta name="twitter:card" content="summary">
<link rel="alternate" href="/atom.xml" title="Hexo" type="application/atom+xml">
<link rel="shortcut icon" href="/favicon.png">
<link rel="stylesheet" href="/css/style.css">
<link rel="stylesheet" href="/fancybox/jquery.fancybox.min.css">
<meta name="generator" content="Hexo 7.0.0"></head>
<body>
<div id="container">
<div id="wrap">
<header id="header">
<div id="banner"></div>
<div id="header-outer" class="outer">
<div id="header-title" class="inner">
<h1 id="logo-wrap">
<a href="/" id="logo">Hexo</a>
</h1>
</div>
<div id="header-inner" class="inner">
<nav id="main-nav">
<a id="main-nav-toggle" class="nav-icon"><span class="fa fa-bars"></span></a>
<a class="main-nav-link" href="/">Home</a>
<a class="main-nav-link" href="/archives">Archives</a>
</nav>
<nav id="sub-nav">
<a class="nav-icon" href="/atom.xml" title="RSS Feed"><span class="fa fa-rss"></span></a>
<a class="nav-icon nav-search-btn" title="Search"><span class="fa fa-search"></span></a>
</nav>
<div id="search-form-wrap">
<form action="//google.com/search" method="get" accept-charset="UTF-8" class="search-form"><input type="search" name="q" class="search-form-input" placeholder="Search"><button type="submit" class="search-form-submit"></button><input type="hidden" name="sitesearch" value="http://example.com"></form>
</div>
</div>
</div>
</header>
<div class="outer">
<section id="main">
<article id="post-uniffle-0.9.1发布" class="h-entry article article-type-post" itemprop="blogPost" itemscope itemtype="https://schema.org/BlogPosting">
<div class="article-meta">
<a href="/2024/08/02/uniffle-0.9.1%E5%8F%91%E5%B8%83/" class="article-date">
<time class="dt-published" datetime="2024-08-02T07:15:25.788Z" itemprop="datePublished">2024-08-02</time>
</a>
</div>
<div class="article-inner">
<div class="e-content article-entry" itemprop="articleBody">
<h1 id="uniffle-091-release工作"><a class="markdownIt-Anchor" href="#uniffle-091-release工作"></a> uniffle 0.9.1 release工作</h1>
<h2 id="branch-09分离点"><a class="markdownIt-Anchor" href="#branch-09分离点"></a> branch-0.9分离点</h2>
<ul>
<li>
<p>branch-0.9与master分离点对应issue: #1631</p>
</li>
<li>
<p>branch-0.9与master分离点对应commit: 1431c8a1b8b53ffd2b9ab45372416e9842eba944</p>
</li>
</ul>
<h2 id="branch-090已经合并的issue"><a class="markdownIt-Anchor" href="#branch-090已经合并的issue"></a> branch-0.9.0已经合并的issue</h2>
<table>
<thead>
<tr>
<th style="text-align:left">issue</th>
<th style="text-align:left">PR</th>
<th style="text-align:left">master中的commit</th>
<th style="text-align:left">其他说明</th>
</tr>
</thead>
<tbody>
<tr>
<td style="text-align:left">1626</td>
<td style="text-align:left">1627</td>
<td style="text-align:left">a7a0b4ce41a213f8f3e05a1e1198c41857706229</td>
<td style="text-align:left"></td>
</tr>
<tr>
<td style="text-align:left">1596</td>
<td style="text-align:left">1641</td>
<td style="text-align:left">a238a734bc0f7b5b5ecbfe04477230047c098123</td>
<td style="text-align:left"></td>
</tr>
<tr>
<td style="text-align:left">MINOR</td>
<td style="text-align:left">1637</td>
<td style="text-align:left">b37fa06c7a8303c0b98872a6f23d5f2ab0388a50</td>
<td style="text-align:left"></td>
</tr>
<tr>
<td style="text-align:left">1634</td>
<td style="text-align:left">1635</td>
<td style="text-align:left">6ea43002dc005f5127cccf45fa953769628a2965</td>
<td style="text-align:left"></td>
</tr>
<tr>
<td style="text-align:left">1629</td>
<td style="text-align:left">1630</td>
<td style="text-align:left">b4c92b84d7bf6d4f68fa7580fa7a2caaea2e5631</td>
<td style="text-align:left"></td>
</tr>
<tr>
<td style="text-align:left">1662</td>
<td style="text-align:left">1663</td>
<td style="text-align:left">3bc0ee50d1e3f60ba5e42c4c92697f1a6e927496</td>
<td style="text-align:left"></td>
</tr>
<tr>
<td style="text-align:left">1662</td>
<td style="text-align:left">1663</td>
<td style="text-align:left">3bc0ee50d1e3f60ba5e42c4c92697f1a6e927496</td>
<td style="text-align:left"></td>
</tr>
<tr>
<td style="text-align:left">378</td>
<td style="text-align:left">1669</td>
<td style="text-align:left">6f6d35aab8e9fe7002de3768e6d5a30cad2d4cb7</td>
<td style="text-align:left"></td>
</tr>
<tr>
<td style="text-align:left">1341</td>
<td style="text-align:left">1666</td>
<td style="text-align:left">636d0be219b24898fcc7bbd59d74ba2d558acaa9</td>
<td style="text-align:left"></td>
</tr>
<tr>
<td style="text-align:left">1459</td>
<td style="text-align:left">1634</td>
<td style="text-align:left">5ad9f488e2fb39bed82b479aa0f08275334810d5</td>
<td style="text-align:left"></td>
</tr>
<tr>
<td style="text-align:left">1459</td>
<td style="text-align:left">1648</td>
<td style="text-align:left">222f5d403419e59e2f345ff6be7f51de01bcb806</td>
<td style="text-align:left"></td>
</tr>
<tr>
<td style="text-align:left">MINOR</td>
<td style="text-align:left">1674</td>
<td style="text-align:left">65ccce5431f61f403ddf2760849ed618e66509be</td>
<td style="text-align:left"></td>
</tr>
<tr>
<td style="text-align:left">1657</td>
<td style="text-align:left">1671</td>
<td style="text-align:left">6d193278fa6bc44c4a61014ff8a91e8e053107af</td>
<td style="text-align:left"></td>
</tr>
<tr>
<td style="text-align:left">1678</td>
<td style="text-align:left">1679</td>
<td style="text-align:left">ae72831615b05b56055dcfda8f129189792130ba</td>
<td style="text-align:left"></td>
</tr>
<tr>
<td style="text-align:left">1684</td>
<td style="text-align:left">1685</td>
<td style="text-align:left">40bd14b9368e748b4d8ac6585c645a02cafad100</td>
<td style="text-align:left"></td>
</tr>
<tr>
<td style="text-align:left">1675</td>
<td style="text-align:left">1676</td>
<td style="text-align:left">917456c5eb0d7b70f7faa5da489aaa29d54cde40</td>
<td style="text-align:left"></td>
</tr>
<tr>
<td style="text-align:left">1680</td>
<td style="text-align:left">1681</td>
<td style="text-align:left">bfcba2e19f8a63e28f70eb32e8558997c8264f80</td>
<td style="text-align:left"></td>
</tr>
<tr>
<td style="text-align:left">MINOR</td>
<td style="text-align:left">1692</td>
<td style="text-align:left">8e26a34e02aaf62243464dd60103551cc7c9c044</td>
<td style="text-align:left"></td>
</tr>
<tr>
<td style="text-align:left">MINOR</td>
<td style="text-align:left">1697</td>
<td style="text-align:left">d4a5fb753a17f9c34addff0b28790f1d27796ce6</td>
<td style="text-align:left"></td>
</tr>
<tr>
<td style="text-align:left">1675</td>
<td style="text-align:left">1696</td>
<td style="text-align:left">4f4f7e39e1b87aea952e06cc791e32446da4e3b7</td>
<td style="text-align:left"></td>
</tr>
<tr>
<td style="text-align:left">1673</td>
<td style="text-align:left">1694</td>
<td style="text-align:left">3cc052bc2dae3f90c9b29cc78d1cab303b58b6ed</td>
<td style="text-align:left"></td>
</tr>
<tr>
<td style="text-align:left">1721</td>
<td style="text-align:left">1722</td>
<td style="text-align:left">d9b1d9fba58f70347ec2a352f15502eeec53deb9</td>
<td style="text-align:left"></td>
</tr>
<tr>
<td style="text-align:left">MINOR</td>
<td style="text-align:left">1734</td>
<td style="text-align:left">168cf741ae5dfb806edb040b11a6e496c0d73afa</td>
<td style="text-align:left"></td>
</tr>
<tr>
<td style="text-align:left">1675</td>
<td style="text-align:left">1730</td>
<td style="text-align:left">fddace29d6a7ed4cee69adcb9a656fb62983dc43</td>
<td style="text-align:left"></td>
</tr>
<tr>
<td style="text-align:left">1149</td>
<td style="text-align:left">1240</td>
<td style="text-align:left">59b899eb19e3dbca32b6a81251ec17fc21671a52</td>
<td style="text-align:left"></td>
</tr>
<tr>
<td style="text-align:left">1764</td>
<td style="text-align:left">1766</td>
<td style="text-align:left">a6a715fb2bf6660253299608b18fb1794dddb06e</td>
<td style="text-align:left"></td>
</tr>
<tr>
<td style="text-align:left">1751</td>
<td style="text-align:left">1857</td>
<td style="text-align:left">f7c6d2da237bd487d3cd0e21231108df90559cbe</td>
<td style="text-align:left"></td>
</tr>
</tbody>
</table>
<h2 id="branch-090未合并但是commit已经过时的需要被合并的"><a class="markdownIt-Anchor" href="#branch-090未合并但是commit已经过时的需要被合并的"></a> branch-0.9.0未合并,但是commit已经过时的需要被合并的</h2>
<table>
<thead>
<tr>
<th style="text-align:left">issue</th>
<th style="text-align:left">PR</th>
<th style="text-align:left">master中的commit</th>
<th style="text-align:left">其他说明</th>
</tr>
</thead>
<tbody>
<tr>
<td style="text-align:left">1698</td>
<td style="text-align:left">1726</td>
<td style="text-align:left">f738a886cd2337eabe128d58edea9e90403e753a</td>
<td style="text-align:left">合并到0.9.1: 优化编译测试</td>
</tr>
<tr>
<td style="text-align:left">MINOR</td>
<td style="text-align:left">1733</td>
<td style="text-align:left">a0e88da59f9c7c30d43120c7fa88ec1d0736a616</td>
<td style="text-align:left">暂不合并: 日志, 影响不大</td>
</tr>
<tr>
<td style="text-align:left">1698</td>
<td style="text-align:left">1739</td>
<td style="text-align:left">37d64a437568a1a7030939922fd7b5d883f2959a</td>
<td style="text-align:left">合并到0.9.1: 优化编译测试</td>
</tr>
<tr>
<td style="text-align:left">1711</td>
<td style="text-align:left">1737</td>
<td style="text-align:left">59b899eb19e3dbca32b6a81251ec17fc21671a52</td>
<td style="text-align:left">暂不合并: 日志, 影响不大</td>
</tr>
<tr>
<td style="text-align:left">1675</td>
<td style="text-align:left">1740</td>
<td style="text-align:left">417674d779cc2654f7f0adca2c0a53a0da7f5b4d</td>
<td style="text-align:left">合并到0.9.1冲突,待定: 优化了测试</td>
</tr>
<tr>
<td style="text-align:left">1743</td>
<td style="text-align:left">1744</td>
<td style="text-align:left">8d49063115ee49b18c366e8cf4c4b65de9782b45</td>
<td style="text-align:left">合并到0.9.1: 完善异常处理</td>
</tr>
<tr>
<td style="text-align:left">1755</td>
<td style="text-align:left">1756</td>
<td style="text-align:left">d182a039685e4c50d5b500b9ac7aee52dcec623c</td>
<td style="text-align:left">合并到0.9.1冲突,需要合并: fix bug spill size计算问题</td>
</tr>
<tr>
<td style="text-align:left">MINOR</td>
<td style="text-align:left">1752</td>
<td style="text-align:left">5e49017672820533bf5a59d1e8bc0fa5bae95978</td>
<td style="text-align:left">合并到0.9.1: 显示问题</td>
</tr>
<tr>
<td style="text-align:left">1699</td>
<td style="text-align:left">1742</td>
<td style="text-align:left">27d6fca88354a26a1b51284c82b4fde639c60e8b</td>
<td style="text-align:left">合并到0.9.1: shaded</td>
</tr>
<tr>
<td style="text-align:left">1755</td>
<td style="text-align:left">1760</td>
<td style="text-align:left">eb164a8fa79cf70dcdbb13774686663cf11bc8af</td>
<td style="text-align:left">合并到0.9.1冲突,待定: 日志显示</td>
</tr>
<tr>
<td style="text-align:left">1579</td>
<td style="text-align:left">1762</td>
<td style="text-align:left">375262ad9236efebe0010a65a556c2dec3e9ae82</td>
<td style="text-align:left">合并到0.9.1冲突,修改了proto, 不合并</td>
</tr>
<tr>
<td style="text-align:left">MINOR</td>
<td style="text-align:left">1777</td>
<td style="text-align:left">44956681887c4b32db6da3c0acc387018950d47b</td>
<td style="text-align:left">合并到0.9.1: 文档</td>
</tr>
<tr>
<td style="text-align:left">MINOR</td>
<td style="text-align:left">1815</td>
<td style="text-align:left">ceae615e3566cc988ce0603ff8bea477c4de05bf</td>
<td style="text-align:left">暂不合并: 日志, 影响不大</td>
</tr>
<tr>
<td style="text-align:left">1668</td>
<td style="text-align:left">1804</td>
<td style="text-align:left">1482804f3bd365adce9af251a0a71ca463928083</td>
<td style="text-align:left">暂不合并: 可以通过制定-ea解决</td>
</tr>
<tr>
<td style="text-align:left">1818</td>
<td style="text-align:left">1819</td>
<td style="text-align:left">097114811620a68254929133e8d22a0250681239</td>
<td style="text-align:left">合并到0.9.1: 避免多次merge shuffle read metrics</td>
</tr>
<tr>
<td style="text-align:left">1826</td>
<td style="text-align:left">1827</td>
<td style="text-align:left">a16184aa2994fced4264698b894ee7c7d5c289e9</td>
<td style="text-align:left">合并到0.9.1: 修复编译脚本</td>
</tr>
<tr>
<td style="text-align:left">1826</td>
<td style="text-align:left">1830</td>
<td style="text-align:left">30cefcc06b11ac93a4fd50625596938460669283</td>
<td style="text-align:left">合并到0.9.1: 修复编译脚本</td>
</tr>
<tr>
<td style="text-align:left">MINOR</td>
<td style="text-align:left">1850</td>
<td style="text-align:left">441fad03716b43e72d599379a0ab43e9be3e6062</td>
<td style="text-align:left">合并到0.9.1冲突,需要合并: 页面修复</td>
</tr>
<tr>
<td style="text-align:left">1809</td>
<td style="text-align:left">1846</td>
<td style="text-align:left">6599ef27e257d117ffb01855cf0d2e8351b90eec</td>
<td style="text-align:left">暂不合并: improvement</td>
</tr>
<tr>
<td style="text-align:left">MINOR</td>
<td style="text-align:left">1851</td>
<td style="text-align:left">88152e98c8b84d5094d93f4710157d21105ec6b8</td>
<td style="text-align:left">合并到0.9.1: 页面修复</td>
</tr>
</tbody>
</table>
<blockquote>
<p>v0.9.0版本release工作: <a target="_blank" rel="noopener" href="https://github.com/apache/incubator-uniffle/issues/1654">https://github.com/apache/incubator-uniffle/issues/1654</a><br />
v0.9.0版本跟踪到#1751, commit为f7c6d2da237bd487d3cd0e21231108df90559cbe</p>
</blockquote>
<h2 id="branch-091-要合并的patch"><a class="markdownIt-Anchor" href="#branch-091-要合并的patch"></a> branch-0.9.1 要合并的patch</h2>
</div>
<footer class="article-footer">
<a data-url="http://example.com/2024/08/02/uniffle-0.9.1%E5%8F%91%E5%B8%83/" data-id="clzc8m5h50005hws83ja81r9g" data-title="" class="article-share-link"><span class="fa fa-share">Share</span></a>
</footer>
</div>
</article>
<article id="post-ErasuceCode算法实现" class="h-entry article article-type-post" itemprop="blogPost" itemscope itemtype="https://schema.org/BlogPosting">
<div class="article-meta">
<a href="/2024/05/17/ErasuceCode%E7%AE%97%E6%B3%95%E5%AE%9E%E7%8E%B0/" class="article-date">
<time class="dt-published" datetime="2024-05-17T02:46:33.253Z" itemprop="datePublished">2024-05-17</time>
</a>
</div>
<div class="article-inner">
<div class="e-content article-entry" itemprop="articleBody">
<blockquote>
<p>本文侧重于如何从依据理论指导实践,在不适用任何第三方库的情况下实现ErasuceCode算法。本文仅会对用到的理论做简单的解释并标记引用,不会做详细解释。因此阅读本文前请熟读文献[1]第四章内容。<br />
本文代码存储于<a target="_blank" rel="noopener" href="https://github.com/zhengchenyu/SimpleErasureCode">https://github.com/zhengchenyu/SimpleErasureCode</a>, SimpleErasureCode只是为了便于方便理解,为了保持与文章保持一致,因此没有优化。部分关键流程文章中会有链接到对应的代码。</p>
</blockquote>
<p>EC算法在存储领域和通信领域都有广泛的应用。在分布式存储领域,为了避免机器宕机,需要存储3份冗余副本,这3台机器最多允许2台宕机。如果使用EC RS-6-3的话,可以实现使用9个副本冗余6份数据,这9台机器最多允许3台宕机,而且存储量却减少了一半。分布式存储使用EC在几乎不降低可用性的前提下,降低了冗余副本数,大大节约了存储资源。</p>
<p>实现EC算法是检验是否理解EC算法的重要手段。对EC算法的理解对工程上也有很大的帮助,笔者实现EC算法的初衷也是为了解决HADOOP-19180提出的问题。</p>
<h1 id="1-算法概述"><a class="markdownIt-Anchor" href="#1-算法概述"></a> 1 算法概述</h1>
<p>本文以RS-6-3算法为例, 存在6个数据块和3个校验块。EC算法的两个基本问题:</p>
<ul>
<li>如何通过数据块生成校验块?</li>
<li>如何通过部分数据块和校验块恢复丢失的数据块?</li>
</ul>
<h2 id="11-生成校验块"><a class="markdownIt-Anchor" href="#11-生成校验块"></a> 1.1 生成校验块</h2>
<p>生成校验块即编码过程(<a target="_blank" rel="noopener" href="https://github.com/zhengchenyu/SimpleErasureCode/blob/24be76083a0c3172f1d2fe7af8e1ad972935657f/src/main/java/zcy/ec/coder/ErasureCoder.java#L29">encode</a>)。对6个数据块依次取出byte, 即d<sub>0</sub>,d<sub>1</sub>,d<sub>2</sub>,d<sub>3</sub>,d<sub>4</sub>,d<sub>5</sub>。如下图所示,用编码矩阵(<a target="_blank" rel="noopener" href="https://github.com/zhengchenyu/SimpleErasureCode/blob/24be76083a0c3172f1d2fe7af8e1ad972935657f/src/main/java/zcy/ec/coder/ErasureCoder.java#L14C24-L14C36">encodeMatrix</a>)乘以对应的数据块,即得到原有的数据d<sub>0</sub>,d<sub>1</sub>,d<sub>2</sub>,d<sub>3</sub>,d<sub>4</sub>,d<sub>5</sub>和对应的校验字节c<sub>0</sub>,c<sub>1</sub>,c<sub>2</sub>(<a target="_blank" rel="noopener" href="https://github.com/zhengchenyu/SimpleErasureCode/blob/24be76083a0c3172f1d2fe7af8e1ad972935657f/src/main/java/zcy/ec/coder/ErasureCoder.java#L50">编码过程</a>)。对数据块的所有字节依次执行上述的操作就得到了校验块。</p>
<img src="/images/ec/ec编码矩阵.png" width=50% height=50% text-align=center/>
<h2 id="12-恢复数据块"><a class="markdownIt-Anchor" href="#12-恢复数据块"></a> 1.2 恢复数据块</h2>
<p>恢复数据块的过程即解码过程(<a target="_blank" rel="noopener" href="https://github.com/zhengchenyu/SimpleErasureCode/blob/24be76083a0c3172f1d2fe7af8e1ad972935657f/src/main/java/zcy/ec/coder/ErasureCoder.java#L65">decode</a>)。假如d2,d3,d4丢失了,我们需要通过d<sub>0</sub>,d<sub>1</sub>,d<sub>5</sub>,c<sub>0</sub>,c<sub>1</sub>,c<sub>2</sub>恢复d<sub>2</sub>,d<sub>3</sub>,d<sub>4</sub>。在上面公式中删除对应的行,公式中用d<sub>2?</sub>,d<sub>3?</sub>,d<sub>4?</sub>表示数据块丢失。有如下公式:</p>
<img src="/images/ec/ec解码过程1.png" width=50% height=50% text-align=center/>
<p>对上面的公式两侧都乘以裁剪后的矩阵的逆矩阵,这个逆矩阵即解码矩阵(<a target="_blank" rel="noopener" href="https://github.com/zhengchenyu/SimpleErasureCode/blob/24be76083a0c3172f1d2fe7af8e1ad972935657f/src/main/java/zcy/ec/coder/ErasureCoder.java#L117C18-L117C30">decodeMatrix</a>)。可以得到如下的公式:</p>
<img src="/images/ec/ec解码过程2.png" width=50% height=50% text-align=center/>
<p>得到解码矩阵后其实计算过程与编码类似,只是输入为为d<sub>0</sub>,d<sub>1</sub>,d<sub>5</sub>,c<sub>0</sub>,c<sub>1</sub>,c<sub>2</sub>,输出为d<sub>0</sub>,d<sub>1</sub>,d<sub>2</sub>,d<sub>3</sub>,d<sub>4</sub>,d<sub>5</sub>。</p>
<h1 id="2-关于矩阵"><a class="markdownIt-Anchor" href="#2-关于矩阵"></a> 2 关于矩阵</h1>
<p>上一章节已经介绍了EC编解码的基本过程,但工程实现上让然会有一些问题需要解决。这一小节主要介绍一下关于矩阵方面的问题。</p>
<h2 id="21-如何选择矩阵"><a class="markdownIt-Anchor" href="#21-如何选择矩阵"></a> 2.1 如何选择矩阵</h2>
<p>根据第一小节的分析,编码矩阵是一个9 * 6的矩阵。而解码矩阵是在编码矩阵的基础上删除任意三行,然后再求逆的。因此我们定义编码矩阵的时候要保证,对于这个9 * 6的编码矩阵,任意删除三行得到的6 * 6的矩阵是一个可逆矩阵。<br />
为了便于计算,编码矩阵的上半部分使用的是6 * 6 的单位矩阵,单位矩阵是可逆的,是满足条件的。接下来就是给下半部分的3 * 6的找到合理的矩阵。本文使用的是范德蒙矩阵。</p>
<img src="/images/ec/ec范德蒙矩阵.png" width=50% height=50% text-align=center/>
<blockquote>
<p>由于MathType不支持新版Mac, 而且markdown经常因为调整不好乱码,这里的共识还是贴图吧…</p>
</blockquote>
<p>如下为范德蒙矩阵的行列式:</p>
<img src="/images/ec/ec范德蒙矩阵行列式.png" width=50% height=50% text-align=center/>
<p>只要ai各不相等且不为0,则范德蒙矩阵一定可逆,意味着任意挑出3行向量,一定是线性无关的。那我们就挑出前三行,令a<sub>1</sub>=1,a<sub>2</sub>=2,a<sub>3</sub>=3,便可以得到如下编码矩阵:</p>
<img src="/images/ec/ec真实编码矩阵.png" width=50% height=50% text-align=center/>
<p>前面已经说明了前6行向量是互为线性无关,后3行向量也是互为线性无关的。如果在前6行中选取n行,在后三行中去6-n行,那么这6行向量还是线性无关的吗?假设n为5,由于后三行的没有元素为0,因此肯定是缺一个维度来保证线性相关。如果n为4和3也是同样的道理。因此这个9 * 6的矩阵随意挑出6行组成的6 * 6阶矩阵一定是可逆的。</p>
<h2 id="22-高斯消元求逆"><a class="markdownIt-Anchor" href="#22-高斯消元求逆"></a> 2.2 高斯消元求逆</h2>
<p>本文使用了容易理解的高斯消元法求逆(<a target="_blank" rel="noopener" href="https://github.com/zhengchenyu/SimpleErasureCode/blob/24be76083a0c3172f1d2fe7af8e1ad972935657f/src/main/java/zcy/ec/coder/math/Matrix.java#L73">inverse</a>)。假设当前6 * 6矩阵为A, 在右侧再拼接6 * 6的单位矩阵E,得到矩阵[A | E]。如果使用A<sup>-1</sup>乘以这个矩阵,会得到[E|A<sup>-1</sup>]。这样我们只需将A矩阵转化为单元矩阵,E矩阵自然就变成了A<sup>-1</sup>。(<a target="_blank" rel="noopener" href="https://github.com/zhengchenyu/SimpleErasureCode/blob/24be76083a0c3172f1d2fe7af8e1ad972935657f/src/main/java/zcy/ec/coder/math/Matrix.java#L73C24-L73C31">高斯消元求逆</a>)<br />
对每一行行依次执行如下的过程:</p>
<ul>
<li>(1) 对第i行, 找到第i行第i列的值。(<a target="_blank" rel="noopener" href="https://github.com/zhengchenyu/SimpleErasureCode/blob/24be76083a0c3172f1d2fe7af8e1ad972935657f/src/main/java/zcy/ec/coder/math/Matrix.java#L120">Step1</a>)</li>
<li>(2) 然后计算该值的乘法逆元。然后让该行的每个元素均乘以这个乘法逆元。(<a target="_blank" rel="noopener" href="https://github.com/zhengchenyu/SimpleErasureCode/blob/24be76083a0c3172f1d2fe7af8e1ad972935657f/src/main/java/zcy/ec/coder/math/Matrix.java#L126">Step2</a>)</li>
<li>(3) 对其他行j,使用第i行经过线性变换将地地j行第i列的值消为0。</li>
</ul>
<p>经过这三步变换为,对于第i列,有且只有地i行的数据为1,其他均为0。对每一行均执行如下操作,左侧变得到了单位矩阵,右侧的结果也即A<sup>-1</sup>。</p>
<blockquote>
<p>对于有理数域中,乘以乘法逆元即除法,加上加法逆元即减法。这样描述对伽罗华域中的数学运算的描述更准确。</p>
</blockquote>
<p>对于第0行计算的过程如下,其他行同理。</p>
<img src="/images/ec/ec高斯消元处理第0行.png" width=50% height=50% text-align=center/>
<p>还有一种特殊的情况,譬如上述矩阵如果处理第2行。第2行和第2列的元素值为0,0是没有乘法逆元的。所以这时候就需要找到第2行下面的行中第2列不为0的情况,把该行加到第2行上,这样可以保证算法可运行(<a target="_blank" rel="noopener" href="https://github.com/zhengchenyu/SimpleErasureCode/blob/24be76083a0c3172f1d2fe7af8e1ad972935657f/src/main/java/zcy/ec/coder/math/Matrix.java#L105">setup for step1</a>)。其实移位可能更高效,但是为了便于理解,采用这样的方式。</p>
<h1 id="3-伽罗华域"><a class="markdownIt-Anchor" href="#3-伽罗华域"></a> 3 伽罗华域</h1>
<p>矩阵运算还存在一个问题,数据都是byte存储的,是一个0-255的数值。经过有理数运算后,数值是很容易越界的。因此我们需要一套新的计算体系同时满足以下两个条件:</p>
<ul>
<li>该算法体系的值和计算得到的值只能是在有限的集合范围内,即不越界。</li>
<li>可以通过有限的运算恢复得到原有值,即可解码。</li>
</ul>
<p>这就需要使用伽罗华有限域。伽罗华有限域可以保证任何计算得到的结果均在有限集合内,这满足了不越界的要求。另外伽罗华有限域的运算都有对应的逆运算。譬如对a,如果a乘以b,再乘以b的加法逆元,结果仍然是a。这满足了可解码的要求。</p>
<p>这里为了便于理解先简单的介绍GF(7),然后再解释实际使用的GF(2<sup>8</sup>)。</p>
<blockquote>
<p>本章并没有详细的证明。譬如将7扩张到无穷大的质数的时候,如何证明相应的定理为什么成立。事实上笔者也不知道,参考文献上也没有严谨的证明。也只是简单的说明了合数不能作为模数。但对于工程上有限的集合内,很容易通过穷举法来证明算法的可行性。</p>
</blockquote>
<h2 id="31-gf7"><a class="markdownIt-Anchor" href="#31-gf7"></a> 3.1 GF(7)</h2>
<p>首先给出如下定义:</p>
<ul>
<li>加法逆元: 给定x,如果存在x’,使得x+x’=x’+x=0,则称x’是x的加法逆元。</li>
<li>乘法逆元: 给定x,如果存在x’,使得x * x’=x’ * x=e,则称x’为x的乘法逆元。其中e为该群的单位元。对于GF(7),e为1。</li>
</ul>
<p>我们定义一种新的运算,即模7运算。对于加法和乘法运算,我们会将结果然后模7。譬如, 8 + 3 = 11 mod 7 = 4。8 * 3 = 24 mod 7 = 3。<br />
对于加法逆元和乘法逆元,根据定义我们可以穷举加法和乘法计算,根据结果得到对应的逆元。譬如, 4 + 3 = 7 mod 7 = 0,我们就说4和3互为加法逆元。2 * 4 = 8 mod 7 = 1,我们就说2和4互为乘法逆元。<br />
下表穷举了GF7的所有加法和乘法运算,根据码表也得到了GF(7)下所有元素对应的加法逆元和乘法逆元。</p>
<img src="/images/ec/ecgf7table.png" width=50% height=50% text-align=center/>
<p>来看下是否满足我们的要求,不越界在定义上就满足了。可解码的特点,我们随机指定数组假设为5,用5乘以3在乘以3的乘法逆元5, 得到的值为5 * 3 * 5 = 75 mod 1 = 5。事实上根据乘法交换律很容易证明这个。对于加法逆元的解码同理。</p>
<h2 id="32-gf2sup8sup"><a class="markdownIt-Anchor" href="#32-gf2sup8sup"></a> 3.2 GF(2<sup>8</sup>)</h2>
<h3 id="321-基本原理"><a class="markdownIt-Anchor" href="#321-基本原理"></a> 3.2.1 基本原理</h3>
<p>实际的数字存储的值是0-255, 那么是否意味着我们可以直接使用GF(256)呢? 答案是不可以的。因为256是合数,譬如16 * 16 = 256 mod 256 = 0,那么16就不存在乘法逆元,也就难以进行边界吗。<br />
因此需要引入多项式的运算,且要求同指数幂下遵循GF(2)。模数为不可约多项式。对于不可约多项式a, 无法找到两个不为1的多项式b和c使得b * c = a。可以使用穷举法得到每个多项式的加法逆元和乘法逆元。事实上只要集合内的每个元素都有1对1对应的乘法逆元,就可以满足我们的要求。文献1的表4.6也通过穷举证明了GF(2<sup>3</sup>)的有效性。<br />
多项式的计算与对byte的编码有什么关系呢?由于多项式的同指数幂是GF(2),也就意味着对于多项式a<sub>0</sub> + a<sub>1</sub>x + a<sub>2</sub>x<sup>2</sup> + a<sub>3</sub>x<sup>3</sup> + a<sub>4</sub>x<sup>4</sup> + a<sub>5</sub>x<sup>5</sup> + a<sub>6</sub>x<sup>6</sup> + a<sub>7</sub>x<sup>7</sup> ,a<sub>i</sub> 为0和1。如果a<sub>0</sub>为第0位,a<sub>1</sub>为第1位,依次类推,这组系数就是一个byte。这样就把要存储的byte与多项式运算结合了。</p>
<h3 id="322-计算"><a class="markdownIt-Anchor" href="#322-计算"></a> 3.2.2 计算</h3>
<p>本章主要介绍如何计算GF(2<sup>8</sup>)。对于加法和乘法,我们直接使用多项式运算。对于加法逆元和乘法逆元,我们穷举加法和乘法运算,然后通过码表来得到加法和乘法逆元。<br />
本文的算法的不可约多项式为x<sup>8</sup> + x<sup>4</sup>+ x<sup>3</sup> + 1。<br />
f(x) = x<sup>6</sup> + x<sup>4</sup>+ x<sup>2</sup> + x + 1, g(x) = x<sup>7</sup> + x + 1<br />
对于加法, 指数幂执行GF(2)运算,G(2)运算实际上就是异或。得到f(x)+g(x)= x<sup>7</sup> + x<sup>6</sup> +x<sup>4</sup>+ x<sup>2</sup> + 1。实际可以理解为f(x)对应的二进制0b01010111与g(x)对应的二进制0b10000011进行按位的异或计算。(<a target="_blank" rel="noopener" href="https://github.com/zhengchenyu/SimpleErasureCode/blob/24be76083a0c3172f1d2fe7af8e1ad972935657f/src/main/java/zcy/ec/coder/operator/GaloisFieldComputation.java#L101">GF(2<sup>8</sup>)</a>)<br />
对于乘法, 先考虑h(x) = a<sub>0</sub> + a<sub>1</sub>x + a<sub>2</sub>x<sup>2</sup> + a<sub>3</sub>x<sup>3</sup> + a<sub>4</sub>x<sup>4</sup> + a<sub>5</sub>x<sup>5</sup> + a<sub>6</sub>x<sup>6</sup> + a<sub>7</sub>x<sup>7</sup>, 计算 h(x) * x = (a<sub>0</sub>x + a<sub>1</sub>x<sup>2</sup> + a<sub>2</sub>x<sup>3</sup> + a<sub>3</sub>x<sup>4</sup> + a<sub>4</sub>x<sup>5</sup> + a<sub>5</sub>x<sup>6</sup> + a<sub>6</sub>x<sup>7</sup> + a<sub>7</sub>x<sup>8</sup> ) mod (x<sup>8</sup> + x<sup>4</sup>+ x<sup>3</sup> + x + 1)。存在两种情况</p>
<ul>
<li>(1) a<sub>7</sub> 等于 0<br />
那么该式已经不可约,因此h(x) * x= a<sub>0</sub>x + a<sub>1</sub>x<sup>2</sup> + a<sub>2</sub>x<sup>3</sup> + a<sub>3</sub>x<sup>4</sup> + a<sub>4</sub>x<sup>5</sup> + a<sub>5</sub>x<sup>6</sup> + a<sub>6</sub>x<sup>7</sup>。也就意味着[a<sub>7</sub>a<sub>6</sub>a<sub>5</sub>a<sub>4</sub>a<sub>3</sub>a<sub>2</sub>a<sub>1</sub>a<sub>0</sub>] * [00000010] = [a<sub>6</sub>a<sub>5</sub>a<sub>4</sub>a<sub>3</sub>a<sub>2</sub>a<sub>1</sub>a<sub>0</sub>0],即向左移动一位。(<a target="_blank" rel="noopener" href="https://github.com/zhengchenyu/SimpleErasureCode/blob/24be76083a0c3172f1d2fe7af8e1ad972935657f/src/main/java/zcy/ec/coder/operator/GaloisFieldComputation.java#L85">a<sub>7</sub> 等于 0</a>)</li>
<li>(1) a<sub>7</sub> 不等于0<br />
h(x) * x = (a<sub>0</sub>x + a<sub>1</sub>x<sup>2</sup> + a<sub>2</sub>x<sup>3</sup> + a<sub>3</sub>x<sup>4</sup> + a<sub>4</sub>x<sup>5</sup> + a<sub>5</sub>x<sup>6</sup> + a<sub>6</sub>x<sup>7</sup> + x<sup>8</sup>) mod (x<sup>8</sup> + x<sup>4</sup>+ x<sup>3</sup> +x+ 1) = a<sub>0</sub>x + a<sub>1</sub>x<sup>2</sup> + a<sub>2</sub>x<sup>3</sup> + a<sub>3</sub>x<sup>4</sup> + a<sub>4</sub>x<sup>5</sup> + a<sub>5</sub>x<sup>6</sup> + a<sub>6</sub>x<sup>7</sup> + x<sup>8</sup> - x<sup>8</sup> - x<sup>4</sup> - x<sup>3</sup> -1 = 1 + (a<sub>0</sub>+1)x + a<sub>1</sub>x<sup>2</sup> + (a<sub>2</sub>+1)x<sup>3</sup> + (a<sub>3</sub>+1)x<sup>4</sup> + a<sub>4</sub>x<sup>5</sup> + a<sub>5</sub>x<sup>6</sup> + a<sub>6</sub>x<sup>7</sup>。以为意味着[a<sub>7</sub>a<sub>6</sub>a<sub>5</sub>a<sub>4</sub>a<sub>3</sub>a<sub>2</sub>a<sub>1</sub>a<sub>0</sub>] * [00000010] = [a<sub>6</sub>a<sub>5</sub>a<sub>4</sub>a<sub>3</sub>a<sub>2</sub>a<sub>1</sub>a<sub>0</sub>0] ^ [00011011]。即左移一位后,在于不包含最高位系统的字节进行异或。(<a target="_blank" rel="noopener" href="https://github.com/zhengchenyu/SimpleErasureCode/blob/24be76083a0c3172f1d2fe7af8e1ad972935657f/src/main/java/zcy/ec/coder/operator/GaloisFieldComputation.java#L88">a<sub>7</sub> 不等于 0</a>)</li>
</ul>
<blockquote>
<p>这里减法为加法逆元,对于GF(2),加法逆元就是自己。</p>
</blockquote>
<p>f(x) * g(x) = ((x<sup>6</sup> + x<sup>4</sup>+ x<sup>2</sup> + x + 1) * ( x<sup>7</sup> + x + 1))。我们可以把他分解为一下三个运算之和。</p>
<ul>
<li>(1) (x<sup>6</sup> + x<sup>4</sup>+ x<sup>2</sup> + x + 1) * x<sup>7</sup></li>
<li>(2) (x<sup>6</sup> + x<sup>4</sup>+ x<sup>2</sup> + x + 1) * x</li>
<li>(3) (x<sup>6</sup> + x<sup>4</sup>+ x<sup>2</sup> + x + 1) * 1<br />
对于(2)在前面已经介绍了,对于(3)实质上不用计算。对于(1), 我们可以使用(2)的计算方法进行递归计算(<a target="_blank" rel="noopener" href="https://github.com/zhengchenyu/SimpleErasureCode/blob/24be76083a0c3172f1d2fe7af8e1ad972935657f/src/main/java/zcy/ec/coder/operator/GaloisFieldComputation.java#L73">fxxn</a>)。然后分别计算的三个值进行相加便得到了最终的结果(<a target="_blank" rel="noopener" href="https://github.com/zhengchenyu/SimpleErasureCode/blob/24be76083a0c3172f1d2fe7af8e1ad972935657f/src/main/java/zcy/ec/coder/operator/GaloisFieldComputation.java#L63">_mul</a>)。</li>
</ul>
<p>至此,ErasureCode算法的所有问题都已经得到了解释。本文以自顶向下的方式描述了如何从理论到实践来实现ErasureCode算法。完整的代码见<a target="_blank" rel="noopener" href="https://github.com/zhengchenyu/SimpleErasureCode">SimpleErasureCode</a>。</p>
<p>参考文献:</p>
<ul>
<li>[1]William Stallings. 密码编码学与网络安全 原理与实践 第六版[M] 第四章</li>
<li>[2]<a target="_blank" rel="noopener" href="https://drmingdrmer.github.io/tech/distributed/2017/02/01/ec.html">https://drmingdrmer.github.io/tech/distributed/2017/02/01/ec.html</a></li>
</ul>
</div>
<footer class="article-footer">
<a data-url="http://example.com/2024/05/17/ErasuceCode%E7%AE%97%E6%B3%95%E5%AE%9E%E7%8E%B0/" data-id="clzc8m5h10001hws89o6070kz" data-title="" class="article-share-link"><span class="fa fa-share">Share</span></a>
</footer>
</div>
</article>
<article id="post-2024-04-22" class="h-entry article article-type-post" itemprop="blogPost" itemscope itemtype="https://schema.org/BlogPosting">
<div class="article-meta">
<a href="/2024/04/22/2024-04-22/" class="article-date">
<time class="dt-published" datetime="2024-04-22T07:02:30.058Z" itemprop="datePublished">2024-04-22</time>
</a>
</div>
<div class="article-inner">
<div class="e-content article-entry" itemprop="articleBody">
</div>
<footer class="article-footer">
<a data-url="http://example.com/2024/04/22/2024-04-22/" data-id="clzc8m5gw0000hws80p6e4kz4" data-title="" class="article-share-link"><span class="fa fa-share">Share</span></a>
</footer>
</div>
</article>
<article id="post-RSS-远程Merge的设计" class="h-entry article article-type-post" itemprop="blogPost" itemscope itemtype="https://schema.org/BlogPosting">
<div class="article-meta">
<a href="/2023/12/25/RSS-%E8%BF%9C%E7%A8%8BMerge%E7%9A%84%E8%AE%BE%E8%AE%A1/" class="article-date">
<time class="dt-published" datetime="2023-12-25T10:58:15.000Z" itemprop="datePublished">2023-12-25</time>
</a>
</div>
<div class="article-inner">
<header class="article-header">
<h1 itemprop="name">
<a class="p-name article-title" href="/2023/12/25/RSS-%E8%BF%9C%E7%A8%8BMerge%E7%9A%84%E8%AE%BE%E8%AE%A1/">RSS-远程Merge的设计</a>
</h1>
</header>
<div class="e-content article-entry" itemprop="articleBody">
<h1 id="1-默认的shuffle"><a class="markdownIt-Anchor" href="#1-默认的shuffle"></a> 1 默认的shuffle</h1>
<blockquote>
<p>注: 第一节简单介绍了主流框架默认的shuffle的原理,目的为了那里会使用本地磁盘,从而设计远程Merge。如果足够了解,可以忽略这部分内容。</p>
</blockquote>
<p>我们依次对MapReduce, Tez, Spark的shuffle进行分析。</p>
<h2 id="11-mapreduce"><a class="markdownIt-Anchor" href="#11-mapreduce"></a> 1.1 MapReduce</h2>
<p>Map将Record写入到内存,当内存超过阈值的时候,会将内存的数据spill到磁盘文件中,按照partitionid+key的顺序依次将Record写入到磁盘文件。当Map处理完所有Record后, 会将当前内存中的数据spill到磁盘文件。然后读取所有的spill到磁盘的文件,并按照partitionid+key的顺序进行merge,得到排序的Records。</p>
<blockquote>
<p>注: 按照partitionid排序的目的是Reduce端从Map端获取的数据的时候,尽可能顺序读。对于MR、Tez、Spark, 无论是否排序,只要有分区,都需要按照partitionid进行排序。</p>
</blockquote>
<p>Reduce端会从Map端以远程或本地的方式拉取对应分区的Records,称之为MapOutput。正常情况下会直接使用内存,如果内存超过阈值会将这些Records写入到磁盘。然后Reduce端会对所有MapOutput使用最小堆K路归并排序进行一些Merge操作,得到全局排序的Reccords。Merge的过程中,可能会因为内存超过阈值,会将临时的结果spill到磁盘。另外如果spill到磁盘的文件过多,也会触发额外的merge。</p>
<h2 id="12-tez"><a class="markdownIt-Anchor" href="#12-tez"></a> 1.2 Tez</h2>
<p>Tez的情况略微复杂。Tez分为ordered io和unordered io。</p>
<p>ordered io与MapReduce相同,不再展开分析。</p>
<p>unordered io一般用于hashjoin等不需要key进行排序的情况。非排序的io采用来之即用方案。Map直接将Record写入文件或者通过缓存在写入文件。Reduce端也是读数据的时候也是读之即用。</p>
<h2 id="13-spark"><a class="markdownIt-Anchor" href="#13-spark"></a> 1.3 Spark</h2>
<p>spark的shuffle更复杂,也更合理。部分任务是不需要sort和combine的,因此spark用户可以需求决定shuffle的逻辑。</p>
<h3 id="131-shuffle写操作"><a class="markdownIt-Anchor" href="#131-shuffle写操作"></a> 1.3.1 Shuffle写操作</h3>
<p>写shuffle数据的时候,支持三种writer:</p>
<ul>
<li>(1) BypassMergeSortShuffleWriter</li>
</ul>
<p>为每个partition都生成一个临时文件。在写Record的时候,找到对应的分区直接写入对应的临时文件。然后当所有数据都处理完成后,将这些临时的文件按照分区的顺序依次写入到一个最终的文件,并删除临时文件。</p>
<ul>
<li>(2) UnsafeShuffleWriter</li>
</ul>
<p>UnsafeShuffleWriter主要通过ShuffleExternalSorter来实现具体的逻辑。当写Record的时候,直接序列化操作,并将序列化的字节拷贝到申请的内存中。同时也会将Record的地址和分区记录到内存中(inMemSorter)。</p>
<p>当内存的Record达到阈值的时候,会进行spill操作。根据内存(inMemSorter)中的信息,我们很容易得到一个按照分区排序的Record,并写到文件中。</p>
<p>当处理完所有Record,我们会把当前内存中的Records spill到文件中。最后对所有spilled文件进行一次聚合。由于之前spilled的文件已经是按照分区排序的,所以我们可以按照分区的顺序依次将所有spill的文件的对应自己拷贝到最终文件。这样得到的最终文件即为分区排序的文件。</p>
<ul>
<li>(3) SortShuffleWriter</li>
</ul>
<p>SortShuffleWriter主要通过ExternalSorter来实现具体逻辑。ExternalSorter根据用户的需求决定是否combine和sort。</p>
<p>当写Record的时候,会直接插入到内存中。如果需要combine,内存架构是map,否则是buffer。</p>
<p>如果当前评估内存大于阈值会触发spill操作。spill操作的时候,会将Record,然后spill到磁盘。这个过程是需要进行排序的。而具体的比较器会根据用户需求的不同使用不同的值。如果设置了keyordering会按照key进行排序。如果没有设置keyordering,但是设置了aggregator(即combine),则按照key的hashcode进行排序,这样保证相同的key组织一起,便于combine操作。如果keyordering和aggregator都没有设置,则会按照partiton进行排序。</p>
<p>所有Record都写完时,需要读取spill的文件,并合并成一个全局有序的文件。</p>
<p>三种writer的比较</p>
<table>
<thead>
<tr>
<th>writer</th>
<th>优点</th>
<th>缺点</th>
<th>场景</th>
</tr>
</thead>
<tbody>
<tr>
<td>BypassMergeSortShuffleWriter</td>
<td>(1) 只经过一次序列化。<br>(2) 采用类hashmap的数据结构,插入数据快。</td>
<td>(1) 不支持combine和sort <br>(2) 每个分区都要对应生成一个临时文件,会产生过多的临时文件。</td>
<td>适合分区数较少(默认小于等于200)且没有combine的的情况.</td>
</tr>
<tr>
<td>UnsafeShuffleWriter</td>
<td>(1) 只经过一次序列化。<br>(2) spill到磁盘的文件数目有限,不再基于分区数,可以支持更大的分区。</td>
<td>(1) 不支持combine, sort <br>(2) 写入顺序Record顺序会打乱,要求supportsRelocationOfSerializedObjects。</td>
<td>适用于没有combine的情况,且支持supportsRelocationOfSerializedObjects,并且支持最大支持分区数为16777216。</td>
</tr>
<tr>
<td>SortShuffleWriter</td>
<td>(1) 支持combine, sort <br> (2) 适合于所有场景 <br> (3) spill到磁盘的文件数目有限</td>
<td>(1) 需要进行多次序列化</td>
<td>适用于所有场景。</td>
</tr>
</tbody>
</table>
<h3 id="132-shuffle读"><a class="markdownIt-Anchor" href="#132-shuffle读"></a> 1.3.2 shuffle读</h3>
<p>当前只有BlockStoreShuffleReader一个实现。实现与MapReduce类似。<br />
Reduce端会从Map端以远程或本地的方式拉取对应分区的Records。正常情况下会直接写到内存中,但如果要获取的block大小超过阈值则会使用磁盘。<br />
然后会根据用户的需求决定是否进行combine或sort,最终形成一个用户要求的record iterator。<br />
combine和sort分别使用了ExternalAppendOnlyMap和ExternalSorter,当内存超过阈值后,会将数据spill到本地磁盘中。</p>
<h2 id="14-总结"><a class="markdownIt-Anchor" href="#14-总结"></a> 1.4 总结</h2>
<p>(1) 关于各个框架语义</p>
<p>对于MapReduce和Tez的ordered io, 实质就是spark的排序的特例。对于Tez的unordered io,实质上就是spark的非排序的特例。实质各个框架上语义是相同的, spark更加泛化。</p>
<p>(2) 哪里会产生本地磁盘文件?</p>
<p>分析三种计算框架后,我们得知如下过程会使用磁盘:</p>
<ul>
<li>(1) Map由于内存超越阈值,可能会产生中间的临时文件。</li>
<li>(2) Map端最终必然会产生磁盘文件,用于提供shuffle服务。</li>
<li>(3) Reduce拉取records时候,可能因为超越阈值,产生磁盘文件。</li>
<li>(4) Reduce端Merge的时候,可能会产生临时的磁盘文件,用于全局排序。</li>
</ul>
<p>而事实上, uniffle已经解决(1), (2)。对于(3), 如果有效的调整参数,是很难产生磁盘文件的。事实上只有(4)是本文需要讨论的。</p>
<h1 id="2-方案的选择"><a class="markdownIt-Anchor" href="#2-方案的选择"></a> 2 方案的选择</h1>
<p>为了解决在Reduce端Merge可能会spill到磁盘的问题,主要有两个方案:</p>
<ul>
<li>(1) Shuffle Server端进行Merge</li>
<li>(2) Reduce端按需Merge</li>
</ul>
<h2 id="21-方案1-shuffleserver端进行merge"><a class="markdownIt-Anchor" href="#21-方案1-shuffleserver端进行merge"></a> 2.1 方案1: ShuffleServer端进行Merge</h2>
<p>将Reduce的Merge过程移到ShuffleServer端,ShuffleServer会对Map端发来的局部排序后的Records进行Merge,合并成一个全局排序的Records序列。Reduce端直接按照哦Records序列的顺序读取。</p>
<ul>
<li>优点: 不需要过多内存和网络RPC。</li>
<li>缺点: Shuffle Server端需要解析Key, Value和Comparator。Shuffle端不能combine。</li>
</ul>
<h2 id="22-方案2-reduce端按需merge"><a class="markdownIt-Anchor" href="#22-方案2-reduce端按需merge"></a> 2.2 方案2: Reduce端按需Merge</h2>
<img src="/images/rss/on_demand_merge.png" width=50% height=50% text-align=center/>
由于Reduce端内存有限,为了避免在Reduce端进行Merge的时候spill数据到磁盘。Reduce在获取Segment只能读取每个segment的部分buffer,然后对所有buffer进行Merge。然后对然后当某一个segment的部分buffer读取完成,会继续读取这个segment的下一块buffer,将这块buffer继续加到merge过程中。
这样有一个问题,Reduce端从ShuffleServer读取数据的次数大约为为segments_num * (segment_size / buffer_size),对于大任务这是一个很大的值。过多的RPC意味着性能的下降。
<blockquote>
<p>这里的segment是指排序后record集合,可以理解为record已经按照key排序后的block。</p>
</blockquote>
<ul>
<li>优点: Shuffle Server不需要做额外的任何事情。</li>
<li>缺点: 过多的RPC。</li>
</ul>
<p><strong>本文选择方案1,接下来的内容主要针对于方案1进行讨论。</strong></p>
<h1 id="3-需求分析"><a class="markdownIt-Anchor" href="#3-需求分析"></a> 3 需求分析</h1>
<h2 id="31-哪类任务需要远程merge"><a class="markdownIt-Anchor" href="#31-哪类任务需要远程merge"></a> 3.1 哪类任务需要远程merge?</h2>
<p>当前uniffle的map端操作已经不再需要磁盘操作。本文主要考虑reduce端的情况。主要分如下几种情况:</p>
<ul>
<li>(1) 对于spark的非排序且非聚集、tez unordered io,Record是来之即用的,不需要有任何的全局的聚合和排序操作,只需要非常少的内存。当前版本的uniffle在内存设置合理的情况下是不会使用磁盘的。使用当前uniffle的方案即可。本文不会讨论这方面的内容。</li>
<li>(2) 对于spark的排序或聚集任务、tez ordered io、mapreduce,由于需要全局排序或聚集,内存可能不够用,可能会将Record spill到磁盘。本文主要讨论这种情况。<br />
<strong>综上可知,remote merge仅用于需要排序或聚集的shuffle。</strong></li>
</ul>
<h2 id="32-shuffleserver如何进行排序"><a class="markdownIt-Anchor" href="#32-shuffleserver如何进行排序"></a> 3.2 ShuffleServer如何进行排序?</h2>
<p>对于排序类的操作,一般会在Map进行排序得到一组局部排序的记录,这里称之为segment。然后Reduce会获取所有的segment, 并进行归并,Spark, MR, Tez都是用了最小堆K路归并排序。远程排序依旧可以使用这种方式。</p>
<p>BufferManager和FlushManager维护着block在内存和磁盘中的信息。我们只需要在需要在ShuffleServer中新增MergeManager,并将同一个Shuffle下的block进行Merge,得到全局排序的Records即可。</p>
<p>在ShuffleServer端引入排序后产生一个副作用: 即需要将该Shuffle的KeyClass和ValueClass以及KeyComparator传递给ShuffleServer。</p>
<h2 id="33-shuffleserver禁止combine"><a class="markdownIt-Anchor" href="#33-shuffleserver禁止combine"></a> 3.3 ShuffleServer禁止combine</h2>
<p>Combine一般都是用户自定义的操作,因此禁止ShuffleServer端进行Combine操作。</p>
<h1 id="4-架构设计"><a class="markdownIt-Anchor" href="#4-架构设计"></a> 4 架构设计</h1>
<h2 id="41-remotemerge的基本流程"><a class="markdownIt-Anchor" href="#41-remotemerge的基本流程"></a> 4.1 RemoteMerge的基本流程</h2>
<img src="/images/rss/remote_merge_structure.png" width=50% height=50% text-align=center/>
<p>下面介绍一下Remote Merge的流程:</p>
<ul>
<li>(1) 注册<br />
AM/Driver调用registerShuffle方法,会额外注册keyClass, valueClass和keyComparator. 这些信息主要用于ShuffleServer在合并的时候对Record进行解析和排序。</li>
<li>(2) sendShuffleData<br />
sendShuffleData逻辑与现有的RSS任务基本保持一致。唯一区别是使用统一的序列化器和反序列化器,这样可以保证无论是哪一种框架,ShuffleServer都可以正常的解析Record.</li>
<li>(3) buffer and flush<br />
shuffle server端会将数据存在缓存中,或者通过flush manager缓存到本地文件系统或远程文件系统。这里还是复用原来的ShuffleServer的逻辑。</li>
<li>(4) reportUniqueBlocks<br />
提供了一个新的API, 即reportUniqueBlocks。Reduce端会根据map产生的block进行去重,然后将得到有效block集合通过reportUniqueBlocks发送给ShuffleServer。ShuffleServer收到有效的blocks集合后,会触发Remote Merge。Remote Merge的结果会像普通的block一样写入到bufferPool, 避免的时候会flush到磁盘中。RemoteMerge产生的结果即为普通的block,但是为了方便说明,这里称之为merged block。merged block记录的是按照key排序后的结果,因此读取merged block的时候,需要按照blockid的顺序依次递增读取。</li>
<li>(5) getSortedShuffleData<br />
Reduce会按照block序号的顺序读取merged block,然后根据一定的条件选择何时为reduce计算使用。</li>
</ul>
<h2 id="42-从record的视角分析流程"><a class="markdownIt-Anchor" href="#42-从record的视角分析流程"></a> 4.2 从Record的视角分析流程</h2>
<p>我们可以WordCount为例子解释Record在整个过程中的流转。本例子中有两个分区以及一个Reduce,即一个Reduce处理两个分区的数据。</p>
<img src="/images/rss/remote_merge_from_record_perspective.jpg" width=50% height=50% text-align=center/>
<ul>
<li>(1) MAP端<br />
Map端处理文档数据后,会进行排序。对于Map1, 由于存在两个分区,以奇数为key的record会写入到block101中,以偶数为key的record会写入到block102中。Map2同理。注意这里block中的Record都是已经排序后的。</li>
<li>(2) Map端发送数据<br />
Map端通过sendShuffleData将block发送给ShuffleServer, ShuffleServer会将其存储到bufferPool中。<br />
这里指的注意的是,在注册的时候会会注册APP1名字的app的同时,也会注册APP1@RemoteMerge的app,稍后会介绍它。</li>
<li>(3) ShuffleServer端Merge<br />
Reduce启动后,会调用reportUniqueBlocks汇报可用的block集合,同时触发ShuffleServer中对应的partition进行Merge。Merge的结果在这个分区下全局排序的Record集合。<br />
然后的问题是Merge的结果存在那里?Merge过程是在内存中发生的,每当Merge一定数量的Record后,会将这些结果写到一个新的block中。为了与原来的appid区分,这里会将这组block放在一个以"@RemoteMerge"结尾的appid进行管理。这组新的block的blockid是从1开始递增的,而且是经过全局排序的。即每个block内部的record是排序的,blockid=1的records一定小于等于blockid=2的records。</li>
<li>(4) Reduce端读<br />
根据前面的分析,Reduce端只要读取以”@RemoteMerge“结尾的appid管理的block即可。Reduce读取block的时候从blockid=1的block开始,按照blockid顺序读取即可。我们知道Reduce进行计算的时候,是按照顺序计算的。由于我们在ShuffleServer端获取的数据已经是排序后的,所以每次只需要从ShuffleServer端获取少量的数据即可,这样就实现了从ShuffleServer端按需读取,大大降低了使用内存。<br />
这里还存在两种特殊的情况,详细5.5</li>
</ul>
<h1 id="5-计划"><a class="markdownIt-Anchor" href="#5-计划"></a> 5 计划</h1>
<h2 id="51-统一序列化器"><a class="markdownIt-Anchor" href="#51-统一序列化器"></a> 5.1 统一序列化器</h2>
<p>由于需要在ShuffleServer端进行Merge, 需要提取出独立于计算框架的统一序列化器。这里提炼出两类序列化器: (1) Writable (2) Kryo。Writable序列化用于处理org.apache.hadoop.io.Writable接口的类,用于MR和TEZ框架。Kryo可以序列化绝大多数的类,一般用于Spark框架。</p>
<h2 id="52-recordsfilewriterrecordfilereader"><a class="markdownIt-Anchor" href="#52-recordsfilewriterrecordfilereader"></a> 5.2 RecordsFileWriter/RecordFileReader</h2>
<p>提供关于处理Record的抽象方法</p>
<h2 id="53-merger"><a class="markdownIt-Anchor" href="#53-merger"></a> 5.3 Merger</h2>
<p>提供基础的Merger服务,对多个数据流按照key进行merge。采用最小堆K路归并排序,对已经进行局部排序的数据流进行归并排序。</p>
<h2 id="54-mergemanager"><a class="markdownIt-Anchor" href="#54-mergemanager"></a> 5.4 MergeManager</h2>
<p>用于在服务端对Records进行Merge。</p>
<h2 id="55-rmrecordsreader"><a class="markdownIt-Anchor" href="#55-rmrecordsreader"></a> 5.5 RMRecordsReader</h2>
<p>一般来讲Reduce端在读取数据的情况,直接发给下游计算即可。但是存在两种特殊的情况:<br />
(1) 对于存在需要在Merge进行combine的情况,我们需要等待所有相同的key都达到后进行combine,然后再发给下游。<br />
(2) 对于spark和tez, reduce端可以会读取多个分区的数据。因此我们需要对多个分区的数据在reduce端再进行一次merge,然后在发给下游。<br />
RMRecordsReader是用于读取排序后数据的工具。大致的架构如下:</p>
<img src="/images/rss/rm_records_reader.png" width=50% height=50% text-align=center/>
<p>图中描述了单个Reduce处理两个分区的情况。RecordsFetcher线程会读取对应分区的block,然后解析成Records。然后发送到combineBuffers中。RecordCombiner从combineBuffer中读取Records,当某个key的所有records都收集完成,会进行combine操作。结果会发送给mergedBuffer。RecordMerge会获取所有mergedBuffer,然后在内存中再进行一次归并排序。最终得到全局排序的结果给下游使用。</p>
<h2 id="56-框架适配"><a class="markdownIt-Anchor" href="#56-框架适配"></a> 5.6 框架适配</h2>
<p>适配MR,Tez,Spark三种架构。</p>
<blockquote>
<p>笔者已使用线上任务对MR和Tez进行了大规模压测。Spark目前仅进行了一些基础的examples的测试,仍需要大量测试。</p>
</blockquote>
<h2 id="57-隔离的classloader"><a class="markdownIt-Anchor" href="#57-隔离的classloader"></a> 5.7 隔离的classloader</h2>
<p>对于不同版本的keyclass, valueclass以及comparatorclass, 使用隔离的classloader加载。</p>
<h1 id="5-特别注意"><a class="markdownIt-Anchor" href="#5-特别注意"></a> 5 特别注意</h1>
<ul>
<li>不支持spark的javardd,因为spark javardd的类型会被擦除。</li>
<li>适当提高服务器的max open file的配置。因为合并的时候可能会长时间持有文件。</li>
<li>适当降低rss.server.buffer.capacity, 因为remote merge的过程需要更多的额外内存。</li>
</ul>
</div>
<footer class="article-footer">
<a data-url="http://example.com/2023/12/25/RSS-%E8%BF%9C%E7%A8%8BMerge%E7%9A%84%E8%AE%BE%E8%AE%A1/" data-id="clzc8m5h40003hws87jo05a06" data-title="RSS-远程Merge的设计" class="article-share-link"><span class="fa fa-share">Share</span></a>
</footer>
</div>
</article>
<article id="post-RSS-远程Merge的设计EN" class="h-entry article article-type-post" itemprop="blogPost" itemscope itemtype="https://schema.org/BlogPosting">
<div class="article-meta">
<a href="/2023/12/25/RSS-%E8%BF%9C%E7%A8%8BMerge%E7%9A%84%E8%AE%BE%E8%AE%A1EN/" class="article-date">
<time class="dt-published" datetime="2023-12-25T10:58:15.000Z" itemprop="datePublished">2023-12-25</time>
</a>
</div>
<div class="article-inner">
<header class="article-header">
<h1 itemprop="name">
<a class="p-name article-title" href="/2023/12/25/RSS-%E8%BF%9C%E7%A8%8BMerge%E7%9A%84%E8%AE%BE%E8%AE%A1EN/">RSS-Remote Merge Design</a>
</h1>
</header>
<div class="e-content article-entry" itemprop="articleBody">
<h1 id="1-default-shuffle"><a class="markdownIt-Anchor" href="#1-default-shuffle"></a> 1 Default shuffle</h1>
<blockquote>
<p>Note: The first chapter briefly introduces the principle of default shuffle, with the purpose of find where local disks are used, then design remote merge. If you know enough, you can ignore this part.</p>
</blockquote>
<p>We will analyze the shuffle of MapReduce, Tez, and Spark in turn.</p>
<h2 id="11-mapreduce"><a class="markdownIt-Anchor" href="#11-mapreduce"></a> 1.1 MapReduce</h2>
<p>Map writes the record to the memory. When the memory exceeds the threshold, the memory data is spilled to the disk file, and the Record is written to the disk file in order of partitionid+key. After Map has processed all records, it will spill the data currently in memory to a disk file. Then read all the files spilled to the disk and merge them in the order of partitionid+key to get the sorted Records.</p>
<blockquote>
<p>Note: The purpose of sorting according to partitionid is that when the Reduce side obtains the data from the Map side, it should be read as sequentially as possible. For MR, Tez, and Spark, regardless of whether they are sorted or not, as long as there are partitioned, they need to be sorted according to partitionid.</p>
</blockquote>
<p>The reduce will pull the records of the corresponding partition remotely or locally from the Map, which is called MapOutput. Under normal circumstances, the memory will be used directly. If the memory exceeds the threshold, these records will be written to the disk. Then the reduce will perform merge operations on MapOutputs using minimum heap K-way merge sorting to obtain globally sorted records. During the Merge process, temporary results may be spilled to disk because the memory exceeds the threshold. In addition, if there are too many files spilled to disk, additional merges will be triggered.</p>
<h2 id="12-tez"><a class="markdownIt-Anchor" href="#12-tez"></a> 1.2 Tez</h2>
<p>There are two cases of tez: (1) ordered io (2) unordered io.</p>
<p>Ordered io is the same as MapReduce and so ignore it here.</p>
<p>Unordered io is generally used in hashjoin and other situations where keys are not required for sorting. Non-sorted io adopts a ready-to-use solution. Map writes the Record directly to the file or writes it to the file through cache. The Reduce side can also read and use it when reading data.</p>
<h2 id="13-spark"><a class="markdownIt-Anchor" href="#13-spark"></a> 1.3 Spark</h2>
<p>Spark’s shuffle is more complex and more reasonable. Some tasks do not require sort and combine, so spark users can determine the shuffle logic according to their needs.</p>
<h3 id="131-shuffle-write-operation"><a class="markdownIt-Anchor" href="#131-shuffle-write-operation"></a> 1.3.1 Shuffle write operation</h3>
<p>When writing shuffle data, three writers are supported:</p>
<ul>
<li>(1) BypassMergeSortShuffleWriter</li>
</ul>
<p>A temporary file is generated for each partition. When writing record, find the corresponding partition and write it directly to the corresponding temporary file. Then when all data is processed, these temporary files are written to a final file in order of the partitions, and the temporary files are deleted.</p>
<ul>
<li>(2) UnsafeShuffleWriter</li>
</ul>
<p>UnsafeShuffleWriter mainly implements specific logic through ShuffleExternalSorter. When writing a Record, the serialization operation is performed directly and the serialized bytes are copied to the requested memory. At the same time, the address and partition of the record will also be recorded into the memory (inMemSorter).</p>
<p>When the memory reaches the threshold, spill operation will be performed. Based on the information in memory (inMemSorter), we can easily get a Record sorted by partition and write it to a file.</p>
<p>When all Records are processed, we will spill the records currently in memory into the file. Finally, all spilled files are aggregated once. Since the previously spilled files have been sorted according to the partition, we can copy the corresponding copies of all the spilled files to the final file in the order of the partitions. The final file obtained in this way is the partition-sorted file.</p>
<ul>
<li>(3) SortShuffleWriter</li>
</ul>
<p>SortShuffleWriter mainly implements specific logic through ExternalSorter. ExternalSorter decides whether to combine and sort based on the user’s needs.</p>
<p>When writing record, it will be inserted directly into memory. If combine is required, the memory architecture is map, otherwise it is buffer.</p>
<p>If the current evaluation memory is greater than the threshold, the spill operation will be triggered. During the spill operation, the Record will be spilled to the disk. This process requires sorting. The specific comparator will use different values according to different user needs. If keyordering is set, it will be sorted by key. If keyordering is not set, but aggregator (i.e. combine) is set, the keys are sorted according to the hashcode of key, thus ensuring that the same keys are organized together to facilitate combine operations. If neither keyordering nor aggregator is set, it will be sorted according to partition.</p>
<p>When all Records are written, the spill files need to be read and merged into a globally ordered file.<br />
<br />
Comparison of three writers</p>
<table>
<thead>
<tr>
<th>writer</th>
<th>advantages</th>
<th>disadvantages</th>
<th>scene</th>
</tr>
</thead>
<tbody>
<tr>
<td>BypassMergeSortShuffleWriter</td>
<td>(1) Only serialized once. <br>(2) Using hashmap-like data structure, inserting data is fast.</td>
<td>(1) Combine and sort are not supported <br>(2) Each partition must generate a temporary file, which will generate too many temporary files.</td>
<td>Suitable for situations where the number of partitions is small (default is less than or equal to 200) and there is no combine.</td>
</tr>
<tr>
<td>UnsafeShuffleWriter</td>
<td>(1) Only serialized once. <br>(2) The number of files spilled to disk is limited and is no longer based on the number of partitions, and can support larger partitions.</td>
<td>(1) Combine, sort is not supported <br>(2) The writing order Record order will be disrupted, and supportsRelocationOfSerializedObjects is required.</td>
<td>Applicable to situations where combine does not exist, and supportsRelocationOfSerializedObjects is true, and the maximum number of supported partitions is 16777216.</td>
</tr>
<tr>
<td>SortShuffleWriter</td>
<td>(1) Supports combine, sort <br> (2) Suitable for all scenarios <br> (3) The number of files spilled to disk is limited</td>
<td>(1) Multiple serializations are required</td>
<td>Suitable for all scenarios.</td>
</tr>
</tbody>
</table>
<h3 id="132-shuffle-read"><a class="markdownIt-Anchor" href="#132-shuffle-read"></a> 1.3.2 shuffle read</h3>
<p>Currently there is only one implementation of BlockStoreShuffleReader. The implementation is similar to MapReduce.<br />
The reduce will pull the records of the corresponding partition remotely or locally from the map. Under normal circumstances, it will be written directly to the memory, but if the block size to be obtained exceeds the threshold, will use disk.<br />
Then it will be decided according to the user’s needs whether to combine or sort, and finally form a record iterator required by the user.<br />
Combine and sort use ExternalAppendOnlyMap and ExternalSorter respectively. When the memory exceeds the threshold, the data will be spilled to the local disk.</p>
<h2 id="14-summary"><a class="markdownIt-Anchor" href="#14-summary"></a> 1.4 Summary</h2>
<p>(1) About the semantics of each framework</p>
<p>For MapReduce and the ordered io of Tez, it is a special case of spark sorting. For Tez’s unordered io, it is essentially a special case of spark’s non-sorting. In essence, the semantics of each framework are the same, and spark is more general.</p>
<p>(2) Where will generate local disk files?</p>
<p>After analyzing the three computing frameworks, we learned that the following processes will use disks:</p>
<ul>
<li>(1) Map may generate intermediate temporary files because the memory exceeds the threshold.</li>
<li>(2) The map will eventually generate disk files to provide shuffle services.</li>
<li>(3) When reduce pulls records, disk files may be generated because the threshold is exceeded.</li>
<li>(4) When merging on the reduce side, temporary disk files may be generated for global sorting.</li>
</ul>
<p>In fact, uniffle has solved (1), (2). For (3), if the parameters are adjusted effectively, it is difficult to generate disk files. In fact, only (4) needs to be discussed in this article.</p>
<h1 id="2-plans"><a class="markdownIt-Anchor" href="#2-plans"></a> 2 Plans</h1>
<p>In order to solve the problem that Merge on the Reduce side may spill to disk, there are two main solutions:</p>
<ul>
<li>(1) Merge on Shuffle Server</li>
<li>(2) Reduce side Merge on demand</li>
</ul>
<h2 id="21-option-1-merge-on-shuffleserver"><a class="markdownIt-Anchor" href="#21-option-1-merge-on-shuffleserver"></a> 2.1 Option 1: Merge on ShuffleServer</h2>
<p>Move the merge process of reduce to the ShuffleServer side. ShuffleServer will merge the locally sorted Records sent from the map side into a globally sorted records sequence. The reduce side reads directly in the order of the records sequence.</p>
<ul>
<li>Advantages: Does not require too much memory and network RPC.</li>
<li>Disadvantages: Shuffle Server needs to parse Key, Value and Comparator. The Shuffle side cannot combine.</li>
</ul>
<h2 id="22-option-2-on-demand-merge-on-the-reduce-side"><a class="markdownIt-Anchor" href="#22-option-2-on-demand-merge-on-the-reduce-side"></a> 2.2 Option 2: On-demand Merge on the Reduce side</h2>
<img src="/images/rss/on_demand_merge.png" width=50% height=50% text-align=center/>
Since the memory on the reduce side is limited, in order to avoid spilling data to disk when merging on the reduce side. When reduce obtains segment, it can only read part of the buffer of each segment, and then merge all the buffers. Then when the partial buffer reading of a certain segment is completed, the next buffer of this segment will continue to be read, and this buffer will continue to be added to the merge process.
There is a problem with this. The number of times the Reduce side reads data from ShuffleServer is approximately segments_num * (segment_size / buffer_size), which is a large value for large tasks. Too many RPCs means decreased performance.
<blockquote>
<p>The segment here refers to the sorted record collection, which can be understood as the block in which the records have been sorted according to key.</p>
</blockquote>
<ul>
<li>Advantages: Shuffle Server does not need to do anything extra.</li>
<li>Disadvantages: Too many RPCs.</li>
</ul>
<p>**This article chooses option 1, and the following content mainly discusses option 1. **</p>
<h1 id="3-demand-analysis"><a class="markdownIt-Anchor" href="#3-demand-analysis"></a> 3 Demand analysis</h1>
<h2 id="31-what-types-of-tasks-require-remote-merge"><a class="markdownIt-Anchor" href="#31-what-types-of-tasks-require-remote-merge"></a> 3.1 What types of tasks require remote merge?</h2>
<p>Currently, uniffle’s map-side no longer spill disk. This article mainly considers the situation on the reduce. Mainly divided into the following situations:</p>
<ul>
<li>(1) For spark’s non-sorted, non-aggregated, tez unordered io. It does not require any global aggregation and sorting operations, and only requires very little memory. The current version of uniffle will not use disk if related settings are reasonable. Just use the current uniffle solution. This article will not discuss this aspect.</li>
<li>(2) For spark sorting or aggregation tasks, tez ordered io, mapreduce, due to the need for global sorting or aggregation, the memory may not be enough, and the record may be spilled to the disk. This article mainly discusses this situation.<br />
**In summary, it can be seen that remote merge is only used for shuffles that require sorting or aggregation. **</li>
</ul>
<h2 id="32-how-does-shuffleserver-sort"><a class="markdownIt-Anchor" href="#32-how-does-shuffleserver-sort"></a> 3.2 How does ShuffleServer sort?</h2>
<p>For sorting, map is generally sorted to obtain a set of partially sorted records, which is called segment here. Then reduce will obtain all segments and merge them. Spark, MR, and Tez all use minimum heap K-way merge sorting. This method can still be used for remote sorting.</p>
<p>BufferManager and FlushManager maintain block information in memory and disk. We only need to add MergeManager to ShuffleServer and merge the blocks under the same Shuffle to obtain globally sorted Records.</p>
<p>Introducing sorting on the ShuffleServer produces a side effect: the Shuffle’s KeyClass, ValueClass and KeyComparator need to be passed to ShuffleServer.</p>
<h2 id="33-how-does-shuffleserver-combine"><a class="markdownIt-Anchor" href="#33-how-does-shuffleserver-combine"></a> 3.3 How does ShuffleServer combine?</h2>
<p>Combine is generally a user-defined operation, so ShuffleServer is prohibited from performing combine operations. If combine is performed on the Reduce side, wouldn’t it violate our theme of avoiding spill to disk on the task side? In fact we don’t have to use ExternalAppendOnlyMap for combine. If the Records obtained from ShuffleServer are sorted by key, it means that the same keys have been organized together, and only a small amount of memory is needed to combine.</p>
<h2 id="34-how-does-writer-write"><a class="markdownIt-Anchor" href="#34-how-does-writer-write"></a> 3.4 How does Writer write?</h2>
<p>Just write it the way we have it.</p>
<h2 id="35-how-does-reader-read"><a class="markdownIt-Anchor" href="#35-how-does-reader-read"></a> 3.5 How does Reader read?</h2>
<p>Currently, Uniffle’s shuffle reader uses blockid as the read mark, which makes it easy to verify whether an accurate and complete records are obtained. For remote merge, MergeManager has merged the original Block collection into a new sequence sorted records by key. Therefore, the blockid generated by the map segment cannot be used:<br />
We will use a new way to read Records. When MergerManager performs global Merge, an index will be generated. Reader will read according to this index.</p>
<blockquote>
<p>Note: In principle, using key as a read index is more semantic, and the first version of the demo program was also implemented by this way. However, this proposal was not friendly enough to deal with the problem of data skew, so gave up the plan.</p>
</blockquote>
<h1 id="4-scheme-design"><a class="markdownIt-Anchor" href="#4-scheme-design"></a> 4 Scheme Design</h1>
<h2 id="41-basic-procedures-for-remotemerge"><a class="markdownIt-Anchor" href="#41-basic-procedures-for-remotemerge"></a> 4.1 Basic procedures for RemoteMerge</h2>
<img src="/images/rss/remote_merge_structure.png" width=50% height=50% text-align=center/>
<p>The following introduces the process of Remote Merge:</p>
<ul>
<li>(1) Register<br />
When AM/Driver calls the registerShuffle method, it will additionally register keyClass, valueClass and keyComparator. This information is mainly used by ShuffleServer to parse and sort the Record during merging.</li>
<li>(2) sendShuffleData<br />
The sendShuffleData logic is basically consistent with existing RSS tasks. The only difference is the use of unified serializers and deserializers, which ensures that ShuffleServer can parse the Record normally no matter which framework it is.</li>
<li>(3) buffer and flush<br />
The shuffle server will store the data in the cache, or cache it to the local file system or remote file system through the flush manager. The logic of the original ShuffleServer is still reused here.</li>
<li>(4) reportUniqueBlocks<br />
A new API is provided, reportUniqueBlocks. The Reduce end will deduplicate the blocks generated by the map, and then send the valid block set to ShuffleServer through reportUniqueBlocks. After ShuffleServer receives a valid blocks collection, it will trigger Remote Merge. The results of Remote Merge will be written to the bufferPool like a normal block, and may be flushed to disk when necessary. The result generated by RemoteMerge is an ordinary block, but for convenience of explanation, it is called merged block here. The merged block records the results sorted by key, so when reading the merged block, you need to read it in ascending order in the order of blockid.</li>
<li>(5) getSortedShuffleData<br />
Reduce will read merged blocks in the order of block serial numbers, and then choose when to use them for reduce calculations based on certain conditions.</li>
</ul>
<h2 id="42-analyze-the-process-from-the-perspective-of-record"><a class="markdownIt-Anchor" href="#42-analyze-the-process-from-the-perspective-of-record"></a> 4.2 Analyze the process from the perspective of Record</h2>
<p>We can use WordCount as an example to explain the flow of record in the entire process. In this example, there are two partitions and one reduce, that is, one reduce processes the data of two partitions.</p>
<img src="/images/rss/remote_merge_from_record_perspective.jpg" width=50% height=50% text-align=center/>
<ul>
<li>(1) MAP side<br />
After the document data is processed on the map side, it will be sorted. For Map1, since there are two partitions, records with odd numbers as keys will be written to block101, and records with even numbers as keys will be written to block102. The same goes for Map2. Note that the Records in the block here are all sorted.</li>
<li>(2) Map side sends data<br />
The map side sends the block to ShuffleServer through sendShuffleData, and ShuffleServer stores it in the bufferPool.<br />
What I mean here is that when registering, the app named APP1 will also be registered, and the app named APP1@RemoteMerge will also be registered, which will be introduced later.</li>
<li>(3) ShuffleServer side Merge<br />
After reduce is started, reportUniqueBlocks will be called to report the available block set, and the corresponding partition in ShuffleServer will be triggered to merge. The result of Merge is a globally sorted record collection under this partition.<br />
Then the question is where are the results of merge stored? The merge process occurs in memory. Whenever a certain number of records are merged, the results are written to a new block. In order to distinguish it from the original appid, this group of blocks will be managed in an appid ending with “@RemoteMerge”. The blockid of this new group of blocks increases from 1 and is sorted globally. That is, the records inside each block are sorted, and the records with blockid=1 must be less than or equal to the records with blockid=2.</li>
<li>(4) Reduce end reading<br />
According to the previous analysis, the reduce side only needs to read the block managed by the appid ending with “@RemoteMerge”. When reduce reads a block, it starts from the block with blockid=1 and reads in the order of blockid. We know that when reduce performs calculations, it is calculated in order. Since the data we obtain on the ShuffleServer side is already sorted, we only need to obtain a small amount of data from the ShuffleServer side each time. This enables on-demand reading from the ShuffleServer side, which greatly reduces memory usage.<br />
There are two special situations here, detailed in 5.5.</li>
</ul>
<h1 id="5-plan"><a class="markdownIt-Anchor" href="#5-plan"></a> 5 Plan</h1>
<h2 id="51-unified-serializer"><a class="markdownIt-Anchor" href="#51-unified-serializer"></a> 5.1 Unified serializer</h2>
<p>Since Merge needs to be performed on the ShuffleServer side, a unified serializer that is independent of the computing framework needs to be extracted. Two types of serializers are extracted here: (1) Writable (2) Kryo. Writable serialization is used for classes that handle the org.apache.hadoop.io.Writable interface, used in the MR and TEZ frameworks. Kryo can serialize most classes and is generally used in the Spark framework.</p>
<h2 id="52-recordsfilewriterrecordfilereader"><a class="markdownIt-Anchor" href="#52-recordsfilewriterrecordfilereader"></a> 5.2 RecordsFileWriter/RecordFileReader</h2>
<p>Provides abstract methods for processing Records</p>
<h2 id="53-merger"><a class="markdownIt-Anchor" href="#53-merger"></a> 5.3 Merger</h2>
<p>Provides basic Merger service to merge multiple data streams according to key. Minimum heap K-way merge sorting is used to merge and sort the data streams that have been partially sorted.</p>
<h2 id="54-mergemanager"><a class="markdownIt-Anchor" href="#54-mergemanager"></a> 5.4 MergeManager</h2>
<p>Used to merge Records on the server side.</p>
<h2 id="55-tools-for-reading-sorted-data"><a class="markdownIt-Anchor" href="#55-tools-for-reading-sorted-data"></a> 5.5 Tools for reading sorted data</h2>
<p>Generally speaking, when the Reduce side reads data, it can be sent directly to downstream calculations. But there are two special situations:<br />