Skip to content

Commit

Permalink
fix(interactive): Fix the occasionally incorrect pegasus server routi…
Browse files Browse the repository at this point in the history
…ng configuration (#3673)

Fixes the `updatePeerView` may not be trigger at the proper time.

---------

Co-authored-by: nengli.ln <[email protected]>
  • Loading branch information
siyuan0322 and lnfjpt authored Mar 28, 2024
1 parent 579ab9f commit 049986a
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ public class ZkConfig {
Config.intConfig("zk.session.timeout.ms", 30000);

public static final Config<Integer> ZK_BASE_SLEEP_MS =
Config.intConfig("zk.base.sleep.ms", 1000);
Config.intConfig("zk.base.sleep.ms", 10000);

public static final Config<Integer> ZK_MAX_SLEEP_MS =
Config.intConfig("zk.max.sleep.ms", 45000);
Config.intConfig("zk.max.sleep.ms", 60000);

public static final Config<Integer> ZK_MAX_RETRY = Config.intConfig("zk.max.retry", 29);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,16 @@ pub fn listen_on<A: ToSocketAddrs>(
///
/// 如果参数中的`server_id` 大于等于当前服务的id,并不会发起连接,返回`Ok(())`;
///
pub fn connect<A: ToSocketAddrs>(
local_id: u64, remote_id: u64, params: ConnectionParams, addr: A,
pub fn connect(
local_id: u64, remote_id: u64, params: ConnectionParams, addr: SocketAddr,
) -> Result<(), NetError> {
// 连接请求可能会失败, 或许由于对端服务器未启动端口监听,调用方需要根据返回内容确定是否重试;
let mut conn = TcpStream::connect(addr)?;
info!("Try to connect to server {:?}", addr);
let timeout = std::time::Duration::from_secs(10);
let mut conn = TcpStream::connect_timeout(&addr, timeout)?;
// let mut conn = TcpStream::connect(addr)?;
let addr = conn.peer_addr()?;
debug!("connect to server {:?};", addr);
info!("connect to server {:?};", addr);
let hb_sec = params.get_hb_interval_sec();
super::setup_connection(local_id, hb_sec, &mut conn)?;
info!("setup connection to {:?} success;", addr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public NodeChangeListener(RoleType roleType, ServiceCache<GrootNode> serviceCach

@Override
public void cacheChanged() {
logger.debug("cacheChanged. roleType [" + roleType.getName() + "]");
logger.info("cacheChanged. roleType [" + roleType.getName() + "]");
synchronized (lock) {
Map<Integer, GrootNode> newRoleNodes = new HashMap<>();
for (ServiceInstance<GrootNode> instance : this.serviceCache.getInstances()) {
Expand Down Expand Up @@ -207,7 +207,7 @@ private void notifyRemoved(RoleType role, Map<Integer, GrootNode> removed) {
if (removed.isEmpty()) {
return;
}
logger.debug("role [{}] remove nodes [{}]", role.getName(), removed.values());
logger.info("role [{}] remove nodes [{}]", role.getName(), removed.values());
for (Listener listener : this.listeners) {
this.singleThreadExecutor.execute(
() -> {
Expand All @@ -225,7 +225,7 @@ private void notifyAdded(RoleType role, Map<Integer, GrootNode> added) {
if (added.isEmpty()) {
return;
}
logger.debug("role [{}] add nodes [{}]", role.getName(), added.values());
logger.info("role [{}] add nodes [{}]", role.getName(), added.values());
for (Listener listener : this.listeners) {
this.singleThreadExecutor.execute(
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.alibaba.graphscope.groot.store.jna;

import com.alibaba.graphscope.groot.common.config.CommonConfig;
import com.alibaba.graphscope.groot.common.config.Configs;
import com.alibaba.graphscope.groot.common.config.StoreConfig;
import com.alibaba.graphscope.groot.operation.OperationBatch;
Expand Down Expand Up @@ -61,8 +62,10 @@ public JnaGraphStore(Configs configs, int partitionId) throws IOException {
Path walPath = Paths.get(walDir, "" + partitionId);
builder.put(StoreConfig.STORE_WAL_DIR.getKey(), walPath.toString());
}
if (!Files.isDirectory(secondPath)) {
Files.createDirectories(secondPath);
if (CommonConfig.SECONDARY_INSTANCE_ENABLED.get(configs)) {
if (!Files.isDirectory(secondPath)) {
Files.createDirectories(secondPath);
}
}
if (!Files.isDirectory(partitionPath)) {
Files.createDirectories(partitionPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ private AdminClient createAdminWithRetry() throws InterruptedException {
try {
return AdminClient.create(adminConfig);
} catch (Exception e) {
logger.warn("Error creating Kafka AdminClient", e);
logger.warn("Error creating Kafka AdminClient, servers: {}", servers, e);
Thread.sleep(10000);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,17 @@ public void stop() {
@Override
public void nodesJoin(RoleType role, Map<Integer, GrootNode> nodes) {
if (role == RoleType.GAIA_ENGINE) {
this.engineNodes.putAll(nodes);
for (Map.Entry<Integer, GrootNode> entry : nodes.entrySet()) {
GrootNode node = entry.getValue();
if (node.getRoleName().equals(RoleType.GAIA_ENGINE.getName())) {
this.engineNodes.put(entry.getKey(), node);
} else {
logger.warn("Unexpected node joined: {}", node);
}
}
if (this.engineNodes.size() == this.nodeCount) {
String peerViewString =
nodes.values().stream()
engineNodes.values().stream()
.map(
n ->
String.format(
Expand Down

0 comments on commit 049986a

Please sign in to comment.