Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][fn] Record Pulsar Function processing time properly for asynchronous functions #23811

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

walkinggo
Copy link

@walkinggo walkinggo commented Jan 4, 2025

Fixes #23705

Motivation

See #23705

Modifications

@walkinggo Please update this. (You can use GenAI in assisting to summarize the modifications. Some hints in #23806 (comment) . GitHub's Co-pilot seems to support summarizing PR modifications too.)

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change doesn't seem to be related to the PR.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, uploading these file changes was a mistake. I've force-pushed in a later commit to remove those changes.

Comment on lines 341 to 359
CompletableFuture<InstanceObserver> future = CompletableFuture.supplyAsync(() -> {
JavaExecutionResult result;
InstanceObserver instanceObserver = new InstanceObserver();
instanceObserver.setStartTime(System.nanoTime());
// process the message
Thread.currentThread().setContextClassLoader(functionClassLoader);
result = javaInstance.handleMessage(
currentRecord,
currentRecord.getValue(),
asyncResultConsumer,
asyncErrorHandler);
Thread.currentThread().setContextClassLoader(instanceClassLoader);
instanceObserver.setJavaExecutionResult(result);
return instanceObserver;
}).whenComplete((res, ex) -> {
stats.processTimeEnd(res.getStartTime());
});

JavaExecutionResult result = future.join().getJavaExecutionResult();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't make sense and is completely unnecessary.
Instead, you should simply pass the starting time nanos as an argument that gets added to the JavaExecutionResult instance field (add a new field startTimeNanos to JavaExecutionResult).
The result can be recorded in the handleResult method. The processTimeStart method should be completely removed from the ComponentStatsManager class and the processTimeEnd method should accept the starting time as a parameter. The value gets taken from the startTimeNanos field of the JavaExecutionResult instance.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, I have completed these tasks.

…m ComponentStatsManager to JavaExecutionResult.
@walkinggo walkinggo force-pushed the complete-get-proper-time branch from 58d57af to 3511c82 Compare January 4, 2025 17:10
@walkinggo
Copy link
Author

It looks like due to code formatting, many unintended changes were introduced. I added startTimeNanos in JavaExecutionResult and removed it from ComponentStatsManager. Now, the processTimeEnd method in ComponentStatsManager takes the start time as a parameter and calculates the final time. I also removed the processTimeStart method.

@walkinggo
Copy link
Author

Do I need to create a new PR to remove the unnecessary line changes caused by formatting? @ @lhotari

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good start, going in the right direction!

public abstract void processTimeStart();

public abstract void processTimeEnd();
public abstract void processTimeEnd(double processTimeStart);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the parameter should be a long instead of double.

@@ -29,6 +29,7 @@
public class JavaExecutionResult {
private Throwable userException;
private Object result;
private double startTimeNanos;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private double startTimeNanos;
private final long startTimeNanos = System.nanoTime();

It should be a long field and it should be made final.

@@ -117,6 +117,7 @@ public JavaExecutionResult handleMessage(Record<?> record, Object input,
}

JavaExecutionResult executionResult = new JavaExecutionResult();
executionResult.setStartTimeNanos(System.nanoTime());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is obsolete since startTimeNanos is set when the instance gets constructed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You will also need to pass the JavaExecutionResult instance into a field javaExecutionResult of the AsyncFuncRequest so that the same instance is being used for the asyncPreserveInputOrderForOutputMessages case on line 139. Replace the instantiation of JavaExecutionResult in processAsyncResultsInInputOrder method on line 191 with this instance that is carried in the javaExecutionResult of the AsyncFuncRequest instance.
Also replace the instance on line 152 with this same executionResult on line 152 so that the starting time will be handled for the non asyncPreserveInputOrderForOutputMessages case of async processing.
With these changes, the solution would have the basics in place. There would also need to be a way to test all of this.

For testing, it might be useful to have a java.util.function.LongSupplier function for getting the nanoTime. In production code, this would default to () -> System.nanoTime(), but in tests, this could be used to mock the "clock" for getting nanoTime. Adding testability would cause some additional refactorings.

Comment on lines +353 to +354
// register end time
stats.processTimeEnd(result.getStartTimeNanos());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this. This should be handled in the handleResult method instead.

@lhotari
Copy link
Member

lhotari commented Jan 4, 2025

Do I need to create a new PR to remove the unnecessary line changes caused by formatting? @ @lhotari

It makes it slightly harder to read the diff when there are formatting changes. However there's the "hide whitespace" option to address that in GitHub UI.
Are you using IntelliJ/IDEA for development? Have you setup the code style according to the instructions at https://pulsar.apache.org/contribute/setup-ide/#configure-code-style ? I noticed that the import formatting doesn't match the Pulsar formatting since there are no empty lines in the Pulsar code style.
In the past, I have noticed that some code in the Pulsar Functions packages is using some non-default formatting so if you reformat the code, it will result in many changes. As long as you are using the Pulsar code style, the formatting changes are fine to be included in this PR.

@lhotari lhotari changed the title [Bug][pulsar-function]Solve pulsar Function processing time doesn't get properly recorded for asynchronous functions with completefuture . [fix][fn] Record Pulsar Function processing time properly for asynchronous functions Jan 4, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-not-needed Your PR changes do not impact docs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug] Pulsar Function processing time doesn't get properly recorded for asynchronous functions
2 participants