Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
277 changes: 113 additions & 164 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ ignore = "0.4"
iroh = "0.96"
minicbor = { version = "2.1", default-features = false, features = ["alloc", "derive", "std"] }
notify-debouncer-mini = "0.7"
sedimentree_core = { version = "0.6", features = ["std"] }
sedimentree_fs_storage = "0.6"
sedimentree_core = { version = "0.7.1", features = ["std"] }
sedimentree_fs_storage = "0.7.1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
subduction_core = { version = "0.7", features = ["std", "test_utils"] }
subduction_crypto = { version = "0.2.1", features = ["std"] }
subduction_iroh = "0.4"
subduction_websocket = { version = "0.7", features = ["tokio_client_any"] }
subduction_core = { version = "0.9", features = ["system_time", "test_utils"] }
subduction_crypto = { version = "0.3.1", features = ["std"] }
subduction_iroh = "0.5.1"
subduction_websocket = { version = "0.8.1", features = ["tokio_client_any"] }
predicates = "3.1"
tempfile = "3.16"
testresult = "0.4.1"
Expand Down
28 changes: 11 additions & 17 deletions darn_cli/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@ use darn_core::{
peer::{self, Peer, PeerAddress, PeerName},
sedimentree, signer,
staged_update::StagedUpdate,
subduction::{self, DarnConnection, DarnSubduction},
subduction::{self, DarnSubduction, DarnTransport},
sync_progress::SyncProgressEvent,
watcher::{WatchEvent, WatchEventProcessor, Watcher, WatcherConfig},
};
use futures::StreamExt as _;
use sedimentree_core::id::SedimentreeId;
use subduction_core::{peer::id::PeerId, storage::traits::Storage};
use subduction_websocket::tokio::{TimeoutTokio, client::TokioWebSocketClient};
use subduction_core::{
peer::id::PeerId, storage::traits::Storage, transport::message::MessageTransport,
};
use subduction_websocket::tokio::client::TokioWebSocketClient;
use tokio_util::sync::CancellationToken;
use tracing::info;
use tungstenite::http::Uri;
Expand Down Expand Up @@ -2579,7 +2581,7 @@ fn info_human_workspace(dim: &Style) -> eyre::Result<()> {

/// Connect to all global peers for standalone document operations.
///
/// Returns the number of successfully connected peers.
/// Returns the peer IDs of successfully connected peers.
///
/// # Errors
///
Expand All @@ -2588,7 +2590,6 @@ async fn connect_global_peers(
subduction: &DarnSubduction,
signer_dir: &Path,
peers: &[Peer],
timeout: Duration,
) -> eyre::Result<Vec<PeerId>> {
let mut connected = Vec::new();

Expand All @@ -2597,21 +2598,14 @@ async fn connect_global_peers(
PeerAddress::WebSocket { url } => {
let uri: Uri = url.parse()?;
let peer_signer = signer::load(signer_dir)?;
match TokioWebSocketClient::new(
uri,
TimeoutTokio,
timeout,
peer_signer,
peer.audience,
)
.await
{
match TokioWebSocketClient::new(uri, peer_signer, peer.audience).await {
Ok((authenticated, listener_fut, sender_fut)) => {
tokio::spawn(async move { drop(listener_fut.await) });
tokio::spawn(async move { drop(sender_fut.await) });
let peer_id = authenticated.peer_id();
let authenticated =
authenticated.map(|c| DarnConnection::WebSocket(Box::new(c)));
let authenticated = authenticated
.map(|c| DarnTransport::WebSocket(Box::new(c)))
.map(MessageTransport::new);
if let Err(e) = subduction.add_connection(authenticated).await {
info!(%e, peer = %peer.name, "Failed to add connection");
continue;
Expand Down Expand Up @@ -2698,7 +2692,7 @@ pub(crate) async fn doc_edit(
}

let spinner = out.spinner("Connecting to peers...");
let peer_ids = connect_global_peers(&subduction, &signer_dir, &peers, timeout).await?;
let peer_ids = connect_global_peers(&subduction, &signer_dir, &peers).await?;
if peer_ids.is_empty() {
spinner.stop("Failed to connect");
eyre::bail!("Could not connect to any peers");
Expand Down
41 changes: 18 additions & 23 deletions darn_core/src/darn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ use futures::{StreamExt, stream};
use sedimentree_core::id::SedimentreeId;
use sedimentree_fs_storage::FsStorage;
#[cfg(feature = "iroh")]
use subduction_core::connection::nonce_cache::NonceCache;
use subduction_core::{connection::handshake::audience::Audience, peer::id::PeerId};
use subduction_core::nonce_cache::NonceCache;
use subduction_core::{
handshake::audience::Audience, peer::id::PeerId, transport::message::MessageTransport,
};
use subduction_crypto::signer::memory::MemorySigner;
use subduction_websocket::tokio::{TimeoutTokio, client::TokioWebSocketClient};
use subduction_websocket::tokio::client::TokioWebSocketClient;
use thiserror::Error;
use tungstenite::http::Uri;

Expand All @@ -42,7 +44,7 @@ use crate::{
staged_update::{StageError, StagedUpdate},
subduction::{
self, AuthenticatedDarnConnection, DarnAddConnectionError, DarnIoError, DarnSubduction,
SubductionInitError,
DarnTransport, SubductionInitError,
},
sync_progress::{ApplyResult, SyncProgressEvent, SyncSummary},
workspace::{id::WorkspaceId, layout::WorkspaceLayout},
Expand Down Expand Up @@ -1144,8 +1146,7 @@ impl Darn {
let signer = self.load_signer()?;

let (authenticated, listener_fut, sender_fut) =
TokioWebSocketClient::new(uri, TimeoutTokio, Self::DEFAULT_TIMEOUT, signer, audience)
.await?;
TokioWebSocketClient::new(uri, signer, audience).await?;

tokio::spawn(async move {
if let Err(e) = listener_fut.await {
Expand All @@ -1159,8 +1160,9 @@ impl Darn {
});

let actual_peer_id = authenticated.peer_id();
let authenticated =
authenticated.map(|c| crate::subduction::DarnConnection::WebSocket(Box::new(c)));
let authenticated = authenticated
.map(|c| DarnTransport::WebSocket(Box::new(c)))
.map(MessageTransport::new);

Ok((authenticated, actual_peer_id))
}
Expand All @@ -1183,15 +1185,8 @@ impl Darn {

let signer = self.load_signer()?;

let result = subduction_iroh::client::connect(
&self.iroh_endpoint,
addr,
Self::DEFAULT_TIMEOUT,
TimeoutTokio,
&signer,
audience,
)
.await?;
let result =
subduction_iroh::client::connect(&self.iroh_endpoint, addr, &signer, audience).await?;

tokio::spawn(async move {
if let Err(e) = result.listener_task.await {
Expand All @@ -1207,7 +1202,8 @@ impl Darn {
let actual_peer_id = result.authenticated.peer_id();
let authenticated = result
.authenticated
.map(crate::subduction::DarnConnection::Iroh);
.map(DarnTransport::Iroh)
.map(MessageTransport::new);

Ok((authenticated, actual_peer_id))
}
Expand Down Expand Up @@ -1252,8 +1248,6 @@ impl Darn {
}
result = subduction_iroh::server::accept_one(
&self.iroh_endpoint,
Self::DEFAULT_TIMEOUT,
TimeoutTokio,
&signer,
&nonce_cache,
our_peer_id,
Expand All @@ -1262,7 +1256,7 @@ impl Darn {
) => {
match result {
Ok(accept_result) => {
let peer_id = accept_result.authenticated.peer_id();
let peer_id = accept_result.peer_id;
tracing::info!(%peer_id, "Accepted incoming Iroh connection");

tokio::spawn(async move {
Expand All @@ -1278,7 +1272,8 @@ impl Darn {

let authenticated = accept_result
.authenticated
.map(crate::subduction::DarnConnection::Iroh);
.map(DarnTransport::Iroh)
.map(MessageTransport::new);

if let Err(e) = self.subduction.add_connection(authenticated).await {
tracing::error!(%peer_id, "Failed to add Iroh connection: {e:?}");
Expand Down Expand Up @@ -1843,7 +1838,7 @@ impl FileOutsideWorkspace {
}

/// Result of syncing with a peer.
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone)]
pub struct SyncResult {
/// The peer ID we connected to.
pub peer_id: PeerId,
Expand Down
2 changes: 1 addition & 1 deletion darn_core/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{collections::BTreeMap, fmt, path::Path, str::FromStr};

use sedimentree_core::{crypto::digest::Digest, id::SedimentreeId, sedimentree::Sedimentree};
use serde::{Deserialize, Serialize};
use subduction_core::{connection::handshake::audience::Audience, peer::id::PeerId};
use subduction_core::{handshake::audience::Audience, peer::id::PeerId};
use thiserror::Error;

use crate::{serde_base58, unix_timestamp::UnixTimestamp};
Expand Down
6 changes: 3 additions & 3 deletions darn_core/src/sedimentree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@
//! └──────────────────────────────────────────────────────────────────────┘
//! ```

use std::{collections::BTreeSet, convert::Infallible, path::Path};
use std::{collections::BTreeSet, path::Path};

use automerge::{Automerge, Change, ChangeHash};
use future_form::Sendable;
use sedimentree_core::{
blob::Blob, crypto::digest::Digest, id::SedimentreeId, sedimentree::Sedimentree,
};
use sedimentree_fs_storage::FsStorage;
use subduction_core::subduction::error::WriteError;
use subduction_core::{connection::message::SyncMessage, subduction::error::WriteError};
use thiserror::Error;

use crate::{
Expand Down Expand Up @@ -401,7 +401,7 @@ fn generate_sedimentree_id() -> Result<SedimentreeId, SedimentreeError> {
pub enum SedimentreeError {
/// Subduction write error.
#[error("subduction write error: {0}")]
SubductionWrite(Box<WriteError<Sendable, FsStorage, DarnConnection, Infallible>>),
SubductionWrite(Box<WriteError<Sendable, FsStorage, DarnConnection, SyncMessage>>),

/// Storage read error.
#[error("storage read error: {0}")]
Expand Down
4 changes: 2 additions & 2 deletions darn_core/src/serde_base58.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ pub mod peer_id {
/// Serde module for `DiscoveryId` (wraps 32-byte array).
pub mod discovery_id {
use serde::{Deserializer, Serializer};
use subduction_core::connection::handshake::audience::DiscoveryId;
use subduction_core::handshake::audience::DiscoveryId;

/// Serialize `DiscoveryId` as base58.
///
Expand Down Expand Up @@ -333,7 +333,7 @@ mod tests {
/// Serde module for `Audience` enum (Known or Discover).
pub mod audience {
use serde::{Deserialize, Deserializer, Serialize, Serializer, de};
use subduction_core::connection::handshake::audience::{Audience, DiscoveryId};
use subduction_core::handshake::audience::{Audience, DiscoveryId};
use subduction_core::peer::id::PeerId;

#[derive(Serialize, Deserialize)]
Expand Down
Loading
Loading