diff --git a/build.zig b/build.zig index 675272a7..fd744f3b 100644 --- a/build.zig +++ b/build.zig @@ -368,6 +368,7 @@ pub fn build(b: *Build) !void { "src/lib/buzz_http.zig", "src/lib/buzz_ffi.zig", "src/lib/buzz_serialize.zig", + "src/lib/buzz_thread.zig", }; // Zig only libs const lib_names = [_][]const u8{ @@ -383,6 +384,7 @@ pub fn build(b: *Build) !void { "http", "ffi", "serialize", + "thread", }; const all_lib_names = [_][]const u8{ "std", @@ -398,6 +400,7 @@ pub fn build(b: *Build) !void { "errors", "ffi", "serialize", + "thread", }; // TODO: this section is slow. Modifying Buzz parser shouldn't trigger recompile of all buzz dynamic libraries diff --git a/src/builtin/list.zig b/src/builtin/list.zig index f5f8af8e..530d6fc6 100644 --- a/src/builtin/list.zig +++ b/src/builtin/list.zig @@ -104,18 +104,16 @@ const SortContext = struct { }; fn lessThan(context: SortContext, lhs: Value, rhs: Value) bool { - var args = std.ArrayList(*const Value).init(context.ctx.vm.gc.allocator); - defer args.deinit(); - - // TODO: handle error - args.append(&lhs) catch unreachable; - args.append(&rhs) catch unreachable; + var args = [_]Value{ + lhs, + rhs, + }; buzz_api.bz_call( context.ctx.vm, context.sort_closure, - @as([*]const *const Value, @ptrCast(args.items)), - @as(u8, @intCast(args.items.len)), + @ptrCast(args[0..]), + @intCast(args.len), null, ); @@ -271,19 +269,16 @@ pub fn forEach(ctx: *NativeCtx) c_int { const closure = ObjClosure.cast(ctx.vm.peek(0).obj()).?; for (list.items.items, 0..) |item, index| { - var args = std.ArrayList(*const Value).init(ctx.vm.gc.allocator); - defer args.deinit(); - - // TODO: handle error - const index_value = Value.fromInteger(@as(i32, @intCast(index))); - args.append(&index_value) catch unreachable; - args.append(&item) catch unreachable; + var args = [_]Value{ + Value.fromInteger(@as(i32, @intCast(index))), + item, + }; buzz_api.bz_call( ctx.vm, closure, - @as([*]const *const Value, @ptrCast(args.items)), - @as(u8, @intCast(args.items.len)), + @ptrCast(args[0..]), + @intCast(args.len), null, ); } @@ -297,20 +292,17 @@ pub fn reduce(ctx: *NativeCtx) c_int { var accumulator = ctx.vm.peek(0); for (list.items.items, 0..) |item, index| { - var args = std.ArrayList(*const Value).init(ctx.vm.gc.allocator); - defer args.deinit(); - - // TODO: handle error - const index_value = Value.fromInteger(@as(i32, @intCast(index))); - args.append(&index_value) catch unreachable; - args.append(&item) catch unreachable; - args.append(&accumulator) catch unreachable; + var args = [_]Value{ + Value.fromInteger(@as(i32, @intCast(index))), + item, + accumulator, + }; buzz_api.bz_call( ctx.vm, closure, - @as([*]const *const Value, @ptrCast(args.items)), - @as(u8, @intCast(args.items.len)), + @ptrCast(args[0..]), + @intCast(args.len), null, ); @@ -335,19 +327,16 @@ pub fn filter(ctx: *NativeCtx) c_int { ) catch unreachable; // TODO: handle error for (list.items.items, 0..) |item, index| { - var args = std.ArrayList(*const Value).init(ctx.vm.gc.allocator); - defer args.deinit(); - - // TODO: handle error - const index_value = Value.fromInteger(@as(i32, @intCast(index))); - args.append(&index_value) catch unreachable; - args.append(&item) catch unreachable; + var args = [_]Value{ + Value.fromInteger(@as(i32, @intCast(index))), + item, + }; buzz_api.bz_call( ctx.vm, closure, - @as([*]const *const Value, @ptrCast(args.items)), - @as(u8, @intCast(args.items.len)), + @ptrCast(args[0..]), + @intCast(args.len), null, ); @@ -376,19 +365,16 @@ pub fn map(ctx: *NativeCtx) c_int { ) catch unreachable; // TODO: handle error for (list.items.items, 0..) |item, index| { - var args = std.ArrayList(*const Value).init(ctx.vm.gc.allocator); - defer args.deinit(); - - // TODO: handle error - const index_value = Value.fromInteger(@as(i32, @intCast(index))); - args.append(&index_value) catch unreachable; - args.append(&item) catch unreachable; + var args = [_]Value{ + Value.fromInteger(@as(i32, @intCast(index))), + item, + }; buzz_api.bz_call( ctx.vm, closure, - @as([*]const *const Value, @ptrCast(args.items)), - @as(u8, @intCast(args.items.len)), + @ptrCast(args[0..]), + @intCast(args.len), null, ); diff --git a/src/buzz_api.zig b/src/buzz_api.zig index aacba8f4..b9324b8b 100644 --- a/src/buzz_api.zig +++ b/src/buzz_api.zig @@ -539,6 +539,10 @@ export fn bz_listSet(vm: *VM, self: Value, index: usize, value: Value) void { ) catch @panic("Could not set element in list"); } +export fn bz_listPtr(self: *ObjList) [*]Value { + return self.items.items.ptr; +} + export fn bz_listLen(self: *ObjList) usize { return self.items.items.len; } @@ -602,7 +606,7 @@ export fn bz_newVM(self: *VM) ?*VM { var gc = self.gc.allocator.create(GarbageCollector) catch { return null; }; - // FIXME: should share strings between gc + gc.* = GarbageCollector.init(self.gc.allocator); gc.type_registry = TypeRegistry{ .gc = gc, @@ -617,7 +621,20 @@ export fn bz_newVM(self: *VM) ?*VM { return vm; } +export fn bz_startVM(self: *VM) void { + self.current_fiber.* = _vm.Fiber.init( + self.gc.allocator, + null, // parent fiber + null, // stack_slice + .OP_CALL, // call_type + 1, // arg_count + false, // catch_count + null, // method/member + ) catch @panic("Out of memory"); +} + export fn bz_deinitVM(_: *VM) void { + // FIXME // self.deinit(); } @@ -719,14 +736,14 @@ pub export fn bz_invoke( pub export fn bz_call( self: *VM, closure: *ObjClosure, - arguments: ?[*]const *const Value, + arguments: ?[*]const Value, len: u8, - catch_value: ?*Value, + catch_value: ?*const Value, ) void { self.push(closure.toValue()); var i: usize = 0; while (i < len) : (i += 1) { - self.push(arguments.?[i].*); + self.push(arguments.?[i]); } // TODO: catch properly diff --git a/src/lib/buzz_api.zig b/src/lib/buzz_api.zig index 97964af6..ea9a8c94 100644 --- a/src/lib/buzz_api.zig +++ b/src/lib/buzz_api.zig @@ -37,7 +37,8 @@ pub const ZigType = opaque { pub const VM = opaque { pub const allocator = @import("../buzz_api.zig").allocator; - pub extern fn bz_newVM(self: *VM) *VM; + pub extern fn bz_newVM(self: *VM) ?*VM; + pub extern fn bz_startVM(self: *VM) void; pub extern fn bz_deinitVM(self: *VM) void; pub extern fn bz_compile( self: *VM, @@ -50,9 +51,9 @@ pub const VM = opaque { pub extern fn bz_call( self: *VM, closure: *ObjClosure, - arguments: ?[*]const *const Value, + arguments: ?[*]const Value, len: usize, - catch_value: ?*Value, + catch_value: ?*const Value, ) void; pub extern fn bz_push(self: *VM, value: Value) void; pub extern fn bz_pop(self: *VM) Value; @@ -311,6 +312,7 @@ pub const ObjList = opaque { pub extern fn bz_listConcat(vm: *VM, list: Value, other_list: Value) Value; pub extern fn bz_getListField(vm: *VM, list_value: Value, field_name_value: Value, bind: bool) Value; pub extern fn bz_listNext(vm: *VM, list_value: Value, index: *Value) Value; + pub extern fn bz_listPtr(self: *ObjList) [*]Value; }; pub const ObjMap = opaque { diff --git a/src/lib/buzz_io.zig b/src/lib/buzz_io.zig index db06466a..470e3ac9 100644 --- a/src/lib/buzz_io.zig +++ b/src/lib/buzz_io.zig @@ -343,7 +343,7 @@ export fn runFile(ctx: *api.NativeCtx) c_int { defer api.VM.allocator.free(source); // Init new VM - var vm = ctx.vm.bz_newVM(); + var vm = ctx.vm.bz_newVM() orelse @panic("Out of memory"); defer vm.bz_deinitVM(); // Compile diff --git a/src/lib/buzz_thread.zig b/src/lib/buzz_thread.zig new file mode 100644 index 00000000..a713fee5 --- /dev/null +++ b/src/lib/buzz_thread.zig @@ -0,0 +1,194 @@ +const std = @import("std"); +const api = @import("buzz_api.zig"); + +fn handleThreadError(ctx: *api.NativeCtx, err: anytype) void { + switch (err) { + error.ThreadQuotaExceeded, + error.SystemResources, + error.OutOfMemory, + error.LockedMemoryLimitExceeded, + error.Unexpected, + => ctx.vm.pushErrorEnum("thread.ThreadError", @errorName(err)), + } +} + +fn spawn( + current_vm: *api.VM, + closure: *api.ObjClosure, + arguments: ?[*]const api.Value, + len: usize, + catch_value: ?*const api.Value, +) void { + const thread_vm = current_vm.bz_newVM() orelse @panic("Out of memory"); + defer thread_vm.bz_deinitVM(); + + thread_vm.bz_startVM(); + + thread_vm.bz_call( + closure, + arguments, + len, + catch_value, + ); +} + +export fn ThreadSpawn(ctx: *api.NativeCtx) c_int { + const closure = ctx.vm.bz_peek(2).bz_valueToClosure(); + const arguments = ctx.vm.bz_peek(1).bz_valueToObjList(); + var catch_value = ctx.vm.bz_peek(0); + + var thread = api.VM.allocator.create(std.Thread) catch @panic("Out of memory"); + errdefer api.VM.allocator.destroy(thread); + + thread.* = std.Thread.spawn( + .{}, + spawn, + .{ + ctx.vm, + closure, + arguments.bz_listPtr(), + arguments.bz_listLen(), + &catch_value, + }, + ) catch |err| { + handleThreadError(ctx, err); + + return -1; + }; + + if (api.ObjUserData.bz_newUserData(ctx.vm, @intFromPtr(thread))) |userdata| { + ctx.vm.bz_pushUserData(userdata); + + return 1; + } else { + @panic("Out of memory"); + } + + return 1; +} + +export fn ThreadJoin(ctx: *api.NativeCtx) c_int { + const userdata = ctx.vm.bz_peek(0).bz_valueToUserData(); + const thread: *std.Thread = @ptrCast(@alignCast(@as(*anyopaque, @ptrFromInt(userdata)))); + + thread.join(); + + return 0; +} + +export fn ThreadDetach(ctx: *api.NativeCtx) c_int { + const userdata = ctx.vm.bz_peek(0).bz_valueToUserData(); + const thread: *std.Thread = @ptrCast(@alignCast(@as(*anyopaque, @ptrFromInt(userdata)))); + + thread.detach(); + + return 0; +} + +export fn ThreadCollect(ctx: *api.NativeCtx) c_int { + const userdata = ctx.vm.bz_peek(0).bz_valueToUserData(); + const thread: *std.Thread = @ptrCast(@alignCast(@as(*anyopaque, @ptrFromInt(userdata)))); + + api.VM.allocator.destroy(thread); + + return 0; +} + +export fn SemaphoreInit(ctx: *api.NativeCtx) c_int { + var semaphore = api.VM.allocator.create(std.Thread.Semaphore) catch @panic("Out of memory"); + errdefer api.VM.allocator.destroy(semaphore); + + semaphore.* = .{ + .permits = 1, + }; + + if (api.ObjUserData.bz_newUserData(ctx.vm, @intFromPtr(semaphore))) |userdata| { + ctx.vm.bz_pushUserData(userdata); + + return 1; + } else { + @panic("Out of memory"); + } + + return 1; +} + +export fn SemaphoreWait(ctx: *api.NativeCtx) c_int { + const userdata = ctx.vm.bz_peek(0).bz_valueToUserData(); + const semaphore: *std.Thread.Semaphore = @ptrCast(@alignCast(@as(*anyopaque, @ptrFromInt(userdata)))); + + semaphore.wait(); + + return 0; +} + +export fn SemaphorePost(ctx: *api.NativeCtx) c_int { + const userdata = ctx.vm.bz_peek(0).bz_valueToUserData(); + const semaphore: *std.Thread.Semaphore = @ptrCast(@alignCast(@as(*anyopaque, @ptrFromInt(userdata)))); + + semaphore.post(); + + return 0; +} + +export fn SemaphoreCollect(ctx: *api.NativeCtx) c_int { + const userdata = ctx.vm.bz_peek(0).bz_valueToUserData(); + const semaphore: *std.Thread.Semaphore = @ptrCast(@alignCast(@as(*anyopaque, @ptrFromInt(userdata)))); + + api.VM.allocator.destroy(semaphore); + + return 0; +} + +export fn MutexInit(ctx: *api.NativeCtx) c_int { + var mutex = api.VM.allocator.create(std.Thread.Mutex) catch @panic("Out of memory"); + errdefer api.VM.allocator.destroy(mutex); + + mutex.* = .{}; + + if (api.ObjUserData.bz_newUserData(ctx.vm, @intFromPtr(mutex))) |userdata| { + ctx.vm.bz_pushUserData(userdata); + + return 1; + } else { + @panic("Out of memory"); + } + + return 1; +} + +export fn MutexLock(ctx: *api.NativeCtx) c_int { + const userdata = ctx.vm.bz_peek(0).bz_valueToUserData(); + const mutex: *std.Thread.Mutex = @ptrCast(@alignCast(@as(*anyopaque, @ptrFromInt(userdata)))); + + mutex.lock(); + + return 0; +} + +export fn MutexTryLock(ctx: *api.NativeCtx) c_int { + const userdata = ctx.vm.bz_peek(0).bz_valueToUserData(); + const mutex: *std.Thread.Mutex = @ptrCast(@alignCast(@as(*anyopaque, @ptrFromInt(userdata)))); + + ctx.vm.bz_pushBool(mutex.tryLock()); + + return 1; +} + +export fn MutexUnlock(ctx: *api.NativeCtx) c_int { + const userdata = ctx.vm.bz_peek(0).bz_valueToUserData(); + const mutex: *std.Thread.Mutex = @ptrCast(@alignCast(@as(*anyopaque, @ptrFromInt(userdata)))); + + mutex.unlock(); + + return 1; +} + +export fn MutexCollect(ctx: *api.NativeCtx) c_int { + const userdata = ctx.vm.bz_peek(0).bz_valueToUserData(); + const mutex: *std.Thread.Mutex = @ptrCast(@alignCast(@as(*anyopaque, @ptrFromInt(userdata)))); + + api.VM.allocator.destroy(mutex); + + return 0; +} diff --git a/src/lib/thread.buzz b/src/lib/thread.buzz new file mode 100644 index 00000000..875099bc --- /dev/null +++ b/src/lib/thread.buzz @@ -0,0 +1,129 @@ +|| @private +extern fun ThreadSpawn(any function, [any] arguments, any catchValue) > ud !> ThreadError; +|| @private +extern fun ThreadJoin(ud thread) > void; +|| @private +extern fun ThreadDetach(ud thread) > void; +|| @private +extern fun ThreadCollect(ud thread) > void; +|| @private +extern fun SemaphoreInit() > ud; +|| @private +extern fun SemaphoreWait(ud handle) > void; +|| @private +extern fun SemaphorePost(ud handle) > void; +|| @private +extern fun SemaphoreCollect(ud handle) > void; +|| @private +extern fun MutexInit() > ud; +|| @private +extern fun MutexLock(ud handle) > void; +|| @private +extern fun MutexTryLock(ud handle) > bool; +|| @private +extern fun MutexUnlock(ud handle) > void; +|| @private +extern fun MutexCollect(ud handle) > void; + +export enum State { + Running, + Detached, + Completed, +} + +export enum ThreadError { + ThreadQuotaExceeded, + SystemResources, + OutOfMemory, + LockedMemoryLimitExceeded, + Unexpected, +} + +| Do we want yield to be able to yield across threads? +export object Thread { + ud handle, + State? state = null, + + | TODO: `Function` type to enforce function without specifying its signature + static fun spawn(any function, [any] arguments, any catchValue = null) > Thread !> ThreadError { + return Thread{ + handle = ThreadSpawn( + function, + arguments: arguments, + catchValue: catchValue + ), + state = State.Running, + }; + } + + fun join() > void !> ThreadError { + if (this.state != State.Running) { + return; + } + + ThreadJoin(this.handle); + this.state = State.Completed; + } + + fun detach() > void { + if (this.state != State.Running) { + return; + } + + ThreadDetach(this.handle); + this.state = State.Detached; + } + + fun collect() > void { + ThreadCollect(this.handle); + this.state = State.Detached; + } +} + +export object Semaphore { + ud handle, + + static fun init() > Semaphore { + return Semaphore{ + handle = SemaphoreInit(), + }; + } + + fun wait() > void { + SemaphoreWait(this.handle); + } + + fun post() > void { + SemaphorePost(this.handle); + } + + fun collect() > void { + SemaphoreCollect(this.handle); + } +} + +export object Mutex { + ud handle, + + static fun init() > Mutex { + return Mutex{ + handle = MutexInit(), + }; + } + + fun lock() > void { + MutexLock(this.handle); + } + + fun unlock() > void { + MutexUnlock(this.handle); + } + + fun tryLock() > bool { + return MutexTryLock(this.handle); + } + + fun collect() > void { + MutexCollect(this.handle); + } +} \ No newline at end of file diff --git a/tests/051-functional.buzz b/tests/051-functional.buzz index 1bd53553..8167756e 100644 --- a/tests/051-functional.buzz +++ b/tests/051-functional.buzz @@ -1,4 +1,5 @@ import "std"; +import "debug"; test "list.forEach" { [int] data = [1, 2, 3, 4]; @@ -44,6 +45,7 @@ test "list.sort" { data.sort(fun (int left, int right) -> left < right); + dump(data); foreach (int i, int value in [-3, 0, 1, 3, 10, 12]) { assert(data[i] == value, message: "list is not ordered"); } diff --git a/tests/062-threads.buzz b/tests/062-threads.buzz new file mode 100644 index 00000000..bcacd27a --- /dev/null +++ b/tests/062-threads.buzz @@ -0,0 +1,39 @@ +import "std"; +import "thread"; + +test "Spawning multiple threads and use mutex" { + [int?] sums = []; + foreach (int n in 0..10) { + sums.append(null); + } + + Mutex mutex = Mutex.init(); + [Thread] pool = []; + foreach (int n in 0..10) { + pool.append( + Thread.spawn( + function: fun (int threadId, int n) > void { + int sum = 0; + foreach (int i in 0..n) { + sum = sum + i; + } + + mutex.lock(); + + sums[threadId] = sum; + + mutex.unlock(); + }, + arguments: [ n, 10 ], + ) + ); + } + + foreach (Thread thread in pool) { + thread.join(); + } + + foreach (int i, int? sum in sums) { + assert(sum == 45, message: "could run multiple threads"); + } +} \ No newline at end of file