Skip to content

Commit

Permalink
Multithread support (#42)
Browse files Browse the repository at this point in the history
* Added new frame type for recording multi thread

* Handle start all threads frame message

* Add multithread flag to ETTraceRunner

* Added logic to record all threads

* Add threads stacks to response data

* Save each thread's stacks to a different json

* Save thread names and create merged json

* Add {} to single line ifs

* Use the multithread format to record single thread traces

* Code refactor

* Change JSON output depending on the number of threads

* More code refactor

* Fix if logic

* Serve main thread flamegraph through http server

* Skip merged flamegraph generation while UI is not ready

* Removed Threads to JSON logic
  • Loading branch information
Itaybre authored Aug 24, 2023
1 parent 428cbd1 commit da81f0b
Show file tree
Hide file tree
Showing 11 changed files with 257 additions and 88 deletions.
1 change: 1 addition & 0 deletions ETTrace/CommunicationFrame/Public/CommunicationFrame.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ enum {
PTFrameTypeResultsMetadata = 105,
PTFrameTypeResultsData = 106,
PTFrameTypeResultsTransferComplete = 107,
PTFrameTypeStartMultiThread = 108,
};

typedef struct _PTStartFrame {
Expand Down
16 changes: 8 additions & 8 deletions ETTrace/ETModels/Flamegraph.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,31 @@ public class Flamegraph: NSObject {
public let osBuild: String

@objc
public let device: String?
public let device: String

@objc
public let isSimulator: Bool

@objc
public var nodes: FlameNode

@objc
public var events: [FlamegraphEvent]

@objc
public var libraries: [String:UInt64]

@objc
public var threadNodes: [ThreadNode]

public init(osBuild: String,
device: String?,
device: String,
isSimulator: Bool,
nodes: FlameNode,
libraries: [String:UInt64],
events: [FlamegraphEvent]) {
events: [FlamegraphEvent],
threadNodes: [ThreadNode]) {
self.osBuild = osBuild
self.device = device
self.isSimulator = isSimulator
self.nodes = nodes
self.events = events
self.libraries = libraries
self.threadNodes = threadNodes
}
}
23 changes: 23 additions & 0 deletions ETTrace/ETModels/ThreadNode.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
//
// ThreadNode.swift
//
//
// Created by Itay Brenner on 18/8/23.
//

import Foundation

@objc
public class ThreadNode: NSObject {
@objc
public let threadName: String?

@objc
public var nodes: FlameNode

public init(nodes: FlameNode,
threadName: String? = nil) {
self.nodes = nodes
self.threadName = threadName
}
}
10 changes: 6 additions & 4 deletions ETTrace/ETTrace/EMGChannelListener.m
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ - (BOOL)ioFrameChannel:(PTChannel*)channel shouldAcceptFrameOfType:(uint32_t)typ
if (channel != self.peerChannel) {
// A previous channel that has been canceled but not yet ended. Ignore.
return NO;
} else if (type == PTFrameTypeStart || type == PTFrameTypeStop || type == PTFrameTypeRequestResults){
} else if (type == PTFrameTypeStart || type == PTFrameTypeStop ||
type == PTFrameTypeRequestResults || type == PTFrameTypeStartMultiThread){
return YES;
} else {
NSLog(@"Unexpected frame of type %u", type);
Expand All @@ -53,14 +54,15 @@ - (BOOL)ioFrameChannel:(PTChannel*)channel shouldAcceptFrameOfType:(uint32_t)typ
}

- (void)ioFrameChannel:(PTChannel*)channel didReceiveFrameOfType:(uint32_t)type tag:(uint32_t)tag payload:(NSData *)payload {
if (type == PTFrameTypeStart) {
if (type == PTFrameTypeStart || type == PTFrameTypeStartMultiThread) {
PTStartFrame *startFrame = (PTStartFrame *)payload.bytes;
NSLog(@"Start received, with: %i", startFrame->runAtStartup);
BOOL runAtStartup = startFrame->runAtStartup;
BOOL recordAllThreads = type == PTFrameTypeStartMultiThread;
if (runAtStartup) {
[EMGPerfAnalysis setupRunAtStartup];
[EMGPerfAnalysis setupRunAtStartup:recordAllThreads];
} else {
[EMGPerfAnalysis setupStackRecording];
[EMGPerfAnalysis setupStackRecording:recordAllThreads];
}
} else if (type == PTFrameTypeStop) {
[EMGPerfAnalysis stopRecordingThread];
Expand Down
158 changes: 128 additions & 30 deletions ETTrace/ETTrace/EMGPerfAnalysis.mm
Original file line number Diff line number Diff line change
Expand Up @@ -17,51 +17,124 @@
#import "EMGChannelListener.h"
#import <QuartzCore/QuartzCore.h>
#import "PerfAnalysis.h"
#include <map>

NSString *const kEMGSpanStarted = @"EmergeMetricStarted";
NSString *const kEMGSpanEnded = @"EmergeMetricEnded";

@implementation EMGPerfAnalysis

static thread_t sMainMachThread = {0};
static thread_t sETTraceThread = {0};

static const int kMaxFramesPerStack = 512;
static NSThread *sStackRecordingThread = nil;
typedef struct {
CFTimeInterval time;
uint64_t frameCount;
uintptr_t frames[kMaxFramesPerStack];
} Stack;
static std::vector<Stack> *sStacks;
static std::mutex sStacksLock;

typedef struct {
std::vector<Stack> *stacks;
char name[256];
} Thread;
static std::map<unsigned int, Thread *> *sThreadsMap;
static std::mutex sThreadsLock;

static dispatch_queue_t fileEventsQueue;

static EMGChannelListener *channelListener;
static NSMutableArray <NSDictionary *> *sSpanTimes;
static BOOL sRecordAllThreads = false;

extern "C" {
void FIRCLSWriteThreadStack(thread_t thread, uintptr_t *frames, uint64_t framesCapacity, uint64_t *framesWritten);
}

+ (void)recordStack
+ (Thread *) createThread:(thread_t) threadId
{
Stack stack;
thread_suspend(sMainMachThread);
stack.time = CACurrentMediaTime();
FIRCLSWriteThreadStack(sMainMachThread, stack.frames, kMaxFramesPerStack, &(stack.frameCount));
thread_resume(sMainMachThread);
sStacksLock.lock();
try {
sStacks->emplace_back(stack);
} catch (const std::length_error& le) {
fflush(stdout);
fflush(stderr);
throw le;
Thread *thread = new Thread;

if(threadId == sMainMachThread) {
strcpy(thread->name,"Main Thread");
} else {
// Get thread Name
char name[256];
pthread_t pt = pthread_from_mach_thread_np(threadId);
if (pt) {
name[0] = '\0';
int rc = pthread_getname_np(pt, name, sizeof name);
strcpy(thread->name, name);
}
}
sStacksLock.unlock();

// Create stacks vector
thread->stacks = new std::vector<Stack>;
thread->stacks->reserve(400);

return thread;
}

+ (void)setupStackRecording
+ (void)recordStackForAllThreads
{
thread_act_array_t threads;
mach_msg_type_number_t thread_count;
if (sRecordAllThreads) {
if (task_threads(mach_task_self(), &threads, &thread_count) != KERN_SUCCESS) {
thread_count = 0;
}
} else {
threads = &sMainMachThread;
thread_count = 1;
}

// Suspend all threads but ETTrace's
for (mach_msg_type_number_t i = 0; i < thread_count; i++) {
if (threads[i] != sETTraceThread) {
thread_suspend(threads[i]);
}
}

CFTimeInterval time = CACurrentMediaTime();
for (mach_msg_type_number_t i = 0; i < thread_count; i++) {
if (threads[i] == sETTraceThread) {
continue;
}

Stack stack;
stack.time = time;
FIRCLSWriteThreadStack(threads[i], stack.frames, kMaxFramesPerStack, &(stack.frameCount));

std::vector<Stack> *threadStack;
sThreadsLock.lock();
if (sThreadsMap->find(threads[i]) == sThreadsMap->end()) {
Thread *thread = [self createThread:threads[i]];
// Add to hash map
sThreadsMap->insert(std::pair<unsigned int, Thread *>(threads[i], thread));

threadStack = thread->stacks;
} else {
threadStack = sThreadsMap->at(threads[i])->stacks;
}

try {
threadStack->emplace_back(stack);
} catch (const std::length_error& le) {
fflush(stdout);
fflush(stderr);
throw le;
}
sThreadsLock.unlock();
}

for (mach_msg_type_number_t i = 0; i < thread_count; i++) {
if (threads[i] != sETTraceThread)
thread_resume(threads[i]);
}
}

+ (void)setupStackRecording:(BOOL) recordAllThreads
{
if (sStackRecordingThread != nil) {
return;
Expand All @@ -78,12 +151,18 @@ + (void)setupStackRecording
// usleep is guaranteed to sleep more than that, in practice ~5ms. We could use a
// dispatch_timer, which at least tries to compensate for drift etc., but the
// timer's queue could theoretically end up run on the main thread
sStacks = new std::vector<Stack>;
sStacks->reserve(400);
sRecordAllThreads = recordAllThreads;

sThreadsMap = new std::map<unsigned int, Thread *>;

sStackRecordingThread = [[NSThread alloc] initWithBlock:^{
if (!sETTraceThread) {
sETTraceThread = mach_thread_self();
}

NSThread *thread = [NSThread currentThread];
while (!thread.cancelled) {
[self recordStack];
[self recordStackForAllThreads];
usleep(4500);
}
}];
Expand All @@ -99,8 +178,9 @@ + (void)stopRecordingThread {
});
}

+ (void)setupRunAtStartup {
+ (void)setupRunAtStartup:(BOOL) recordAllThreads {
[[NSUserDefaults standardUserDefaults] setBool:true forKey:@"runAtStartup"];
[[NSUserDefaults standardUserDefaults] setBool:recordAllThreads forKey:@"recordAllThreads"];
exit(0);
}

Expand Down Expand Up @@ -187,10 +267,9 @@ + (NSString *)deviceName {
return [NSString stringWithCString:systemInfo.machine encoding:NSUTF8StringEncoding];
}

+ (void)stopRecording {
sStacksLock.lock();
NSMutableArray <NSDictionary <NSString *, id> *> *stacks = [NSMutableArray array];
for (const auto &cStack : *sStacks) {
+ (NSArray <NSDictionary <NSString *, id> *> *) arrayFromStacks: (std::vector<Stack>)stacks {
NSMutableArray <NSDictionary <NSString *, id> *> *threadStacks = [NSMutableArray array];
for (const auto &cStack : stacks) {
NSMutableArray <NSNumber *> *stack = [NSMutableArray array];
// Add the addrs in reverse order so that they start with the lowest frame, e.g. `start`
for (int j = (int)cStack.frameCount - 1; j >= 0; j--) {
Expand All @@ -200,20 +279,37 @@ + (void)stopRecording {
@"stack": [stack copy],
@"time": @(cStack.time)
};
[stacks addObject:stackDictionary];
[threadStacks addObject:stackDictionary];
}
sStacks->clear();
sStacksLock.unlock();
return threadStacks;
}

+ (void)stopRecording {
sThreadsLock.lock();
NSMutableDictionary <NSString *, NSDictionary<NSString *, id> *> *threads = [NSMutableDictionary dictionary];

std::map<unsigned int, Thread *>::iterator it;
for (it = sThreadsMap->begin(); it != sThreadsMap->end(); it++) {
Thread thread = *it->second;
NSString *threadId = [[NSNumber numberWithUnsignedInt:it->first] stringValue];
threads[threadId] = @{
@"name": [NSString stringWithFormat:@"%s", thread.name],
@"stacks": [self arrayFromStacks: *thread.stacks]
};
}
sThreadsMap->empty();
sThreadsLock.unlock();

const NXArchInfo *archInfo = NXGetLocalArchInfo();
NSString *cpuType = [NSString stringWithUTF8String:archInfo->description];
NSMutableDictionary *info = [@{
@"stacks": stacks,
@"libraryInfo": EMGLibrariesData(),
@"isSimulator": @([self isRunningOnSimulator]),
@"osBuild": [self osBuild],
@"cpuType": cpuType,
@"device": [self deviceName],
@"events": sSpanTimes,
@"threads": threads,
} mutableCopy];

NSError *error = nil;
Expand Down Expand Up @@ -243,8 +339,10 @@ + (void)load {
EMGBeginCollectingLibraries();
BOOL infoPlistRunAtStartup = ((NSNumber *) NSBundle.mainBundle.infoDictionary[@"ETTraceRunAtStartup"]).boolValue;
if ([[NSUserDefaults standardUserDefaults] boolForKey:@"runAtStartup"] || infoPlistRunAtStartup) {
[EMGPerfAnalysis setupStackRecording];
sRecordAllThreads = [[NSUserDefaults standardUserDefaults] boolForKey:@"recordAllThreads"];
[EMGPerfAnalysis setupStackRecording:sRecordAllThreads];
[[NSUserDefaults standardUserDefaults] setBool:NO forKey:@"runAtStartup"];
[[NSUserDefaults standardUserDefaults] setBool:NO forKey:@"recordAllThreads"];
}
[EMGPerfAnalysis startObserving];
}
Expand Down
4 changes: 2 additions & 2 deletions ETTrace/ETTrace/EMGPerfAnalysis_Private.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
#import "PerfAnalysis.h"

@interface EMGPerfAnalysis (Private)
+ (void)setupStackRecording;
+ (void)setupRunAtStartup;
+ (void)setupStackRecording:(BOOL) recordAllThreads;
+ (void)setupRunAtStartup:(BOOL) recordAllThreads;
+ (void)stopRecordingThread;
+ (NSURL *)outputPath;
@end
Expand Down
5 changes: 3 additions & 2 deletions ETTrace/ETTraceRunner/Devices/DeviceManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ protocol DeviceManager {
}

extension DeviceManager {
func sendStartRecording(_ runAtStartup: Bool) async throws -> Void {
func sendStartRecording(_ runAtStartup: Bool, _ multiThread: Bool) async throws -> Void {
return try await withCheckedThrowingContinuation { continuation in
var boolValue = runAtStartup ? 1 : 0
let data = Data(bytes: &boolValue, count: 2)

communicationChannel.channel.sendFrame(type: UInt32(PTFrameTypeStart), tag: UInt32(PTNoFrameTag), payload: data) { error in
let type = multiThread ? PTFrameTypeStartMultiThread : PTFrameTypeStart
communicationChannel.channel.sendFrame(type: UInt32(type), tag: UInt32(PTNoFrameTag), payload: data) { error in
if let error = error {
continuation.resume(throwing: error)
} else {
Expand Down
5 changes: 4 additions & 1 deletion ETTrace/ETTraceRunner/PerfAnalysisRunner.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ struct PerfAnalysisRunner: ParsableCommand {

@Flag(name: .shortAndLong, help: "Verbose logging")
var verbose: Bool = false

@Flag(name: .shortAndLong, help: "Record all threads")
var multiThread: Bool = false

mutating func run() throws {
if let dsym = dsyms, dsym.hasSuffix(".dSYM") {
PerfAnalysisRunner.exit(withError: ValidationError("The dsym argument should be set to a folder containing your dSYM files, not the dSYM itself"))
}
let helper = RunnerHelper(dsyms, launch, simulator, verbose)
let helper = RunnerHelper(dsyms, launch, simulator, verbose, multiThread)
Task {
do {
try await helper.start()
Expand Down
Loading

0 comments on commit da81f0b

Please sign in to comment.