Skip to content

Commit

Permalink
Merge branch 'main' into table-headers
Browse files Browse the repository at this point in the history
  • Loading branch information
tdraier committed Sep 11, 2024
2 parents 8670e6d + 0c59a6e commit a066312
Show file tree
Hide file tree
Showing 143 changed files with 2,521 additions and 2,778 deletions.
14 changes: 6 additions & 8 deletions connectors/src/connectors/github/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,6 @@ export class GithubConnectorManager extends BaseConnectorManager<null> {
return new Err(new Error("Connector state not found"));
}

if (!connectorState.webhooksEnabledAt) {
return new Err(new Error("Connector is already stopped"));
}

await connectorState.update({
webhooksEnabledAt: null,
});
Expand Down Expand Up @@ -651,6 +647,8 @@ export class GithubConnectorManager extends BaseConnectorManager<null> {
internalId: string;
memoizationKey?: string;
}): Promise<Result<string[], Error>> {
const baseParents: string[] = [internalId];

const connector = await ConnectorResource.fetchById(this.connectorId);
if (!connector) {
return new Err(
Expand All @@ -666,19 +664,19 @@ export class GithubConnectorManager extends BaseConnectorManager<null> {
internalId,
repoId
);
return new Ok(parents);
return new Ok([...baseParents, ...parents]);
} else {
return new Ok([`${repoId}`]);
return new Ok([...baseParents, `${repoId}`]);
}
} else {
const repoId = parseInt(internalId.split("-")[0] || "", 10);
if (
internalId.endsWith("-issues") ||
internalId.endsWith("-discussions")
) {
return new Ok([`${repoId}`]);
return new Ok([...baseParents, `${repoId}`]);
}
return new Ok([]);
return new Ok(baseParents);
}
}

Expand Down
4 changes: 4 additions & 0 deletions connectors/src/connectors/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ export abstract class BaseConnectorManager<T extends ConnectorConfiguration> {
viewType: ContentNodesViewType;
}): Promise<Result<ContentNode[], Error>>;

/**
* Retrieves the parent IDs of a content node in hierarchical order.
* The first ID is the internal ID of the content node itself.
*/
abstract retrieveContentNodeParents(params: {
internalId: string;
memoizationKey?: string;
Expand Down
15 changes: 6 additions & 9 deletions connectors/src/connectors/slack/bot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,18 @@ async function botAnswerMessage(
requestedGroups = await slackConfig.getBotGroupIds(botName);
}

const userEmailHeader =
slackChatBotMessage.slackEmail !== "unknown"
? slackChatBotMessage.slackEmail
: undefined;

const dustAPI = new DustAPI(
apiConfig.getDustAPIConfig(),
{
workspaceId: connector.workspaceId,
apiKey: connector.workspaceAPIKey,
groupIds: requestedGroups,
userEmail: userEmailHeader,
},
logger,
{
Expand Down Expand Up @@ -319,7 +325,6 @@ async function botAnswerMessage(
return new Err(new Error(agentConfigurationsRes.error.message));
}
const agentConfigurations = agentConfigurationsRes.value;

if (mentionCandidates.length === 1) {
for (const mc of mentionCandidates) {
let bestCandidate:
Expand Down Expand Up @@ -444,10 +449,6 @@ async function botAnswerMessage(
const mesasgeRes = await dustAPI.postUserMessage({
conversationId: lastSlackChatBotMessage.conversationId,
message: messageReqBody,
userEmailHeader:
slackChatBotMessage.slackEmail !== "unknown"
? slackChatBotMessage.slackEmail
: undefined,
});
if (mesasgeRes.isErr()) {
return new Err(new Error(mesasgeRes.error.message));
Expand All @@ -466,10 +467,6 @@ async function botAnswerMessage(
visibility: "unlisted",
message: messageReqBody,
contentFragment: buildContentFragmentRes.value || undefined,
userEmailHeader:
slackChatBotMessage.slackEmail !== "unknown"
? slackChatBotMessage.slackEmail
: undefined,
});
if (convRes.isErr()) {
return new Err(new Error(convRes.error.message));
Expand Down
9 changes: 6 additions & 3 deletions connectors/src/connectors/slack/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -573,9 +573,12 @@ export class SlackConnectorManager extends BaseConnectorManager<SlackConfigurati
return new Ok(contentNodes);
}

async retrieveContentNodeParents(): Promise<Result<string[], Error>> {
// Slack is flat.
return new Ok([]);
async retrieveContentNodeParents({
internalId,
}: {
internalId: string;
}): Promise<Result<string[], Error>> {
return new Ok([internalId]);
}

async setConfigurationKey({
Expand Down
2 changes: 1 addition & 1 deletion connectors/src/connectors/webcrawler/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ export class WebcrawlerConnectorManager extends BaseConnectorManager<WebCrawlerC
internalId: string;
memoizationKey?: string;
}): Promise<Result<string[], Error>> {
const parents: string[] = [];
const parents: string[] = [internalId];
let parentUrl: string | null = null;

// First we get the Page or Folder for which we want to retrieve the parents
Expand Down
1 change: 1 addition & 0 deletions connectors/src/lib/oauth.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export async function getOAuthConnectionAccessTokenWithThrow({

if (
tokRes.error.code === "token_revoked_error" ||
tokRes.error.code === "connection_not_found" ||
isMicrosoftApplicationDisabledError(tokRes.error, provider)
) {
throw new ExternalOAuthTokenError(new Error(tokRes.error.message));
Expand Down
5 changes: 4 additions & 1 deletion connectors/src/lib/temporal_monitoring.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,10 @@ export class ActivityInboundLogInterceptor
await connectorManager.stop();
} else {
this.logger.error(
`Connector manager not found for connector ${connector.id}`
{
connectorId: connector.id,
},
`Connector manager not found for connector`
);
}
}
Expand Down
3 changes: 1 addition & 2 deletions core/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,10 +531,9 @@ impl App {
Some(ref skips) => skips[input_idx],
None => false,
};
let project_id = project.project_id();
tokio::spawn(async move {
match skip {
false => match b.execute(&name, &e, event_sender, project_id).await {
false => match b.execute(&name, &e, event_sender).await {
Ok(v) => {
let block_status = {
let mut block_status = block_status.lock();
Expand Down
1 change: 0 additions & 1 deletion core/src/blocks/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ pub trait Block {
name: &str,
env: &Env,
event_sender: Option<UnboundedSender<Value>>,
project_id: i64,
) -> Result<BlockResult>;

fn clone_box(&self) -> Box<dyn Block + Sync + Send>;
Expand Down
1 change: 0 additions & 1 deletion core/src/blocks/browser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ impl Block for Browser {
name: &str,
env: &Env,
_event_sender: Option<UnboundedSender<Value>>,
_project_id: i64,
) -> Result<BlockResult> {
let config = env.config.config_for_block(name);

Expand Down
1 change: 0 additions & 1 deletion core/src/blocks/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ impl Block for Chat {
name: &str,
env: &Env,
event_sender: Option<UnboundedSender<Value>>,
_project_id: i64,
) -> Result<BlockResult> {
let config = env.config.config_for_block(name);

Expand Down
1 change: 0 additions & 1 deletion core/src/blocks/code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ impl Block for Code {
_name: &str,
env: &Env,
_event_sender: Option<UnboundedSender<Value>>,
_project_id: i64,
) -> Result<BlockResult> {
// Assumes there is a _fun function defined in `source`.
// TODO(spolu): revisit, not sure this is optimal.
Expand Down
1 change: 0 additions & 1 deletion core/src/blocks/curl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ impl Block for Curl {
name: &str,
env: &Env,
_event_sender: Option<UnboundedSender<Value>>,
_project_id: i64,
) -> Result<BlockResult> {
let config = env.config.config_for_block(name);

Expand Down
1 change: 0 additions & 1 deletion core/src/blocks/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ impl Block for Data {
_name: &str,
env: &Env,
_event_sender: Option<UnboundedSender<Value>>,
_project_id: i64,
) -> Result<BlockResult> {
match env
.store
Expand Down
5 changes: 1 addition & 4 deletions core/src/blocks/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,13 @@ impl DataSource {
top_k: usize,
filter: Option<SearchFilter>,
target_document_tokens: Option<usize>,
project_id: i64,
) -> Result<Vec<Document>> {
let (data_source_project, view_filter, data_source_id) =
get_data_source_project_and_view_filter(
&workspace_id,
&data_source_or_data_source_view_id,
env,
format!("data_source_project_id_{}", project_id).as_str(),
"data_source",
)
.await?;

Expand Down Expand Up @@ -186,7 +185,6 @@ impl Block for DataSource {
name: &str,
env: &Env,
_event_sender: Option<UnboundedSender<Value>>,
project_id: i64,
) -> Result<BlockResult> {
let config = env.config.config_for_block(name);

Expand Down Expand Up @@ -332,7 +330,6 @@ impl Block for DataSource {
top_k,
filter.clone(),
target_document_tokens,
project_id,
));
}

Expand Down
3 changes: 1 addition & 2 deletions core/src/blocks/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ impl Block for Database {
name: &str,
env: &Env,
_event_sender: Option<UnboundedSender<Value>>,
project_id: i64,
) -> Result<BlockResult> {
let config = env.config.config_for_block(name);

Expand Down Expand Up @@ -102,7 +101,7 @@ impl Block for Database {
};

let query = replace_variables_in_string(&self.query, "query", env)?;
let tables = load_tables_from_identifiers(&table_identifiers, env, project_id).await?;
let tables = load_tables_from_identifiers(&table_identifiers, env).await?;

match query_database(&tables, env.store.clone(), &query).await {
Ok((results, schema)) => Ok(BlockResult {
Expand Down
8 changes: 3 additions & 5 deletions core/src/blocks/database_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ impl Block for DatabaseSchema {
name: &str,
env: &Env,
_event_sender: Option<UnboundedSender<Value>>,
project_id: i64,
) -> Result<BlockResult> {
let config = env.config.config_for_block(name);

Expand Down Expand Up @@ -72,7 +71,7 @@ impl Block for DatabaseSchema {
_ => Err(anyhow!(err_msg.clone()))?,
};

let mut tables = load_tables_from_identifiers(&table_identifiers, env, project_id).await?;
let mut tables = load_tables_from_identifiers(&table_identifiers, env).await?;

// Compute the unique table names for each table.
let unique_table_names = get_unique_table_names_for_database(&tables);
Expand Down Expand Up @@ -117,20 +116,19 @@ impl Block for DatabaseSchema {
pub async fn load_tables_from_identifiers(
table_identifiers: &Vec<(&String, &String, &String)>,
env: &Env,
project_id: i64,
) -> Result<Vec<Table>> {
// Get a vec of unique (workspace_id, data_source_id) pairs.
let data_source_identifiers = table_identifiers
.iter()
.map(|(workspace_id, data_source_id, _)| (*workspace_id, *data_source_id))
.unique()
.collect::<Vec<_>>();
let origin = format!("database_schema_project_id_{}", project_id);

// Get a vec of the corresponding project ids for each (workspace_id, data_source_id) pair.
let project_ids_view_filters = try_join_all(
data_source_identifiers
.iter()
.map(|(w, d)| get_data_source_project_and_view_filter(w, d, env, origin.as_str())),
.map(|(w, d)| get_data_source_project_and_view_filter(w, d, env, "database_schema")),
)
.await?;

Expand Down
1 change: 0 additions & 1 deletion core/src/blocks/end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ impl Block for End {
_name: &str,
_env: &Env,
_event_sender: Option<UnboundedSender<Value>>,
_project_id: i64,
) -> Result<BlockResult> {
// No-op the block outputs within the while|if/end are coallesced, the output of end is
// ignored and not stored in the environment as it has the same name as the while|if block.
Expand Down
1 change: 0 additions & 1 deletion core/src/blocks/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ impl Block for Input {
_name: &str,
env: &Env,
_event_sender: Option<UnboundedSender<Value>>,
_project_id: i64,
) -> Result<BlockResult> {
match env.input.value.as_ref() {
Some(i) => Ok(BlockResult {
Expand Down
1 change: 0 additions & 1 deletion core/src/blocks/llm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,6 @@ impl Block for LLM {
name: &str,
env: &Env,
event_sender: Option<UnboundedSender<Value>>,
_project_id: i64,
) -> Result<BlockResult> {
let config = env.config.config_for_block(name);

Expand Down
1 change: 0 additions & 1 deletion core/src/blocks/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ impl Block for Map {
_name: &str,
env: &Env,
_event_sender: Option<UnboundedSender<Value>>,
_project_id: i64,
) -> Result<BlockResult> {
match env.state.get(&self.from) {
None => Err(anyhow::anyhow!(
Expand Down
1 change: 0 additions & 1 deletion core/src/blocks/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ impl Block for Reduce {
_name: &str,
_env: &Env,
_event_sender: Option<UnboundedSender<Value>>,
_project_id: i64,
) -> Result<BlockResult> {
// No-op the block outputs within the map/reduce will be coallesced, the output of reduce is
// ignored and not stored in the environment as it has the same name as the map block.
Expand Down
1 change: 0 additions & 1 deletion core/src/blocks/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ impl Block for Search {
name: &str,
env: &Env,
_event_sender: Option<UnboundedSender<Value>>,
_project_id: i64,
) -> Result<BlockResult> {
let config = env.config.config_for_block(name);

Expand Down
1 change: 0 additions & 1 deletion core/src/blocks/while.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ impl Block for While {
_name: &str,
env: &Env,
_event_sender: Option<UnboundedSender<Value>>,
_project_id: i64,
) -> Result<BlockResult> {
let e = env.clone();

Expand Down
Loading

0 comments on commit a066312

Please sign in to comment.