Skip to content

Commit

Permalink
runtime: rewrite channel implementation
Browse files Browse the repository at this point in the history
This rewrite simplifies the channel implementation considerably, with
34% less LOC. Perhaps the most important change is the removal of the
channel state, which made sense when we had only send and receive
operations but only makes things more compliated when multiple select
operations can be pending on a single channel.

I did this rewrite originally to make it possible to make channels
parallelism-safe. The current implementation is not parallelism-safe,
but it will be easy to make it so (the main additions will be a channel
lock, a global select lock, and an atomic compare-and-swap in
chanQueue.pop).
  • Loading branch information
aykevl committed Oct 27, 2024
1 parent 2a76ceb commit 01498f6
Show file tree
Hide file tree
Showing 2 changed files with 353 additions and 534 deletions.
37 changes: 20 additions & 17 deletions compiler/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,17 @@ func (b *builder) createChanSend(instr *ssa.Send) {
b.CreateStore(chanValue, valueAlloca)
}

// Allocate blockedlist buffer.
channelBlockedList := b.getLLVMRuntimeType("channelBlockedList")
channelBlockedListAlloca, channelBlockedListAllocaSize := b.createTemporaryAlloca(channelBlockedList, "chan.blockedList")
// Allocate buffer for the channel operation.
channelOp := b.getLLVMRuntimeType("channelOp")
channelOpAlloca, channelOpAllocaSize := b.createTemporaryAlloca(channelOp, "chan.op")

// Do the send.
b.createRuntimeCall("chanSend", []llvm.Value{ch, valueAlloca, channelBlockedListAlloca}, "")
b.createRuntimeCall("chanSend", []llvm.Value{ch, valueAlloca, channelOpAlloca}, "")

// End the lifetime of the allocas.
// This also works around a bug in CoroSplit, at least in LLVM 8:
// https://bugs.llvm.org/show_bug.cgi?id=41742
b.emitLifetimeEnd(channelBlockedListAlloca, channelBlockedListAllocaSize)
b.emitLifetimeEnd(channelOpAlloca, channelOpAllocaSize)
if !isZeroSize {
b.emitLifetimeEnd(valueAlloca, valueAllocaSize)
}
Expand All @@ -72,20 +72,20 @@ func (b *builder) createChanRecv(unop *ssa.UnOp) llvm.Value {
valueAlloca, valueAllocaSize = b.createTemporaryAlloca(valueType, "chan.value")
}

// Allocate blockedlist buffer.
channelBlockedList := b.getLLVMRuntimeType("channelBlockedList")
channelBlockedListAlloca, channelBlockedListAllocaSize := b.createTemporaryAlloca(channelBlockedList, "chan.blockedList")
// Allocate buffer for the channel operation.
channelOp := b.getLLVMRuntimeType("channelOp")
channelOpAlloca, channelOpAllocaSize := b.createTemporaryAlloca(channelOp, "chan.op")

// Do the receive.
commaOk := b.createRuntimeCall("chanRecv", []llvm.Value{ch, valueAlloca, channelBlockedListAlloca}, "")
commaOk := b.createRuntimeCall("chanRecv", []llvm.Value{ch, valueAlloca, channelOpAlloca}, "")
var received llvm.Value
if isZeroSize {
received = llvm.ConstNull(valueType)
} else {
received = b.CreateLoad(valueType, valueAlloca, "chan.received")
b.emitLifetimeEnd(valueAlloca, valueAllocaSize)
}
b.emitLifetimeEnd(channelBlockedListAlloca, channelBlockedListAllocaSize)
b.emitLifetimeEnd(channelOpAlloca, channelOpAllocaSize)

if unop.CommaOk {
tuple := llvm.Undef(b.ctx.StructType([]llvm.Type{valueType, b.ctx.Int1Type()}, false))
Expand Down Expand Up @@ -198,26 +198,29 @@ func (b *builder) createSelect(expr *ssa.Select) llvm.Value {
if expr.Blocking {
// Stack-allocate operation structures.
// If these were simply created as a slice, they would heap-allocate.
chBlockAllocaType := llvm.ArrayType(b.getLLVMRuntimeType("channelBlockedList"), len(selectStates))
chBlockAlloca, chBlockSize := b.createTemporaryAlloca(chBlockAllocaType, "select.block.alloca")
chBlockLen := llvm.ConstInt(b.uintptrType, uint64(len(selectStates)), false)
chBlockPtr := b.CreateGEP(chBlockAllocaType, chBlockAlloca, []llvm.Value{
opsAllocaType := llvm.ArrayType(b.getLLVMRuntimeType("channelOp"), len(selectStates))
opsAlloca, opsSize := b.createTemporaryAlloca(opsAllocaType, "select.block.alloca")
opsLen := llvm.ConstInt(b.uintptrType, uint64(len(selectStates)), false)
opsPtr := b.CreateGEP(opsAllocaType, opsAlloca, []llvm.Value{
llvm.ConstInt(b.ctx.Int32Type(), 0, false),
llvm.ConstInt(b.ctx.Int32Type(), 0, false),
}, "select.block")

results = b.createRuntimeCall("chanSelect", []llvm.Value{
recvbuf,
statesPtr, statesLen, statesLen, // []chanSelectState
chBlockPtr, chBlockLen, chBlockLen, // []channelBlockList
opsPtr, opsLen, opsLen, // []channelOp
}, "select.result")

// Terminate the lifetime of the operation structures.
b.emitLifetimeEnd(chBlockAlloca, chBlockSize)
b.emitLifetimeEnd(opsAlloca, opsSize)
} else {
results = b.createRuntimeCall("tryChanSelect", []llvm.Value{
opsPtr := llvm.ConstNull(b.dataPtrType)
opsLen := llvm.ConstInt(b.uintptrType, 0, false)
results = b.createRuntimeCall("chanSelect", []llvm.Value{
recvbuf,
statesPtr, statesLen, statesLen, // []chanSelectState
opsPtr, opsLen, opsLen, // []channelOp (nil slice)
}, "select.result")
}

Expand Down
Loading

0 comments on commit 01498f6

Please sign in to comment.