Skip to content

Commit

Permalink
Adjust broadcast tx policy and add parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
ManfredKarrer committed Jan 9, 2018
1 parent 58de2b5 commit 63c9a91
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 39 deletions.
62 changes: 35 additions & 27 deletions core/src/main/java/org/bitcoinj/core/PeerGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,22 @@
/**
* <p>Runs a set of connections to the P2P network, brings up connections to replace disconnected nodes and manages
* the interaction between them all. Most applications will want to use one of these.</p>
*
*
* <p>PeerGroup tries to maintain a constant number of connections to a set of distinct peers.
* Each peer runs a network listener in its own thread. When a connection is lost, a new peer
* will be tried after a delay as long as the number of connections less than the maximum.</p>
*
*
* <p>Connections are made to addresses from a provided list. When that list is exhausted,
* we start again from the head of the list.</p>
*
*
* <p>The PeerGroup can broadcast a transaction to the currently connected set of peers. It can
* also handle download of the blockchain from peers, restarting the process when peers die.</p>
*
* <p>A PeerGroup won't do anything until you call the {@link PeerGroup#start()} method
* which will block until peer discovery is completed and some outbound connections
* have been initiated (it will return before handshaking is done, however).
* <p>A PeerGroup won't do anything until you call the {@link PeerGroup#start()} method
* which will block until peer discovery is completed and some outbound connections
* have been initiated (it will return before handshaking is done, however).
* You should call {@link PeerGroup#stop()} when finished. Note that not all methods
* of PeerGroup are safe to call from a UI thread as some may do network IO,
* of PeerGroup are safe to call from a UI thread as some may do network IO,
* but starting and stopping the service should be fine.</p>
*/
public class PeerGroup implements TransactionBroadcaster {
Expand Down Expand Up @@ -151,7 +151,7 @@ public class PeerGroup implements TransactionBroadcaster {
// if true, we will listen to "addr" network messages and add nodes discovered this way.
// if false, only nodes found by discovery process are used/added.
@GuardedBy("lock")
private boolean addPeersFromAddressMessage = true;
private boolean addPeersFromAddressMessage = true;
// Minimum protocol version we will allow ourselves to connect to: require Bloom filtering.
private volatile int vMinRequiredProtocolVersion;

Expand All @@ -168,7 +168,7 @@ public class PeerGroup implements TransactionBroadcaster {

// Give the client the option to disable HttpSeeds
private static boolean ignoreHttpSeeds;

public static void setIgnoreHttpSeeds(boolean ignoreHttpSeeds) {
PeerGroup.ignoreHttpSeeds = ignoreHttpSeeds;
}
Expand Down Expand Up @@ -306,7 +306,7 @@ public Message onPreMessageReceived(Peer peer, Message m) {
/** The default timeout between when a connection attempt begins and version message exchange completes */
public static final int DEFAULT_CONNECT_TIMEOUT_MILLIS = 5000;
private volatile int vConnectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT_MILLIS;

/** Whether bloom filter support is enabled when using a non FullPrunedBlockchain*/
private volatile boolean vBloomFilteringEnabled = true;

Expand Down Expand Up @@ -523,7 +523,7 @@ public void setAddPeersFromAddressMessage(boolean addPeersFromAddressMessage) {
lock.unlock();
}
}

/**
* Configure download of pending transaction dependencies. A change of values only takes effect for newly connected
* peers.
Expand Down Expand Up @@ -624,7 +624,7 @@ public void go() {
return;
}

// BitcoinJ gets too many connections after a connection loss and reconnect as it adds up a lot of
// BitcoinJ gets too many connections after a connection loss and reconnect as it adds up a lot of
// potential candidates and then try to connect to all of those when getting connection again.
// A check for maxConnections is required to not exceed connections.
if(pendingPeers.size() + peers.size() < maxConnections)
Expand Down Expand Up @@ -710,7 +710,7 @@ public VersionMessage getVersionMessage() {
}

/**
* Sets information that identifies this software to remote nodes. This is a convenience wrapper for creating
* Sets information that identifies this software to remote nodes. This is a convenience wrapper for creating
* a new {@link VersionMessage}, calling {@link VersionMessage#appendToSubVer(String, String, String)} on it,
* and then calling {@link PeerGroup#setVersionMessage(VersionMessage)} on the result of that. See the docs for
* {@link VersionMessage#appendToSubVer(String, String, String)} for information on what the fields should contain.
Expand All @@ -724,7 +724,7 @@ public void setUserAgent(String name, String version, @Nullable String comments)
ver.appendToSubVer(name, version, comments);
setVersionMessage(ver);
}

// Updates the relayTxesBeforeFilter flag of ver
private void updateVersionMessageRelayTxesBeforeFilter(VersionMessage ver) {
// We will provide the remote node with a bloom filter (ie they shouldn't relay yet)
Expand Down Expand Up @@ -1097,7 +1097,7 @@ protected int discoverPeers() throws PeerDiscoveryException {
log.warn(e.getMessage());
continue;
}

for (InetSocketAddress address : addresses) addressList.add(new PeerAddress(address));
if (addressList.size() >= maxPeersToDiscoverCount) break;
}
Expand Down Expand Up @@ -1151,10 +1151,10 @@ private boolean maybeCheckForLocalhostPeer() {
socket = new Socket();
socket.connect(new InetSocketAddress(InetAddresses.forString("127.0.0.1"), params.getPort()), vConnectTimeoutMillis);
localhostCheckState = LocalhostCheckState.FOUND;

// If we are connected to localhost we don't want to get other peers added from AddressMessage calls.
setAddPeersFromAddressMessage(false);

return true;
} catch (IOException e) {
log.info("Localhost peer not detected.");
Expand Down Expand Up @@ -1374,7 +1374,7 @@ public void removeWallet(Wallet wallet) {
wallet.setTransactionBroadcaster(null);
for (Peer peer : peers) {
peer.removeWallet(wallet);
}
}
}

public enum FilterRecalculateMode {
Expand Down Expand Up @@ -1458,13 +1458,13 @@ public void go() {
}
return future;
}

/**
* <p>Sets the false positive rate of bloom filters given to peers. The default is {@link #DEFAULT_BLOOM_FILTER_FP_RATE}.</p>
*
* <p>Be careful regenerating the bloom filter too often, as it decreases anonymity because remote nodes can
* compare transactions against both the new and old filters to significantly decrease the false positive rate.</p>
*
*
* <p>See the docs for {@link BloomFilter#BloomFilter(int, double, long, BloomFilter.BloomUpdate)} for a brief
* explanation of anonymity when using bloom filters.</p>
*/
Expand All @@ -1479,7 +1479,7 @@ public void setBloomFilterFalsePositiveRate(double bloomFilterFPRate) {
}

/**
* Returns the number of currently connected peers. To be informed when this count changes, register a
* Returns the number of currently connected peers. To be informed when this count changes, register a
* {@link org.bitcoinj.core.listeners.PeerConnectionEventListener} and use the onPeerConnected/onPeerDisconnected methods.
*/
public int numConnectedPeers() {
Expand All @@ -1489,7 +1489,7 @@ public int numConnectedPeers() {
/**
* Connect to a peer by creating a channel to the destination address. This should not be
* used normally - let the PeerGroup manage connections through {@link #start()}
*
*
* @param address destination IP and port.
* @return The newly created Peer object or null if the peer could not be connected.
* Use {@link org.bitcoinj.core.Peer#getConnectionOpenFuture()} if you
Expand Down Expand Up @@ -1637,7 +1637,7 @@ private static void removeDataEventListenerFromPeer(Peer peer, PeerDataEventList

/**
* Download the blockchain from peers. Convenience that uses a {@link DownloadProgressTracker} for you.<p>
*
*
* This method waits until the download is complete. "Complete" is defined as downloading
* from at least one peer all the blocks that are in that peer's inventory.
*/
Expand Down Expand Up @@ -1830,13 +1830,13 @@ protected void handlePeerDeath(final Peer peer, @Nullable Throwable exception) {
final Peer newDownloadPeer = selectDownloadPeer(peers);
if (newDownloadPeer != null) {
setDownloadPeer(newDownloadPeer);
// When using BlockingClient we get errors at shutdown caused by

// When using BlockingClient we get errors at shutdown caused by
// startBlockChainDownloadFromPeer()
// We add another check to terminate here if we have been shut down already
if (!isRunning())
if (!isRunning())
return;

if (downloadListener != null) {
startBlockChainDownloadFromPeer(newDownloadPeer);
}
Expand Down Expand Up @@ -2201,6 +2201,12 @@ public TransactionBroadcast broadcastTransaction(final Transaction tx) {
return broadcastTransaction(tx, Math.max(1, getMinBroadcastConnections()));
}

private boolean broadcastToAllPeers;

public void setBroadcastToAllPeers(boolean broadcastToAllPeers) {
this.broadcastToAllPeers = broadcastToAllPeers;
}

/**
* <p>Given a transaction, sends it un-announced to one peer and then waits for it to be received back from other
* peers. Once all connected peers have announced the transaction, the future available via the
Expand Down Expand Up @@ -2228,6 +2234,8 @@ public TransactionBroadcast broadcastTransaction(final Transaction tx, final int
}
final TransactionBroadcast broadcast = new TransactionBroadcast(this, tx);
broadcast.setMinConnections(minConnections);
broadcast.setBroadcastToAllPeers(broadcastToAllPeers);

// Send the TX to the wallet once we have a successful broadcast.
Futures.addCallback(broadcast.future(), new FutureCallback<Transaction>() {
@Override
Expand Down
38 changes: 26 additions & 12 deletions core/src/main/java/org/bitcoinj/core/TransactionBroadcast.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,21 @@

package org.bitcoinj.core;

import com.google.common.annotations.*;
import com.google.common.base.*;
import com.google.common.util.concurrent.*;
import org.bitcoinj.utils.*;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.bitcoinj.core.listeners.PreMessageReceivedEventListener;
import org.bitcoinj.utils.Threading;
import org.bitcoinj.wallet.Wallet;
import org.slf4j.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.*;
import javax.annotation.Nullable;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.Executor;

import static com.google.common.base.Preconditions.checkState;
import org.bitcoinj.core.listeners.PreMessageReceivedEventListener;

/**
* Represents a single transaction broadcast that we are performing. A broadcast occurs after a new transaction is created
Expand All @@ -46,10 +48,16 @@ public class TransactionBroadcast {
private int minConnections;
private int numWaitingFor;

private boolean broadcastToAllPeers;

public void setBroadcastToAllPeers(boolean broadcastToAllPeers) {
this.broadcastToAllPeers = broadcastToAllPeers;
}

/** Used for shuffling the peers before broadcast: unit tests can replace this to make themselves deterministic. */
@VisibleForTesting
public static Random random = new Random();

// Tracks which nodes sent us a reject message about this broadcast, if any. Useful for debugging.
private Map<Peer, RejectMessage> rejects = Collections.synchronizedMap(new HashMap<Peer, RejectMessage>());

Expand Down Expand Up @@ -141,10 +149,16 @@ public void run() {
// our version message, as SPV nodes cannot relay it doesn't give away any additional information
// to skip the inv here - we wouldn't send invs anyway.
int numConnected = peers.size();
int numToBroadcastTo = (int) Math.max(1, Math.round(Math.ceil(peers.size() / 2.0)));
numWaitingFor = (int) Math.ceil((peers.size() - numToBroadcastTo) / 2.0);

// We add the option ot broadcast to all peer but don't change the algorithm for how many nodes we want to hear back
int numToBroadcastTo = broadcastToAllPeers ? peers.size(): (int) Math.max(1, Math.round(Math.ceil(peers.size() / 2.0)));
if(!broadcastToAllPeers)
peers = peers.subList(0, numToBroadcastTo);

numWaitingFor = Math.min(1, (int) Math.floor(peers.size() / 4.0));
Collections.shuffle(peers, random);
peers = peers.subList(0, numToBroadcastTo);


log.info("broadcastTransaction: We have {} peers, adding {} to the memory pool", numConnected, tx.getHashAsString());
log.info("Sending to {} peers, will wait for {}, sending to: {}", numToBroadcastTo, numWaitingFor, Joiner.on(",").join(peers));
for (Peer peer : peers) {
Expand Down

1 comment on commit 63c9a91

@cbeams
Copy link
Member

@cbeams cbeams commented on 63c9a91 Jan 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.