|
1 | 1 | use delegate::delegate;
|
2 | 2 | use ion_rs_old::{Decimal, Int, IonError, IonReader, IonType, StreamItem, Symbol};
|
3 | 3 | use once_cell::sync::Lazy;
|
4 |
| -use partiql_value::{Bag, DateTime, List, Tuple, Value, Variant}; |
| 4 | +use partiql_value::{Bag, DateTime, EdgeSpec, Graph, List, SimpleGraph, Tuple, Value, Variant}; |
5 | 5 | use regex::RegexSet;
|
6 | 6 | use rust_decimal::prelude::ToPrimitive;
|
| 7 | +use std::collections::HashSet; |
7 | 8 |
|
8 | 9 | use crate::boxed_ion::BoxedIonType;
|
9 | 10 | use crate::common::{
|
10 |
| - Encoding, BAG_ANNOT, BOXED_ION_ANNOT, DATE_ANNOT, MISSING_ANNOT, RE_SET_TIME_PARTS, TIME_ANNOT, |
11 |
| - TIME_PARTS_HOUR, TIME_PARTS_MINUTE, TIME_PARTS_SECOND, TIME_PARTS_TZ_HOUR, |
12 |
| - TIME_PARTS_TZ_MINUTE, |
| 11 | + Encoding, BAG_ANNOT, BOXED_ION_ANNOT, DATE_ANNOT, GRAPH_ANNOT, MISSING_ANNOT, |
| 12 | + RE_SET_TIME_PARTS, TIME_ANNOT, TIME_PARTS_HOUR, TIME_PARTS_MINUTE, TIME_PARTS_SECOND, |
| 13 | + TIME_PARTS_TZ_HOUR, TIME_PARTS_TZ_MINUTE, |
13 | 14 | };
|
14 | 15 | use std::num::NonZeroU8;
|
| 16 | +use std::rc::Rc; |
15 | 17 | use std::str::FromStr;
|
16 | 18 | use thiserror::Error;
|
17 | 19 | use time::Duration;
|
@@ -368,6 +370,23 @@ fn has_annotation(
|
368 | 370 | static TIME_PARTS_PATTERN_SET: Lazy<RegexSet> =
|
369 | 371 | Lazy::new(|| RegexSet::new(RE_SET_TIME_PARTS).unwrap());
|
370 | 372 |
|
| 373 | +type GNode = (String, HashSet<String>, Option<Value>); |
| 374 | +type GNodes = (Vec<String>, Vec<HashSet<String>>, Vec<Option<Value>>); |
| 375 | +#[allow(clippy::type_complexity)] |
| 376 | +type GEdge = ( |
| 377 | + String, |
| 378 | + HashSet<String>, |
| 379 | + (String, String, String), |
| 380 | + Option<Value>, |
| 381 | +); |
| 382 | +#[allow(clippy::type_complexity)] |
| 383 | +type GEdges = ( |
| 384 | + Vec<String>, |
| 385 | + Vec<HashSet<String>>, |
| 386 | + Vec<(String, String, String)>, |
| 387 | + Vec<Option<Value>>, |
| 388 | +); |
| 389 | + |
371 | 390 | impl PartiqlEncodedIonValueDecoder {
|
372 | 391 | fn decode_date<R>(&self, reader: &mut R) -> IonDecodeResult
|
373 | 392 | where
|
@@ -527,6 +546,211 @@ impl PartiqlEncodedIonValueDecoder {
|
527 | 546 | .map_err(|e| IonDecodeError::StreamError(e.to_string()))?,
|
528 | 547 | ))
|
529 | 548 | }
|
| 549 | + |
| 550 | + fn decode_graph<R>(&self, reader: &mut R) -> IonDecodeResult |
| 551 | + where |
| 552 | + R: IonReader<Item = StreamItem, Symbol = Symbol>, |
| 553 | + { |
| 554 | + let err = || IonDecodeError::ConversionError("Invalid graph specified".into()); |
| 555 | + let mut nodes = None; |
| 556 | + let mut edges = None; |
| 557 | + reader.step_in()?; |
| 558 | + 'kv: loop { |
| 559 | + match reader.next()? { |
| 560 | + StreamItem::Value(typ) => match typ { |
| 561 | + IonType::List => match reader.field_name()?.text_or_error()? { |
| 562 | + "nodes" => nodes = Some(self.decode_nodes(reader)?), |
| 563 | + "edges" => edges = Some(self.decode_edges(reader)?), |
| 564 | + _ => return Err(err()), |
| 565 | + }, |
| 566 | + _ => return Err(err()), |
| 567 | + }, |
| 568 | + StreamItem::Null(_) => return Err(err()), |
| 569 | + StreamItem::Nothing => break 'kv, |
| 570 | + } |
| 571 | + } |
| 572 | + reader.step_out()?; |
| 573 | + |
| 574 | + let nodes = nodes.ok_or_else(err)?; |
| 575 | + let (ids, labels, ends, payloads) = edges.ok_or_else(err)?; |
| 576 | + let edge_specs = ends |
| 577 | + .into_iter() |
| 578 | + .map(|(l, dir, r)| match dir.as_str() { |
| 579 | + "->" => Ok(EdgeSpec::Directed(l, r)), |
| 580 | + "<-" => Ok(EdgeSpec::Directed(r, l)), |
| 581 | + "--" => Ok(EdgeSpec::Undirected(l, r)), |
| 582 | + _ => Err(err()), |
| 583 | + }) |
| 584 | + .collect::<Result<Vec<EdgeSpec>, _>>()?; |
| 585 | + Ok(Value::Graph(Box::new(Graph::Simple(Rc::new( |
| 586 | + SimpleGraph::from_spec(nodes, (ids, labels, edge_specs, payloads)), |
| 587 | + ))))) |
| 588 | + } |
| 589 | + |
| 590 | + fn decode_nodes<R>(&self, reader: &mut R) -> Result<GNodes, IonDecodeError> |
| 591 | + where |
| 592 | + R: IonReader<Item = StreamItem, Symbol = Symbol>, |
| 593 | + { |
| 594 | + let err = || IonDecodeError::ConversionError("Invalid graph specified".into()); |
| 595 | + reader.step_in()?; |
| 596 | + let mut ids = vec![]; |
| 597 | + let mut labels = vec![]; |
| 598 | + let mut payloads = vec![]; |
| 599 | + 'values: loop { |
| 600 | + let item = reader.next()?; |
| 601 | + match item { |
| 602 | + StreamItem::Nothing => break 'values, |
| 603 | + StreamItem::Value(IonType::Struct) => { |
| 604 | + let (id, labelset, payload) = self.decode_node(reader)?; |
| 605 | + ids.push(id); |
| 606 | + labels.push(labelset); |
| 607 | + payloads.push(payload); |
| 608 | + } |
| 609 | + _ => return Err(err()), |
| 610 | + } |
| 611 | + } |
| 612 | + reader.step_out()?; |
| 613 | + Ok((ids, labels, payloads)) |
| 614 | + } |
| 615 | + |
| 616 | + fn decode_node<R>(&self, reader: &mut R) -> Result<GNode, IonDecodeError> |
| 617 | + where |
| 618 | + R: IonReader<Item = StreamItem, Symbol = Symbol>, |
| 619 | + { |
| 620 | + let err = || IonDecodeError::ConversionError("Invalid graph specified".into()); |
| 621 | + let mut id = None; |
| 622 | + let mut labels = None; |
| 623 | + let mut payload = None; |
| 624 | + reader.step_in()?; |
| 625 | + 'kv: loop { |
| 626 | + let item = reader.next()?; |
| 627 | + if item == StreamItem::Nothing { |
| 628 | + break 'kv; |
| 629 | + } |
| 630 | + let fname = reader.field_name()?; |
| 631 | + let fname = fname.text_or_error()?; |
| 632 | + match (fname, item) { |
| 633 | + ("id", StreamItem::Value(IonType::Symbol)) => { |
| 634 | + id = Some(reader.read_symbol()?.text_or_error()?.to_string()); |
| 635 | + } |
| 636 | + ("labels", StreamItem::Value(IonType::List)) => { |
| 637 | + let mut labelset = HashSet::new(); |
| 638 | + reader.step_in()?; |
| 639 | + #[allow(irrefutable_let_patterns)] |
| 640 | + while let item = reader.next()? { |
| 641 | + match item { |
| 642 | + StreamItem::Value(IonType::String) => { |
| 643 | + labelset.insert(reader.read_string()?.text().to_string()); |
| 644 | + } |
| 645 | + StreamItem::Nothing => break, |
| 646 | + _ => return Err(err()), |
| 647 | + } |
| 648 | + } |
| 649 | + reader.step_out()?; |
| 650 | + labels = Some(labelset); |
| 651 | + } |
| 652 | + ("payload", StreamItem::Value(typ)) => { |
| 653 | + payload = Some(self.decode_value(reader, typ)?); |
| 654 | + } |
| 655 | + _ => return Err(err()), |
| 656 | + } |
| 657 | + } |
| 658 | + reader.step_out()?; |
| 659 | + |
| 660 | + let id = id.ok_or_else(err)?; |
| 661 | + let labels = labels.unwrap_or_else(Default::default); |
| 662 | + Ok((id, labels, payload)) |
| 663 | + } |
| 664 | + |
| 665 | + fn decode_edges<R>(&self, reader: &mut R) -> Result<GEdges, IonDecodeError> |
| 666 | + where |
| 667 | + R: IonReader<Item = StreamItem, Symbol = Symbol>, |
| 668 | + { |
| 669 | + let err = || IonDecodeError::ConversionError("Invalid graph specified".into()); |
| 670 | + reader.step_in()?; |
| 671 | + let mut ids = vec![]; |
| 672 | + let mut labels = vec![]; |
| 673 | + let mut ends = vec![]; |
| 674 | + let mut payloads = vec![]; |
| 675 | + 'values: loop { |
| 676 | + let item = reader.next()?; |
| 677 | + match item { |
| 678 | + StreamItem::Nothing => break 'values, |
| 679 | + StreamItem::Value(IonType::Struct) => { |
| 680 | + let (id, labelset, end, payload) = self.decode_edge(reader)?; |
| 681 | + ids.push(id); |
| 682 | + labels.push(labelset); |
| 683 | + ends.push(end); |
| 684 | + payloads.push(payload); |
| 685 | + } |
| 686 | + _ => return Err(err()), |
| 687 | + } |
| 688 | + } |
| 689 | + reader.step_out()?; |
| 690 | + Ok((ids, labels, ends, payloads)) |
| 691 | + } |
| 692 | + |
| 693 | + fn decode_edge<R>(&self, reader: &mut R) -> Result<GEdge, IonDecodeError> |
| 694 | + where |
| 695 | + R: IonReader<Item = StreamItem, Symbol = Symbol>, |
| 696 | + { |
| 697 | + let err = || IonDecodeError::ConversionError("Invalid graph specified".into()); |
| 698 | + let mut id = None; |
| 699 | + let mut labels = None; |
| 700 | + let mut ends = None; |
| 701 | + let mut payload = None; |
| 702 | + reader.step_in()?; |
| 703 | + 'kv: loop { |
| 704 | + let item = reader.next()?; |
| 705 | + if item == StreamItem::Nothing { |
| 706 | + break 'kv; |
| 707 | + } |
| 708 | + let fname = reader.field_name()?; |
| 709 | + let fname = fname.text_or_error()?; |
| 710 | + match (fname, item) { |
| 711 | + ("id", StreamItem::Value(IonType::Symbol)) => { |
| 712 | + id = Some(reader.read_symbol()?.text_or_error()?.to_string()); |
| 713 | + } |
| 714 | + ("labels", StreamItem::Value(IonType::List)) => { |
| 715 | + let mut labelset = HashSet::new(); |
| 716 | + reader.step_in()?; |
| 717 | + #[allow(irrefutable_let_patterns)] |
| 718 | + while let item = reader.next()? { |
| 719 | + match item { |
| 720 | + StreamItem::Value(IonType::String) => { |
| 721 | + labelset.insert(reader.read_string()?.text().to_string()); |
| 722 | + } |
| 723 | + StreamItem::Nothing => break, |
| 724 | + _ => return Err(err()), |
| 725 | + } |
| 726 | + } |
| 727 | + reader.step_out()?; |
| 728 | + labels = Some(labelset); |
| 729 | + } |
| 730 | + ("ends", StreamItem::Value(IonType::SExp)) => { |
| 731 | + reader.step_in()?; |
| 732 | + reader.next()?; |
| 733 | + let l = reader.read_symbol()?.text_or_error()?.to_string(); |
| 734 | + reader.next()?; |
| 735 | + let dir = reader.read_symbol()?.text_or_error()?.to_string(); |
| 736 | + reader.next()?; |
| 737 | + let r = reader.read_symbol()?.text_or_error()?.to_string(); |
| 738 | + reader.step_out()?; |
| 739 | + ends = Some((l, dir, r)); |
| 740 | + } |
| 741 | + ("payload", StreamItem::Value(typ)) => { |
| 742 | + payload = Some(self.decode_value(reader, typ)?); |
| 743 | + } |
| 744 | + _ => return Err(err()), |
| 745 | + } |
| 746 | + } |
| 747 | + reader.step_out()?; |
| 748 | + |
| 749 | + let id = id.ok_or_else(err)?; |
| 750 | + let labels = labels.unwrap_or_else(Default::default); |
| 751 | + let ends = ends.ok_or_else(err)?; |
| 752 | + Ok((id, labels, ends, payload)) |
| 753 | + } |
530 | 754 | }
|
531 | 755 |
|
532 | 756 | impl<R> IonValueDecoder<R> for PartiqlEncodedIonValueDecoder
|
@@ -576,6 +800,8 @@ where
|
576 | 800 | fn decode_struct(&self, reader: &mut R) -> IonDecodeResult {
|
577 | 801 | if has_annotation(reader, TIME_ANNOT) {
|
578 | 802 | self.decode_time(reader)
|
| 803 | + } else if has_annotation(reader, GRAPH_ANNOT) { |
| 804 | + self.decode_graph(reader) |
579 | 805 | } else {
|
580 | 806 | decode_struct(self, reader)
|
581 | 807 | }
|
|
0 commit comments