-
Notifications
You must be signed in to change notification settings - Fork 103
feat: decode entrance support mtp #245
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: decode entrance support mtp #245
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds MTP (Multi-Token Prediction) support to the RPC decode entrance by implementing speculative execution and propose token handling. The changes enable the decode service to receive and process propose tokens from prefill operations and configure MTP-specific parameters.
- Implements MTP support in decode entrance with propose token handling
- Adds MTP cache mapping logic for multi-model scenarios
- Configures speculative execution based on MTP availability
Reviewed Changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
File | Description |
---|---|
model_rpc_service.proto | Updated protobuf schema to support propose tokens and simplified field numbering |
PrefillRpcServerNew.cc | Added MTP propose token extraction and cache mapping logic for multi-model support |
PrefillGenerateContextNew.cc | Enabled MTP-based speculative execution configuration |
DecodeRpcServerNew.cc | Implemented propose token handling and context position ID setup for decode operations |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
prefill_context.response->mutable_propose_token_ids()->CopyFrom( | ||
{propose_tokens.begin(), propose_tokens.end()}); |
Copilot
AI
Oct 16, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The CopyFrom operation with initializer list creates unnecessary temporary objects. Use RepeatedField::Add() in a loop or assign directly to avoid the temporary vector creation.
Copilot uses AI. Check for mistakes.
prefill_context.response->mutable_context_position_ids()->CopyFrom( | ||
{context_position_ids->data<int32_t>(), | ||
context_position_ids->data<int32_t>() + context_position_ids->size()}); |
Copilot
AI
Oct 16, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the propose_token_ids, this CopyFrom with iterator range creates a temporary vector. Consider using RepeatedField methods for better performance.
prefill_context.response->mutable_context_position_ids()->CopyFrom( | |
{context_position_ids->data<int32_t>(), | |
context_position_ids->data<int32_t>() + context_position_ids->size()}); | |
prefill_context.response->mutable_context_position_ids()->Add( | |
context_position_ids->data<int32_t>(), context_position_ids->size()); |
Copilot uses AI. Check for mistakes.
std::vector<int> propose_tokens; | ||
propose_tokens.assign(response.propose_token_ids().begin(), response.propose_token_ids().end()); | ||
generate_stream->setProposeToken(propose_tokens); | ||
RTP_LLM_LOG_DEBUG("request [%s] received %d propose tokens from prefill", | ||
decode_context.request_key.c_str(), | ||
propose_tokens.size()); |
Copilot
AI
Oct 16, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Creating a temporary vector and copying data is inefficient. If setProposeToken accepts a RepeatedField or can be modified to accept iterators, pass the protobuf data directly to avoid the copy.
std::vector<int> propose_tokens; | |
propose_tokens.assign(response.propose_token_ids().begin(), response.propose_token_ids().end()); | |
generate_stream->setProposeToken(propose_tokens); | |
RTP_LLM_LOG_DEBUG("request [%s] received %d propose tokens from prefill", | |
decode_context.request_key.c_str(), | |
propose_tokens.size()); | |
generate_stream->setProposeToken(response.propose_token_ids().begin(), response.propose_token_ids().end()); | |
RTP_LLM_LOG_DEBUG("request [%s] received %d propose tokens from prefill", | |
decode_context.request_key.c_str(), | |
response.propose_token_ids_size()); |
Copilot uses AI. Check for mistakes.
0b8acd7
to
5396b35
Compare
response_output, &(result.value()), maga_init_params_.gpt_init_parameter.misc_config.aux_string); | ||
// should only generate one token | ||
break; | ||
if (stream->queryPdSep() && stream->waitForRemoteGenerate()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why will decode entrance prefill call waitForRemoteGenerate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SpeculativeEngine::prefillMtpStep will setNeedRemoteGenerate(true) after filling stream's propose tokens. if prefill instance not waitForRemoteGenerate here, stream's propose tokens might be empty.
} | ||
|
||
if (stream->getContextPositionIds()) { | ||
auto context_position_ids = stream->getContextPositionIds(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
context_position not exist in proto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
already delete it
5396b35
to
d2e25e2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
if (stream->queryPdSep() && stream->waitForRemoteGenerate()) { | ||
break; | ||
} |
Copilot
AI
Oct 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The closing brace on line 250 appears to be orphaned. This suggests a control flow structure may have been modified incorrectly, potentially breaking the loop logic.
Copilot uses AI. Check for mistakes.
if (stream->getContextPositionIds()) { | ||
auto context_position_ids = stream->getContextPositionIds(); | ||
prefill_context.response->mutable_context_position_ids()->CopyFrom( | ||
{context_position_ids->data<int32_t>(), | ||
context_position_ids->data<int32_t>() + context_position_ids->size()}); | ||
} |
Copilot
AI
Oct 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The context_position_ids
field is being set in the response but is not defined in the protocol buffer schema. This will cause compilation errors since mutable_context_position_ids()
method doesn't exist.
Copilot uses AI. Check for mistakes.
d2e25e2
to
db0bca7
Compare
db0bca7
to
4eeeb3f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 8 comments.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
RTP_LLM_LOG_DEBUG("stream [%ld] set setNeedRemoteGenerate", stream->streamId()); | ||
stream->setNeedRemoteGenerate(true); | ||
} | ||
stream->setPreillMtpReady(true); |
Copilot
AI
Oct 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected spelling of 'Preill' to 'Prefill' in method name.
stream->setPreillMtpReady(true); | |
stream->setPrefillMtpReady(true); |
Copilot uses AI. Check for mistakes.
bool GenerateStream::waitForPreillMtpReady() { | ||
std::unique_lock<std::mutex> lock(*output_mutex_); | ||
|
||
cv_->wait(lock, [this] { | ||
return preill_mtp_ready_ || generate_status_->status == StreamState::STOPPED | ||
|| generate_status_->status == StreamState::FINISHED; | ||
}); | ||
|
||
if(!preill_mtp_ready_ && generate_status_->status == StreamState::STOPPED) { | ||
RTP_LLM_LOG_WARNING("waitForPreillMtpReady exits due to stream [%ld] stopped, error: %s", | ||
streamId(), | ||
generate_status_->error_info.ToString().c_str()); | ||
} | ||
|
||
return preill_mtp_ready_; |
Copilot
AI
Oct 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected spelling of 'Preill' to 'Prefill' in method name, variable names, and log message.
bool GenerateStream::waitForPreillMtpReady() { | |
std::unique_lock<std::mutex> lock(*output_mutex_); | |
cv_->wait(lock, [this] { | |
return preill_mtp_ready_ || generate_status_->status == StreamState::STOPPED | |
|| generate_status_->status == StreamState::FINISHED; | |
}); | |
if(!preill_mtp_ready_ && generate_status_->status == StreamState::STOPPED) { | |
RTP_LLM_LOG_WARNING("waitForPreillMtpReady exits due to stream [%ld] stopped, error: %s", | |
streamId(), | |
generate_status_->error_info.ToString().c_str()); | |
} | |
return preill_mtp_ready_; | |
bool GenerateStream::waitForPrefillMtpReady() { | |
std::unique_lock<std::mutex> lock(*output_mutex_); | |
cv_->wait(lock, [this] { | |
return prefill_mtp_ready_ || generate_status_->status == StreamState::STOPPED | |
|| generate_status_->status == StreamState::FINISHED; | |
}); | |
if(!prefill_mtp_ready_ && generate_status_->status == StreamState::STOPPED) { | |
RTP_LLM_LOG_WARNING("waitForPrefillMtpReady exits due to stream [%ld] stopped, error: %s", | |
streamId(), | |
generate_status_->error_info.ToString().c_str()); | |
} | |
return prefill_mtp_ready_; |
Copilot uses AI. Check for mistakes.
bool GenerateStream::waitForPreillMtpReady() { | ||
std::unique_lock<std::mutex> lock(*output_mutex_); | ||
|
||
cv_->wait(lock, [this] { | ||
return preill_mtp_ready_ || generate_status_->status == StreamState::STOPPED | ||
|| generate_status_->status == StreamState::FINISHED; | ||
}); | ||
|
||
if(!preill_mtp_ready_ && generate_status_->status == StreamState::STOPPED) { | ||
RTP_LLM_LOG_WARNING("waitForPreillMtpReady exits due to stream [%ld] stopped, error: %s", | ||
streamId(), | ||
generate_status_->error_info.ToString().c_str()); | ||
} | ||
|
||
return preill_mtp_ready_; |
Copilot
AI
Oct 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected spelling of 'Preill' to 'Prefill' in method name, variable names, and log message.
bool GenerateStream::waitForPreillMtpReady() { | |
std::unique_lock<std::mutex> lock(*output_mutex_); | |
cv_->wait(lock, [this] { | |
return preill_mtp_ready_ || generate_status_->status == StreamState::STOPPED | |
|| generate_status_->status == StreamState::FINISHED; | |
}); | |
if(!preill_mtp_ready_ && generate_status_->status == StreamState::STOPPED) { | |
RTP_LLM_LOG_WARNING("waitForPreillMtpReady exits due to stream [%ld] stopped, error: %s", | |
streamId(), | |
generate_status_->error_info.ToString().c_str()); | |
} | |
return preill_mtp_ready_; | |
bool GenerateStream::waitForPrefillMtpReady() { | |
std::unique_lock<std::mutex> lock(*output_mutex_); | |
cv_->wait(lock, [this] { | |
return prefill_mtp_ready_ || generate_status_->status == StreamState::STOPPED | |
|| generate_status_->status == StreamState::FINISHED; | |
}); | |
if(!prefill_mtp_ready_ && generate_status_->status == StreamState::STOPPED) { | |
RTP_LLM_LOG_WARNING("waitForPrefillMtpReady exits due to stream [%ld] stopped, error: %s", | |
streamId(), | |
generate_status_->error_info.ToString().c_str()); | |
} | |
return prefill_mtp_ready_; |
Copilot uses AI. Check for mistakes.
bool GenerateStream::waitForPreillMtpReady() { | ||
std::unique_lock<std::mutex> lock(*output_mutex_); | ||
|
||
cv_->wait(lock, [this] { | ||
return preill_mtp_ready_ || generate_status_->status == StreamState::STOPPED | ||
|| generate_status_->status == StreamState::FINISHED; | ||
}); | ||
|
||
if(!preill_mtp_ready_ && generate_status_->status == StreamState::STOPPED) { | ||
RTP_LLM_LOG_WARNING("waitForPreillMtpReady exits due to stream [%ld] stopped, error: %s", | ||
streamId(), | ||
generate_status_->error_info.ToString().c_str()); | ||
} | ||
|
||
return preill_mtp_ready_; |
Copilot
AI
Oct 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected spelling of 'Preill' to 'Prefill' in method name, variable names, and log message.
bool GenerateStream::waitForPreillMtpReady() { | |
std::unique_lock<std::mutex> lock(*output_mutex_); | |
cv_->wait(lock, [this] { | |
return preill_mtp_ready_ || generate_status_->status == StreamState::STOPPED | |
|| generate_status_->status == StreamState::FINISHED; | |
}); | |
if(!preill_mtp_ready_ && generate_status_->status == StreamState::STOPPED) { | |
RTP_LLM_LOG_WARNING("waitForPreillMtpReady exits due to stream [%ld] stopped, error: %s", | |
streamId(), | |
generate_status_->error_info.ToString().c_str()); | |
} | |
return preill_mtp_ready_; | |
bool GenerateStream::waitForPrefillMtpReady() { | |
std::unique_lock<std::mutex> lock(*output_mutex_); | |
cv_->wait(lock, [this] { | |
return prefill_mtp_ready_ || generate_status_->status == StreamState::STOPPED | |
|| generate_status_->status == StreamState::FINISHED; | |
}); | |
if(!prefill_mtp_ready_ && generate_status_->status == StreamState::STOPPED) { | |
RTP_LLM_LOG_WARNING("waitForPrefillMtpReady exits due to stream [%ld] stopped, error: %s", | |
streamId(), | |
generate_status_->error_info.ToString().c_str()); | |
} | |
return prefill_mtp_ready_; |
Copilot uses AI. Check for mistakes.
bool GenerateStream::waitForPreillMtpReady() { | ||
std::unique_lock<std::mutex> lock(*output_mutex_); | ||
|
||
cv_->wait(lock, [this] { | ||
return preill_mtp_ready_ || generate_status_->status == StreamState::STOPPED | ||
|| generate_status_->status == StreamState::FINISHED; | ||
}); | ||
|
||
if(!preill_mtp_ready_ && generate_status_->status == StreamState::STOPPED) { | ||
RTP_LLM_LOG_WARNING("waitForPreillMtpReady exits due to stream [%ld] stopped, error: %s", | ||
streamId(), | ||
generate_status_->error_info.ToString().c_str()); | ||
} | ||
|
||
return preill_mtp_ready_; |
Copilot
AI
Oct 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected spelling of 'Preill' to 'Prefill' in method name, variable names, and log message.
bool GenerateStream::waitForPreillMtpReady() { | |
std::unique_lock<std::mutex> lock(*output_mutex_); | |
cv_->wait(lock, [this] { | |
return preill_mtp_ready_ || generate_status_->status == StreamState::STOPPED | |
|| generate_status_->status == StreamState::FINISHED; | |
}); | |
if(!preill_mtp_ready_ && generate_status_->status == StreamState::STOPPED) { | |
RTP_LLM_LOG_WARNING("waitForPreillMtpReady exits due to stream [%ld] stopped, error: %s", | |
streamId(), | |
generate_status_->error_info.ToString().c_str()); | |
} | |
return preill_mtp_ready_; | |
bool GenerateStream::waitForPrefillMtpReady() { | |
std::unique_lock<std::mutex> lock(*output_mutex_); | |
cv_->wait(lock, [this] { | |
return prefill_mtp_ready_ || generate_status_->status == StreamState::STOPPED | |
|| generate_status_->status == StreamState::FINISHED; | |
}); | |
if(!prefill_mtp_ready_ && generate_status_->status == StreamState::STOPPED) { | |
RTP_LLM_LOG_WARNING("waitForPrefillMtpReady exits due to stream [%ld] stopped, error: %s", | |
streamId(), | |
generate_status_->error_info.ToString().c_str()); | |
} | |
return prefill_mtp_ready_; |
Copilot uses AI. Check for mistakes.
break; | ||
|
||
if (engine_->isMTPEagle()) { | ||
stream->waitForPreillMtpReady(); |
Copilot
AI
Oct 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected spelling of 'Preill' to 'Prefill' in method call.
stream->waitForPreillMtpReady(); | |
stream->waitForPrefillMtpReady(); |
Copilot uses AI. Check for mistakes.
|| generate_status_->status == StreamState::FINISHED; | ||
}); | ||
|
||
if(!preill_mtp_ready_ && generate_status_->status == StreamState::STOPPED) { |
Copilot
AI
Oct 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing space after 'if' keyword. Should be 'if (' instead of 'if('.
if(!preill_mtp_ready_ && generate_status_->status == StreamState::STOPPED) { | |
if (!preill_mtp_ready_ && generate_status_->status == StreamState::STOPPED) { |
Copilot uses AI. Check for mistakes.
4eeeb3f
to
1d982c5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.
45f8b20
to
df4c0b3
Compare
df4c0b3
to
040e6fb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 3 comments.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
|
||
if (engine_->isMTPEagle()) { | ||
stream->waitForPrefillMtpReady(); | ||
break; |
Copilot
AI
Oct 20, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The break statement causes the loop to exit early only for MTP streams, potentially skipping output processing for non-finished MTP streams. The loop should continue processing outputs until the stream is finished, regardless of MTP status.
break; |
Copilot uses AI. Check for mistakes.
auto modified_request = const_cast<RemoteGenerateRequestPBNew*>(request); | ||
GenerateInputPB* mutable_input = modified_request->mutable_input(); | ||
|
||
|
Copilot
AI
Oct 20, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Unnecessary blank line added without purpose.
Copilot uses AI. Check for mistakes.
ErrorInfo PrefillRpcServerNew::generateFirstToken(PrefillGenerateContextNew& prefill_context) { | ||
auto stream = prefill_context.getStream(); | ||
engine_->enqueue(stream); | ||
|
Copilot
AI
Oct 20, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Unnecessary trailing whitespace added.
Copilot uses AI. Check for mistakes.
feat: decode entrance support mtp