-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathsorting.f90
275 lines (218 loc) · 6.66 KB
/
sorting.f90
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
module m_sorting
implicit none
contains
recursive subroutine quicksort(x, lo, hi)
implicit none
real, dimension(:), intent(inout) :: x
integer :: lo, hi, i, N
N = size(X)
i = split(X, lo, hi)
if (i-1 > lo) then
call quicksort(x, lo, i-1);
end if
if (i+1 < hi) then
call quicksort(x, i+1, hi);
end if
end subroutine quicksort
integer function split(x, lo, hi)
implicit none
real, dimension(:), intent(inout) :: x
integer, intent(in) :: lo, hi
integer :: i, j, piv
real :: val, rnd
! pick piv , with lo ≤ piv ≤ hi;
call random_number(rnd)
piv = lo+anint(rnd*(hi-lo))
val = X(piv)
call swap(x(piv), x(hi))
i = lo
do j = lo, hi-1
if (x(j) < val) then
call swap(x(i), x(j))
i = i+1
end if
end do
call swap(x(i) , x(hi) )
split = i
end function split
subroutine swap(x, y)
implicit none
real, intent(inout) :: x, y
real :: tmp
tmp=x
x=y
y=tmp
end subroutine swap
! given an array X and integers a <= b <= c s.t.
! X(a:b-1) and X(b:c) are sorted already, sorts
! X(a:c) using merge-sort.
! tmp should be a real array allocated with at least c-a+1 elements,
! it's contents are arbitrary on input and output.
subroutine merge(x, a, b, c, tmp)
real, dimension(:), intent(inout) :: X
integer, intent(in) :: a, b, c
integer :: i, j, k
real, dimension(:), intent(out) :: tmp
if (a >= b .or. b > c) then ! one of the parts is empty
return
end if
i = a !index for range [a,b-1]
j = b !index for range [b,c]
k = 1 !index for tmp
! Compare values as long as no part is empty
do while (i < b .and. j <= c)
if (x(i) < x(j)) then
tmp(k) = x(i)
i = i+1
else
tmp(k) = x(j)
j = j+1
end if
k = k+1
end do
! Copy the remaining values
if (i >= b) then
tmp(k:k+(c-j)) = x(j:c)
end if
if (j > c) then
tmp(k:k+(b-i)) = x(i:b)
end if
! Copy the values back into x
X(a:c) = tmp(1:c-a+1)
end subroutine merge
! This subroutine sorts the array x for
! the index range start(1) <= i < start(p), p:=size(start).
!
! On input, subranges start(k) <= i < start(k+1) are assumed
! to have been sorted already, for k = 1,2,...,p-1.
!
subroutine mergeparts(x, start)
implicit none
real, dimension(:), intent(inout) :: x
integer, dimension(:), intent(in) :: start
integer :: p, nparts, i, k
integer, allocatable, dimension(:) :: cstart
real, allocatable, dimension(:) :: tmp
p = size(start)
allocate(tmp(start(p)-start(1)+1))
! Initialize current data
nparts= p-1 ! current number of parts
allocate(cstart(p)) ! current start
cstart(:) = start(:)
do while (nparts > 1)
!write(*,*) 'nparts=',nparts
!write(*,*) 'cstart=',cstart(1:nparts+1)
! Merge pairs of parts
do i=1,nparts/2
!write(*,*) 'merge ', cstart(2*i-1), cstart(2*i),cstart(2*i+1)
call merge(x, cstart(2*i-1), cstart(2*i),cstart(2*i+1)-1, tmp)
end do
do i=0,nparts/2-1
cstart(i+1)=cstart(2*i+1)
end do
if (modulo(nparts,2)==1) then
cstart((nparts+1)/2)=cstart(nparts)
end if
nparts = (nparts+1)/2
cstart(nparts+1) = start(p)
end do
end subroutine mergeparts
!!
subroutine sort_coarray(X, nloc, nloc_out)
implicit none
integer, intent(in) :: nloc
integer, intent(inout) :: nloc_out
real(kind=4), dimension(nloc_out), codimension[*], intent(inout) :: x
integer :: me, nproc, i, t, count, offset
real, dimension(:), allocatable, save :: sample[:]
real, dimension(:), allocatable :: all_samples, splitval, x_buf
integer, dimension(:), allocatable :: start
! counts(s)[t] is the number of elements process t receives from process s
! offset(s)[t] indicates that process t will obtain those values starting from
! position offset(s)[t] on process s.
integer, allocatable, save :: counts(:)[:], offsets(:)[:]
me = this_image()
nproc = num_images()
! We assume that the local block size is a multiple of nproc^2 for this algorithm,
if (modulo(nloc, nproc*nproc) /= 0) then
stop 'sort_coarray only works if the local block size is a multiple of nproc^2'
end if
! note: Fortran auto-deallocates arrays at the end of a subroutine,
! so whenever we call this we have to allocate the arrays:
allocate(start(nproc+1))
allocate(splitval(nproc+1))
allocate(all_samples(nproc*nproc))
! co-arrays are different because allocation/deallocation
! overhead is substantial: Therefore, the standard requires
! them to be declared 'save', which means that they persist
! between calls.
if (.not. allocated(sample)) then
allocate(sample(nproc)[*])
allocate(counts(nproc)[*])
allocate(offsets(nproc)[*])
end if
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
!! SUPERSTEP 0: sort local block and create samples !!
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
call quicksort(x, 1, nloc)
do i=1,nproc
sample(i) = x((i-1)*(nloc/nproc)+1)
end do
sync all
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
!! SUPERSTEP 1-2: allgather the samples and sort them !!
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
do t=1,nproc
all_samples((t-1)*nproc+1:t*nproc) = sample(1:nproc)[t]
start(t) = (t-1)*nproc+1
end do
start(nproc+1) = nproc*nproc+1
! note that we don't need to sync because we used a 'get'
!! Sort the samples: We can exploit that the intervals start[t]:start[t+1]-1
! are already sorted by using mergeparts instead of quicksort.
call mergeparts(all_samples, start)
!! Create splitters: elements s.t. splitval(t) <= X(i) < splitval(t+1) are sent to process t
do t= 1,nproc
splitval(t) = all_samples((t-1)*nproc+1)
end do
! 'huge' returns the largest possible value for the data type of x
splitval(nproc+1) = huge(x)
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
!! SUPERSTEP 3: Split the local block and send the resulting parts !!
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
i=1
do t=1,nproc
offset= i ! index of first value to be sent
count = 0 ! number of elements to be sent to process t
do while (i<=nloc .and. x(i)<splitval(t+1))
count=count+1
i=i+1
end do
! this way the other process knows how much memory to allocate
! and get from me:
counts(me)[t] = count
! and the offset where it should get from:
offsets(me)[t] = offset
end do
sync all
! get the values from all other processes and store them in the vararray
count = sum(counts(:))
allocate(x_buf(count))
i=1
do t=1,nproc
x_buf(i:i+counts(t)-1) = X(offsets(t):offsets(t)+counts(t)-1)[t]
i=i+counts(t)
end do
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
!! SUPERSTEP 4: Sort the received parts !!
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
sync all
start(1) = 1
do t=1,nproc
start(t+1) = start(t) + counts(t)
end do
nloc_out = count
x(1:count) = x_buf
call mergeparts(x, start)
end subroutine sort_coarray
end module m_sorting