Skip to content

Commit

Permalink
Merge pull request #661 from sandwichfarm/next-nocapd-squashed
Browse files Browse the repository at this point in the history
next-nocapd
  • Loading branch information
dskvr committed Dec 16, 2023
2 parents 0f17fed + 041fa0d commit 716b20b
Show file tree
Hide file tree
Showing 180 changed files with 6,381 additions and 770 deletions.
10 changes: 0 additions & 10 deletions .env.sample
Original file line number Diff line number Diff line change
@@ -1,13 +1,3 @@
#general
ENABLE_KIND_2=true #relay submissions
ENABLE_KIND_3=true #contact list
ENABLE_KIND_10002=true #relay list
ENABLE_ONION=true
ENABLE_CLEARNET=true

ENABLE_BLOCKLIST=true
BLOCKLIST=""

POSTGRES_HOST="localhost"
POSTGRES_USER="postgres"
POSTGRES_PASS="postgres"
Expand Down
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,7 @@ packages/nostrwatch-history-relay/strfry-db/data.mdb
packages/relaydb-cli
dist/js/chunk-vendors.cd206719.js.map
node_modules
ours.sh
ours.sh
.pg
packages/synx
packages/kinds
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,8 @@ Testing suite is not yet implemented.

## Legacy Participants
A special thank you to the 300+ individuals who submitted their relay and/or oppenned issues/pull requests to nostrwatch legacy and to the 1M+ unique visitors over the last year. A huge thank you to OpenSats for giving my vision a new life.

<a align="center" href="https://github.com/dskvr/nostr-watch/graphs/contributors">
<img src="https://contrib.rocks/image?repo=dskvr/nostr-watch" />
</a>

4 changes: 1 addition & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@
"clean": "lerna clean",
"test": "lerna run test"
},
"devDependencies": {
"lerna": "^4.0.0"
},
"devDependencies": {},
"workspaces": [ "packages/*", "packages/nocap/adapters/**/*" ],
"main": "index.js",
"repository": "[email protected]:sandwichfarm/nostr-watch.git",
Expand Down
14 changes: 3 additions & 11 deletions packages/controlflow/index.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
import { Trawler, Nocapd, RestApi } from './queues.js'
import { Scheduler as NWScheduler } from './scheduler.js'

export default {
NWQueues: {
Trawler,
Nocapd,
RestApi
},
NWScheduler
}
export { SyncQueue, TrawlQueue, NocapdQueue, RestApiQueue, QueueInit, BullMQ } from './src/queues.js'
export { Scheduler } from './src/scheduler.js'
export { RetryManager } from './src/retry.js'
4 changes: 3 additions & 1 deletion packages/controlflow/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
"version": "0.0.1",
"description": "Provides exports for application control flow",
"main": "index.js",
"type": "module",
"license": "MIT",
"dependencies": {
"bullmq": "4.14.2",
"dotenv": "16.3.1"
"dotenv": "16.3.1",
"node-schedule": "2.1.1"
}
}
29 changes: 0 additions & 29 deletions packages/controlflow/queues.js

This file was deleted.

11 changes: 0 additions & 11 deletions packages/controlflow/scheduler.js

This file was deleted.

47 changes: 47 additions & 0 deletions packages/controlflow/src/queues.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import dotenv from 'dotenv'
import { Queue, QueueEvents, Worker } from 'bullmq';
import { RedisConnectionDetails } from '@nostrwatch/utils'

dotenv.config()

const $ = {}

export const TrawlQueue = (qopts={}) => {
return QueueInit('TrawlQueue', qopts)
}

export const NocapdQueue = (qopts={}) => {
return QueueInit('NocapdQueue', qopts)
}

export const SyncQueue = (qopts={}) => {
return QueueInit('SyncQueue', qopts)
}

export const RestApiQueue = (qopts={}) => {
return QueueInit('RestApiQueue', qopts)
}

export const QueueInit = (key, qopts={}) => {
if($?.[key]) return $[key]
qopts = { connection: RedisConnectionDetails(), ...qopts }
const $Queue = new Queue(key, qopts)
const $QueueEvents = new QueueEvents($Queue.name, { connection: RedisConnectionDetails() } )
$[key] = { $Queue, $QueueEvents, Worker }
return $[key]
}

export const BullMQ = {
Queue,
QueueEvents,
Worker
}

export default {
SyncQueue,
TrawlQueue,
NocapdQueue,
RestApiQueue,
QueueInit,
BullMQ
}
82 changes: 82 additions & 0 deletions packages/controlflow/src/retry.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import relaycache from '@nostrwatch/relaycache'
import { capitalize, loadConfig } from "@nostrwatch/utils"

const rcache = relaycache(process.env.NWCACHE_PATH)

const config = await loadConfig()

export class RetryManager {

constructor(caller, action, relays) {
if(!caller) throw new Error('caller is required')
if(!action) throw new Error('action is required')
this.caller = caller
this.action = action
this.relays = relays? relays : []
this.retries = []
this.config = config?.[caller]?.[action]
}


cacheId(url){
return `${capitalize(this.caller)}:${url}`
}

async init(){
const relays = this.relays.length? this.relays: await rcache.relays.get.all()
const persisted = []
for await(const relay of relays) {
const url = relay.url
const retries = rcache.retry.get( this.cacheId(url) )
if(retries === null)
persisted.push(await rcache.retry.set(this.cacheId(url), 0))
}
return persisted
}

expiry(retries){
if(retries === null) return 0
let map
if(this.config?.expiry && this.config.expiry instanceof Array )
map = this.config.expiry.map( entry => { return { max: entry.max, delay: parseInt(eval(entry.delay)) } } )
else
map = [
{ max: 3, delay: 1000 * 60 * 60 },
{ max: 6, delay: 1000 * 60 * 60 * 24 },
{ max: 13, delay: 1000 * 60 * 60 * 24 * 7 },
{ max: 17, delay: 1000 * 60 * 60 * 24 * 28 },
{ max: 29, delay: 1000 * 60 * 60 * 24 * 90 }
];
const found = map.find(entry => retries <= entry.max);
return found ? found.delay : map[map.length - 1].delay;
};

async getExpiredRelays(lastCheckedFn, relays=[]){
relays = relays?.length? relays: this.relays?.length? this.relays: await rcache.relays.get.all()
if(!(lastCheckedFn instanceof Function)) throw new Error('lastCheckedFn (arg[1]) must be a function')
const relayStatuses = await Promise.all(relays.map(async relay => {
const url = relay.url;
const lastChecked = rcache.cachetime.get.one(lastCheckedFn(url))
if (!lastChecked) return { relay, isExpired: true };
const retries = await rcache.retry.get(this.cacheId(url));
const isExpired = lastChecked < Date.now() - this.expiry(retries);
return { relay, isExpired };
}));
return relayStatuses.filter(r => r.isExpired).map(r => r.relay);
}

async getRetries( url ){
return await rcache.retry.get(this.cacheId(url))
}

async setRetries( url, success ){
let id
if(success) {
this.log?.debug(`${url} did not require a retry`)
id = await rcache.retry.set(this.cacheId(url), 0)
} else {
this.log?.debug(`${url} required a retry`)
id = await rcache.retry.increment(this.cacheId(url))
}
}
}
92 changes: 92 additions & 0 deletions packages/controlflow/src/scheduler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import schedule from 'node-schedule'

export class Scheduler {
constructor(workers) {
this.workers = workers;
this.analysis = {};
this.schedules = {};
this.analyzeAndCacheWorkers();
this.createSchedules();
}

analyzeAndCacheWorkers() {
this.workers.sort((a, b) => a.interval - b.interval);
const totalInterval = this.workers.reduce((sum, worker) => sum + worker.interval, 0);
let cumulativeOffset = 0;
this.workers.forEach(worker => {
// console.log(worker)
this.analysis[worker.name] = {
interval: worker.interval,
offset: this.calculateBestOffset(worker, cumulativeOffset, totalInterval),
handler: worker.handler
};
cumulativeOffset += this.analysis[worker.name].offset;
});
}

calculateBestOffset(worker, currentOffset, totalInterval) {
// Calculate an ideal gap between tasks
const idealGap = totalInterval / this.workers.length;
// Start by proposing an offset that spaces out the tasks evenly
let proposedOffset = currentOffset + idealGap;
// Adjust the proposed offset to avoid as much overlap as possible
// This loop tries to find a spot where the current task is least likely to collide with others
while (this.isCollision(proposedOffset, worker.interval, totalInterval)) {
proposedOffset = (proposedOffset + worker.interval) % totalInterval;
}
return proposedOffset % totalInterval;
}

isCollision(proposedOffset, interval, totalInterval) {
// Check if the proposed offset collides with other tasks
for (let otherWorkerName in this.analysis) {
const otherWorker = this.analysis[otherWorkerName];
if (this.doIntervalsOverlap(proposedOffset, interval, otherWorker.offset, otherWorker.interval, totalInterval)) {
return true;
}
}
return false;
}

doIntervalsOverlap(start1, length1, start2, length2, totalLength) {
// Simplified check for overlap between two intervals on a circular timeline
const end1 = (start1 + length1) % totalLength;
const end2 = (start2 + length2) % totalLength;
if (start1 <= end1) {
// Case 1: Interval 1 does not wrap around
return (start2 < end1 && end2 > start1);
} else {
// Case 2: Interval 1 wraps around
return (start2 < end1 || end2 > start1);
}
}


createSchedules() {
Object.keys(this.analysis).forEach(name => {
const worker = this.analysis[name];
// Calculate the initial start time based on the current time and the offset
const startTime = new Date(Date.now() + worker.offset);
// Define the rule for scheduling
const rule = new schedule.RecurrenceRule();
rule.start = startTime; // Set the start time
rule.rule = `*/${Math.round(worker.interval / 1000)} * * * * *`; // Set the interval in seconds
// Schedule the job
this.schedules[name] = schedule.scheduleJob(rule, this.analysis[name].handler);
});
}

getAll() {
return this.schedules;
}

get(name) {
return this.schedules[name];
}

gracefulShutdown() {
Object.values(this.schedules).forEach(job => {
schedule.gracefulShutdown(job);
});
}
}
2 changes: 1 addition & 1 deletion packages/logger/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export default class Logger {

constructor(name, log_level="INFO", split_logs=false) {
this.logger = createLogger?.default? createLogger.default(name): createLogger(name)
this.log_level = new String(config.log_level? config.log_level : log_level).toUpperCase();
this.log_level = new String(config?.log_level? config.log_level : log_level).toUpperCase();
this.split_logs = split_logs || false
}

Expand Down
10 changes: 6 additions & 4 deletions packages/nocap/adapters/default/DnsAdapterDefault/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@ class DnsAdapterDefault {
async check_dns(){
let result, data = {}
if(this.$.results.get('network') !== 'clearnet')
return this.$.logger.warn('DNS check skipped for url not accessible over clearnet')
return this.$.logger.debug('DNS check skipped for url not accessible over clearnet')
let err = false
let url = this.$.url.replace('wss://', '').replace('ws://', '')
let url = this.$.url.replace('wss://', '').replace('ws://', '').replace(/\/+$/, '');
const query = `https://1.1.1.1/dns-query?name=${url}`
const headers = { accept: 'application/dns-json' }
const response = await fetch( query, { headers } ).catch((e) => { result = { status: "error", message: e.message, data } })
const response = await fetch( query, { headers } ).catch((e) => { result = { status: "error", message: e.message, data } })
data = await response.json()
if(!result)
if(!data?.Answer || data.Answer.length === 0 || Object.keys(data.Answer[0]) === 0)
result = { status: "error", message: "No DNS Answer" }
else
result = { status: "success", data }
this.$.finish('dns', result)
}
Expand Down
9 changes: 6 additions & 3 deletions packages/nocap/adapters/default/GeoAdapterDefault/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ import { fetch } from 'cross-fetch'
let endpoint
const ips = this.$.results.getIps('ipv4')
const ip = ips[ips?.length-1]
const apiKey = process.env?.IP_API_KEY
//todo, enable override via options
const fields = 'continent,continentCode,countryCode,regionName,city,lat,lon,isp,as,asname,query'
if(typeof ip !== 'string')
return this.$.finish('geo', { status: "error", message: 'No IP address. Run `dns` check first.', data: {} })
if(this.config?.auth?.ip_api_key)
endpoint = `https://pro.ip-api.com/json/${ip}?key=${this.config.auth.ip_api_key}`
if(apiKey)
endpoint = `https://pro.ip-api.com/json/${ip}?key=${apiKey}&fields=${fields}`
else
endpoint = `http://ip-api.com/json/${ip}`
endpoint = `http://ip-api.com/json/${ip}?fields=${fields}`
const headers = { 'accept': 'application/json' }
const response = await fetch(endpoint, { headers }).catch(e => err=e)
delete response.query
Expand Down
Loading

0 comments on commit 716b20b

Please sign in to comment.