@@ -16,6 +16,7 @@ mod database;
16
16
mod metrics;
17
17
mod produce_block;
18
18
mod proposer_duties;
19
+ mod publish_attestations;
19
20
mod publish_blocks;
20
21
mod standard_block_rewards;
21
22
mod state_id;
@@ -35,7 +36,7 @@ use beacon_chain::{
35
36
validator_monitor:: timestamp_now, AttestationError as AttnError , BeaconChain , BeaconChainError ,
36
37
BeaconChainTypes , WhenSlotSkipped ,
37
38
} ;
38
- use beacon_processor:: BeaconProcessorSend ;
39
+ use beacon_processor:: { work_reprocessing_queue :: ReprocessQueueMessage , BeaconProcessorSend } ;
39
40
pub use block_id:: BlockId ;
40
41
use builder_states:: get_next_withdrawals;
41
42
use bytes:: Bytes ;
@@ -129,6 +130,7 @@ pub struct Context<T: BeaconChainTypes> {
129
130
pub network_senders : Option < NetworkSenders < T :: EthSpec > > ,
130
131
pub network_globals : Option < Arc < NetworkGlobals < T :: EthSpec > > > ,
131
132
pub beacon_processor_send : Option < BeaconProcessorSend < T :: EthSpec > > ,
133
+ pub beacon_processor_reprocess_send : Option < Sender < ReprocessQueueMessage > > ,
132
134
pub eth1_service : Option < eth1:: Service > ,
133
135
pub sse_logging_components : Option < SSELoggingComponents > ,
134
136
pub log : Logger ,
@@ -534,6 +536,11 @@ pub fn serve<T: BeaconChainTypes>(
534
536
. filter ( |_| config. enable_beacon_processor ) ;
535
537
let task_spawner_filter =
536
538
warp:: any ( ) . map ( move || TaskSpawner :: new ( beacon_processor_send. clone ( ) ) ) ;
539
+ let beacon_processor_reprocess_send = ctx
540
+ . beacon_processor_reprocess_send
541
+ . clone ( )
542
+ . filter ( |_| config. enable_beacon_processor ) ;
543
+ let reprocess_send_filter = warp:: any ( ) . map ( move || beacon_processor_reprocess_send. clone ( ) ) ;
537
544
538
545
let duplicate_block_status_code = ctx. config . duplicate_block_status_code ;
539
546
@@ -1756,140 +1763,26 @@ pub fn serve<T: BeaconChainTypes>(
1756
1763
. and ( warp:: path:: end ( ) )
1757
1764
. and ( warp_utils:: json:: json ( ) )
1758
1765
. and ( network_tx_filter. clone ( ) )
1766
+ . and ( reprocess_send_filter)
1759
1767
. and ( log_filter. clone ( ) )
1760
1768
. then (
1761
1769
|task_spawner : TaskSpawner < T :: EthSpec > ,
1762
1770
chain : Arc < BeaconChain < T > > ,
1763
1771
attestations : Vec < Attestation < T :: EthSpec > > ,
1764
1772
network_tx : UnboundedSender < NetworkMessage < T :: EthSpec > > ,
1765
- log : Logger | {
1766
- task_spawner. blocking_json_task ( Priority :: P0 , move || {
1767
- let seen_timestamp = timestamp_now ( ) ;
1768
- let mut failures = Vec :: new ( ) ;
1769
- let mut num_already_known = 0 ;
1770
-
1771
- for ( index, attestation) in attestations. as_slice ( ) . iter ( ) . enumerate ( ) {
1772
- let attestation = match chain
1773
- . verify_unaggregated_attestation_for_gossip ( attestation, None )
1774
- {
1775
- Ok ( attestation) => attestation,
1776
- Err ( AttnError :: PriorAttestationKnown { .. } ) => {
1777
- num_already_known += 1 ;
1778
-
1779
- // Skip to the next attestation since an attestation for this
1780
- // validator is already known in this epoch.
1781
- //
1782
- // There's little value for the network in validating a second
1783
- // attestation for another validator since it is either:
1784
- //
1785
- // 1. A duplicate.
1786
- // 2. Slashable.
1787
- // 3. Invalid.
1788
- //
1789
- // We are likely to get duplicates in the case where a VC is using
1790
- // fallback BNs. If the first BN actually publishes some/all of a
1791
- // batch of attestations but fails to respond in a timely fashion,
1792
- // the VC is likely to try publishing the attestations on another
1793
- // BN. That second BN may have already seen the attestations from
1794
- // the first BN and therefore indicate that the attestations are
1795
- // "already seen". An attestation that has already been seen has
1796
- // been published on the network so there's no actual error from
1797
- // the perspective of the user.
1798
- //
1799
- // It's better to prevent slashable attestations from ever
1800
- // appearing on the network than trying to slash validators,
1801
- // especially those validators connected to the local API.
1802
- //
1803
- // There might be *some* value in determining that this attestation
1804
- // is invalid, but since a valid attestation already it exists it
1805
- // appears that this validator is capable of producing valid
1806
- // attestations and there's no immediate cause for concern.
1807
- continue ;
1808
- }
1809
- Err ( e) => {
1810
- error ! ( log,
1811
- "Failure verifying attestation for gossip" ;
1812
- "error" => ?e,
1813
- "request_index" => index,
1814
- "committee_index" => attestation. data. index,
1815
- "attestation_slot" => attestation. data. slot,
1816
- ) ;
1817
- failures. push ( api_types:: Failure :: new (
1818
- index,
1819
- format ! ( "Verification: {:?}" , e) ,
1820
- ) ) ;
1821
- // skip to the next attestation so we do not publish this one to gossip
1822
- continue ;
1823
- }
1824
- } ;
1825
-
1826
- // Notify the validator monitor.
1827
- chain
1828
- . validator_monitor
1829
- . read ( )
1830
- . register_api_unaggregated_attestation (
1831
- seen_timestamp,
1832
- attestation. indexed_attestation ( ) ,
1833
- & chain. slot_clock ,
1834
- ) ;
1835
-
1836
- publish_pubsub_message (
1837
- & network_tx,
1838
- PubsubMessage :: Attestation ( Box :: new ( (
1839
- attestation. subnet_id ( ) ,
1840
- attestation. attestation ( ) . clone ( ) ,
1841
- ) ) ) ,
1842
- ) ?;
1843
-
1844
- let committee_index = attestation. attestation ( ) . data . index ;
1845
- let slot = attestation. attestation ( ) . data . slot ;
1846
-
1847
- if let Err ( e) = chain. apply_attestation_to_fork_choice ( & attestation) {
1848
- error ! ( log,
1849
- "Failure applying verified attestation to fork choice" ;
1850
- "error" => ?e,
1851
- "request_index" => index,
1852
- "committee_index" => committee_index,
1853
- "slot" => slot,
1854
- ) ;
1855
- failures. push ( api_types:: Failure :: new (
1856
- index,
1857
- format ! ( "Fork choice: {:?}" , e) ,
1858
- ) ) ;
1859
- } ;
1860
-
1861
- if let Err ( e) = chain. add_to_naive_aggregation_pool ( & attestation) {
1862
- error ! ( log,
1863
- "Failure adding verified attestation to the naive aggregation pool" ;
1864
- "error" => ?e,
1865
- "request_index" => index,
1866
- "committee_index" => committee_index,
1867
- "slot" => slot,
1868
- ) ;
1869
- failures. push ( api_types:: Failure :: new (
1870
- index,
1871
- format ! ( "Naive aggregation pool: {:?}" , e) ,
1872
- ) ) ;
1873
- }
1874
- }
1875
-
1876
- if num_already_known > 0 {
1877
- debug ! (
1878
- log,
1879
- "Some unagg attestations already known" ;
1880
- "count" => num_already_known
1881
- ) ;
1882
- }
1883
-
1884
- if failures. is_empty ( ) {
1885
- Ok ( ( ) )
1886
- } else {
1887
- Err ( warp_utils:: reject:: indexed_bad_request (
1888
- "error processing attestations" . to_string ( ) ,
1889
- failures,
1890
- ) )
1891
- }
1892
- } )
1773
+ reprocess_tx : Option < Sender < ReprocessQueueMessage > > ,
1774
+ log : Logger | async move {
1775
+ let result = crate :: publish_attestations:: publish_attestations (
1776
+ task_spawner,
1777
+ chain,
1778
+ attestations,
1779
+ network_tx,
1780
+ reprocess_tx,
1781
+ log,
1782
+ )
1783
+ . await
1784
+ . map ( |( ) | warp:: reply:: json ( & ( ) ) ) ;
1785
+ task_spawner:: convert_rejection ( result) . await
1893
1786
} ,
1894
1787
) ;
1895
1788
0 commit comments