Skip to content

Commit

Permalink
add delta updates
Browse files Browse the repository at this point in the history
  • Loading branch information
wusteven815 committed Dec 18, 2024
1 parent 2738a1d commit 1970f81
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 30 deletions.
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ repos:
sphinx,
click,
watchdog,
pyjsonpatch,
]
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.2.2
Expand Down
13 changes: 13 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions plugins/ui/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ install_requires =
deephaven-core>=0.37.0
deephaven-plugin>=0.6.0
json-rpc
pyjsonpatch
deephaven-plugin-utilities>=0.0.2
typing_extensions;python_version<'3.11'
include_package_data = True
Expand Down
17 changes: 10 additions & 7 deletions plugins/ui/src/deephaven/ui/object_types/ElementMessageStream.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from deephaven.server.executors import submit_task
from deephaven.execution_context import ExecutionContext, get_exec_ctx
from deephaven.liveness_scope import liveness_scope
from pyjsonpatch import generate_patch

from .._internal import wrap_callable
from ..elements import Element
Expand Down Expand Up @@ -194,6 +195,7 @@ def __init__(self, element: Element, connection: MessageStream):
self._render_state = _RenderState.IDLE
self._exec_context = get_exec_ctx()
self._is_closed = False
self._last_document = {}

def _render(self) -> None:
logger.debug("ElementMessageStream._render")
Expand All @@ -208,7 +210,7 @@ def _render(self) -> None:
try:
node = self._renderer.render(self._element)
state = self._context.export_state()
self._send_document_update(node, state)
self._send_document_patch(node, state)
except Exception as e:
# Send the error to the client for displaying to the user
# If there's an error sending it to the client, then it will be caught by the render exception handler
Expand Down Expand Up @@ -450,11 +452,11 @@ def _close_callable(self, callable_id: str) -> None:
self._callable_dict.pop(callable_id, None)
self._temp_callable_dict.pop(callable_id, None)

def _send_document_update(
def _send_document_patch(
self, root: RenderedNode, state: ExportedRenderState
) -> None:
"""
Send a document update to the client. Currently just sends the entire document for each update.
Send a document update to the client in the form of a JSON Patch (RFC 6902).
Args:
root: The root node of the document to send
Expand All @@ -464,18 +466,19 @@ def _send_document_update(
logger.error("Stream is closed, cannot render document")
sys.exit()

# TODO(#67): Send a diff of the document instead of the entire document.
encoder_result = self._encoder.encode_node(root)
encoded_document = encoder_result["encoded_node"]
new_objects = encoder_result["new_objects"]
callable_id_dict = encoder_result["callable_id_dict"]

document = json.loads(encoded_document)
patch = generate_patch(self._last_document, document)
self._last_document = document

logger.debug("Exported state: %s", state)
encoded_state = json.dumps(state)

request = self._make_notification(
"documentUpdated", encoded_document, encoded_state
)
request = self._make_notification("documentPatched", patch, encoded_state)
payload = json.dumps(request)
logger.debug(f"Sending payload: {payload}")

Expand Down
1 change: 1 addition & 0 deletions plugins/ui/src/js/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
"@fortawesome/react-fontawesome": "^0.2.0",
"@internationalized/date": "^3.5.5",
"classnames": "^2.5.1",
"fast-json-patch": "^3.1.1",
"json-rpc-2.0": "^1.6.0",
"nanoid": "^5.0.7",
"react-markdown": "^8.0.7",
Expand Down
165 changes: 142 additions & 23 deletions plugins/ui/src/js/src/widget/WidgetHandler.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import React, {
} from 'react';
// eslint-disable-next-line camelcase
import { unstable_batchedUpdates } from 'react-dom';
import { applyPatch, type Operation } from 'fast-json-patch';
import {
JSONRPCClient,
JSONRPCServer,
Expand All @@ -21,20 +22,22 @@ import { useWidget } from '@deephaven/jsapi-bootstrap';
import type { dh } from '@deephaven/jsapi-types';
import Log from '@deephaven/log';
import { EMPTY_FUNCTION, assertNotNull } from '@deephaven/utils';

import {
CALLABLE_KEY,
OBJECT_KEY,
isCallableNode,
isElementNode,
isObjectNode,
isPrimitive,
} from '../elements/utils/ElementUtils';
import {
ReadonlyWidgetData,
WidgetDataUpdate,
WidgetMessageEvent,
WidgetError,
METHOD_DOCUMENT_ERROR,
METHOD_DOCUMENT_UPDATED,
METHOD_DOCUMENT_PATCHED,
METHOD_EVENT,
} from './WidgetTypes';
import DocumentHandler from './DocumentHandler';
Expand Down Expand Up @@ -73,6 +76,9 @@ function WidgetHandler({
initialData: initialDataProp,
}: WidgetHandlerProps): JSX.Element | null {
const { widget, error: widgetError } = useWidget(widgetDescriptor);
const uiDomRef = useRef({});
const reactDomRef: any = useRef(null); // eslint-disable-line @typescript-eslint/no-explicit-any
const reactDomNewRef: any = useRef(null); // eslint-disable-line @typescript-eslint/no-explicit-any
const [isLoading, setIsLoading] = useState(true);
const [prevWidgetDescriptor, setPrevWidgetDescriptor] =
useState(widgetDescriptor);
Expand Down Expand Up @@ -166,36 +172,49 @@ function WidgetHandler({
[jsonClient]
);

const parseDocument = useCallback(
const parseDocumentFromObject = useCallback(
/**
* Parse the data from the server, replacing some of the nodes on the way.
* Parses an object representing a document, making a deep copy and replacing some of the nodes on the way.
* Replaces all Callables with an async callback that will automatically call the server use JSON-RPC.
* Replaces all Objects with the exported object from the server.
* Replaces all Element nodes with the ReactNode derived from that Element.
*
* @param data The data to parse
* @returns The parsed data
*/
(data: string) => {
(data: object) => {
assertNotNull(jsonClient);
// Keep track of exported objects that are no longer in use after this render.
// We close those objects that are no longer referenced, as they will never be referenced again.
const deadObjectMap = new Map(exportedObjectMap.current);

const parsedData = JSON.parse(data, (key, value) => {
// Need to re-hydrate any objects that are defined
if (isCallableNode(value)) {
const callableId = value[CALLABLE_KEY];
const deepCopyAndParse = (obj: unknown, map = new WeakMap()): unknown => {
// make a deep copy of the object and recurse on children before making any replacements
if (obj === null || typeof obj !== 'object') return obj;
if (map.has(obj)) return map.get(obj);
const clone = Array.isArray(obj)
? []
: Object.create(Object.getPrototypeOf(obj));
map.set(obj, clone);
const keys = Reflect.ownKeys(obj);
keys.forEach(key => {
const value = obj[key as keyof typeof obj];
clone[key] = deepCopyAndParse(value, map);
});

if (isCallableNode(clone)) {
const callableId = clone[CALLABLE_KEY];
log.debug2('Registering callableId', callableId);
return wrapCallable(
const res = wrapCallable(
jsonClient,
callableId,
callableFinalizationRegistry
);
return res;
}
if (isObjectNode(value)) {
if (isObjectNode(clone)) {
// Replace this node with the exported object
const objectKey = value[OBJECT_KEY];
const objectKey = clone[OBJECT_KEY];
const exportedObject = exportedObjectMap.current.get(objectKey);
if (exportedObject === undefined) {
// The map should always have the exported object for a key, otherwise the protocol is broken
Expand All @@ -205,19 +224,21 @@ function WidgetHandler({
return exportedObject;
}

if (isElementNode(value)) {
if (isElementNode(clone)) {
// Replace the elements node with the Component it maps to
try {
return getComponentForElement(value);
const res = getComponentForElement(clone);
return res;
} catch (e) {
log.warn('Error getting component for element', e);
return value;
return clone;
}
}

return value;
});
return clone;
};

const parsedData = deepCopyAndParse(data);
// Close any objects that are no longer referenced
deadObjectMap.forEach((deadObject, objectKey) => {
log.debug('Closing dead object', objectKey);
Expand Down Expand Up @@ -250,6 +271,93 @@ function WidgetHandler({
[]
);

const optimizeObject = useCallback(
/**
* Optimizes an object for React, by reusing an existing object if it hasn't changed. If any children object has
* changed, a shallow copy is made.
*
* @param oldObj The old object to use for optimization
* @param newObj The new object to optimize
* @returns An object with parts of OldObj that are unchanged and parts of newObj that are changed
*/
(
oldObj: any, // eslint-disable-line @typescript-eslint/no-explicit-any
newObj: any // eslint-disable-line @typescript-eslint/no-explicit-any
// eslint-disable-next-line @typescript-eslint/no-explicit-any
): { changed: boolean; obj: any } => {
if (typeof oldObj !== typeof newObj) {
return { changed: true, obj: newObj };
}
if (
isPrimitive(oldObj) ||
typeof oldObj === 'symbol' ||
typeof oldObj === 'function'
) {
if (oldObj === newObj) return { changed: false, obj: oldObj };
return { changed: true, obj: newObj };
}

// typeof null is object
if (oldObj == null && newObj == null) {
return { changed: false, obj: oldObj };
}
if (oldObj == null || newObj == null) {
return { changed: true, obj: newObj };
}

if (typeof newObj !== 'object') {
return { changed: true, obj: newObj };
}
if (Array.isArray(oldObj) !== Array.isArray(newObj)) {
return { changed: true, obj: newObj };
}

// For object nodes
if (
'connection' in newObj &&
'fetched' in newObj &&
'ticket_0' in newObj
) {
if (oldObj.connection === newObj.connection) {
return { changed: false, obj: oldObj };
}
return { changed: true, obj: newObj };
}

const obj = newObj;
if (Array.isArray(obj)) {
let changed = oldObj.length !== obj.length;
for (let i = 0; i < obj.length; i += 1) {
const { changed: thisChanged, obj: newValue } = optimizeObject(
oldObj[i],
newObj[i]
);
if (thisChanged) {
changed = true;
obj[i] = newValue;
}
}
if (changed) return { changed: true, obj: [...obj] };
return { changed: false, obj: oldObj };
}

let changed = Object.keys(oldObj).length !== Object.keys(obj).length;
Reflect.ownKeys(obj).forEach(key => {
const { changed: thisChanged, obj: newValue } = optimizeObject(
oldObj[key as keyof typeof oldObj],
obj[key as keyof typeof obj]
);
obj[key] = newValue;
if (thisChanged) {
changed = true;
}
});
if (changed) return { changed: true, obj: { ...obj } };
return { changed: false, obj: oldObj };
},
[]
);

useEffect(
function initMethods() {
if (jsonClient == null) {
Expand All @@ -258,15 +366,25 @@ function WidgetHandler({

log.debug('Adding methods to jsonClient');
jsonClient.addMethod(
METHOD_DOCUMENT_UPDATED,
async (params: [string, string]) => {
log.debug2(METHOD_DOCUMENT_UPDATED, params);
const [documentParam, stateParam] = params;
const newDocument = parseDocument(documentParam);
METHOD_DOCUMENT_PATCHED,
async (params: [Operation[], string]) => {
log.debug2(METHOD_DOCUMENT_PATCHED, params);
const [patch, stateParam] = params;

applyPatch(uiDomRef.current, patch);
reactDomRef.current = reactDomNewRef.current;
reactDomNewRef.current = parseDocumentFromObject(uiDomRef.current);

const { obj: updatedDocument } = optimizeObject(
reactDomRef.current,
reactDomNewRef.current
);
reactDomNewRef.current = updatedDocument;

// TODO: Remove unstable_batchedUpdates wrapper when upgrading to React 18
unstable_batchedUpdates(() => {
setInternalError(undefined);
setDocument(newDocument);
setDocument(updatedDocument);
setIsLoading(false);
});
if (stateParam != null) {
Expand Down Expand Up @@ -338,7 +456,8 @@ function WidgetHandler({
[
jsonClient,
onDataChange,
parseDocument,
parseDocumentFromObject,
optimizeObject,
sendSetState,
callableFinalizationRegistry,
]
Expand Down
Loading

0 comments on commit 1970f81

Please sign in to comment.