Skip to content

Commit

Permalink
[query] Remove BlockMatrix persist from Backend interface
Browse files Browse the repository at this point in the history
  • Loading branch information
ehigham committed Oct 29, 2024
1 parent 539e25f commit 6a100de
Show file tree
Hide file tree
Showing 11 changed files with 73 additions and 94 deletions.
12 changes: 1 addition & 11 deletions hail/src/main/scala/is/hail/backend/Backend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@ import is.hail.io.{BufferSpec, TypedCodecSpec}
import is.hail.io.fs._
import is.hail.io.plink.LoadPlink
import is.hail.io.vcf.LoadVCF
import is.hail.linalg.BlockMatrix
import is.hail.types._
import is.hail.types.encoded.EType
import is.hail.types.physical.PTuple
import is.hail.types.virtual.{BlockMatrixType, TFloat64}
import is.hail.types.virtual.TFloat64
import is.hail.utils._
import is.hail.variant.ReferenceGenome

Expand Down Expand Up @@ -90,15 +89,6 @@ abstract class Backend extends Closeable {

def broadcast[T: ClassTag](value: T): BroadcastValue[T]

def persist(backendContext: BackendContext, id: String, value: BlockMatrix, storageLevel: String)
: Unit

def unpersist(backendContext: BackendContext, id: String): Unit

def getPersistedBlockMatrix(backendContext: BackendContext, id: String): BlockMatrix

def getPersistedBlockMatrixType(backendContext: BackendContext, id: String): BlockMatrixType

def parallelizeAndComputeWithIndex(
backendContext: BackendContext,
fs: FS,
Expand Down
6 changes: 6 additions & 0 deletions hail/src/main/scala/is/hail/backend/ExecuteContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import is.hail.asm4s.HailClassLoader
import is.hail.backend.local.LocalTaskContext
import is.hail.expr.ir.lowering.IrMetadata
import is.hail.io.fs.FS
import is.hail.linalg.BlockMatrix
import is.hail.utils._
import is.hail.variant.ReferenceGenome

Expand Down Expand Up @@ -71,6 +72,7 @@ object ExecuteContext {
flags: HailFeatureFlags,
backendContext: BackendContext,
irMetadata: IrMetadata,
blockMatrixCache: mutable.Map[String, BlockMatrix],
)(
f: ExecuteContext => T
): T = {
Expand All @@ -89,6 +91,7 @@ object ExecuteContext {
flags,
backendContext,
irMetadata,
blockMatrixCache,
))(f(_))
}
}
Expand Down Expand Up @@ -118,6 +121,7 @@ class ExecuteContext(
val flags: HailFeatureFlags,
val backendContext: BackendContext,
val irMetadata: IrMetadata,
val BlockMatrixCache: mutable.Map[String, BlockMatrix],
) extends Closeable {

val rngNonce: Long =
Expand Down Expand Up @@ -186,6 +190,7 @@ class ExecuteContext(
flags: HailFeatureFlags = this.flags,
backendContext: BackendContext = this.backendContext,
irMetadata: IrMetadata = this.irMetadata,
blockMatrixCache: mutable.Map[String, BlockMatrix] = this.BlockMatrixCache,
)(
f: ExecuteContext => A
): A =
Expand All @@ -202,5 +207,6 @@ class ExecuteContext(
flags,
backendContext,
irMetadata,
blockMatrixCache,
))(f)
}
30 changes: 30 additions & 0 deletions hail/src/main/scala/is/hail/backend/caching/BlockMatrixCache.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package is.hail.backend.caching

import is.hail.linalg.BlockMatrix

import scala.collection.mutable

class BlockMatrixCache extends mutable.AbstractMap[String, BlockMatrix] with AutoCloseable {

private[this] val blockmatrices: mutable.Map[String, BlockMatrix] =
mutable.LinkedHashMap.empty

override def +=(kv: (String, BlockMatrix)): BlockMatrixCache.this.type = {
blockmatrices += kv; this
}

override def -=(key: String): BlockMatrixCache.this.type = {
get(key).foreach { bm => bm.unpersist(); blockmatrices -= key }; this
}

override def get(key: String): Option[BlockMatrix] =
blockmatrices.get(key)

override def iterator: Iterator[(String, BlockMatrix)] =
blockmatrices.iterator

override def close(): Unit = {
blockmatrices.values.foreach(_.unpersist())
blockmatrices.clear()
}
}
13 changes: 2 additions & 11 deletions hail/src/main/scala/is/hail/backend/local/LocalBackend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ import is.hail.expr.ir._
import is.hail.expr.ir.analyses.SemanticHash
import is.hail.expr.ir.lowering._
import is.hail.io.fs._
import is.hail.linalg.BlockMatrix
import is.hail.types._
import is.hail.types.physical.PTuple
import is.hail.types.physical.stypes.PTypeReferenceSingleCodeType
import is.hail.types.virtual.{BlockMatrixType, TVoid}
import is.hail.types.virtual.TVoid
import is.hail.utils._
import is.hail.variant.ReferenceGenome

Expand Down Expand Up @@ -100,6 +99,7 @@ class LocalBackend(
ExecutionCache.fromFlags(flags, fs, tmpdir)
},
new IrMetadata(),
mutable.Map.empty,
)(f)
}

Expand Down Expand Up @@ -202,15 +202,6 @@ class LocalBackend(
): TableReader =
LowerDistributedSort.distributedSort(ctx, stage, sortFields, rt, nPartitions)

def persist(backendContext: BackendContext, id: String, value: BlockMatrix, storageLevel: String)
: Unit = ???

def unpersist(backendContext: BackendContext, id: String): Unit = ???

def getPersistedBlockMatrix(backendContext: BackendContext, id: String): BlockMatrix = ???

def getPersistedBlockMatrixType(backendContext: BackendContext, id: String): BlockMatrixType = ???

def tableToTableStage(ctx: ExecuteContext, inputIR: TableIR, analyses: LoweringAnalyses)
: TableStage =
LowerTableIR.applyTable(inputIR, DArrayLowering.All, ctx, analyses)
Expand Down
11 changes: 1 addition & 10 deletions hail/src/main/scala/is/hail/backend/service/ServiceBackend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import is.hail.expr.ir.functions.IRFunctionRegistry
import is.hail.expr.ir.lowering._
import is.hail.io.fs._
import is.hail.io.reference.{IndexedFastaSequenceFile, LiftOver}
import is.hail.linalg.BlockMatrix
import is.hail.services.{BatchClient, JobGroupRequest, _}
import is.hail.services.JobGroupStates.{Cancelled, Failure, Running, Success}
import is.hail.types._
Expand Down Expand Up @@ -373,15 +372,6 @@ class ServiceBackend(
): TableReader =
LowerDistributedSort.distributedSort(ctx, inputStage, sortFields, rt, nPartitions)

def persist(backendContext: BackendContext, id: String, value: BlockMatrix, storageLevel: String)
: Unit = ???

def unpersist(backendContext: BackendContext, id: String): Unit = ???

def getPersistedBlockMatrix(backendContext: BackendContext, id: String): BlockMatrix = ???

def getPersistedBlockMatrixType(backendContext: BackendContext, id: String): BlockMatrixType = ???

def tableToTableStage(ctx: ExecuteContext, inputIR: TableIR, analyses: LoweringAnalyses)
: TableStage =
LowerTableIR.applyTable(inputIR, DArrayLowering.All, ctx, analyses)
Expand All @@ -400,6 +390,7 @@ class ServiceBackend(
flags,
serviceBackendContext,
new IrMetadata(),
mutable.Map.empty,
)(f)
}

Expand Down
21 changes: 6 additions & 15 deletions hail/src/main/scala/is/hail/backend/spark/SparkBackend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import is.hail.{HailContext, HailFeatureFlags}
import is.hail.annotations._
import is.hail.asm4s._
import is.hail.backend._
import is.hail.backend.caching.BlockMatrixCache
import is.hail.backend.py4j.Py4JBackendExtensions
import is.hail.expr.Validate
import is.hail.expr.ir._
import is.hail.expr.ir.analyses.SemanticHash
import is.hail.expr.ir.lowering._
import is.hail.io.{BufferSpec, TypedCodecSpec}
import is.hail.io.fs._
import is.hail.linalg.BlockMatrix
import is.hail.rvd.RVD
import is.hail.types._
import is.hail.types.physical.{PStruct, PTuple}
Expand Down Expand Up @@ -338,20 +338,8 @@ class SparkBackend(
override val longLifeTempFileManager: TempFileManager =
new OwningTempFileManager(fs)

val bmCache: SparkBlockMatrixCache = SparkBlockMatrixCache()

def persist(backendContext: BackendContext, id: String, value: BlockMatrix, storageLevel: String)
: Unit = bmCache.persistBlockMatrix(id, value, storageLevel)

def unpersist(backendContext: BackendContext, id: String): Unit = unpersist(id)

def getPersistedBlockMatrix(backendContext: BackendContext, id: String): BlockMatrix =
bmCache.getPersistedBlockMatrix(id)

def getPersistedBlockMatrixType(backendContext: BackendContext, id: String): BlockMatrixType =
bmCache.getPersistedBlockMatrixType(id)

def unpersist(id: String): Unit = bmCache.unpersistBlockMatrix(id)
private[this] val bmCache: BlockMatrixCache =
new BlockMatrixCache()

def createExecuteContextForTests(
timer: ExecutionTimer,
Expand All @@ -374,6 +362,7 @@ class SparkBackend(
ExecutionCache.forTesting
},
new IrMetadata(),
null,
)

override def withExecuteContext[T](f: ExecuteContext => T)(implicit E: Enclosing): T =
Expand All @@ -393,6 +382,7 @@ class SparkBackend(
ExecutionCache.fromFlags(flags, fs, tmpdir)
},
new IrMetadata(),
bmCache,
)(f)
}

Expand Down Expand Up @@ -457,6 +447,7 @@ class SparkBackend(
override def asSpark(op: String): SparkBackend = this

def close(): Unit = {
bmCache.close()
SparkBackend.stop()
longLifeTempFileManager.close()
}
Expand Down

This file was deleted.

13 changes: 5 additions & 8 deletions hail/src/main/scala/is/hail/expr/ir/BlockMatrixIR.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package is.hail.expr.ir

import is.hail.HailContext
import is.hail.annotations.NDArray
import is.hail.backend.{BackendContext, ExecuteContext}
import is.hail.backend.ExecuteContext
import is.hail.expr.Nat
import is.hail.expr.ir.lowering.{BMSContexts, BlockMatrixStage2, LowererUnsupportedOperation}
import is.hail.io.{StreamBufferSpec, TypedCodecSpec}
Expand Down Expand Up @@ -106,7 +105,7 @@ object BlockMatrixReader {
def fromJValue(ctx: ExecuteContext, jv: JValue): BlockMatrixReader =
(jv \ "name").extract[String] match {
case "BlockMatrixNativeReader" => BlockMatrixNativeReader.fromJValue(ctx.fs, jv)
case "BlockMatrixPersistReader" => BlockMatrixPersistReader.fromJValue(ctx.backendContext, jv)
case "BlockMatrixPersistReader" => BlockMatrixPersistReader.fromJValue(ctx, jv)
case _ => jv.extract[BlockMatrixReader]
}
}
Expand Down Expand Up @@ -274,22 +273,20 @@ case class BlockMatrixBinaryReader(path: String, shape: IndexedSeq[Long], blockS
case class BlockMatrixNativePersistParameters(id: String)

object BlockMatrixPersistReader {
def fromJValue(ctx: BackendContext, jv: JValue): BlockMatrixPersistReader = {
def fromJValue(ctx: ExecuteContext, jv: JValue): BlockMatrixPersistReader = {
implicit val formats: Formats = BlockMatrixReader.formats
val params = jv.extract[BlockMatrixNativePersistParameters]
BlockMatrixPersistReader(
params.id,
HailContext.backend.getPersistedBlockMatrixType(ctx, params.id),
BlockMatrixType.fromBlockMatrix(ctx.BlockMatrixCache(params.id)),
)
}
}

case class BlockMatrixPersistReader(id: String, typ: BlockMatrixType) extends BlockMatrixReader {
def pathsUsed: Seq[String] = FastSeq()
lazy val fullType: BlockMatrixType = typ

def apply(ctx: ExecuteContext): BlockMatrix =
HailContext.backend.getPersistedBlockMatrix(ctx.backendContext, id)
def apply(ctx: ExecuteContext): BlockMatrix = ctx.BlockMatrixCache(id)
}

case class BlockMatrixMap(child: BlockMatrixIR, eltName: Name, f: IR, needsDense: Boolean)
Expand Down
3 changes: 1 addition & 2 deletions hail/src/main/scala/is/hail/expr/ir/BlockMatrixWriter.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package is.hail.expr.ir

import is.hail.HailContext
import is.hail.annotations.Region
import is.hail.asm4s._
import is.hail.backend.ExecuteContext
Expand Down Expand Up @@ -190,7 +189,7 @@ case class BlockMatrixPersistWriter(id: String, storageLevel: String) extends Bl
def pathOpt: Option[String] = None

def apply(ctx: ExecuteContext, bm: BlockMatrix): Unit =
HailContext.backend.persist(ctx.backendContext, id, bm, storageLevel)
ctx.BlockMatrixCache += id -> bm.persist(storageLevel)

def loweredTyp: Type = TVoid
}
Expand Down
2 changes: 1 addition & 1 deletion hail/src/main/scala/is/hail/utils/ErrorHandling.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class HailWorkerException(
trait ErrorHandling {
def fatal(msg: String): Nothing = throw new HailException(msg)

def fatal(msg: String, errorId: Int) = throw new HailException(msg, errorId)
def fatal(msg: String, errorId: Int): Nothing = throw new HailException(msg, errorId)

def fatal(msg: String, cause: Throwable): Nothing = throw new HailException(msg, None, cause)

Expand Down
31 changes: 20 additions & 11 deletions hail/src/test/scala/is/hail/expr/ir/IRSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import is.hail.ExecStrategy.ExecStrategy
import is.hail.TestUtils._
import is.hail.annotations.{BroadcastRow, ExtendedOrdering, SafeNDArray}
import is.hail.backend.ExecuteContext
import is.hail.backend.caching.BlockMatrixCache
import is.hail.expr.Nat
import is.hail.expr.ir.ArrayZipBehavior.ArrayZipBehavior
import is.hail.expr.ir.agg._
import is.hail.expr.ir.functions._
import is.hail.io.{BufferSpec, TypedCodecSpec}
import is.hail.io.bgen.MatrixBGENReader
import is.hail.linalg.BlockMatrix
import is.hail.methods._
import is.hail.rvd.{PartitionBoundOrdering, RVD, RVDPartitioner}
import is.hail.types.{tcoerce, VirtualTypeWithReq}
Expand Down Expand Up @@ -3906,17 +3906,26 @@ class IRSuite extends HailSuite {
assert(x2 == x)
}

def testBlockMatrixIRParserPersist(): Unit = {
val bm = BlockMatrix.fill(1, 1, 0.0, 5)
backend.persist(ctx.backendContext, "x", bm, "MEMORY_ONLY")
val persist =
BlockMatrixRead(BlockMatrixPersistReader("x", BlockMatrixType.fromBlockMatrix(bm)))
@Test def testBlockMatrixIRParserPersist(): Unit =
using(new BlockMatrixCache()) { cache =>
val bm = BlockMatrixRandom(0, gaussian = true, shape = Array(5L, 6L), blockSize = 3)

val s = Pretty.sexprStyle(persist, elideLiterals = false)
val x2 = IRParser.parse_blockmatrix_ir(ctx, s)
assert(x2 == persist)
backend.unpersist(ctx.backendContext, "x")
}
backend.withExecuteContext { ctx =>
ctx.local(blockMatrixCache = cache) { ctx =>
backend.execute(ctx, BlockMatrixWrite(bm, BlockMatrixPersistWriter("x", "MEMORY_ONLY")))
}
}

backend.withExecuteContext { ctx =>
ctx.local(blockMatrixCache = cache) { ctx =>
val persist = BlockMatrixRead(BlockMatrixPersistReader("x", bm.typ))

val s = Pretty.sexprStyle(persist, elideLiterals = false)
val x2 = IRParser.parse_blockmatrix_ir(ctx, s)
assert(x2 == persist)
}
}
}

@Test def testCachedIR(): Unit = {
val cached = Literal(TSet(TInt32), Set(1))
Expand Down

0 comments on commit 6a100de

Please sign in to comment.