Skip to content

Commit b9369bf

Browse files
authored
[Concurrency] Fix default actor isolation issue in face of TaskExecutor preference (swiftlang#74658)
* [Concurrency] Fix task excutor handling of default actor isolation The task executor API did not properly account for taking the default actor locking into account when running code on it, we just took the job and ran it without checking with the serial executor at all, which resulted in potential concurrent executions inside the actor -- violating actor isolation. Here we change the TaskExecutor enqueue API to accept the "target" serial executor, which in practice will be either generic or a specific default actor, and coordinate with it when we perform a runSynchronously. The SE proposal needs to be amended to showcase this new API, however without this change we are introducing races so we must do this before the API is stable. * Remove _swift_task_enqueueOnTaskExecutor as we don't use it anymore * no need for the new protocol requirement * remove the enqueue(_ job: UnownedJob, isolatedTo unownedSerialExecutor: UnownedSerialExecutor) Thankfully we dont need it after all * Don't add swift_defaultActor_enqueue_withTaskExecutor and centralize the task executor getting to enqueue() * move around extern definitions
1 parent 0350023 commit b9369bf

9 files changed

+404
-114
lines changed

include/swift/ABI/Executor.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,12 @@ class SerialExecutorRef {
124124
return Identity;
125125
}
126126

127+
const char* getIdentityDebugName() const {
128+
return isMainExecutor() ? " (MainActorExecutor)"
129+
: isGeneric() ? " (GenericExecutor)"
130+
: "";
131+
}
132+
127133
/// Is this the generic executor reference?
128134
bool isGeneric() const {
129135
return Identity == 0;
@@ -233,6 +239,10 @@ class TaskExecutorRef {
233239
return TaskExecutorRef(identity, wtable);
234240
}
235241

242+
/// If the job is an 'AsyncTask' return its task executor preference,
243+
/// otherwise return 'undefined', meaning "no preference".
244+
static TaskExecutorRef fromTaskExecutorPreference(Job *job);
245+
236246
HeapObject *getIdentity() const {
237247
return Identity;
238248
}

include/swift/Runtime/Concurrency.h

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -558,7 +558,7 @@ swift_task_pushTaskExecutorPreference(TaskExecutorRef executor);
558558
/// signals a bug in record handling by the concurrency library -- a record push
559559
/// must always be paired with a record pop.
560560
///
561-
/// Runtime availability: Swift 9999.
561+
/// Runtime availability: Swift 6.0
562562
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
563563
void swift_task_popTaskExecutorPreference(TaskExecutorPreferenceStatusRecord* record);
564564

@@ -926,15 +926,14 @@ void swift_job_run(Job *job, SerialExecutorRef executor);
926926
/// Establish that the current thread is running as the given
927927
/// executor, then run a job.
928928
///
929-
/// Runtime availability: Swift 9999
929+
/// Runtime availability: Swift 6.0
930930
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
931-
void swift_job_run_on_task_executor(Job *job,
932-
TaskExecutorRef executor);
931+
void swift_job_run_on_task_executor(Job *job, TaskExecutorRef executor);
933932

934933
/// Establish that the current thread is running as the given
935934
/// executor, then run a job.
936935
///
937-
/// Runtime availability: Swift 9999
936+
/// Runtime availability: Swift 6.0
938937
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
939938
void swift_job_run_on_serial_and_task_executor(Job *job,
940939
SerialExecutorRef serialExecutor,

stdlib/public/Concurrency/Actor.cpp

Lines changed: 109 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ ExecutorTrackingInfo::ActiveInfoInThread;
217217

218218
void swift::runJobInEstablishedExecutorContext(Job *job) {
219219
_swift_tsan_acquire(job);
220+
SWIFT_TASK_DEBUG_LOG("Run job in established context %p", job);
220221

221222
#if SWIFT_OBJC_INTEROP
222223
auto pool = objc_autoreleasePoolPush();
@@ -386,6 +387,20 @@ static void checkIsCurrentExecutorMode(void *context) {
386387
: Swift6_UseCheckIsolated_AllowCrash;
387388
}
388389

390+
// Implemented in Swift to avoid some annoying hard-coding about
391+
// TaskExecutor's protocol witness table. We could inline this
392+
// with effort, though.
393+
extern "C" SWIFT_CC(swift) void _swift_task_enqueueOnTaskExecutor(
394+
Job *job, HeapObject *executor, const Metadata *selfType,
395+
const TaskExecutorWitnessTable *wtable);
396+
397+
// Implemented in Swift to avoid some annoying hard-coding about
398+
// SerialExecutor's protocol witness table. We could inline this
399+
// with effort, though.
400+
extern "C" SWIFT_CC(swift) void _swift_task_enqueueOnExecutor(
401+
Job *job, HeapObject *executor, const Metadata *executorType,
402+
const SerialExecutorWitnessTable *wtable);
403+
389404
SWIFT_CC(swift)
390405
static bool swift_task_isCurrentExecutorImpl(SerialExecutorRef expectedExecutor) {
391406
auto current = ExecutorTrackingInfo::current();
@@ -464,9 +479,17 @@ static bool swift_task_isCurrentExecutorImpl(SerialExecutorRef expectedExecutor)
464479

465480
// Complex equality means that if two executors of the same type have some
466481
// special logic to check if they are "actually the same".
482+
//
483+
// If any of the executors does not have a witness table we can't complex
484+
// equality compare with it.
485+
//
486+
// We may be able to prove we're on the same executor as expected by
487+
// using 'checkIsolated' later on though.
467488
if (expectedExecutor.isComplexEquality()) {
468489
if (currentExecutor.getIdentity() &&
490+
currentExecutor.hasSerialExecutorWitnessTable() &&
469491
expectedExecutor.getIdentity() &&
492+
expectedExecutor.hasSerialExecutorWitnessTable() &&
470493
swift_compareWitnessTables(
471494
reinterpret_cast<const WitnessTable *>(
472495
currentExecutor.getSerialExecutorWitnessTable()),
@@ -1171,7 +1194,10 @@ class DefaultActorImpl
11711194
/// Schedule a processing job.
11721195
/// It can be done when actor transitions from Idle to Scheduled or
11731196
/// when actor gets a priority override and we schedule a stealer.
1174-
void scheduleActorProcessJob(JobPriority priority);
1197+
///
1198+
/// When the task executor is `undefined` ths task will be scheduled on the
1199+
/// default global executor.
1200+
void scheduleActorProcessJob(JobPriority priority, TaskExecutorRef taskExecutor);
11751201

11761202
/// Processes claimed incoming jobs into `prioritizedJobs`.
11771203
/// Incoming jobs are of mixed priorities and in LIFO order.
@@ -1271,14 +1297,36 @@ dispatch_lock_t *DefaultActorImpl::drainLockAddr() {
12711297
}
12721298
#endif /* SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION */
12731299

1274-
void DefaultActorImpl::scheduleActorProcessJob(JobPriority priority) {
1300+
void DefaultActorImpl::scheduleActorProcessJob(
1301+
JobPriority priority, TaskExecutorRef taskExecutor) {
12751302
Job *job = new ProcessOutOfLineJob(this, priority);
12761303
SWIFT_TASK_DEBUG_LOG(
1277-
"Scheduling processing job %p for actor %p at priority %#zx", job, this,
1278-
priority);
1304+
"Scheduling processing job %p for actor %p at priority %#zx, with taskExecutor %p", job, this,
1305+
priority, taskExecutor.getIdentity());
1306+
1307+
if (taskExecutor.isDefined()) {
1308+
#if SWIFT_CONCURRENCY_EMBEDDED
1309+
swift_unreachable("task executors not supported in embedded Swift");
1310+
#else
1311+
auto taskExecutorIdentity = taskExecutor.getIdentity();
1312+
auto taskExecutorType = swift_getObjectType(taskExecutorIdentity);
1313+
auto taskExecutorWtable = taskExecutor.getTaskExecutorWitnessTable();
1314+
1315+
return _swift_task_enqueueOnTaskExecutor(
1316+
job, taskExecutorIdentity, taskExecutorType, taskExecutorWtable);
1317+
#endif
1318+
}
1319+
12791320
swift_task_enqueueGlobal(job);
12801321
}
12811322

1323+
TaskExecutorRef TaskExecutorRef::fromTaskExecutorPreference(Job *job) {
1324+
if (auto task = dyn_cast<AsyncTask>(job)) {
1325+
return task->getPreferredTaskExecutor();
1326+
}
1327+
return TaskExecutorRef::undefined();
1328+
}
1329+
12821330
void DefaultActorImpl::enqueue(Job *job, JobPriority priority) {
12831331
// We can do relaxed loads here, we are just using the current head in the
12841332
// atomic state and linking that into the new job we are inserting, we don't
@@ -1287,6 +1335,7 @@ void DefaultActorImpl::enqueue(Job *job, JobPriority priority) {
12871335
this, priority);
12881336
concurrency::trace::actor_enqueue(this, job);
12891337
bool distributedActorIsRemote = swift_distributed_actor_is_remote(this);
1338+
12901339
auto oldState = _status().load(std::memory_order_relaxed);
12911340
while (true) {
12921341
auto newState = oldState;
@@ -1317,7 +1366,10 @@ void DefaultActorImpl::enqueue(Job *job, JobPriority priority) {
13171366
if (!oldState.isScheduled() && newState.isScheduled()) {
13181367
// We took responsibility to schedule the actor for the first time. See
13191368
// also ownership rule (1)
1320-
return scheduleActorProcessJob(newState.getMaxPriority());
1369+
TaskExecutorRef taskExecutor =
1370+
TaskExecutorRef::fromTaskExecutorPreference(job);
1371+
1372+
return scheduleActorProcessJob(newState.getMaxPriority(), taskExecutor);
13211373
}
13221374

13231375
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
@@ -1336,7 +1388,10 @@ void DefaultActorImpl::enqueue(Job *job, JobPriority priority) {
13361388
"[Override] Scheduling a stealer for actor %p at %#x priority",
13371389
this, newState.getMaxPriority());
13381390
swift_retain(this);
1339-
scheduleActorProcessJob(newState.getMaxPriority());
1391+
1392+
TaskExecutorRef taskExecutor =
1393+
TaskExecutorRef::fromTaskExecutorPreference(job);
1394+
scheduleActorProcessJob(newState.getMaxPriority(), taskExecutor);
13401395
}
13411396
}
13421397
#endif
@@ -1411,7 +1466,8 @@ void DefaultActorImpl::enqueueStealer(Job *job, JobPriority priority) {
14111466
"[Override] Scheduling a stealer for actor %p at %#x priority",
14121467
this, newState.getMaxPriority());
14131468
swift_retain(this);
1414-
scheduleActorProcessJob(newState.getMaxPriority());
1469+
auto taskExecutor = TaskExecutorRef::fromTaskExecutorPreference(job);
1470+
scheduleActorProcessJob(newState.getMaxPriority(), taskExecutor);
14151471
}
14161472
#endif
14171473
}
@@ -1805,7 +1861,7 @@ bool DefaultActorImpl::unlock(bool forceUnlock)
18051861
} else {
18061862
// There is no work left to do - actor goes idle
18071863

1808-
// R becomes 0 and N descreases by 1.
1864+
// R becomes 0 and N decreases by 1.
18091865
// But, we may still have stealers scheduled so N could be > 0. This is
18101866
// fine since N >= R. Every such stealer, once scheduled, will observe
18111867
// actor as idle, will release its ref and return. (See tryLock function.)
@@ -1822,7 +1878,8 @@ bool DefaultActorImpl::unlock(bool forceUnlock)
18221878

18231879
if (newState.isScheduled()) {
18241880
// See ownership rule (6) in DefaultActorImpl
1825-
scheduleActorProcessJob(newState.getMaxPriority());
1881+
// FIXME: should we specify some task executor here, since otherwise we'll schedule on the global pool
1882+
scheduleActorProcessJob(newState.getMaxPriority(), TaskExecutorRef::undefined());
18261883
} else {
18271884
// See ownership rule (5) in DefaultActorImpl
18281885
SWIFT_TASK_DEBUG_LOG("Actor %p is idle now", this);
@@ -1854,7 +1911,11 @@ static void swift_job_runImpl(Job *job, SerialExecutorRef executor) {
18541911
// is generic.
18551912
if (!executor.isGeneric()) trackingInfo.disallowSwitching();
18561913

1857-
trackingInfo.enterAndShadow(executor, TaskExecutorRef::undefined());
1914+
auto taskExecutor = executor.isGeneric()
1915+
? TaskExecutorRef::fromTaskExecutorPreference(job)
1916+
: TaskExecutorRef::undefined();
1917+
1918+
trackingInfo.enterAndShadow(executor, taskExecutor);
18581919

18591920
SWIFT_TASK_DEBUG_LOG("job %p", job);
18601921
runJobInEstablishedExecutorContext(job);
@@ -1872,9 +1933,11 @@ static void swift_job_runImpl(Job *job, SerialExecutorRef executor) {
18721933

18731934
SWIFT_CC(swift)
18741935
static void swift_job_run_on_serial_and_task_executorImpl(Job *job,
1875-
SerialExecutorRef serialExecutor,
1876-
TaskExecutorRef taskExecutor) {
1936+
SerialExecutorRef serialExecutor,
1937+
TaskExecutorRef taskExecutor) {
18771938
ExecutorTrackingInfo trackingInfo;
1939+
SWIFT_TASK_DEBUG_LOG("Run job %p on serial executor %p task executor %p", job,
1940+
serialExecutor.getIdentity(), taskExecutor.getIdentity());
18781941

18791942
// TODO: we don't allow switching
18801943
trackingInfo.disallowSwitching();
@@ -2090,16 +2153,13 @@ static void swift_task_switchImpl(SWIFT_ASYNC_CONTEXT AsyncContext *resumeContex
20902153
auto currentTaskExecutor = (trackingInfo ? trackingInfo->getTaskExecutor()
20912154
: TaskExecutorRef::undefined());
20922155
auto newTaskExecutor = task->getPreferredTaskExecutor();
2093-
SWIFT_TASK_DEBUG_LOG("Task %p trying to switch executors: executor %p to "
2094-
"%p%s; task executor: from %p%s to %p%s",
2095-
task, currentExecutor.getIdentity(),
2096-
currentExecutor.isMainExecutor() ? " (MainActorExecutor)"
2097-
: currentExecutor.isGeneric() ? " (GenericExecutor)"
2098-
: "",
2156+
SWIFT_TASK_DEBUG_LOG("Task %p trying to switch executors: executor %p%s to "
2157+
"new serial executor: %p%s; task executor: from %p%s to %p%s",
2158+
task,
2159+
currentExecutor.getIdentity(),
2160+
currentExecutor.getIdentityDebugName(),
20992161
newExecutor.getIdentity(),
2100-
newExecutor.isMainExecutor() ? " (MainActorExecutor)"
2101-
: newExecutor.isGeneric() ? " (GenericExecutor)"
2102-
: "",
2162+
newExecutor.getIdentityDebugName(),
21032163
currentTaskExecutor.getIdentity(),
21042164
currentTaskExecutor.isDefined() ? "" : " (undefined)",
21052165
newTaskExecutor.getIdentity(),
@@ -2147,86 +2207,66 @@ static void swift_task_switchImpl(SWIFT_ASYNC_CONTEXT AsyncContext *resumeContex
21472207
/************************* GENERIC ACTOR INTERFACES **************************/
21482208
/*****************************************************************************/
21492209

2150-
// Implemented in Swift to avoid some annoying hard-coding about
2151-
// SerialExecutor's protocol witness table. We could inline this
2152-
// with effort, though.
2153-
extern "C" SWIFT_CC(swift)
2154-
void _swift_task_enqueueOnExecutor(Job *job, HeapObject *executor,
2155-
const Metadata *selfType,
2156-
const SerialExecutorWitnessTable *wtable);
2157-
extern "C" SWIFT_CC(swift) void _swift_task_enqueueOnTaskExecutor(
2158-
Job *job, HeapObject *executor, const Metadata *selfType,
2159-
const TaskExecutorWitnessTable *wtable);
2160-
21612210
extern "C" SWIFT_CC(swift) void _swift_task_makeAnyTaskExecutor(
21622211
HeapObject *executor, const Metadata *selfType,
21632212
const TaskExecutorWitnessTable *wtable);
21642213

21652214
SWIFT_CC(swift)
2166-
static void swift_task_enqueueImpl(Job *job, SerialExecutorRef executor) {
2167-
SWIFT_TASK_DEBUG_LOG("enqueue job %p on serial executor %p", job,
2168-
executor.getIdentity());
2215+
static void swift_task_enqueueImpl(Job *job, SerialExecutorRef serialExecutorRef) {
2216+
#ifndef NDEBUG
2217+
auto _taskExecutorRef = TaskExecutorRef::undefined();
2218+
if (auto task = dyn_cast<AsyncTask>(job)) {
2219+
_taskExecutorRef = task->getPreferredTaskExecutor();
2220+
}
2221+
SWIFT_TASK_DEBUG_LOG("enqueue job %p on serial serialExecutor %p, taskExecutor = %p", job,
2222+
serialExecutorRef.getIdentity(),
2223+
__taskExecutorRef.getIdentity());
2224+
#endif
21692225

21702226
assert(job && "no job provided");
21712227
job->SchedulerPrivate[0] = NULL;
21722228
job->SchedulerPrivate[1] = NULL;
21732229

21742230
_swift_tsan_release(job);
21752231

2176-
if (executor.isGeneric()) {
2177-
// TODO: check the task for a flag if we need to look for task executor
2232+
if (serialExecutorRef.isGeneric()) {
21782233
if (auto task = dyn_cast<AsyncTask>(job)) {
2179-
auto taskExecutor = task->getPreferredTaskExecutor();
2180-
if (taskExecutor.isDefined()) {
2234+
auto taskExecutorRef = task->getPreferredTaskExecutor();
2235+
if (taskExecutorRef.isDefined()) {
21812236
#if SWIFT_CONCURRENCY_EMBEDDED
21822237
swift_unreachable("task executors not supported in embedded Swift");
21832238
#else
2184-
auto wtable = taskExecutor.getTaskExecutorWitnessTable();
2185-
auto taskExecutorObject = taskExecutor.getIdentity();
2186-
auto taskExecutorType = swift_getObjectType(taskExecutorObject);
2187-
return _swift_task_enqueueOnTaskExecutor(job, taskExecutorObject,
2188-
taskExecutorType, wtable);
2239+
auto taskExecutorIdentity = taskExecutorRef.getIdentity();
2240+
auto taskExecutorType = swift_getObjectType(taskExecutorIdentity);
2241+
auto taskExecutorWtable = taskExecutorRef.getTaskExecutorWitnessTable();
2242+
2243+
return _swift_task_enqueueOnTaskExecutor(
2244+
job,
2245+
taskExecutorIdentity, taskExecutorType, taskExecutorWtable);
21892246
#endif // SWIFT_CONCURRENCY_EMBEDDED
21902247
} // else, fall-through to the default global enqueue
21912248
}
21922249
return swift_task_enqueueGlobal(job);
21932250
}
21942251

2195-
if (executor.isDefaultActor()) {
2196-
auto taskExecutor = TaskExecutorRef::undefined();
2197-
if (auto task = dyn_cast<AsyncTask>(job)) {
2198-
taskExecutor = task->getPreferredTaskExecutor();
2199-
}
2200-
2201-
#if SWIFT_CONCURRENCY_EMBEDDED
2202-
swift_unreachable("task executors not supported in embedded Swift");
2203-
#else
2204-
if (taskExecutor.isDefined()) {
2205-
auto wtable = taskExecutor.getTaskExecutorWitnessTable();
2206-
auto executorObject = taskExecutor.getIdentity();
2207-
auto executorType = swift_getObjectType(executorObject);
2208-
return _swift_task_enqueueOnTaskExecutor(job, executorObject,
2209-
executorType, wtable);
2210-
} else {
2211-
return swift_defaultActor_enqueue(job, executor.getDefaultActor());
2212-
}
2213-
#endif // SWIFT_CONCURRENCY_EMBEDDED
2252+
if (serialExecutorRef.isDefaultActor()) {
2253+
return swift_defaultActor_enqueue(job, serialExecutorRef.getDefaultActor());
22142254
}
22152255

22162256
#if SWIFT_CONCURRENCY_EMBEDDED
22172257
swift_unreachable("custom executors not supported in embedded Swift");
22182258
#else
22192259
// For main actor or actors with custom executors
2220-
auto wtable = executor.getSerialExecutorWitnessTable();
2221-
auto executorObject = executor.getIdentity();
2222-
auto executorType = swift_getObjectType(executorObject);
2223-
_swift_task_enqueueOnExecutor(job, executorObject, executorType, wtable);
2260+
auto serialExecutorIdentity = serialExecutorRef.getIdentity();
2261+
auto serialExecutorType = swift_getObjectType(serialExecutorIdentity);
2262+
auto serialExecutorWtable = serialExecutorRef.getSerialExecutorWitnessTable();
2263+
_swift_task_enqueueOnExecutor(job, serialExecutorIdentity, serialExecutorType,
2264+
serialExecutorWtable);
22242265
#endif // SWIFT_CONCURRENCY_EMBEDDED
22252266
}
22262267

22272268
static void
2228-
swift_actor_escalate(DefaultActorImpl *actor, AsyncTask *task, JobPriority newPriority)
2229-
{
2269+
swift_actor_escalate(DefaultActorImpl *actor, AsyncTask *task, JobPriority newPriority) {
22302270
#if !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS
22312271
return actor->enqueueStealer(task, newPriority);
22322272
#endif

stdlib/public/Concurrency/Executor.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,6 @@ public protocol TaskExecutor: Executor {
242242
// avoid drilling down to the base conformance just for the basic
243243
// work-scheduling operation.
244244
@_nonoverride
245-
@available(*, deprecated, message: "Implement 'enqueue(_: consuming ExecutorJob)' instead")
246245
func enqueue(_ job: consuming Job)
247246
#endif // !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
248247

@@ -520,6 +519,7 @@ where E: SerialExecutor {
520519
#endif // !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
521520
}
522521

522+
/// Used by Swift Concurrency runtime to call into a task executor's `enqueue`.
523523
@_unavailableInEmbedded
524524
@available(SwiftStdlib 6.0, *)
525525
@_silgen_name("_swift_task_enqueueOnTaskExecutor")

0 commit comments

Comments
 (0)