diff --git a/.gitignore b/.gitignore index 143de5d..b6c97dc 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,4 @@ dist *.egg-info .vscode venv +.DS_Store diff --git a/PTMCMCSampler/PTMCMCSampler.py b/PTMCMCSampler/PTMCMCSampler.py index ef8a705..4aa1fee 100755 --- a/PTMCMCSampler/PTMCMCSampler.py +++ b/PTMCMCSampler/PTMCMCSampler.py @@ -22,6 +22,19 @@ pass +def shift_array(arr, num, fill_value=0.0): + result = np.empty_like(arr) + if num > 0: + result[:num] = fill_value + result[num:] = arr[:-num] + elif num < 0: + result[num:] = fill_value + result[:num] = arr[-num:] + else: + result[:] = arr + return result + + class PTSampler(object): """ @@ -74,6 +87,7 @@ def __init__( outDir="./chains", verbose=True, resume=False, + seed=None, ): # MPI initialization @@ -81,6 +95,14 @@ def __init__( self.MPIrank = self.comm.Get_rank() self.nchain = self.comm.Get_size() + if self.MPIrank == 0: + ss = np.random.SeedSequence(seed) + child_seeds = ss.generate_state(self.nchain) + self.stream = [np.random.default_rng(s) for s in child_seeds] + else: + self.stream = None + self.stream = comm.scatter(self.stream, root=0) + self.ndim = ndim self.logl = _function_wrapper(logl, loglargs, loglkwargs) self.logp = _function_wrapper(logp, logpargs, logpkwargs) @@ -146,7 +168,7 @@ def initialize( NUTSweight=20, HMCweight=20, MALAweight=0, - burn=10000, + burn=50000, HMCstepsize=0.1, HMCsteps=300, maxIter=None, @@ -165,7 +187,7 @@ def initialize( """ # get maximum number of iteration if maxIter is None and self.MPIrank > 0: - maxIter = 2 * Niter + maxIter = Niter elif maxIter is None and self.MPIrank == 0: maxIter = Niter @@ -192,9 +214,8 @@ def initialize( self.nswap_accepted = 0 # set up covariance matrix and DE buffers - # TODO: better way of allocating this to save memory if self.MPIrank == 0: - self._AMbuffer = np.zeros((self.Niter, self.ndim)) + self._AMbuffer = np.zeros((self.covUpdate, self.ndim)) self._DEbuffer = np.zeros((self.burn, self.ndim)) # ##### setup default jump proposal distributions ##### # @@ -288,7 +309,7 @@ def updateChains(self, p0, lnlike0, lnprob0, iter): """ # update buffer if self.MPIrank == 0: - self._AMbuffer[iter, :] = p0 + self._AMbuffer[iter % self.covUpdate, :] = p0 # put results into arrays if iter % self.thin == 0: @@ -368,7 +389,7 @@ def sample( # get maximum number of iteration if maxIter is None and self.MPIrank > 0: - maxIter = 2 * Niter + maxIter = Niter elif maxIter is None and self.MPIrank == 0: maxIter = Niter @@ -431,7 +452,7 @@ def sample( Neff = 0 while runComplete is False: iter += 1 - + self.comm.barrier() # make sure all processes are at the same iteration # call PTMCMCOneStep p0, lnlike0, lnprob0 = self.PTMCMCOneStep(p0, lnlike0, lnprob0, iter) @@ -440,9 +461,7 @@ def sample( try: Neff = iter / max( 1, - np.nanmax( - [acor.acor(self._AMbuffer[self.burn : (iter - 1), ii])[0] for ii in range(self.ndim)] - ), + np.nanmax([acor.acor(self._chain[self.burn : (iter - 1), ii])[0] for ii in range(self.ndim)]), ) # print('\n {0} effective samples'.format(Neff)) except NameError: @@ -461,14 +480,7 @@ def sample( print("\nRun Complete with {0} effective samples".format(int(Neff))) runComplete = True - if self.MPIrank == 0 and runComplete: - for jj in range(1, self.nchain): - self.comm.send(runComplete, dest=jj, tag=55) - - # check for other chains - if self.MPIrank > 0: - runComplete = self.comm.Iprobe(source=0, tag=55) - time.sleep(0.000001) # trick to get around + runComplete = self.comm.bcast(runComplete, root=0) def PTMCMCOneStep(self, p0, lnlike0, lnprob0, iter): """ @@ -491,10 +503,8 @@ def PTMCMCOneStep(self, p0, lnlike0, lnprob0, iter): # broadcast to other chains [self.comm.send(self.cov, dest=rank + 1, tag=111) for rank in range(self.nchain - 1)] - # check for sent covariance matrix from T = 0 chain - getCovariance = self.comm.Iprobe(source=0, tag=111) - time.sleep(0.000001) - if getCovariance and self.MPIrank > 0: + # update covariance matrix + if (iter - 1) % self.covUpdate == 0 and (iter - 1) != 0 and self.MPIrank > 0: self.cov[:, :] = self.comm.recv(source=0, tag=111) for ct, group in enumerate(self.groups): covgroup = np.zeros((len(group), len(group))) @@ -503,7 +513,6 @@ def PTMCMCOneStep(self, p0, lnlike0, lnprob0, iter): covgroup[ii, jj] = self.cov[group[ii], group[jj]] self.U[ct], self.S[ct], v = np.linalg.svd(covgroup) - getCovariance = 0 # update DE buffer if (iter - 1) % self.burn == 0 and (iter - 1) != 0 and self.MPIrank == 0: @@ -512,11 +521,8 @@ def PTMCMCOneStep(self, p0, lnlike0, lnprob0, iter): # broadcast to other chains [self.comm.send(self._DEbuffer, dest=rank + 1, tag=222) for rank in range(self.nchain - 1)] - # check for sent DE buffer from T = 0 chain - getDEbuf = self.comm.Iprobe(source=0, tag=222) - time.sleep(0.000001) - - if getDEbuf and self.MPIrank > 0: + # update DE buffer + if (iter - 1) % self.burn == 0 and (iter - 1) != 0 and self.MPIrank > 0: self._DEbuffer = self.comm.recv(source=0, tag=222) # randomize cycle @@ -524,9 +530,6 @@ def PTMCMCOneStep(self, p0, lnlike0, lnprob0, iter): self.addProposalToCycle(self.DEJump, self.DEweight) self.randomizeProposalCycle() - # reset - getDEbuf = 0 - # after burn in, add DE jumps if (iter - 1) == self.burn and self.MPIrank == 0: if self.verbose: @@ -562,7 +565,7 @@ def PTMCMCOneStep(self, p0, lnlike0, lnprob0, iter): # hastings step diff = newlnprob - lnprob0 + qxy - if diff > np.log(np.random.rand()): + if diff > np.log(self.stream.random()): # accept jump p0, lnlike0, lnprob0 = y, newlnlike, newlnprob @@ -570,15 +573,9 @@ def PTMCMCOneStep(self, p0, lnlike0, lnprob0, iter): # update acceptance counter self.naccepted += 1 self.jumpDict[jump_name][1] += 1 - # temperature swap - swapReturn, p0, lnlike0, lnprob0 = self.PTswap(p0, lnlike0, lnprob0, iter) - - # check return value - if swapReturn != 0: - self.swapProposed += 1 - if swapReturn == 2: - self.nswap_accepted += 1 + if iter % self.Tskip == 0 and self.nchain > 1: + p0, lnlike0, lnprob0 = self.PTswap(p0, lnlike0, lnprob0, iter) self.updateChains(p0, lnlike0, lnprob0, iter) @@ -601,92 +598,46 @@ def PTswap(self, p0, lnlike0, lnprob0, iter): @return lnlike0: new log-likelihood @return lnprob0: new log posterior value + Repurposed from Neil Cornish/Bence Becsy's code: """ + Ts = self.ladder - # initialize variables - readyToSwap = 0 - swapAccepted = 0 - swapProposed = 0 - - # if Tskip is reached, block until next chain in ladder is ready for - # swap proposal - if iter % self.Tskip == 0 and self.MPIrank < self.nchain - 1: - swapProposed = 1 - - # send current likelihood for swap proposal - self.comm.send(lnlike0, dest=self.MPIrank + 1, tag=18) - - # determine if swap was accepted - swapAccepted = self.comm.recv(source=self.MPIrank + 1, tag=888) + log_Ls = self.comm.gather(lnlike0, root=0) # list of likelihoods from each chain + p0s = self.comm.gather(p0, root=0) # list of parameter arrays from each chain - # perform swap - if swapAccepted: - - # exchange likelihood - lnlike0 = self.comm.recv(source=self.MPIrank + 1, tag=18) - - # exchange parameters - pnew = np.empty(self.ndim) - self.comm.Sendrecv( - p0, dest=self.MPIrank + 1, sendtag=19, recvbuf=pnew, source=self.MPIrank + 1, recvtag=19 - ) - p0 = pnew - - # calculate new posterior values - lnprob0 = 1 / self.temp * lnlike0 + self.logp(p0) - - # check if next lowest temperature is ready to swap - elif self.MPIrank > 0: - - readyToSwap = self.comm.Iprobe(source=self.MPIrank - 1, tag=18) - # trick to get around processor using 100% cpu while waiting - time.sleep(0.000001) - - # hotter chain decides acceptance - if readyToSwap: - newlnlike = self.comm.recv(source=self.MPIrank - 1, tag=18) - - # determine if swap is accepted and tell other chain - logChainSwap = (1 / self.ladder[self.MPIrank - 1] - 1 / self.ladder[self.MPIrank]) * ( - lnlike0 - newlnlike - ) - - if logChainSwap > np.log(np.random.rand()): - swapAccepted = 1 + if self.MPIrank == 0: + # set up map to help keep track of swaps + swap_map = list(range(self.nchain)) + + # loop through and propose a swap at each chain (starting from hottest chain and going down in T) + # and keep track of results in swap_map + for swap_chain in reversed(range(self.nchain - 1)): + log_acc_ratio = -log_Ls[swap_map[swap_chain]] / Ts[swap_chain] + log_acc_ratio += -log_Ls[swap_map[swap_chain + 1]] / Ts[swap_chain + 1] + log_acc_ratio += log_Ls[swap_map[swap_chain + 1]] / Ts[swap_chain] + log_acc_ratio += log_Ls[swap_map[swap_chain]] / Ts[swap_chain + 1] + + acc_ratio = np.exp(log_acc_ratio) + if self.stream.uniform() <= acc_ratio: + swap_map[swap_chain], swap_map[swap_chain + 1] = swap_map[swap_chain + 1], swap_map[swap_chain] + self.nswap_accepted += 1 + self.swapProposed += 1 else: - swapAccepted = 0 - - # send out result - self.comm.send(swapAccepted, dest=self.MPIrank - 1, tag=888) + self.swapProposed += 1 - # perform swap - if swapAccepted: + # loop through the chains and record the new samples and log_Ls + for j in range(self.nchain): + p0s[j] = p0s[swap_map[j]] + log_Ls[j] = log_Ls[swap_map[j]] - # exchange likelihood - self.comm.send(lnlike0, dest=self.MPIrank - 1, tag=18) - lnlike0 = newlnlike + # broadcast the new samples and log_Ls to all chains + p0 = self.comm.scatter(p0s, root=0) + lnlike0 = self.comm.scatter(log_Ls, root=0) - # exchange parameters - pnew = np.empty(self.ndim) - self.comm.Sendrecv( - p0, dest=self.MPIrank - 1, sendtag=19, recvbuf=pnew, source=self.MPIrank - 1, recvtag=19 - ) - p0 = pnew + # calculate new posterior values + lnprob0 = 1 / self.temp * lnlike0 + self.logp(p0) - # calculate new posterior values - lnprob0 = 1 / self.temp * lnlike0 + self.logp(p0) - - # Return values for colder chain: 0=nothing happened; 1=swap proposed, - # not accepted; 2=swap proposed & accepted - if swapProposed: - if swapAccepted: - swapReturn = 2 - else: - swapReturn = 1 - else: - swapReturn = 0 - - return swapReturn, p0, lnlike0, lnprob0 + return p0, lnlike0, lnprob0 def temperatureLadder(self, Tmin, Tmax=None, tstep=None): """ @@ -776,10 +727,10 @@ def _updateRecursive(self, iter, mem): it += 1 for jj in range(ndim): - diff[jj] = self._AMbuffer[iter - mem + ii, jj] - self.mu[jj] + diff[jj] = self._AMbuffer[ii, jj] - self.mu[jj] self.mu[jj] += diff[jj] / it - self.M2 += np.outer(diff, (self._AMbuffer[iter - mem + ii, :] - self.mu)) + self.M2 += np.outer(diff, (self._AMbuffer[ii, :] - self.mu)) self.cov[:, :] = self.M2 / (it - 1) @@ -803,7 +754,8 @@ def _updateDEbuffer(self, iter, burn): """ - self._DEbuffer = self._AMbuffer[iter - burn : iter] + self._DEbuffer = shift_array(self._DEbuffer, -len(self._AMbuffer)) # shift DEbuffer to the left + self._DEbuffer[-len(self._AMbuffer) :] = self._AMbuffer # add new samples to the new empty spaces # SCAM jump def covarianceJumpProposalSCAM(self, x, iter, beta): @@ -825,11 +777,11 @@ def covarianceJumpProposalSCAM(self, x, iter, beta): qxy = 0 # choose group - jumpind = np.random.randint(0, len(self.groups)) + jumpind = self.stream.integers(0, len(self.groups)) ndim = len(self.groups[jumpind]) # adjust step size - prob = np.random.rand() + prob = self.stream.random() # large jump if prob > 0.97: @@ -846,8 +798,6 @@ def covarianceJumpProposalSCAM(self, x, iter, beta): else: scale = 1.0 - # scale = np.random.uniform(0.5, 10) - # adjust scale based on temperature if self.temp <= 100: scale *= np.sqrt(self.temp) @@ -856,14 +806,12 @@ def covarianceJumpProposalSCAM(self, x, iter, beta): # y = np.dot(self.U.T, x[self.covinds]) # make correlated componentwise adaptive jump - ind = np.unique(np.random.randint(0, ndim, 1)) + ind = np.unique(self.stream.integers(0, ndim, 1)) neff = len(ind) cd = 2.4 / np.sqrt(2 * neff) * scale - # y[ind] = y[ind] + np.random.randn(neff) * cd * np.sqrt(self.S[ind]) - # q[self.covinds] = np.dot(self.U, y) q[self.groups[jumpind]] += ( - np.random.randn() * cd * np.sqrt(self.S[jumpind][ind]) * self.U[jumpind][:, ind].flatten() + self.stream.standard_normal() * cd * np.sqrt(self.S[jumpind][ind]) * self.U[jumpind][:, ind].flatten() ) return q, qxy @@ -887,10 +835,10 @@ def covarianceJumpProposalAM(self, x, iter, beta): qxy = 0 # choose group - jumpind = np.random.randint(0, len(self.groups)) + jumpind = self.stream.integers(0, len(self.groups)) # adjust step size - prob = np.random.rand() + prob = self.stream.random() # large jump if prob > 0.97: @@ -920,7 +868,7 @@ def covarianceJumpProposalAM(self, x, iter, beta): neff = len(ind) cd = 2.4 / np.sqrt(2 * neff) * scale - y[ind] = y[ind] + np.random.randn(neff) * cd * np.sqrt(self.S[jumpind][ind]) + y[ind] = y[ind] + self.stream.standard_normal(neff) * cd * np.sqrt(self.S[jumpind][ind]) q[self.groups[jumpind]] = np.dot(self.U[jumpind], y) return q, qxy @@ -945,28 +893,28 @@ def DEJump(self, x, iter, beta): qxy = 0 # choose group - jumpind = np.random.randint(0, len(self.groups)) + jumpind = self.stream.integers(0, len(self.groups)) ndim = len(self.groups[jumpind]) bufsize = len(self._DEbuffer) # draw a random integer from 0 - iter - mm = np.random.randint(0, bufsize) - nn = np.random.randint(0, bufsize) + mm = self.stream.integers(0, bufsize) + nn = self.stream.integers(0, bufsize) # make sure mm and nn are not the same iteration while mm == nn: - nn = np.random.randint(0, bufsize) + nn = self.stream.integers(0, bufsize) # get jump scale size - prob = np.random.rand() + prob = self.stream.random() # mode jump if prob > 0.5: scale = 1.0 else: - scale = np.random.rand() * 2.4 / np.sqrt(2 * ndim) * np.sqrt(1 / beta) + scale = self.stream.random() * 2.4 / np.sqrt(2 * ndim) * np.sqrt(1 / beta) for ii in range(ndim): @@ -1033,7 +981,7 @@ def randomizeProposalCycle(self): # get random integers index = np.arange(length) - np.random.shuffle(index) + self.stream.shuffle(index) # randomize proposal cycle self.randomizedPropCycle = [self.propCycle[ind] for ind in index] @@ -1049,7 +997,7 @@ def _jump(self, x, iter): length = len(self.propCycle) # call function - ind = np.random.randint(0, length) + ind = self.stream.integers(0, length) q, qxy = self.propCycle[ind](x, iter, 1 / self.temp) # axuilary jump