commit fec726b4ff944fe9c2dbfd29d58618d39fa379a1 parent 9ce1f169185b6f65cb75f6af9565c1334852084b Author: triesap <triesap@radroots.dev> Date: Mon, 3 Nov 2025 09:20:32 +0000 Migrate indexer to a top-level crate, inline utilities into a local module, rename the package, retarget workspace paths, and switch to direct dependencies for database, hashing, and Nostr. Update the app to the new index URL environment key with browser-only validation, conditionally prerender and fetch indexes, and tighten configuration loading and logging initialization. Diffstat:
75 files changed, 2317 insertions(+), 2709 deletions(-)
diff --git a/.gitignore b/.gitignore @@ -38,3 +38,5 @@ justfile # dev *dev*.toml *dev*.json +dev/data/ +dev/logs/ +\ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock @@ -856,22 +856,6 @@ dependencies = [ ] [[package]] -name = "indexer-utils" -version = "0.1.0" -dependencies = [ - "anyhow", - "bincode", - "nostr", - "rusqlite", - "serde", - "serde_json", - "sha2", - "sled", - "thiserror 1.0.69", - "tracing", -] - -[[package]] name = "inout" version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1326,20 +1310,24 @@ dependencies = [ ] [[package]] -name = "radroots-market-relay-indexer" +name = "radroots-market-indexer" version = "0.1.0" dependencies = [ "anyhow", + "bincode", "clap", "config", - "indexer-utils", + "nostr", "once_cell", "radroots-core", "radroots-events", "radroots-events-indexed", "regex", + "rusqlite", "serde", "serde_json", + "sha2", + "sled", "thiserror 1.0.69", "tokio", "tracing", diff --git a/Cargo.toml b/Cargo.toml @@ -1,6 +1,6 @@ [workspace] members = [ - "crates/*", + "indexer", ] resolver = "2" @@ -11,9 +11,9 @@ rust-version = "1.86.0" license = "AGPL-3.0" [workspace.dependencies] -radroots-core = { path = "../../crates/crates/core" } -radroots-events = { path = "../../crates/crates/events" } -radroots-events-indexed = { path = "../../crates/crates/events-indexed" } +radroots-core = { path = "../crates/core" } +radroots-events = { path = "../crates/events" } +radroots-events-indexed = { path = "../crates/events-indexed" } anyhow = { version = "1" } clap = { version = "4", features = ["derive"] } diff --git a/app/.env.example b/app/.env.example @@ -1,5 +1,5 @@ VITE_PUBLIC_RADROOTS_MARKET_RELAY_URL= -VITE_PUBLIC_RADROOTS_MARKET_RELAY_INDEXES_URL= +VITE_PUBLIC_RADROOTS_MARKET_INDEXES_URL= VITE_PUBLIC_IDB_NAME= VITE_PUBLIC_NDK_CACHE_NAME= VITE_PUBLIC_NDK_CLIENT_NAME= diff --git a/app/src/lib/utils/_env.ts b/app/src/lib/utils/_env.ts @@ -1,22 +1,22 @@ const RADROOTS_MARKET_RELAY_URL = import.meta.env.VITE_PUBLIC_RADROOTS_MARKET_RELAY_URL; -if (!RADROOTS_MARKET_RELAY_URL || typeof RADROOTS_MARKET_RELAY_URL !== 'string') throw new Error('Missing env var: VITE_PUBLIC_RADROOTS_MARKET_RELAY_URL'); - -const RADROOTS_MARKET_RELAY_INDEXES_URL = import.meta.env.VITE_PUBLIC_RADROOTS_MARKET_RELAY_INDEXES_URL; -if (!RADROOTS_MARKET_RELAY_INDEXES_URL || typeof RADROOTS_MARKET_RELAY_INDEXES_URL !== 'string') throw new Error('Missing env var: VITE_PUBLIC_RADROOTS_MARKET_RELAY_INDEXES_URL'); - +const RADROOTS_MARKET_INDEXES_URL = import.meta.env.VITE_PUBLIC_RADROOTS_MARKET_INDEXES_URL; const IDB_NAME = import.meta.env.VITE_PUBLIC_IDB_NAME; -if (!IDB_NAME || typeof IDB_NAME !== 'string') throw new Error('Missing env var: VITE_PUBLIC_IDB_NAME'); - const NDK_CACHE_NAME = import.meta.env.VITE_PUBLIC_NDK_CACHE_NAME; -if (!NDK_CACHE_NAME || typeof NDK_CACHE_NAME !== 'string') throw new Error('Missing env var: VITE_PUBLIC_NDK_CACHE_NAME'); - const NDK_CLIENT_NAME = import.meta.env.VITE_PUBLIC_NDK_CLIENT_NAME; -if (!NDK_CLIENT_NAME || typeof NDK_CLIENT_NAME !== 'string') throw new Error('Missing env var: VITE_PUBLIC_NDK_CLIENT_NAME'); + +// Only validate in browser context, not during build/analysis +if (typeof window !== 'undefined') { + if (!RADROOTS_MARKET_RELAY_URL || typeof RADROOTS_MARKET_RELAY_URL !== 'string') throw new Error('Missing env var: VITE_PUBLIC_RADROOTS_MARKET_RELAY_URL'); + if (!RADROOTS_MARKET_INDEXES_URL || typeof RADROOTS_MARKET_INDEXES_URL !== 'string') throw new Error('Missing env var: VITE_PUBLIC_RADROOTS_MARKET_INDEXES_URL'); + if (!IDB_NAME || typeof IDB_NAME !== 'string') throw new Error('Missing env var: VITE_PUBLIC_IDB_NAME'); + if (!NDK_CACHE_NAME || typeof NDK_CACHE_NAME !== 'string') throw new Error('Missing env var: VITE_PUBLIC_NDK_CACHE_NAME'); + if (!NDK_CLIENT_NAME || typeof NDK_CLIENT_NAME !== 'string') throw new Error('Missing env var: VITE_PUBLIC_NDK_CLIENT_NAME'); +} export const _env = { IDB_NAME, NDK_CACHE_NAME, NDK_CLIENT_NAME, - RADROOTS_MARKET_RELAY_INDEXES_URL, + RADROOTS_MARKET_INDEXES_URL, RADROOTS_MARKET_RELAY_URL, } as const; \ No newline at end of file diff --git a/app/src/lib/utils/listing/index.ts b/app/src/lib/utils/listing/index.ts @@ -5,7 +5,7 @@ import type { RadrootsEventsIndexedManifest } from "@radroots/events-indexed-bin export type ListingRoutesKind = "country" | "author" | "npub" | "nip05"; -const { RADROOTS_MARKET_RELAY_INDEXES_URL: idx_url } = _env; +const { RADROOTS_MARKET_INDEXES_URL: idx_url } = _env; export type ListingIndexedData = { manifest: RadrootsEventsIndexedManifest; diff --git a/app/src/lib/utils/profile/index.ts b/app/src/lib/utils/profile/index.ts @@ -7,7 +7,7 @@ import { lib_nostr_npub_encode } from "@radroots/utils-nostr"; type ProfileRoutesKind = "author" | "npub" | "nip05"; -const { RADROOTS_MARKET_RELAY_INDEXES_URL: idx_url } = _env; +const { RADROOTS_MARKET_INDEXES_URL: idx_url } = _env; async function fetch_listings(fetch_fn: HttpFetch, kind: ProfileRoutesKind, key: string): Promise<RadrootsListingEventMetadata[]> { const manifest = await fetch_json<radroots_events_indexed_manifest>( diff --git a/app/src/routes/(market)/(listing)/[0=country]/+page.ts b/app/src/routes/(market)/(listing)/[0=country]/+page.ts @@ -4,9 +4,10 @@ import type { RadrootsListingEventMetadata } from "@radroots/events-bindings"; import type { RadrootsEventsIndexedManifest } from "@radroots/events-indexed-bindings"; import type { EntryGenerator, PageLoad } from "./$types"; -const { RADROOTS_MARKET_RELAY_INDEXES_URL: idx_url } = _env; +const { RADROOTS_MARKET_INDEXES_URL: idx_url } = _env; export const entries: EntryGenerator = async () => { + if (!idx_url) return []; const indexes: string[] = await fetch(`${idx_url}/events/30402/country/indexes.json`).then((r) => r.json()); return indexes.map((i) => ({ 0: i })); }; @@ -28,4 +29,4 @@ export const load: PageLoad<PageLoadData> = async ({ fetch, params }) => { }; }; -export const prerender = true; +export const prerender = idx_url ? true : false; diff --git a/app/src/routes/(market)/(profile)/[0=nip05]/+page.ts b/app/src/routes/(market)/(profile)/[0=nip05]/+page.ts @@ -2,9 +2,10 @@ import { _env } from "$lib/utils/_env"; import { load_profile_indexed } from "$lib/utils/profile"; import type { EntryGenerator, PageLoad } from "./$types"; -const { RADROOTS_MARKET_RELAY_INDEXES_URL: idx_url } = _env; +const { RADROOTS_MARKET_INDEXES_URL: idx_url } = _env; export const entries: EntryGenerator = async () => { + if (!idx_url) return []; const indexes: string[] = await fetch(`${idx_url}/events/0/nip05/indexes.json`).then(r => r.json()); return indexes.map(i => ({ 0: i })); }; @@ -14,4 +15,4 @@ export const load: PageLoad = async ({ fetch, params }) => { return load_profile_indexed(fetch, "nip05", nip05); }; -export const prerender = true; +export const prerender = idx_url ? true : false; diff --git a/app/src/routes/(market)/(profile)/profile/[0=npub]/+page.ts b/app/src/routes/(market)/(profile)/profile/[0=npub]/+page.ts @@ -2,9 +2,10 @@ import { _env } from "$lib/utils/_env"; import { load_profile_indexed } from "$lib/utils/profile"; import type { EntryGenerator, PageLoad } from "./$types"; -const { RADROOTS_MARKET_RELAY_INDEXES_URL: idx_url } = _env; +const { RADROOTS_MARKET_INDEXES_URL: idx_url } = _env; export const entries: EntryGenerator = async () => { + if (!idx_url) return []; const indexes: string[] = await fetch(`${idx_url}/events/0/npub/indexes.json`).then(r => r.json()); return indexes.map(i => ({ 0: i })); }; @@ -14,4 +15,4 @@ export const load: PageLoad = async ({ fetch, params }) => { return load_profile_indexed(fetch, "npub", npub); }; -export const prerender = true; +export const prerender = idx_url ? true : false; diff --git a/app/src/routes/(market)/(profile)/profile/[0=public_key]/+page.ts b/app/src/routes/(market)/(profile)/profile/[0=public_key]/+page.ts @@ -2,9 +2,10 @@ import { _env } from "$lib/utils/_env"; import { load_profile_indexed } from "$lib/utils/profile"; import type { EntryGenerator, PageLoad } from "./$types"; -const { RADROOTS_MARKET_RELAY_INDEXES_URL: idx_url } = _env; +const { RADROOTS_MARKET_INDEXES_URL: idx_url } = _env; export const entries: EntryGenerator = async () => { + if (!idx_url) return []; const indexes: string[] = await fetch(`${idx_url}/events/0/author/indexes.json`).then(r => r.json()); return indexes.map(i => ({ 0: i })); }; @@ -14,4 +15,4 @@ export const load: PageLoad = async ({ fetch, params }) => { return load_profile_indexed(fetch, "author", public_key); }; -export const prerender = true; +export const prerender = idx_url ? true : false; diff --git a/app/src/routes/+page.ts b/app/src/routes/+page.ts @@ -2,7 +2,7 @@ import { _env } from "$lib/utils/_env"; import { error } from "@sveltejs/kit"; import type { PageLoad } from "./$types"; -const { RADROOTS_MARKET_RELAY_INDEXES_URL: idx_url } = _env; +const { RADROOTS_MARKET_INDEXES_URL: idx_url } = _env; type PageLoadData = { profiles: string[]; @@ -10,6 +10,13 @@ type PageLoadData = { }; export const load: PageLoad<PageLoadData> = async ({ fetch, params }) => { + if (!idx_url) { + return { + profiles: [], + countries: [], + }; + } + const [ res_nip05_indexes, res_country_indexes, @@ -31,4 +38,4 @@ export const load: PageLoad<PageLoadData> = async ({ fetch, params }) => { return data; } -export const prerender = true; +export const prerender = idx_url ? true : false; diff --git a/crates/indexer-utils/.gitignore b/crates/indexer-utils/.gitignore @@ -1,18 +0,0 @@ -/target - -.vscode -.tmp* - -.env -.env.* -!.env.example -!.env.test - -.DS_Store -*.pem - -notes*.txt -notes*.md -notes*.json -git-diff*.txt -justfile diff --git a/crates/indexer-utils/Cargo.toml b/crates/indexer-utils/Cargo.toml @@ -1,18 +0,0 @@ -[package] -name = "indexer-utils" -version = "0.1.0" -authors = ["Radroots Authors"] -license = "AGPLv3" -edition = "2021" - -[dependencies] -anyhow = "1.0" -bincode = { version = "2.0", features = ["derive", "serde"] } -nostr = "0.43.0" -rusqlite = { version = "0.32.1", features = ["bundled"] } -serde = "1.0" -serde_json = "1.0" -sha2 = "0.10.9" -sled = "0.34.7" -thiserror = "1.0" -tracing = "0.1" diff --git a/crates/indexer-utils/LICENSE b/crates/indexer-utils/LICENSE @@ -1,661 +0,0 @@ - GNU AFFERO GENERAL PUBLIC LICENSE - Version 3, 19 November 2007 - - Copyright (C) 2007 Free Software Foundation, Inc. <https://fsf.org/> - Everyone is permitted to copy and distribute verbatim copies - of this license document, but changing it is not allowed. - - Preamble - - The GNU Affero General Public License is a free, copyleft license for -software and other kinds of works, specifically designed to ensure -cooperation with the community in the case of network server software. - - The licenses for most software and other practical works are designed -to take away your freedom to share and change the works. By contrast, -our General Public Licenses are intended to guarantee your freedom to -share and change all versions of a program--to make sure it remains free -software for all its users. - - When we speak of free software, we are referring to freedom, not -price. Our General Public Licenses are designed to make sure that you -have the freedom to distribute copies of free software (and charge for -them if you wish), that you receive source code or can get it if you -want it, that you can change the software or use pieces of it in new -free programs, and that you know you can do these things. - - Developers that use our General Public Licenses protect your rights -with two steps: (1) assert copyright on the software, and (2) offer -you this License which gives you legal permission to copy, distribute -and/or modify the software. - - A secondary benefit of defending all users' freedom is that -improvements made in alternate versions of the program, if they -receive widespread use, become available for other developers to -incorporate. Many developers of free software are heartened and -encouraged by the resulting cooperation. However, in the case of -software used on network servers, this result may fail to come about. -The GNU General Public License permits making a modified version and -letting the public access it on a server without ever releasing its -source code to the public. - - The GNU Affero General Public License is designed specifically to -ensure that, in such cases, the modified source code becomes available -to the community. It requires the operator of a network server to -provide the source code of the modified version running there to the -users of that server. Therefore, public use of a modified version, on -a publicly accessible server, gives the public access to the source -code of the modified version. - - An older license, called the Affero General Public License and -published by Affero, was designed to accomplish similar goals. This is -a different license, not a version of the Affero GPL, but Affero has -released a new version of the Affero GPL which permits relicensing under -this license. - - The precise terms and conditions for copying, distribution and -modification follow. - - TERMS AND CONDITIONS - - 0. Definitions. - - "This License" refers to version 3 of the GNU Affero General Public License. - - "Copyright" also means copyright-like laws that apply to other kinds of -works, such as semiconductor masks. - - "The Program" refers to any copyrightable work licensed under this -License. Each licensee is addressed as "you". "Licensees" and -"recipients" may be individuals or organizations. - - To "modify" a work means to copy from or adapt all or part of the work -in a fashion requiring copyright permission, other than the making of an -exact copy. The resulting work is called a "modified version" of the -earlier work or a work "based on" the earlier work. - - A "covered work" means either the unmodified Program or a work based -on the Program. - - To "propagate" a work means to do anything with it that, without -permission, would make you directly or secondarily liable for -infringement under applicable copyright law, except executing it on a -computer or modifying a private copy. Propagation includes copying, -distribution (with or without modification), making available to the -public, and in some countries other activities as well. - - To "convey" a work means any kind of propagation that enables other -parties to make or receive copies. Mere interaction with a user through -a computer network, with no transfer of a copy, is not conveying. - - An interactive user interface displays "Appropriate Legal Notices" -to the extent that it includes a convenient and prominently visible -feature that (1) displays an appropriate copyright notice, and (2) -tells the user that there is no warranty for the work (except to the -extent that warranties are provided), that licensees may convey the -work under this License, and how to view a copy of this License. If -the interface presents a list of user commands or options, such as a -menu, a prominent item in the list meets this criterion. - - 1. Source Code. - - The "source code" for a work means the preferred form of the work -for making modifications to it. "Object code" means any non-source -form of a work. - - A "Standard Interface" means an interface that either is an official -standard defined by a recognized standards body, or, in the case of -interfaces specified for a particular programming language, one that -is widely used among developers working in that language. - - The "System Libraries" of an executable work include anything, other -than the work as a whole, that (a) is included in the normal form of -packaging a Major Component, but which is not part of that Major -Component, and (b) serves only to enable use of the work with that -Major Component, or to implement a Standard Interface for which an -implementation is available to the public in source code form. A -"Major Component", in this context, means a major essential component -(kernel, window system, and so on) of the specific operating system -(if any) on which the executable work runs, or a compiler used to -produce the work, or an object code interpreter used to run it. - - The "Corresponding Source" for a work in object code form means all -the source code needed to generate, install, and (for an executable -work) run the object code and to modify the work, including scripts to -control those activities. However, it does not include the work's -System Libraries, or general-purpose tools or generally available free -programs which are used unmodified in performing those activities but -which are not part of the work. For example, Corresponding Source -includes interface definition files associated with source files for -the work, and the source code for shared libraries and dynamically -linked subprograms that the work is specifically designed to require, -such as by intimate data communication or control flow between those -subprograms and other parts of the work. - - The Corresponding Source need not include anything that users -can regenerate automatically from other parts of the Corresponding -Source. - - The Corresponding Source for a work in source code form is that -same work. - - 2. Basic Permissions. - - All rights granted under this License are granted for the term of -copyright on the Program, and are irrevocable provided the stated -conditions are met. This License explicitly affirms your unlimited -permission to run the unmodified Program. The output from running a -covered work is covered by this License only if the output, given its -content, constitutes a covered work. This License acknowledges your -rights of fair use or other equivalent, as provided by copyright law. - - You may make, run and propagate covered works that you do not -convey, without conditions so long as your license otherwise remains -in force. You may convey covered works to others for the sole purpose -of having them make modifications exclusively for you, or provide you -with facilities for running those works, provided that you comply with -the terms of this License in conveying all material for which you do -not control copyright. Those thus making or running the covered works -for you must do so exclusively on your behalf, under your direction -and control, on terms that prohibit them from making any copies of -your copyrighted material outside their relationship with you. - - Conveying under any other circumstances is permitted solely under -the conditions stated below. Sublicensing is not allowed; section 10 -makes it unnecessary. - - 3. Protecting Users' Legal Rights From Anti-Circumvention Law. - - No covered work shall be deemed part of an effective technological -measure under any applicable law fulfilling obligations under article -11 of the WIPO copyright treaty adopted on 20 December 1996, or -similar laws prohibiting or restricting circumvention of such -measures. - - When you convey a covered work, you waive any legal power to forbid -circumvention of technological measures to the extent such circumvention -is effected by exercising rights under this License with respect to -the covered work, and you disclaim any intention to limit operation or -modification of the work as a means of enforcing, against the work's -users, your or third parties' legal rights to forbid circumvention of -technological measures. - - 4. Conveying Verbatim Copies. - - You may convey verbatim copies of the Program's source code as you -receive it, in any medium, provided that you conspicuously and -appropriately publish on each copy an appropriate copyright notice; -keep intact all notices stating that this License and any -non-permissive terms added in accord with section 7 apply to the code; -keep intact all notices of the absence of any warranty; and give all -recipients a copy of this License along with the Program. - - You may charge any price or no price for each copy that you convey, -and you may offer support or warranty protection for a fee. - - 5. Conveying Modified Source Versions. - - You may convey a work based on the Program, or the modifications to -produce it from the Program, in the form of source code under the -terms of section 4, provided that you also meet all of these conditions: - - a) The work must carry prominent notices stating that you modified - it, and giving a relevant date. - - b) The work must carry prominent notices stating that it is - released under this License and any conditions added under section - 7. This requirement modifies the requirement in section 4 to - "keep intact all notices". - - c) You must license the entire work, as a whole, under this - License to anyone who comes into possession of a copy. This - License will therefore apply, along with any applicable section 7 - additional terms, to the whole of the work, and all its parts, - regardless of how they are packaged. This License gives no - permission to license the work in any other way, but it does not - invalidate such permission if you have separately received it. - - d) If the work has interactive user interfaces, each must display - Appropriate Legal Notices; however, if the Program has interactive - interfaces that do not display Appropriate Legal Notices, your - work need not make them do so. - - A compilation of a covered work with other separate and independent -works, which are not by their nature extensions of the covered work, -and which are not combined with it such as to form a larger program, -in or on a volume of a storage or distribution medium, is called an -"aggregate" if the compilation and its resulting copyright are not -used to limit the access or legal rights of the compilation's users -beyond what the individual works permit. Inclusion of a covered work -in an aggregate does not cause this License to apply to the other -parts of the aggregate. - - 6. Conveying Non-Source Forms. - - You may convey a covered work in object code form under the terms -of sections 4 and 5, provided that you also convey the -machine-readable Corresponding Source under the terms of this License, -in one of these ways: - - a) Convey the object code in, or embodied in, a physical product - (including a physical distribution medium), accompanied by the - Corresponding Source fixed on a durable physical medium - customarily used for software interchange. - - b) Convey the object code in, or embodied in, a physical product - (including a physical distribution medium), accompanied by a - written offer, valid for at least three years and valid for as - long as you offer spare parts or customer support for that product - model, to give anyone who possesses the object code either (1) a - copy of the Corresponding Source for all the software in the - product that is covered by this License, on a durable physical - medium customarily used for software interchange, for a price no - more than your reasonable cost of physically performing this - conveying of source, or (2) access to copy the - Corresponding Source from a network server at no charge. - - c) Convey individual copies of the object code with a copy of the - written offer to provide the Corresponding Source. This - alternative is allowed only occasionally and noncommercially, and - only if you received the object code with such an offer, in accord - with subsection 6b. - - d) Convey the object code by offering access from a designated - place (gratis or for a charge), and offer equivalent access to the - Corresponding Source in the same way through the same place at no - further charge. You need not require recipients to copy the - Corresponding Source along with the object code. If the place to - copy the object code is a network server, the Corresponding Source - may be on a different server (operated by you or a third party) - that supports equivalent copying facilities, provided you maintain - clear directions next to the object code saying where to find the - Corresponding Source. Regardless of what server hosts the - Corresponding Source, you remain obligated to ensure that it is - available for as long as needed to satisfy these requirements. - - e) Convey the object code using peer-to-peer transmission, provided - you inform other peers where the object code and Corresponding - Source of the work are being offered to the general public at no - charge under subsection 6d. - - A separable portion of the object code, whose source code is excluded -from the Corresponding Source as a System Library, need not be -included in conveying the object code work. - - A "User Product" is either (1) a "consumer product", which means any -tangible personal property which is normally used for personal, family, -or household purposes, or (2) anything designed or sold for incorporation -into a dwelling. In determining whether a product is a consumer product, -doubtful cases shall be resolved in favor of coverage. For a particular -product received by a particular user, "normally used" refers to a -typical or common use of that class of product, regardless of the status -of the particular user or of the way in which the particular user -actually uses, or expects or is expected to use, the product. A product -is a consumer product regardless of whether the product has substantial -commercial, industrial or non-consumer uses, unless such uses represent -the only significant mode of use of the product. - - "Installation Information" for a User Product means any methods, -procedures, authorization keys, or other information required to install -and execute modified versions of a covered work in that User Product from -a modified version of its Corresponding Source. The information must -suffice to ensure that the continued functioning of the modified object -code is in no case prevented or interfered with solely because -modification has been made. - - If you convey an object code work under this section in, or with, or -specifically for use in, a User Product, and the conveying occurs as -part of a transaction in which the right of possession and use of the -User Product is transferred to the recipient in perpetuity or for a -fixed term (regardless of how the transaction is characterized), the -Corresponding Source conveyed under this section must be accompanied -by the Installation Information. But this requirement does not apply -if neither you nor any third party retains the ability to install -modified object code on the User Product (for example, the work has -been installed in ROM). - - The requirement to provide Installation Information does not include a -requirement to continue to provide support service, warranty, or updates -for a work that has been modified or installed by the recipient, or for -the User Product in which it has been modified or installed. Access to a -network may be denied when the modification itself materially and -adversely affects the operation of the network or violates the rules and -protocols for communication across the network. - - Corresponding Source conveyed, and Installation Information provided, -in accord with this section must be in a format that is publicly -documented (and with an implementation available to the public in -source code form), and must require no special password or key for -unpacking, reading or copying. - - 7. Additional Terms. - - "Additional permissions" are terms that supplement the terms of this -License by making exceptions from one or more of its conditions. -Additional permissions that are applicable to the entire Program shall -be treated as though they were included in this License, to the extent -that they are valid under applicable law. If additional permissions -apply only to part of the Program, that part may be used separately -under those permissions, but the entire Program remains governed by -this License without regard to the additional permissions. - - When you convey a copy of a covered work, you may at your option -remove any additional permissions from that copy, or from any part of -it. (Additional permissions may be written to require their own -removal in certain cases when you modify the work.) You may place -additional permissions on material, added by you to a covered work, -for which you have or can give appropriate copyright permission. - - Notwithstanding any other provision of this License, for material you -add to a covered work, you may (if authorized by the copyright holders of -that material) supplement the terms of this License with terms: - - a) Disclaiming warranty or limiting liability differently from the - terms of sections 15 and 16 of this License; or - - b) Requiring preservation of specified reasonable legal notices or - author attributions in that material or in the Appropriate Legal - Notices displayed by works containing it; or - - c) Prohibiting misrepresentation of the origin of that material, or - requiring that modified versions of such material be marked in - reasonable ways as different from the original version; or - - d) Limiting the use for publicity purposes of names of licensors or - authors of the material; or - - e) Declining to grant rights under trademark law for use of some - trade names, trademarks, or service marks; or - - f) Requiring indemnification of licensors and authors of that - material by anyone who conveys the material (or modified versions of - it) with contractual assumptions of liability to the recipient, for - any liability that these contractual assumptions directly impose on - those licensors and authors. - - All other non-permissive additional terms are considered "further -restrictions" within the meaning of section 10. If the Program as you -received it, or any part of it, contains a notice stating that it is -governed by this License along with a term that is a further -restriction, you may remove that term. If a license document contains -a further restriction but permits relicensing or conveying under this -License, you may add to a covered work material governed by the terms -of that license document, provided that the further restriction does -not survive such relicensing or conveying. - - If you add terms to a covered work in accord with this section, you -must place, in the relevant source files, a statement of the -additional terms that apply to those files, or a notice indicating -where to find the applicable terms. - - Additional terms, permissive or non-permissive, may be stated in the -form of a separately written license, or stated as exceptions; -the above requirements apply either way. - - 8. Termination. - - You may not propagate or modify a covered work except as expressly -provided under this License. Any attempt otherwise to propagate or -modify it is void, and will automatically terminate your rights under -this License (including any patent licenses granted under the third -paragraph of section 11). - - However, if you cease all violation of this License, then your -license from a particular copyright holder is reinstated (a) -provisionally, unless and until the copyright holder explicitly and -finally terminates your license, and (b) permanently, if the copyright -holder fails to notify you of the violation by some reasonable means -prior to 60 days after the cessation. - - Moreover, your license from a particular copyright holder is -reinstated permanently if the copyright holder notifies you of the -violation by some reasonable means, this is the first time you have -received notice of violation of this License (for any work) from that -copyright holder, and you cure the violation prior to 30 days after -your receipt of the notice. - - Termination of your rights under this section does not terminate the -licenses of parties who have received copies or rights from you under -this License. If your rights have been terminated and not permanently -reinstated, you do not qualify to receive new licenses for the same -material under section 10. - - 9. Acceptance Not Required for Having Copies. - - You are not required to accept this License in order to receive or -run a copy of the Program. Ancillary propagation of a covered work -occurring solely as a consequence of using peer-to-peer transmission -to receive a copy likewise does not require acceptance. However, -nothing other than this License grants you permission to propagate or -modify any covered work. These actions infringe copyright if you do -not accept this License. Therefore, by modifying or propagating a -covered work, you indicate your acceptance of this License to do so. - - 10. Automatic Licensing of Downstream Recipients. - - Each time you convey a covered work, the recipient automatically -receives a license from the original licensors, to run, modify and -propagate that work, subject to this License. You are not responsible -for enforcing compliance by third parties with this License. - - An "entity transaction" is a transaction transferring control of an -organization, or substantially all assets of one, or subdividing an -organization, or merging organizations. If propagation of a covered -work results from an entity transaction, each party to that -transaction who receives a copy of the work also receives whatever -licenses to the work the party's predecessor in interest had or could -give under the previous paragraph, plus a right to possession of the -Corresponding Source of the work from the predecessor in interest, if -the predecessor has it or can get it with reasonable efforts. - - You may not impose any further restrictions on the exercise of the -rights granted or affirmed under this License. For example, you may -not impose a license fee, royalty, or other charge for exercise of -rights granted under this License, and you may not initiate litigation -(including a cross-claim or counterclaim in a lawsuit) alleging that -any patent claim is infringed by making, using, selling, offering for -sale, or importing the Program or any portion of it. - - 11. Patents. - - A "contributor" is a copyright holder who authorizes use under this -License of the Program or a work on which the Program is based. The -work thus licensed is called the contributor's "contributor version". - - A contributor's "essential patent claims" are all patent claims -owned or controlled by the contributor, whether already acquired or -hereafter acquired, that would be infringed by some manner, permitted -by this License, of making, using, or selling its contributor version, -but do not include claims that would be infringed only as a -consequence of further modification of the contributor version. For -purposes of this definition, "control" includes the right to grant -patent sublicenses in a manner consistent with the requirements of -this License. - - Each contributor grants you a non-exclusive, worldwide, royalty-free -patent license under the contributor's essential patent claims, to -make, use, sell, offer for sale, import and otherwise run, modify and -propagate the contents of its contributor version. - - In the following three paragraphs, a "patent license" is any express -agreement or commitment, however denominated, not to enforce a patent -(such as an express permission to practice a patent or covenant not to -sue for patent infringement). To "grant" such a patent license to a -party means to make such an agreement or commitment not to enforce a -patent against the party. - - If you convey a covered work, knowingly relying on a patent license, -and the Corresponding Source of the work is not available for anyone -to copy, free of charge and under the terms of this License, through a -publicly available network server or other readily accessible means, -then you must either (1) cause the Corresponding Source to be so -available, or (2) arrange to deprive yourself of the benefit of the -patent license for this particular work, or (3) arrange, in a manner -consistent with the requirements of this License, to extend the patent -license to downstream recipients. "Knowingly relying" means you have -actual knowledge that, but for the patent license, your conveying the -covered work in a country, or your recipient's use of the covered work -in a country, would infringe one or more identifiable patents in that -country that you have reason to believe are valid. - - If, pursuant to or in connection with a single transaction or -arrangement, you convey, or propagate by procuring conveyance of, a -covered work, and grant a patent license to some of the parties -receiving the covered work authorizing them to use, propagate, modify -or convey a specific copy of the covered work, then the patent license -you grant is automatically extended to all recipients of the covered -work and works based on it. - - A patent license is "discriminatory" if it does not include within -the scope of its coverage, prohibits the exercise of, or is -conditioned on the non-exercise of one or more of the rights that are -specifically granted under this License. You may not convey a covered -work if you are a party to an arrangement with a third party that is -in the business of distributing software, under which you make payment -to the third party based on the extent of your activity of conveying -the work, and under which the third party grants, to any of the -parties who would receive the covered work from you, a discriminatory -patent license (a) in connection with copies of the covered work -conveyed by you (or copies made from those copies), or (b) primarily -for and in connection with specific products or compilations that -contain the covered work, unless you entered into that arrangement, -or that patent license was granted, prior to 28 March 2007. - - Nothing in this License shall be construed as excluding or limiting -any implied license or other defenses to infringement that may -otherwise be available to you under applicable patent law. - - 12. No Surrender of Others' Freedom. - - If conditions are imposed on you (whether by court order, agreement or -otherwise) that contradict the conditions of this License, they do not -excuse you from the conditions of this License. If you cannot convey a -covered work so as to satisfy simultaneously your obligations under this -License and any other pertinent obligations, then as a consequence you may -not convey it at all. For example, if you agree to terms that obligate you -to collect a royalty for further conveying from those to whom you convey -the Program, the only way you could satisfy both those terms and this -License would be to refrain entirely from conveying the Program. - - 13. Remote Network Interaction; Use with the GNU General Public License. - - Notwithstanding any other provision of this License, if you modify the -Program, your modified version must prominently offer all users -interacting with it remotely through a computer network (if your version -supports such interaction) an opportunity to receive the Corresponding -Source of your version by providing access to the Corresponding Source -from a network server at no charge, through some standard or customary -means of facilitating copying of software. This Corresponding Source -shall include the Corresponding Source for any work covered by version 3 -of the GNU General Public License that is incorporated pursuant to the -following paragraph. - - Notwithstanding any other provision of this License, you have -permission to link or combine any covered work with a work licensed -under version 3 of the GNU General Public License into a single -combined work, and to convey the resulting work. The terms of this -License will continue to apply to the part which is the covered work, -but the work with which it is combined will remain governed by version -3 of the GNU General Public License. - - 14. Revised Versions of this License. - - The Free Software Foundation may publish revised and/or new versions of -the GNU Affero General Public License from time to time. Such new versions -will be similar in spirit to the present version, but may differ in detail to -address new problems or concerns. - - Each version is given a distinguishing version number. If the -Program specifies that a certain numbered version of the GNU Affero General -Public License "or any later version" applies to it, you have the -option of following the terms and conditions either of that numbered -version or of any later version published by the Free Software -Foundation. If the Program does not specify a version number of the -GNU Affero General Public License, you may choose any version ever published -by the Free Software Foundation. - - If the Program specifies that a proxy can decide which future -versions of the GNU Affero General Public License can be used, that proxy's -public statement of acceptance of a version permanently authorizes you -to choose that version for the Program. - - Later license versions may give you additional or different -permissions. However, no additional obligations are imposed on any -author or copyright holder as a result of your choosing to follow a -later version. - - 15. Disclaimer of Warranty. - - THERE IS NO WARRANTY FOR THE PROGRAM, TO THE EXTENT PERMITTED BY -APPLICABLE LAW. EXCEPT WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT -HOLDERS AND/OR OTHER PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY -OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, -THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR -PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE PROGRAM -IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF -ALL NECESSARY SERVICING, REPAIR OR CORRECTION. - - 16. Limitation of Liability. - - IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING -WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MODIFIES AND/OR CONVEYS -THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY -GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE -USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED TO LOSS OF -DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD -PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS), -EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF -SUCH DAMAGES. - - 17. Interpretation of Sections 15 and 16. - - If the disclaimer of warranty and limitation of liability provided -above cannot be given local legal effect according to their terms, -reviewing courts shall apply local law that most closely approximates -an absolute waiver of all civil liability in connection with the -Program, unless a warranty or assumption of liability accompanies a -copy of the Program in return for a fee. - - END OF TERMS AND CONDITIONS - - How to Apply These Terms to Your New Programs - - If you develop a new program, and you want it to be of the greatest -possible use to the public, the best way to achieve this is to make it -free software which everyone can redistribute and change under these terms. - - To do so, attach the following notices to the program. It is safest -to attach them to the start of each source file to most effectively -state the exclusion of warranty; and each file should have at least -the "copyright" line and a pointer to where the full notice is found. - - <one line to give the program's name and a brief idea of what it does.> - Copyright (C) <year> <name of author> - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU Affero General Public License as published - by the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License - along with this program. If not, see <https://www.gnu.org/licenses/>. - -Also add information on how to contact you by electronic and paper mail. - - If your software can interact with users remotely through a computer -network, you should also make sure that it provides a way for users to -get its source. For example, if your program is a web application, its -interface could display a "Source" link that leads users to an archive -of the code. There are many ways you could offer source, and different -solutions will be better for different programs; see section 13 for the -specific requirements. - - You should also get your employer (if you work as a programmer) or school, -if any, to sign a "copyright disclaimer" for the program, if necessary. -For more information on this, and how to apply and follow the GNU AGPL, see -<https://www.gnu.org/licenses/>. -\ No newline at end of file diff --git a/crates/indexer-utils/src/crypto.rs b/crates/indexer-utils/src/crypto.rs @@ -1,7 +0,0 @@ -use sha2::{Digest, Sha256}; - -pub fn sha256_hex(data: &[u8]) -> String { - let mut hasher = Sha256::new(); - hasher.update(data); - format!("{:x}", hasher.finalize()) -} diff --git a/crates/indexer-utils/src/database.rs b/crates/indexer-utils/src/database.rs @@ -1,59 +0,0 @@ -use anyhow::Result; -use serde::de::DeserializeOwned; -use serde::Serialize; -use sled::{Config as SledConfig, Db, IVec}; - -use crate::serialization::{deserialize, serialize}; - -pub struct IndexerDb { - pub db: Db, -} - -impl IndexerDb { - pub fn open(path: &str) -> Result<Self> { - let db = SledConfig::new().path(path).open()?; - Ok(Self { db }) - } - - pub fn insert<T: Serialize>(&self, tree: &str, key: &str, v: &T) -> Result<()> { - let t = self.db.open_tree(tree)?; - let blob: Vec<u8> = serialize(v)?; - t.insert(key, blob)?; - Ok(()) - } - - pub fn get<T: DeserializeOwned>(&self, tree: &str, key: &str) -> Result<Option<T>> { - let t = self.db.open_tree(tree)?; - if let Some(bytes) = t.get(key)? { - let v: T = deserialize(&bytes)?; - Ok(Some(v)) - } else { - Ok(None) - } - } - - pub fn get_all<T: DeserializeOwned>(&self, tree: &str) -> Result<Vec<T>> { - let t = self.db.open_tree(tree)?; - let mut out = Vec::new(); - for res in t.iter().values() { - let bytes = res?; - let v: T = crate::serialization::deserialize(&bytes)?; - out.push(v); - } - Ok(out) - } - - pub fn insert_raw(&self, tree: &str, key: &str, bytes: &[u8]) -> Result<()> { - self.db.open_tree(tree)?.insert(key, bytes)?; - Ok(()) - } - - pub fn get_raw(&self, tree: &str, key: &str) -> Result<Option<IVec>> { - Ok(self.db.open_tree(tree)?.get(key)?) - } - - pub fn flush(&self) -> Result<()> { - self.db.flush()?; - Ok(()) - } -} diff --git a/crates/indexer-utils/src/file.rs b/crates/indexer-utils/src/file.rs @@ -1,50 +0,0 @@ -use anyhow::Result; -use std::fs::{self, File}; -use std::io::Write; -use std::path::Path; -use thiserror::Error; -use tracing::debug; - -use crate::paths::{paths_join, PathsError}; - -#[derive(Error, Debug)] -pub enum FileError { - #[error("Failed to create directory `{path}`: {source}")] - CreateDirError { - path: String, - #[source] - source: std::io::Error, - }, - - #[error("Invalid path construction: {0}")] - PathJoinError(#[from] PathsError), - - #[error("Failed to write RSS file: {0}")] - Io(#[from] std::io::Error), -} - -pub fn fs_mkdir<S, I>(segments: I) -> Result<(), FileError> -where - I: IntoIterator<Item = S>, - S: AsRef<Path>, -{ - let dir_path = paths_join(segments)?; - - if !dir_path.exists() { - fs::create_dir_all(&dir_path).map_err(|e| FileError::CreateDirError { - path: dir_path.display().to_string(), - source: e, - })?; - debug!("Created directory: {}", dir_path.display()); - } else { - debug!("Directory already exists: {}", dir_path.display()); - } - - Ok(()) -} - -pub fn fs_write_rss(path: &Path, content: &str) -> Result<(), FileError> { - let mut file = File::create(path)?; - file.write_all(content.as_bytes())?; - Ok(()) -} diff --git a/crates/indexer-utils/src/hash.rs b/crates/indexer-utils/src/hash.rs @@ -1,30 +0,0 @@ -use sha2::{Digest, Sha256}; -use std::{ - fs::File, - io::{self, Read}, - path::Path, -}; - -pub fn compute_hash_of_bytes<R: Read>(mut rdr: R) -> io::Result<String> { - let mut hasher = Sha256::new(); - let mut buf = [0u8; 8 * 1024]; - loop { - let n = rdr.read(&mut buf)?; - if n == 0 { - break; - } - hasher.update(&buf[..n]); - } - Ok(format!("{:x}", hasher.finalize())) -} - -pub fn is_file_hash_valid(path: &Path) -> io::Result<bool> { - let hash_path = path.with_extension("sha256.txt"); - if !path.exists() || !hash_path.exists() { - return Ok(false); - } - let file = File::open(path)?; - let computed = compute_hash_of_bytes(file)?; - let stored = std::fs::read_to_string(hash_path)?; - Ok(stored.trim() == computed) -} diff --git a/crates/indexer-utils/src/lib.rs b/crates/indexer-utils/src/lib.rs @@ -1,10 +0,0 @@ -pub mod crypto; -pub mod database; -pub mod file; -pub mod hash; -pub mod logs; -pub mod nostr; -pub mod paths; -pub mod serialization; -pub mod sqlite; -pub mod write; diff --git a/crates/indexer-utils/src/nostr.rs b/crates/indexer-utils/src/nostr.rs @@ -1,33 +0,0 @@ -use std::collections::HashMap; - -use nostr::key::{Error as PublicKeyError, PublicKey}; -use nostr::prelude::ToBech32; -use thiserror::Error; - -#[derive(Debug, Error)] -pub enum NostrUtilsError { - #[error("Invalid hex for public key: {0}")] - InvalidPublicKey(#[from] PublicKeyError), - - #[error("Tag parsing error: {0}")] - TagParseError(String), -} - -pub fn public_key_to_npub(public_key_hex: &str) -> Result<String, NostrUtilsError> { - let pubkey = PublicKey::from_hex(public_key_hex)?; - Ok(pubkey.to_bech32().expect("to_bech32 is infallible")) -} - -pub fn get_tag_value<'a>( - tags_map: &'a HashMap<String, Vec<String>>, - key: &str, - idx: usize, -) -> Result<Option<String>, NostrUtilsError> { - match tags_map.get(key) { - Some(values) => Ok(values.get(idx).cloned()), - None => Err(NostrUtilsError::TagParseError(format!( - "Tag '{}' not found", - key - ))), - } -} diff --git a/crates/indexer-utils/src/paths.rs b/crates/indexer-utils/src/paths.rs @@ -1,31 +0,0 @@ -use anyhow::Result; -use std::path::{Path, PathBuf}; -use thiserror::Error; - -#[derive(Debug, Error)] -pub enum PathsError { - #[error("Invalid path segment at index {index}: `{segment}`")] - InvalidSegment { index: usize, segment: String }, -} -pub fn paths_join<I, S>(segments: I) -> Result<PathBuf, PathsError> -where - I: IntoIterator<Item = S>, - S: AsRef<Path>, -{ - let mut path = PathBuf::new(); - - for (i, segment) in segments.into_iter().enumerate() { - let seg_ref = segment.as_ref(); - - if seg_ref.as_os_str().is_empty() { - return Err(PathsError::InvalidSegment { - index: i, - segment: seg_ref.display().to_string(), - }); - } - - path.push(seg_ref); - } - - Ok(path) -} diff --git a/crates/indexer-utils/src/sqlite.rs b/crates/indexer-utils/src/sqlite.rs @@ -1,57 +0,0 @@ -use rusqlite::{Connection, Row, Statement}; -use thiserror::Error; - -pub use rusqlite::{ - types::Type as SqliteType, Error as RustqliteError, Result as SqliteResult, Row as SqliteRow, -}; - -#[derive(Debug, Error)] -pub enum SqliteError { - #[error("Failed to open SQLite DB at `{path}`: {source}")] - ConnectionError { - path: String, - #[source] - source: rusqlite::Error, - }, - - #[error("Failed to prepare SQL statement: {0}")] - PrepareError(#[source] rusqlite::Error), - - #[error("Failed to run SQL query: {0}")] - QueryError(#[source] rusqlite::Error), - - #[error("Failed to collect query results: {0}")] - CollectError(#[source] rusqlite::Error), -} - -pub fn sqlite_conn(db_path: &str) -> Result<Connection, SqliteError> { - Connection::open(db_path).map_err(|e| SqliteError::ConnectionError { - path: db_path.to_string(), - source: e, - }) -} - -pub fn sqlite_stmt<'a>(conn: &'a Connection, stmt: &'a str) -> Result<Statement<'a>, SqliteError> { - conn.prepare(stmt).map_err(SqliteError::PrepareError) -} - -pub fn sqlite_stmt_querymap<'a, T, F>( - stmt: &'a mut Statement<'a>, - map_fn: F, -) -> Result<Vec<T>, SqliteError> -where - F: Fn(&Row) -> rusqlite::Result<T>, - T: 'a, -{ - let mapped = stmt - .query_map([], map_fn) - .map_err(SqliteError::QueryError)?; - mapped - .collect::<Result<Vec<_>, _>>() - .map_err(SqliteError::CollectError) -} - -pub fn row_u64_at(row: &SqliteRow, idx: usize) -> SqliteResult<u64> { - let v: u32 = row.get(idx)?; - Ok(v as u64) -} diff --git a/crates/indexer-utils/src/write.rs b/crates/indexer-utils/src/write.rs @@ -1,44 +0,0 @@ -use anyhow::Result; -use serde::Serialize; -use sha2::Digest; -use std::{ - fs::{write, File}, - io::{BufWriter, Write}, - path::Path, -}; -use tracing::debug; - -pub fn write_json<T: Serialize>(path: &Path, data: &T) -> Result<()> { - let file = File::create(path)?; - let mut buf = BufWriter::new(file); - serde_json::to_writer_pretty(&mut buf, data)?; - buf.flush()?; - Ok(()) -} - -pub fn write_hash(path: &Path, hash: &str) -> Result<()> { - let hash_path = path.with_extension("sha256.txt"); - write(&hash_path, format!("{hash}\n"))?; - debug!(hash_path = %hash_path.display(), "Wrote new hash file"); - Ok(()) -} - -struct HasherWriter<'a>(&'a mut sha2::Sha256); -impl<'a> std::io::Write for HasherWriter<'a> { - fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { - self.0.update(buf); - Ok(buf.len()) - } - fn flush(&mut self) -> std::io::Result<()> { - Ok(()) - } -} - -pub fn compute_hash<T: Serialize>(data: &T) -> anyhow::Result<String> { - let mut hasher = sha2::Sha256::new(); - { - let writer = HasherWriter(&mut hasher); - serde_json::to_writer_pretty(writer, data)?; - } - Ok(format!("{:x}", hasher.finalize())) -} diff --git a/crates/indexer/Cargo.toml b/crates/indexer/Cargo.toml @@ -1,30 +0,0 @@ -[package] -name = "radroots-market-relay-indexer" -version = "0.1.0" -authors = ["Radroots Authors"] -license = "AGPLv3" -edition = "2021" - -[features] -default = [] -audit = [] - -[dependencies] -indexer-utils = { path = "../indexer-utils" } -radroots-core = { workspace = true } -radroots-events = { workspace = true } -radroots-events-indexed = { workspace = true } - -anyhow = { workspace = true } -clap = { workspace = true } -config = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } -thiserror = { workspace = true } -tokio = { workspace = true } -tracing = { workspace = true } -tracing-subscriber = { workspace = true } -tracing-appender = { workspace = true } -once_cell = { workspace = true } -regex = { workspace = true } - diff --git a/crates/indexer/config.toml b/crates/indexer/config.toml @@ -1,12 +0,0 @@ -[indexer] -data_dir = "data/radroots-market-relay-indexer" -logs_dir = "logs/radroots-market-relay-indexer" -flush_interval = 20 - -[relay] -url = "ws://127.0.0.1:8080" -database_path = "data/nostr-rs-relay/nostr.db" - -[listings] -country_shard_size = 1000 -profile_shard_size = 200 diff --git a/crates/indexer/src/audit.rs b/crates/indexer/src/audit.rs @@ -1,388 +0,0 @@ -#![cfg(feature = "audit")] - -use std::collections::HashSet; -use std::sync::RwLock; - -use indexer_utils::nostr::public_key_to_npub; -use once_cell::sync::Lazy; -use regex::Regex; -use tracing::info; - -use crate::domain::resolvers::profile::ProfileResolver; -use crate::relay::event::RelayIndexerEvent; -use radroots_events::listing::models::RadrootsListingEventIndex; -use radroots_events::profile::models::RadrootsProfileEventIndex; - -#[derive(Clone, Debug)] -pub struct AuditFilter { - pub enabled: bool, - pub kinds: Option<HashSet<u64>>, - pub authors: HashSet<String>, - pub npubs: HashSet<String>, - pub nip05_full: HashSet<String>, - pub nip05_local: HashSet<String>, - pub content_re: Option<Regex>, - pub created_at_min: Option<u32>, - pub created_at_max: Option<u32>, -} - -impl Default for AuditFilter { - fn default() -> Self { - Self { - enabled: false, - kinds: None, - authors: HashSet::new(), - npubs: HashSet::new(), - nip05_full: HashSet::new(), - nip05_local: HashSet::new(), - content_re: None, - created_at_min: None, - created_at_max: None, - } - } -} - -impl AuditFilter { - pub fn from_env() -> Self { - let mut f = Self::default(); - - f.enabled = std::env::var("AUDIT_ENABLED") - .map(|v| v.eq_ignore_ascii_case("true") || v == "1") - .unwrap_or(false); - - if let Ok(v) = std::env::var("AUDIT_KINDS") { - let set = v - .split(',') - .filter_map(|s| s.trim().parse::<u64>().ok()) - .collect::<HashSet<_>>(); - if !set.is_empty() { - f.kinds = Some(set); - } - } - - let parse_set = |key: &str| -> HashSet<String> { - std::env::var(key) - .ok() - .map(|s| { - s.split(',') - .map(|x| x.trim().to_lowercase()) - .filter(|x| !x.is_empty()) - .collect() - }) - .unwrap_or_default() - }; - - f.authors = parse_set("AUDIT_AUTHORS"); - f.npubs = parse_set("AUDIT_NPUBS"); - f.nip05_full = parse_set("AUDIT_NIP05"); - f.nip05_local = parse_set("AUDIT_NIP05_LOCAL"); - - if let Ok(rx) = std::env::var("AUDIT_CONTENT_RE") { - if !rx.trim().is_empty() { - if let Ok(re) = Regex::new(&format!("(?i){}", rx)) { - f.content_re = Some(re); - } - } - } - - f.created_at_min = std::env::var("AUDIT_CREATED_AT_MIN") - .ok() - .and_then(|s| s.parse().ok()); - f.created_at_max = std::env::var("AUDIT_CREATED_AT_MAX") - .ok() - .and_then(|s| s.parse().ok()); - - f - } -} - -#[derive(Clone)] -struct AuditState { - filter: AuditFilter, - resolver: Option<ProfileResolver>, -} - -static STATE: Lazy<RwLock<AuditState>> = Lazy::new(|| { - RwLock::new(AuditState { - filter: AuditFilter::from_env(), - resolver: None, - }) -}); - -pub fn reload_from_env() { - if let Ok(mut w) = STATE.write() { - w.filter = AuditFilter::from_env(); - } -} - -pub fn set_profile_resolver(resolver: ProfileResolver) { - if let Ok(mut w) = STATE.write() { - w.resolver = Some(resolver); - } -} - -fn nip05_parts_from_metadata(nip05: &str) -> (String, String) { - let lower = nip05.to_lowercase(); - if let Some((name, domain)) = lower.split_once('@') { - (format!("{name}@{domain}"), name.to_string()) - } else { - (lower.clone(), lower) - } -} - -fn should_log( - author_hex: &str, - kind_u64: u64, - created_at: u32, - content: &str, - npub_opt: Option<String>, - nip05_full_opt: Option<String>, - nip05_local_opt: Option<String>, -) -> bool { - let filter = STATE.read().ok().map(|s| s.filter.clone()); - let Some(f) = filter else { - return false; - }; - if !f.enabled { - return false; - } - - if let Some(kinds) = &f.kinds { - if !kinds.contains(&kind_u64) { - return false; - } - } - - if !f.authors.is_empty() && !f.authors.contains(&author_hex.to_lowercase()) { - return false; - } - - if !f.npubs.is_empty() { - let pass = npub_opt - .as_ref() - .map(|n| f.npubs.contains(&n.to_lowercase())) - .unwrap_or(false); - if !pass { - return false; - } - } - - if !f.nip05_full.is_empty() { - let pass = nip05_full_opt - .as_ref() - .map(|n| f.nip05_full.contains(&n.to_lowercase())) - .unwrap_or(false); - if !pass { - return false; - } - } - - if !f.nip05_local.is_empty() { - let pass = nip05_local_opt - .as_ref() - .map(|n| f.nip05_local.contains(&n.to_lowercase())) - .unwrap_or(false); - if !pass { - return false; - } - } - - if let Some(min) = f.created_at_min { - if created_at < min { - return false; - } - } - if let Some(max) = f.created_at_max { - if created_at > max { - return false; - } - } - - if let Some(re) = &f.content_re { - if !re.is_match(content) { - return false; - } - } - - true -} - -#[inline] -pub fn log_indexer_event(idx: &RelayIndexerEvent) { - let need_npub = STATE - .read() - .ok() - .map(|s| !s.filter.npubs.is_empty()) - .unwrap_or(false); - let npub_opt = if need_npub { - public_key_to_npub(&idx.author).ok() - } else { - None - }; - - let (need_full, need_local) = STATE - .read() - .ok() - .map(|s| { - ( - !s.filter.nip05_full.is_empty(), - !s.filter.nip05_local.is_empty(), - ) - }) - .unwrap_or((false, false)); - - let (nip05_full_opt, nip05_local_opt) = if need_full || need_local { - if let Ok(s) = STATE.read() { - if let Some(res) = s.resolver.as_ref() { - let local = res.nip05_for_author(&idx.author).map(|s| s.to_string()); - (None, local) - } else { - (None, None) - } - } else { - (None, None) - } - } else { - (None, None) - }; - - if !should_log( - &idx.author, - idx.kind.as_u64(), - idx.created_at, - &idx.content, - npub_opt, - nip05_full_opt, - nip05_local_opt, - ) { - return; - } - - let tags_json = - serde_json::to_string(&idx.tags).unwrap_or_else(|_| "Error serializing tags".into()); - info!( - target: "audit", - kind = idx.kind.as_u64(), - id = %idx.id, - author = %idx.author, - created_at = idx.created_at, - tags = %tags_json, - content = %idx.content, - "AUDIT: relay indexer event" - ); -} - -#[inline] -pub fn log_profile_event(evt: &RadrootsProfileEventIndex) { - let (nip05_full_opt, nip05_local_opt) = evt - .metadata - .profile - .nip05 - .as_ref() - .map(|n| { - let (full, local) = nip05_parts_from_metadata(n); - (Some(full), Some(local)) - }) - .unwrap_or((None, None)); - - let need_npub = STATE - .read() - .ok() - .map(|s| !s.filter.npubs.is_empty()) - .unwrap_or(false); - let npub_opt = if need_npub { - public_key_to_npub(&evt.event.author).ok() - } else { - None - }; - - if !should_log( - &evt.event.author, - evt.event.kind.try_into().unwrap(), - evt.event.created_at, - &evt.event.content, - npub_opt, - nip05_full_opt, - nip05_local_opt, - ) { - return; - } - - if let Ok(json) = serde_json::to_string(evt) { - info!( - target = "audit", - kind = evt.event.kind, - id = %evt.event.id, - author = %evt.event.author, - created_at = evt.event.created_at, - processed_json = %json, - "AUDIT: processed metadata" - ); - } -} - -#[inline] -pub fn log_listing_event(evt: &RadrootsListingEventIndex) { - let need_npub = STATE - .read() - .ok() - .map(|s| !s.filter.npubs.is_empty()) - .unwrap_or(false); - let npub_opt = if need_npub { - public_key_to_npub(&evt.event.author).ok() - } else { - None - }; - - let (need_full, need_local) = STATE - .read() - .ok() - .map(|s| { - ( - !s.filter.nip05_full.is_empty(), - !s.filter.nip05_local.is_empty(), - ) - }) - .unwrap_or((false, false)); - - let (nip05_full_opt, nip05_local_opt) = if need_full || need_local { - if let Ok(s) = STATE.read() { - if let Some(res) = s.resolver.as_ref() { - let local = res - .nip05_for_author(&evt.event.author) - .map(|s| s.to_string()); - (None, local) - } else { - (None, None) - } - } else { - (None, None) - } - } else { - (None, None) - }; - - if !should_log( - &evt.event.author, - evt.event.kind as u64, - evt.event.created_at, - &evt.event.content, - npub_opt, - nip05_full_opt, - nip05_local_opt, - ) { - return; - } - - if let Ok(json) = serde_json::to_string(evt) { - info!( - target = "audit", - kind = evt.event.kind, - id = %evt.event.id, - author = %evt.event.author, - created_at = evt.event.created_at, - processed_json = %json, - "AUDIT: processed listing" - ); - } -} diff --git a/crates/indexer/src/config.rs b/crates/indexer/src/config.rs @@ -1,63 +0,0 @@ -use anyhow::Result; -use config::{Config, ConfigError, File}; -use serde::{Deserialize, Serialize}; -use std::path::Path; -use thiserror::Error; -use tracing::error; - -#[derive(Debug, Error)] -pub enum SettingsError { - #[error("Configuration loading failed: {0}")] - Load(#[from] ConfigError), -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct Indexer { - pub data_dir: String, - pub logs_dir: String, - pub flush_interval: u64, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct Relay { - pub url: String, - pub database_path: String, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct Listings { - pub country_shard_size: usize, - pub profile_shard_size: usize, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Settings { - pub indexer: Indexer, - pub relay: Relay, - pub listings: Listings, -} - -impl Settings { - pub fn load(config_path: &Option<String>) -> Result<Self, SettingsError> { - let path = config_path.as_deref().unwrap_or("config.toml"); - let builder = if config_path.is_some() { - Config::builder().add_source(File::from(Path::new(path)).required(true)) - } else { - Config::builder().add_source(File::with_name("config").required(true)) - }; - - match builder.build() { - Ok(cfg) => match cfg.try_deserialize::<Settings>() { - Ok(settings) => Ok(settings), - Err(err) => { - error!("Failed to deserialize configuration: {err}"); - Err(SettingsError::Load(err)) - } - }, - Err(err) => { - error!("Failed to load configuration from '{}': {err}", path); - Err(SettingsError::Load(err)) - } - } - } -} diff --git a/crates/indexer/src/domain/indexer/kind.rs b/crates/indexer/src/domain/indexer/kind.rs @@ -1,83 +0,0 @@ -use indexer_utils::paths::{paths_join, PathsError}; -use serde::de::Error as DeError; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use std::fmt; -use std::path::PathBuf; - -use crate::domain::indexer::key::LISTING_INDEX_DIRECTORY; -use crate::domain::indexer::{IndexerKey, PROFILE_INDEX_DIRECTORY}; - -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] -pub enum IndexerEventKind { - Profile, - Listing, -} - -impl IndexerEventKind { - pub const ALL: [IndexerEventKind; 2] = [IndexerEventKind::Profile, IndexerEventKind::Listing]; - - pub const fn as_u64(self) -> u64 { - match self { - IndexerEventKind::Profile => 0, - IndexerEventKind::Listing => 30402, - } - } - - pub const fn paths(self) -> &'static [IndexerKey] { - match self { - IndexerEventKind::Profile => &PROFILE_INDEX_DIRECTORY, - IndexerEventKind::Listing => &LISTING_INDEX_DIRECTORY, - } - } - - pub fn base_path<P: AsRef<std::path::Path>>(self, data_dir: P) -> Result<PathBuf, PathsError> { - paths_join(&[ - data_dir.as_ref().to_str().unwrap(), - "static", - "events", - &self.as_u64().to_string(), - ]) - } -} - -impl fmt::Display for IndexerEventKind { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.as_u64()) - } -} - -impl Serialize for IndexerEventKind { - fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> - where - S: Serializer, - { - serializer.serialize_u64(self.as_u64()) - } -} - -impl<'de> Deserialize<'de> for IndexerEventKind { - fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> - where - D: Deserializer<'de>, - { - let v = u64::deserialize(deserializer)?; - IndexerEventKind::try_from(v) - .map_err(|_| DeError::custom(format!("invalid event kind: {}", v))) - } -} - -#[derive(thiserror::Error, Debug)] -#[error("unknown event kind: {0}")] -pub struct IndexerEventKindParseError(pub u64); - -impl TryFrom<u64> for IndexerEventKind { - type Error = IndexerEventKindParseError; - - fn try_from(val: u64) -> Result<Self, Self::Error> { - match val { - 0 => Ok(IndexerEventKind::Profile), - 30402 => Ok(IndexerEventKind::Listing), - other => Err(IndexerEventKindParseError(other)), - } - } -} diff --git a/crates/indexer/src/domain/indexer/models/listing.rs b/crates/indexer/src/domain/indexer/models/listing.rs @@ -1,572 +0,0 @@ -use indexer_utils::{ - file::fs_mkdir, - logs::truncate_log, - nostr::public_key_to_npub, - write::{compute_hash, write_hash, write_json}, -}; -use radroots_events::listing::models::{RadrootsListingEventIndex, RadrootsListingEventMetadata}; -use radroots_events_indexed::{RadrootsEventsIndexedManifest, RadrootsEventsIndexedShardMetadata}; -use std::{collections::BTreeMap, fs, path::PathBuf}; -use tracing::{instrument, warn}; - -use crate::{ - audit, - domain::{ - events::ToRadrootsListingEventIndex, - indexer::{ - key::LISTING_INDEX_DIRECTORY, - kind::IndexerEventKind, - models::{EventIndexes, NostrEventsStaticError, WriteEventIndexes}, - }, - resolvers::profile::ProfileResolver, - }, - relay::event::RelayIndexerEvent, - Settings, -}; - -macro_rules! write_if_stale { - ($path:expr, $data:expr, $updated:expr) => {{ - let hash = compute_hash(&$data)?; - let hash_path = $path.with_extension("sha256.txt"); - - let needs_write = if $path.exists() && hash_path.exists() { - let stored = fs::read_to_string(&hash_path)?; - stored.trim() != hash - } else { - true - }; - - if needs_write { - write_json(&$path, &$data)?; - write_hash(&$path, &hash)?; - $updated.push($path.clone()); - } - - hash - }}; -} - -#[derive(Debug)] -pub struct EventListingIndexes { - events: Vec<RadrootsListingEventIndex>, - events_id: BTreeMap<String, RadrootsListingEventIndex>, - country_ids: BTreeMap<String, Vec<String>>, - author_ids: BTreeMap<String, Vec<String>>, - npub_ids: BTreeMap<String, Vec<String>>, - nip05_ids: BTreeMap<String, Vec<String>>, -} - -impl EventListingIndexes { - pub fn build_with_profiles( - raw_events: &[RelayIndexerEvent], - profiles: &ProfileResolver, - ) -> Result<Self, NostrEventsStaticError> { - let mut events: Vec<RadrootsListingEventIndex> = Vec::with_capacity(raw_events.len()); - let mut events_id: BTreeMap<String, RadrootsListingEventIndex> = BTreeMap::new(); - let mut country_ids: BTreeMap<String, Vec<String>> = BTreeMap::new(); - let mut author_ids: BTreeMap<String, Vec<String>> = BTreeMap::new(); - let mut npub_ids: BTreeMap<String, Vec<String>> = BTreeMap::new(); - let mut nip05_ids: BTreeMap<String, Vec<String>> = BTreeMap::new(); - - for raw in raw_events { - match raw.clone().to_radroots_listing_event() { - Ok(evt) => { - audit::log_listing_event(&evt); - - let id = evt.metadata.id.clone(); - let author_hex = evt.metadata.author.to_lowercase(); - - let npub = public_key_to_npub(&author_hex) - .map(|s| s.to_lowercase()) - .ok(); - let author_nip05 = profiles.nip05_for_author(&author_hex).map(str::to_owned); - - let country_opt = evt - .metadata - .listing - .location - .as_ref() - .and_then(|loc| loc.country.as_ref()) - .map(|c| c.to_lowercase()); - - events_id.insert(id.clone(), evt.clone()); - events.push(evt.clone()); - - if let Some(country) = country_opt { - country_ids.entry(country).or_default().push(id.clone()); - } - - author_ids.entry(author_hex).or_default().push(id.clone()); - - if let Some(n) = npub { - npub_ids.entry(n).or_default().push(id.clone()); - } - if let Some(n05) = author_nip05 { - let n05 = n05.to_lowercase(); - nip05_ids.entry(n05).or_default().push(id.clone()); - } - } - Err(err) => { - warn!( - kind = raw.kind.as_u64(), - id = %raw.id, - author = %raw.author, - content = %truncate_log(&raw.content, 1000), - tags = ?raw.tags, - error = %err, - "Skipping malformed listing event" - ); - } - } - } - - let sort_ids = |ids: &mut Vec<String>, - map: &BTreeMap<String, RadrootsListingEventIndex>| { - ids.sort_unstable_by(|a, b| { - let pa = map - .get(a) - .map(|e| e.metadata.published_at) - .unwrap_or_default(); - let pb = map - .get(b) - .map(|e| e.metadata.published_at) - .unwrap_or_default(); - pb.cmp(&pa).then(a.cmp(b)) - }); - }; - - for ids in country_ids.values_mut() { - sort_ids(ids, &events_id); - } - for ids in author_ids.values_mut() { - sort_ids(ids, &events_id); - } - for ids in npub_ids.values_mut() { - sort_ids(ids, &events_id); - } - for ids in nip05_ids.values_mut() { - ids.sort_unstable_by(|a, b| { - let pa = events_id - .get(a) - .map(|e| e.metadata.published_at) - .unwrap_or_default(); - let pb = events_id - .get(b) - .map(|e| e.metadata.published_at) - .unwrap_or_default(); - pb.cmp(&pa).then(a.cmp(b)) - }); - } - - Ok(EventListingIndexes { - events, - events_id, - country_ids, - author_ids, - npub_ids, - nip05_ids, - }) - } -} - -impl EventIndexes for EventListingIndexes { - type Event = RelayIndexerEvent; - - fn subdirs() -> &'static [crate::domain::indexer::IndexerKey] { - &LISTING_INDEX_DIRECTORY - } - - #[instrument(skip(raw_events), fields(event_count = raw_events.len()))] - fn build(raw_events: &[Self::Event]) -> Result<Self, NostrEventsStaticError> { - let empty = ProfileResolver::default(); - Self::build_with_profiles(raw_events, &empty) - } -} - -impl EventListingIndexes { - fn format_shard_filename(ix: usize) -> String { - format!("shards/{:06}.json", ix) - } - - fn shard_vec<T: Clone>(items: &[T], size: usize) -> Vec<Vec<T>> { - if size == 0 { - return vec![items.to_vec()]; - } - let mut out = Vec::with_capacity((items.len() + size - 1) / size); - let mut i = 0; - while i < items.len() { - let end = (i + size).min(items.len()); - out.push(items[i..end].to_vec()); - i = end; - } - out - } -} - -impl WriteEventIndexes for EventListingIndexes { - fn write(&self, settings: &Settings, updated: &mut Vec<PathBuf>) -> anyhow::Result<()> { - let base: PathBuf = IndexerEventKind::Listing.base_path(&settings.indexer.data_dir)?; - fs_mkdir(&[&base])?; - - { - let idxs_root = base.join("events.json"); - let ids: Vec<&String> = self.events.iter().map(|e| &e.event.id).collect(); - write_if_stale!(idxs_root, ids, updated); - } - - { - let sub = base.join("id"); - fs_mkdir(&[&sub])?; - let keys: Vec<String> = self.events_id.keys().cloned().collect(); - write_if_stale!(sub.join("indexes.json"), keys, updated); - - for (id, evt) in &self.events_id { - let dir = sub.join(id.to_lowercase()); - fs_mkdir(&[&dir])?; - write_if_stale!(dir.join("event.json"), evt.event.clone(), updated); - write_if_stale!(dir.join("data.json"), evt.metadata.clone(), updated); - } - } - - { - let sub_country = base.join(crate::domain::indexer::IndexerKey::Country.as_str()); - fs_mkdir(&[&sub_country])?; - let country_codes: Vec<String> = self.country_ids.keys().cloned().collect(); - write_if_stale!(sub_country.join("indexes.json"), country_codes, updated); - - for (cc, ids) in &self.country_ids { - let cc_dir = sub_country.join(cc); - let shards_dir = cc_dir.join("shards"); - fs_mkdir(&[&cc_dir])?; - fs_mkdir(&[&shards_dir])?; - - let mut data_items: Vec<RadrootsListingEventMetadata> = - Vec::with_capacity(ids.len()); - for id in ids { - if let Some(evt) = self.events_id.get(id) { - data_items.push(evt.metadata.clone()); - } - } - - let shard_size = settings.listings.country_shard_size; - - let shards = Self::shard_vec(&data_items, shard_size); - - let (country_first_pub, country_last_pub) = - if let (Some(f), Some(l)) = (data_items.first(), data_items.last()) { - (f.published_at, l.published_at) - } else { - (0, 0) - }; - - let mut manifest = RadrootsEventsIndexedManifest { - country: cc.clone(), - total: u32::try_from(data_items.len()).expect("too many data items for u32"), - shard_size: u32::try_from(shard_size).expect("shard_size too large for u32"), - first_published_at: country_first_pub, - last_published_at: country_last_pub, - shards: Vec::with_capacity(shards.len()), - }; - - for (ix, chunk) in shards.into_iter().enumerate() { - let file_rel = Self::format_shard_filename(ix); - let file_abs = cc_dir.join(&file_rel); - if let Some(parent) = file_abs.parent() { - fs_mkdir(&[&parent])?; - } - - let sha = write_if_stale!(file_abs, chunk, updated); - - let (first_id, first_pub, last_id, last_pub) = if let (Some(f), Some(l)) = ( - data_items.get(ix * shard_size), - data_items.get(((ix + 1) * shard_size).saturating_sub(1)), - ) { - (f.id.clone(), f.published_at, l.id.clone(), l.published_at) - } else { - let fp = chunk - .first() - .map(|x| (x.id.clone(), x.published_at)) - .unwrap_or_default(); - let lp = chunk - .last() - .map(|x| (x.id.clone(), x.published_at)) - .unwrap_or_default(); - (fp.0, fp.1, lp.0, lp.1) - }; - - manifest.shards.push(RadrootsEventsIndexedShardMetadata { - file: file_rel, - count: u32::try_from(chunk.len()).expect("chunk length too large for u32"), - first_id, - last_id, - first_published_at: first_pub, - last_published_at: last_pub, - sha256: sha, - }); - } - - write_if_stale!(cc_dir.join("manifest.json"), manifest, updated); - } - } - - { - let sub_author = base.join(crate::domain::indexer::IndexerKey::Author.as_str()); - fs_mkdir(&[&sub_author])?; - let authors: Vec<String> = self.author_ids.keys().cloned().collect(); - write_if_stale!(sub_author.join("indexes.json"), authors, updated); - - for (author, ids) in &self.author_ids { - let dir = sub_author.join(author); - let shards_dir = dir.join("shards"); - fs_mkdir(&[&dir, &shards_dir])?; - - let mut data_items: Vec<RadrootsListingEventMetadata> = - Vec::with_capacity(ids.len()); - for id in ids { - if let Some(evt) = self.events_id.get(id) { - data_items.push(evt.metadata.clone()); - } - } - - let shard_size = settings.listings.profile_shard_size; - let shards = Self::shard_vec(&data_items, shard_size); - - let (first_pub, last_pub) = - if let (Some(f), Some(l)) = (data_items.first(), data_items.last()) { - (f.published_at, l.published_at) - } else { - (0, 0) - }; - - let mut manifest = RadrootsEventsIndexedManifest { - country: author.clone(), - total: u32::try_from(data_items.len()).expect("too many data items for u32"), - shard_size: u32::try_from(shard_size).expect("shard_size too large for u32"), - first_published_at: first_pub, - last_published_at: last_pub, - shards: Vec::with_capacity(shards.len()), - }; - - for (ix, chunk) in shards.into_iter().enumerate() { - let file_rel = Self::format_shard_filename(ix); - let file_abs = dir.join(&file_rel); - if let Some(parent) = file_abs.parent() { - fs_mkdir(&[&parent])?; - } - - let sha = write_if_stale!(file_abs, chunk, updated); - - let (first_id, first_published_at, last_id, last_published_at) = - if let (Some(f), Some(l)) = ( - data_items.get(ix * shard_size), - data_items.get(((ix + 1) * shard_size).saturating_sub(1)), - ) { - (f.id.clone(), f.published_at, l.id.clone(), l.published_at) - } else { - let fp = data_items - .get(ix * shard_size) - .map(|x| (x.id.clone(), x.published_at)) - .or_else(|| chunk.first().map(|x| (x.id.clone(), x.published_at))) - .unwrap_or_default(); - - let lp = data_items - .get(((ix + 1) * shard_size).saturating_sub(1)) - .map(|x| (x.id.clone(), x.published_at)) - .or_else(|| chunk.last().map(|x| (x.id.clone(), x.published_at))) - .unwrap_or_default(); - - (fp.0, fp.1, lp.0, lp.1) - }; - - manifest.shards.push(RadrootsEventsIndexedShardMetadata { - file: file_rel, - count: u32::try_from(std::cmp::min( - shard_size, - data_items.len().saturating_sub(ix * shard_size), - )) - .expect("chunk length too large for u32"), - first_id, - last_id, - first_published_at, - last_published_at, - sha256: sha, - }); - } - - write_if_stale!(dir.join("manifest.json"), manifest, updated); - } - } - - { - let sub_npub = base.join(crate::domain::indexer::IndexerKey::Npub.as_str()); - fs_mkdir(&[&sub_npub])?; - let npubs: Vec<String> = self.npub_ids.keys().cloned().collect(); - write_if_stale!(sub_npub.join("indexes.json"), npubs, updated); - - for (npub, ids) in &self.npub_ids { - let dir = sub_npub.join(npub); - let shards_dir = dir.join("shards"); - fs_mkdir(&[&dir, &shards_dir])?; - - let mut data_items: Vec<RadrootsListingEventMetadata> = - Vec::with_capacity(ids.len()); - for id in ids { - if let Some(evt) = self.events_id.get(id) { - data_items.push(evt.metadata.clone()); - } - } - - let shard_size = settings.listings.profile_shard_size; - let shards = Self::shard_vec(&data_items, shard_size); - - let (first_pub, last_pub) = - if let (Some(f), Some(l)) = (data_items.first(), data_items.last()) { - (f.published_at, l.published_at) - } else { - (0, 0) - }; - - let mut manifest = RadrootsEventsIndexedManifest { - country: npub.clone(), - total: u32::try_from(data_items.len()).expect("too many data items for u32"), - shard_size: u32::try_from(shard_size).expect("shard_size too large for u32"), - first_published_at: first_pub, - last_published_at: last_pub, - shards: Vec::with_capacity(shards.len()), - }; - - for (ix, chunk) in shards.into_iter().enumerate() { - let file_rel = Self::format_shard_filename(ix); - let file_abs = dir.join(&file_rel); - if let Some(parent) = file_abs.parent() { - fs_mkdir(&[&parent])?; - } - - let sha = write_if_stale!(file_abs, chunk, updated); - - let (first_id, first_published_at, last_id, last_published_at) = - if let (Some(f), Some(l)) = ( - data_items.get(ix * shard_size), - data_items.get(((ix + 1) * shard_size).saturating_sub(1)), - ) { - (f.id.clone(), f.published_at, l.id.clone(), l.published_at) - } else { - let fp = data_items - .get(ix * shard_size) - .map(|x| (x.id.clone(), x.published_at)) - .or_else(|| chunk.first().map(|x| (x.id.clone(), x.published_at))) - .unwrap_or_default(); - - let lp = data_items - .get(((ix + 1) * shard_size).saturating_sub(1)) - .map(|x| (x.id.clone(), x.published_at)) - .or_else(|| chunk.last().map(|x| (x.id.clone(), x.published_at))) - .unwrap_or_default(); - - (fp.0, fp.1, lp.0, lp.1) - }; - - manifest.shards.push(RadrootsEventsIndexedShardMetadata { - file: file_rel, - count: u32::try_from(std::cmp::min( - shard_size, - data_items.len().saturating_sub(ix * shard_size), - )) - .expect("chunk length too large for u32"), - first_id, - last_id, - first_published_at, - last_published_at, - sha256: sha, - }); - } - - write_if_stale!(dir.join("manifest.json"), manifest, updated); - } - - { - let sub_nip05 = base.join(crate::domain::indexer::IndexerKey::Nip05.as_str()); - fs_mkdir(&[&sub_nip05])?; - let names: Vec<String> = self.nip05_ids.keys().cloned().collect(); - write_if_stale!(sub_nip05.join("indexes.json"), names, updated); - - for (name, ids) in &self.nip05_ids { - let dir = sub_nip05.join(name); - let shards_dir = dir.join("shards"); - fs_mkdir(&[&dir, &shards_dir])?; - - let mut data_items = Vec::with_capacity(ids.len()); - for id in ids { - if let Some(evt) = self.events_id.get(id) { - data_items.push(evt.metadata.clone()); - } - } - - let shard_size = settings.listings.profile_shard_size; - let shards = Self::shard_vec(&data_items, shard_size); - - let (first_pub, last_pub) = - if let (Some(f), Some(l)) = (data_items.first(), data_items.last()) { - (f.published_at, l.published_at) - } else { - (0, 0) - }; - - let mut manifest = RadrootsEventsIndexedManifest { - country: name.clone(), - total: u32::try_from(data_items.len()).expect("u32 overflow"), - shard_size: u32::try_from(shard_size).expect("u32 overflow"), - first_published_at: first_pub, - last_published_at: last_pub, - shards: Vec::with_capacity(shards.len()), - }; - - for (ix, chunk) in shards.into_iter().enumerate() { - let file_rel = Self::format_shard_filename(ix); - let file_abs = dir.join(&file_rel); - if let Some(parent) = file_abs.parent() { - fs_mkdir(&[&parent])?; - } - - let sha = write_if_stale!(file_abs, chunk, updated); - - let (first_id, first_pub, last_id, last_pub) = if let (Some(f), Some(l)) = ( - data_items.get(ix * shard_size), - data_items.get(((ix + 1) * shard_size).saturating_sub(1)), - ) { - (f.id.clone(), f.published_at, l.id.clone(), l.published_at) - } else { - let fp = chunk - .first() - .map(|x| (x.id.clone(), x.published_at)) - .unwrap_or_default(); - let lp = chunk - .last() - .map(|x| (x.id.clone(), x.published_at)) - .unwrap_or_default(); - (fp.0, fp.1, lp.0, lp.1) - }; - - manifest.shards.push(RadrootsEventsIndexedShardMetadata { - file: file_rel, - count: u32::try_from(std::cmp::min( - shard_size, - data_items.len().saturating_sub(ix * shard_size), - )) - .expect("u32 overflow"), - first_id, - last_id, - first_published_at: first_pub, - last_published_at: last_pub, - sha256: sha, - }); - } - - write_if_stale!(dir.join("manifest.json"), manifest, updated); - } - } - } - - Ok(()) - } -} diff --git a/crates/indexer/src/domain/indexer/models/profile.rs b/crates/indexer/src/domain/indexer/models/profile.rs @@ -1,177 +0,0 @@ -use indexer_utils::{ - file::fs_mkdir, - logs::truncate_log, - nostr::public_key_to_npub, - write::{compute_hash, write_hash, write_json}, -}; -use radroots_events::profile::models::RadrootsProfileEventIndex; -use std::{collections::BTreeMap, fs, path::PathBuf}; -use tracing::{instrument, warn}; - -use crate::{ - audit, - domain::{ - events::ToRadrootsProfileEventIndex, - indexer::{ - kind::IndexerEventKind, - models::{EventIndexes, NostrEventsStaticError, WriteEventIndexes}, - IndexerKey, PROFILE_INDEX_DIRECTORY, - }, - }, - relay::event::RelayIndexerEvent, - Settings, -}; - -macro_rules! write_if_stale { - ($path:expr, $data:expr, $updated:expr) => {{ - let hash = compute_hash(&$data)?; - let hash_path = $path.with_extension("sha256.txt"); - - let needs_write = if $path.exists() && hash_path.exists() { - let stored = fs::read_to_string(&hash_path)?; - stored.trim() != hash - } else { - true - }; - - if needs_write { - write_json(&$path, &$data)?; - write_hash(&$path, &hash)?; - $updated.push($path.clone()); - } - }}; -} - -#[derive(Debug)] -pub struct EventProfileIndexes { - events: Vec<RadrootsProfileEventIndex>, - events_id: BTreeMap<String, RadrootsProfileEventIndex>, - events_author: BTreeMap<String, RadrootsProfileEventIndex>, - events_nip05: BTreeMap<String, RadrootsProfileEventIndex>, - events_npub: BTreeMap<String, RadrootsProfileEventIndex>, -} - -impl EventIndexes for EventProfileIndexes { - type Event = RelayIndexerEvent; - - fn subdirs() -> &'static [IndexerKey] { - &PROFILE_INDEX_DIRECTORY - } - - #[instrument(skip(raw_events), fields(event_count = raw_events.len()))] - fn build(raw_events: &[Self::Event]) -> Result<Self, NostrEventsStaticError> { - let mut events = Vec::with_capacity(raw_events.len()); - let mut events_id = BTreeMap::new(); - let mut events_author = BTreeMap::new(); - let mut events_nip05 = BTreeMap::new(); - let mut events_npub = BTreeMap::new(); - - for raw in raw_events { - match raw.clone().to_radroots_profile_event() { - Ok(evt) => { - audit::log_profile_event(&evt); - let id = evt.event.id.clone(); - let author = evt.event.author.clone(); - events.push(evt.clone()); - events_id.insert(id.clone(), evt.clone()); - events_author.insert(author.clone(), evt.clone()); - - if let Ok(npub) = public_key_to_npub(&author) { - events_npub.insert(npub.to_lowercase(), evt.clone()); - } - if let Some(nip05) = &evt.metadata.profile.nip05 { - let normalized = nip05.replace("@radroots.market", ""); - events_nip05.insert(normalized, evt.clone()); - } - } - Err(err) => { - warn!( - kind = raw.kind.as_u64(), - id = %raw.id, - author = %raw.author, - content = %truncate_log(&raw.content, 1000), - tags = ?raw.tags, - error = %err, - "Skipping malformed metadata event" - ); - } - } - } - - Ok(EventProfileIndexes { - events, - events_id, - events_author, - events_nip05, - events_npub, - }) - } -} - -impl WriteEventIndexes for EventProfileIndexes { - fn write(&self, settings: &Settings, updated: &mut Vec<PathBuf>) -> anyhow::Result<()> { - let base: PathBuf = IndexerEventKind::Profile.base_path(&settings.indexer.data_dir)?; - fs_mkdir(&[&base])?; - - let idxs_root = base.join("events.json"); - let ids: Vec<&String> = self.events.iter().map(|e| &e.event.id).collect(); - write_if_stale!(idxs_root, ids, updated); - - for &subdir in Self::subdirs().iter() { - let sub_base = base.join(subdir.as_str()); - fs_mkdir(&[sub_base.to_str().unwrap()])?; - - let keys_lower: Vec<String> = match subdir { - IndexerKey::Id => self.events_id.keys().map(|k| k.to_lowercase()).collect(), - IndexerKey::Author => self - .events_author - .keys() - .map(|k| k.to_lowercase()) - .collect(), - IndexerKey::Nip05 => self.events_nip05.keys().map(|k| k.to_lowercase()).collect(), - IndexerKey::Npub => self.events_npub.keys().map(|k| k.to_lowercase()).collect(), - _ => Vec::new(), - }; - let idxs_subdir = sub_base.join("indexes.json"); - write_if_stale!(idxs_subdir, keys_lower, updated); - - match subdir { - IndexerKey::Id => { - for (key, evt) in &self.events_id { - let dir = sub_base.join(key.to_lowercase()); - fs_mkdir(&[dir.to_str().unwrap()])?; - write_if_stale!(dir.join("event.json"), evt.event.clone(), updated); - write_if_stale!(dir.join("metadata.json"), evt.metadata.clone(), updated); - } - } - IndexerKey::Author => { - for (key, evt) in &self.events_author { - let dir = sub_base.join(key.to_lowercase()); - fs_mkdir(&[dir.to_str().unwrap()])?; - write_if_stale!(dir.join("event.json"), evt.event.clone(), updated); - write_if_stale!(dir.join("metadata.json"), evt.metadata.clone(), updated); - } - } - IndexerKey::Nip05 => { - for (key, evt) in &self.events_nip05 { - let dir = sub_base.join(key.to_lowercase()); - fs_mkdir(&[dir.to_str().unwrap()])?; - write_if_stale!(dir.join("event.json"), evt.event.clone(), updated); - write_if_stale!(dir.join("metadata.json"), evt.metadata.clone(), updated); - } - } - IndexerKey::Npub => { - for (key, evt) in &self.events_npub { - let dir = sub_base.join(key.to_lowercase()); - fs_mkdir(&[dir.to_str().unwrap()])?; - write_if_stale!(dir.join("event.json"), evt.event.clone(), updated); - write_if_stale!(dir.join("metadata.json"), evt.metadata.clone(), updated); - } - } - _ => {} - } - } - - Ok(()) - } -} diff --git a/crates/indexer/src/lib.rs b/crates/indexer/src/lib.rs @@ -1,213 +0,0 @@ -use anyhow::{Context, Result}; -use indexer_utils::{ - database::IndexerDb, - sqlite::{sqlite_conn, sqlite_stmt}, -}; -use std::{ - collections::HashMap, - time::{Duration, Instant}, -}; -use tracing::info; - -pub mod cli; -pub mod config; -pub mod telemetry; -pub mod domain { - pub mod events; - pub mod indexer; - pub mod resolvers; -} -pub mod relay { - pub mod event; - pub mod record; -} - -#[cfg(feature = "audit")] -pub mod audit; - -#[cfg(not(feature = "audit"))] -pub mod audit { - use radroots_events::{ - listing::models::RadrootsListingEventIndex, profile::models::RadrootsProfileEventIndex, - }; - - pub fn log_indexer_event(_: &crate::relay::event::RelayIndexerEvent) {} - pub fn log_profile_event(_: &RadrootsProfileEventIndex) {} - pub fn log_listing_event(_: &RadrootsListingEventIndex) {} -} - -use crate::{ - domain::{ - indexer::{ - kind::IndexerEventKind, - models::{EventIndexes, EventListingIndexes, EventProfileIndexes, WriteEventIndexes}, - }, - resolvers::profile::ProfileResolver, - }, - relay::event::RelayIndexerEvent, -}; -pub use config::Settings; -pub use relay::record::RelayEventRecord; - -pub async fn run(settings: Settings) -> Result<()> { - let db_idx = IndexerDb::open(&format!("{}/indexer_db", settings.indexer.data_dir))?; - let tree_raw = "hashes"; - let tree_events_profile = "profile_events"; - let tree_events_listing = "listing_events"; - let tree_stats = "stats"; - - let last_created_at_db: u32 = db_idx - .get_raw(tree_stats, "last_created_at")? - .map(|ivec| { - let arr: [u8; 4] = ivec.as_ref().try_into().unwrap(); - u32::from_be_bytes(arr) - }) - .unwrap_or(0); - let mut last_created_at = last_created_at_db; - - let event_kinds = IndexerEventKind::ALL - .iter() - .map(|k| k.as_u64().to_string()) - .collect::<Vec<_>>() - .join(", "); - - let relay_query = format!( - "SELECT hex(event_hash), hex(author), created_at, kind, content \ - FROM event WHERE kind IN ({}) AND created_at > ?", - event_kinds - ); - - loop { - let iteration_start = Instant::now(); - let relay_db = sqlite_conn(&settings.relay.database_path).with_context(|| { - format!( - "Could not open relay DB at {}", - settings.relay.database_path - ) - })?; - let mut stmt = - sqlite_stmt(&relay_db, &relay_query).context("Could not prepare event query")?; - - let records: Vec<RelayEventRecord> = stmt - .query_map([last_created_at], RelayEventRecord::from_row)? - .collect::<Result<_, _>>() - .context("collecting RelayEventRecord rows")?; - info!(record_count = records.len(), "Loaded relay records"); - - let mut records_kind: HashMap<IndexerEventKind, Vec<RelayIndexerEvent>> = HashMap::new(); - for rec in records.into_iter() { - let iev = RelayIndexerEvent::try_from(rec)?; - audit::log_indexer_event(&iev); - records_kind.entry(iev.kind).or_default().push(iev); - } - - let mut need_rebuild_listing = false; - - if let Some(profile_events) = records_kind.remove(&IndexerEventKind::Profile) { - if !profile_events.is_empty() { - for ev in &profile_events { - last_created_at = last_created_at.max(ev.created_at); - let id = &ev.id; - let hash = &ev.hash; - let skip = if let Some(old) = db_idx.get_raw(tree_raw, id)? { - old.as_ref() == hash.as_bytes() - } else { - false - }; - if skip { - continue; - } - - db_idx.insert(tree_events_profile, id, ev)?; - db_idx.insert_raw(tree_raw, id, hash.as_bytes())?; - } - - db_idx.insert_raw( - tree_stats, - "last_created_at", - &last_created_at.to_be_bytes(), - )?; - db_idx.flush()?; - - let raw_profile_events: Vec<RelayIndexerEvent> = - db_idx.get_all(tree_events_profile)?; - let indexed_profile_events = EventProfileIndexes::build(&raw_profile_events)?; - let mut updated_indexes = Vec::new(); - indexed_profile_events.write(&settings, &mut updated_indexes)?; - info!( - written = updated_indexes.len(), - "Written {} index files", - updated_indexes.len() - ); - - need_rebuild_listing = true; - } - } - - let raw_profile_events: Vec<RelayIndexerEvent> = db_idx.get_all(tree_events_profile)?; - let profiles = ProfileResolver::from_metadata(&raw_profile_events); - - if let Some(listing_events) = records_kind.remove(&IndexerEventKind::Listing) { - if !listing_events.is_empty() { - for ev in &listing_events { - last_created_at = last_created_at.max(ev.created_at); - let id = &ev.id; - let hash = &ev.hash; - let skip = if let Some(old) = db_idx.get_raw(tree_raw, id)? { - old.as_ref() == hash.as_bytes() - } else { - false - }; - if skip { - continue; - } - db_idx.insert(tree_events_listing, id, ev)?; - db_idx.insert_raw(tree_raw, id, hash.as_bytes())?; - } - - db_idx.insert_raw( - tree_stats, - "last_created_at", - &last_created_at.to_be_bytes(), - )?; - db_idx.flush()?; - - let raw_listing_events: Vec<RelayIndexerEvent> = - db_idx.get_all(tree_events_listing)?; - let listing_indexes = EventListingIndexes::build(&raw_listing_events)?; - let mut updated_listing = Vec::new(); - listing_indexes.write(&settings, &mut updated_listing)?; - info!( - written = updated_listing.len(), - "Written {} listing index files", - updated_listing.len() - ); - - need_rebuild_listing = true; - } - } - - if need_rebuild_listing { - let raw_listing_events: Vec<RelayIndexerEvent> = db_idx.get_all(tree_events_listing)?; - let listing_indexes = - EventListingIndexes::build_with_profiles(&raw_listing_events, &profiles)?; - let mut updated_listing = Vec::new(); - listing_indexes.write(&settings, &mut updated_listing)?; - info!( - written = updated_listing.len(), - "Written {} listing index files", - updated_listing.len() - ); - } - - let elapsed = iteration_start.elapsed(); - let interval = Duration::from_secs(settings.indexer.flush_interval); - let delay = interval.saturating_sub(elapsed); - info!( - elapsed_ms = elapsed.as_millis(), - sleeping_ms = delay.as_millis(), - "Iteration complete" - ); - tokio::time::sleep(delay).await; - } -} diff --git a/crates/indexer/src/main.rs b/crates/indexer/src/main.rs @@ -1,24 +0,0 @@ -use anyhow::Result; -use clap::Parser; -use tracing::{error, info}; - -use radroots_market_relay_indexer::{cli, config::Settings, run, telemetry}; - -#[tokio::main] -async fn main() { - if let Err(err) = setup().await { - error!("Fatal error: {err:#?}"); - std::process::exit(1); - } -} - -async fn setup() -> Result<()> { - let args = cli::Args::parse(); - - let settings = Settings::load(&args.config)?; - - telemetry::init(&settings.indexer.logs_dir); - info!("Service starting"); - - run(settings).await -} diff --git a/crates/indexer/src/relay/record.rs b/crates/indexer/src/relay/record.rs @@ -1,37 +0,0 @@ -use indexer_utils::sqlite::{RustqliteError, SqliteResult, SqliteRow, SqliteType}; -use serde::Serialize; - -use crate::domain::indexer::kind::{IndexerEventKind, IndexerEventKindParseError}; - -#[derive(Clone, Debug, Serialize)] -pub struct RelayEventRecord { - pub event_hash: String, - pub author: String, - pub created_at: u32, - pub kind: IndexerEventKind, - pub content: String, -} - -impl RelayEventRecord { - pub fn from_row(row: &SqliteRow) -> SqliteResult<Self> { - let event_hash: String = row.get(0)?; - let author: String = row.get(1)?; - let created_at: u32 = row.get(2)?; - let kind_num: u32 = row.get(3)?; - - let kind = IndexerEventKind::try_from(kind_num as u64).map_err( - |e: IndexerEventKindParseError| { - RustqliteError::FromSqlConversionFailure(3, SqliteType::Integer, Box::new(e)) - }, - )?; - - let content: String = row.get(4)?; - Ok(RelayEventRecord { - event_hash, - author, - created_at, - kind, - content, - }) - } -} diff --git a/crates/indexer/src/telemetry.rs b/crates/indexer/src/telemetry.rs @@ -1,41 +0,0 @@ -use std::path::Path; -use tracing_appender::rolling; -use tracing_subscriber::{fmt, prelude::*, EnvFilter, Registry}; - -#[cfg(feature = "audit")] -use tracing_subscriber::filter::Targets; - -pub fn init(logs_dir: impl AsRef<Path>) { - let file_appender = rolling::daily(&logs_dir, concat!(env!("CARGO_PKG_NAME"), ".log")); - let (file_writer, guard) = tracing_appender::non_blocking(file_appender); - std::mem::forget(guard); - - let stdout_layer = fmt::layer().with_writer(std::io::stdout).with_target(false); - - let file_layer = fmt::layer() - .with_writer(file_writer) - .with_ansi(false) - .with_target(false); - - let subscriber = Registry::default() - .with(EnvFilter::from_default_env()) - .with(stdout_layer) - .with(file_layer); - - #[cfg(feature = "audit")] - let subscriber = { - let audit_app = rolling::daily(&logs_dir, "audit.log"); - let (audit_writer, audit_guard) = tracing_appender::non_blocking(audit_app); - std::mem::forget(audit_guard); - - let audit_layer = fmt::layer() - .with_writer(audit_writer) - .with_ansi(false) - .with_target(true) - .with_filter(Targets::new().with_target("audit", tracing::Level::INFO)); - - subscriber.with(audit_layer) - }; - - subscriber.init(); -} diff --git a/dev/docker-compose.yml b/dev/docker-compose.yml @@ -0,0 +1,24 @@ +services: + rad-roots-dev-relay: + platform: linux/amd64 + image: scsibug/nostr-rs-relay:latest + container_name: rad-roots-dev-relay + ports: + - "21648:8080" + volumes: + - ./data/relay:/usr/src/app/db + - ./relay-config.toml:/usr/src/app/config.toml + environment: + - RUST_LOG=info,nostr_rs_relay=debug + restart: unless-stopped + + rad-roots-dev-indexes: + platform: linux/amd64 + image: nginx:alpine + container_name: rad-roots-dev-indexes + ports: + - "21649:80" + volumes: + - ./data/indexer/static:/usr/share/nginx/html/static:ro + - ./indexer-nginx.conf:/etc/nginx/conf.d/default.conf:ro + restart: unless-stopped diff --git a/dev/indexer-nginx.conf b/dev/indexer-nginx.conf @@ -0,0 +1,68 @@ +server { + listen 80; + server_name _; + + root /usr/share/nginx/html/static; + + location ^~ /indexer_db/ { + deny all; + return 404; + } + + location ~ ^/events/\d+/country/[^/]+/shards/.*\.json$ { + # CORS + add_header Access-Control-Allow-Origin "*" always; + add_header Access-Control-Allow-Methods "GET, OPTIONS" always; + add_header Access-Control-Allow-Headers "Origin, Content-Type, Accept" always; + + if ($request_method = OPTIONS) { + add_header Access-Control-Max-Age 1728000 always; + return 204; + } + + expires -1; + add_header Cache-Control "no-store, no-cache, must-revalidate, proxy-revalidate" always; + etag off; + + try_files $uri =404; + } + + location ~ ^/events/\d+/country/[^/]+/manifest\.json$ { + add_header Access-Control-Allow-Origin "*" always; + add_header Access-Control-Allow-Methods "GET, OPTIONS" always; + add_header Access-Control-Allow-Headers "Origin, Content-Type, Accept" always; + + if ($request_method = OPTIONS) { + add_header Access-Control-Max-Age 1728000 always; + return 204; + } + + expires -1; + add_header Cache-Control "no-store, no-cache, must-revalidate, proxy-revalidate" always; + etag off; + + try_files $uri =404; + } + + location / { + index index.html; + autoindex on; + autoindex_exact_size off; + autoindex_localtime on; + + add_header Access-Control-Allow-Origin "*" always; + add_header Access-Control-Allow-Methods "GET, OPTIONS" always; + add_header Access-Control-Allow-Headers "Origin, Content-Type, Accept" always; + + if ($request_method = OPTIONS) { + add_header Access-Control-Max-Age 1728000 always; + return 204; + } + + expires -1; + add_header Cache-Control "no-store, no-cache, must-revalidate, proxy-revalidate" always; + etag off; + + try_files $uri $uri/ =404; + } +} diff --git a/dev/relay-config.toml b/dev/relay-config.toml @@ -0,0 +1,253 @@ +# Nostr-rs-relay configuration + +[info] +# The advertised URL for the Nostr websocket. +relay_url = "ws://localhost:21648" + +# Relay information for clients. Put your unique server name here. +name = "rad-roots-dev-relay" + +# Description +description = "rad-roots-dev-relay" + +# Administrative contact pubkey (32-byte hex, not npub) +#pubkey = "0c2d168a4ae8ca58c9f1ab237b5df682599c6c7ab74307ea8b05684b60405d41" + +# Administrative contact URI +#contact = "mailto:contact@example.com" + +# Favicon location. Relative to the current directory. Assumes an +# ICO format. +#favicon = "favicon.ico" + +# URL of Relay's icon. +#relay_icon = "https://example.test/img.png" + +# Path to custom relay html page +#relay_page = "index.html" + +[diagnostics] +# Enable tokio tracing (for use with tokio-console) +#tracing = false + +[database] +# Database engine (sqlite/postgres). Defaults to sqlite. +# Support for postgres is currently experimental. +#engine = "sqlite" + +# Directory for SQLite files. Defaults to the current directory. Can +# also be specified (and overriden) with the "--db dirname" command +# line option. +#data_directory = "." + +# Use an in-memory database instead of 'nostr.db'. +# Requires sqlite engine. +# Caution; this will not survive a process restart! +#in_memory = false + +# Database connection pool settings for subscribers: + +# Minimum number of SQLite reader connections +#min_conn = 0 + +# Maximum number of SQLite reader connections. Recommend setting this +# to approx the number of cores. +#max_conn = 8 + +# Database connection string. Required for postgres; not used for +# sqlite. +#connection = "postgresql://postgres:nostr@localhost:7500/nostr" + +# Optional database connection string for writing. Use this for +# postgres clusters where you want to separate reads and writes to +# different nodes. Ignore for single-database instances. +#connection_write = "postgresql://postgres:nostr@localhost:7500/nostr" + +[logging] +# Directory to store log files. Log files roll over daily. +folder_path = "./logs/relay" +file_prefix = "nostr-relay" + +[grpc] +# gRPC interfaces for externalized decisions and other extensions to +# functionality. +# +# Events can be authorized through an external service, by providing +# the URL below. In the event the server is not accessible, events +# will be permitted. The protobuf3 schema used is available in +# `proto/nauthz.proto`. +# event_admission_server = "http://[::1]:50051" + +# If the event admission server denies writes +# in any case (excluding spam filtering). +# This is reflected in the relay information document. +# restricts_write = true + +[network] +# Bind to this network address +address = "0.0.0.0" + +# Listen on this port +port = 8080 + +# If present, read this HTTP header for logging client IP addresses. +# Examples for common proxies, cloudflare: +#remote_ip_header = "x-forwarded-for" +#remote_ip_header = "cf-connecting-ip" + +# Websocket ping interval in seconds, defaults to 5 minutes +#ping_interval = 300 + +[options] +# Reject events that have timestamps greater than this many seconds in +# the future. Recommended to reject anything greater than 30 minutes +# from the current time, but the default is to allow any date. +reject_future_seconds = 1800 + +[limits] +# Limit events created per second, averaged over one minute. Must be +# an integer. If not set (or set to 0), there is no limit. Note: +# this is for the server as a whole, not per-connection. +# +# Limiting event creation is highly recommended if your relay is +# public! +# +#messages_per_sec = 5 + +# Limit client subscriptions created, averaged over one minute. Must +# be an integer. If not set (or set to 0), defaults to unlimited. +# Strongly recommended to set this to a low value such as 10 to ensure +# fair service. +#subscriptions_per_min = 0 + +# UNIMPLEMENTED... +# Limit how many concurrent database connections a client can have. +# This prevents a single client from starting too many expensive +# database queries. Must be an integer. If not set (or set to 0), +# defaults to unlimited (subject to subscription limits). +#db_conns_per_client = 0 + +# Limit blocking threads used for database connections. Defaults to 16. +#max_blocking_threads = 16 + +# Limit the maximum size of an EVENT message. Defaults to 128 KB. +# Set to 0 for unlimited. +#max_event_bytes = 131072 + +# Maximum WebSocket message in bytes. Defaults to 128 KB. +#max_ws_message_bytes = 131072 + +# Maximum WebSocket frame size in bytes. Defaults to 128 KB. +#max_ws_frame_bytes = 131072 + +# Broadcast buffer size, in number of events. This prevents slow +# readers from consuming memory. +#broadcast_buffer = 16384 + +# Event persistence buffer size, in number of events. This provides +# backpressure to senders if writes are slow. +#event_persist_buffer = 4096 + +# Event kind blacklist. Events with these kinds will be discarded. +#event_kind_blacklist = [ +# 70202, +#] + +# Event kind allowlist. Events other than these kinds will be discarded. +#event_kind_allowlist = [ +# 0, 1, 2, 3, 7, 40, 41, 42, 43, 44, 30023, +#] + +# Rejects imprecise requests (kind only and author only etc) +# This is a temperary measure to improve the adoption of outbox model +# Its recommended to have this enabled +limit_scrapers = false + +[authorization] +# Pubkey addresses in this array are whitelisted for event publishing. +# Only valid events by these authors will be accepted, if the variable +# is set. +#pubkey_whitelist = [ +# "35d26e4690cbe1a898af61cc3515661eb5fa763b57bd0b42e45099c8b32fd50f", +# "887645fef0ce0c3c1218d2f5d8e6132a19304cdc57cd20281d082f38cfea0072", +#] +# Enable NIP-42 authentication +#nip42_auth = false +# Send DMs (kind 4 and 44) and gift wraps (kind 1059) only to their authenticated recipients +#nip42_dms = false + +[verified_users] +# NIP-05 verification of users. Can be "enabled" to require NIP-05 +# metadata for event authors, "passive" to perform validation but +# never block publishing, or "disabled" to do nothing. +#mode = "disabled" + +# Domain names that will be prevented from publishing events. +#domain_blacklist = ["wellorder.net"] + +# Domain names that are allowed to publish events. If defined, only +# events NIP-05 verified authors at these domains are persisted. +#domain_whitelist = ["example.com"] + +# Consider an pubkey "verified" if we have a successful validation +# from the NIP-05 domain within this amount of time. Note, if the +# domain provides a successful response that omits the account, +# verification is immediately revoked. +#verify_expiration = "1 week" + +# How long to wait between verification attempts for a specific author. +#verify_update_frequency = "24 hours" + +# How many consecutive failed checks before we give up on verifying +# this author. +#max_consecutive_failures = 20 + +[pay_to_relay] +# Enable pay to relay +#enabled = false + +# Node interface to use +#processor = "ClnRest/LNBits" + +# The cost to be admitted to relay +#admission_cost = 4200 + +# The cost in sats per post +#cost_per_event = 0 + +# Url of node api +#node_url = "<node url>" + +# LNBits api secret +#api_secret = "<ln bits api>" + +# Path to CLN rune +#rune_path = "<rune path>" + +# Nostr direct message on signup +#direct_message=false + +# Terms of service +#terms_message = """ +#This service (and supporting services) are provided "as is", without warranty of any kind, express or implied. +# +#By using this service, you agree: +#* Not to engage in spam or abuse the relay service +#* Not to disseminate illegal content +#* That requests to delete content cannot be guaranteed +#* To use the service in compliance with all applicable laws +#* To grant necessary rights to your content for unlimited time +#* To be of legal age and have capacity to use this service +#* That the service may be terminated at any time without notice +#* That the content you publish may be removed at any time without notice +#* To have your IP address collected to detect abuse or misuse +#* To cooperate with the relay to combat abuse or misuse +#* You may be exposed to content that you might find triggering or distasteful +#* The relay operator is not liable for content produced by users of the relay +#""" + +# Whether or not new sign ups should be allowed +#sign_ups = false + +# optional if `direct_message=false` +#secret_key = "<nostr nsec>" diff --git a/indexer-config.toml b/indexer-config.toml @@ -0,0 +1,12 @@ +[indexer] +data_dir = "./dev/data/indexer" +logs_dir = "./dev/logs/indexer" +flush_interval = 20 + +[relay] +url = "ws://127.0.0.1:21648" +database_path = "./dev/data/relay/nostr.db" + +[listings] +country_shard_size = 1000 +profile_shard_size = 200 diff --git a/crates/indexer/.gitignore b/indexer/.gitignore diff --git a/indexer/Cargo.toml b/indexer/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "radroots-market-indexer" +version = "0.1.0" +authors = ["Radroots Authors"] +description = "Radroots market event indexer" +license = "AGPLv3" +edition = "2021" + +[features] +default = [] +audit = [] + +[dependencies] +radroots-core = { workspace = true } +radroots-events = { workspace = true } +radroots-events-indexed = { workspace = true } + +anyhow = { workspace = true } +clap = { workspace = true } +config = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +tracing-appender = { workspace = true } +once_cell = { workspace = true } +regex = { workspace = true } + +bincode = { version = "2.0", features = ["derive", "serde"] } +rusqlite = { version = "0.32.1", features = ["bundled"] } +sled = "0.34.7" +sha2 = "0.10.9" +nostr = "0.43.0" diff --git a/crates/indexer/LICENSE b/indexer/LICENSE diff --git a/indexer/data/radroots-market-indexer/indexer_db/conf b/indexer/data/radroots-market-indexer/indexer_db/conf @@ -0,0 +1,4 @@ +segment_size: 524288 +use_compression: false +version: 0.34 +vQÁ +\ No newline at end of file diff --git a/indexer/data/radroots-market-indexer/indexer_db/db b/indexer/data/radroots-market-indexer/indexer_db/db Binary files differ. diff --git a/crates/indexer/rust-toolchain.toml b/indexer/rust-toolchain.toml diff --git a/indexer/src/audit.rs b/indexer/src/audit.rs @@ -0,0 +1,388 @@ +#![cfg(feature = "audit")] + +use std::collections::HashSet; +use std::sync::RwLock; + +use crate::utils::nostr::public_key_to_npub; +use once_cell::sync::Lazy; +use regex::Regex; +use tracing::info; + +use crate::domain::resolvers::profile::ProfileResolver; +use crate::relay::event::RelayIndexerEvent; +use radroots_events::listing::models::RadrootsListingEventIndex; +use radroots_events::profile::models::RadrootsProfileEventIndex; + +#[derive(Clone, Debug)] +pub struct AuditFilter { + pub enabled: bool, + pub kinds: Option<HashSet<u64>>, + pub authors: HashSet<String>, + pub npubs: HashSet<String>, + pub nip05_full: HashSet<String>, + pub nip05_local: HashSet<String>, + pub content_re: Option<Regex>, + pub created_at_min: Option<u32>, + pub created_at_max: Option<u32>, +} + +impl Default for AuditFilter { + fn default() -> Self { + Self { + enabled: false, + kinds: None, + authors: HashSet::new(), + npubs: HashSet::new(), + nip05_full: HashSet::new(), + nip05_local: HashSet::new(), + content_re: None, + created_at_min: None, + created_at_max: None, + } + } +} + +impl AuditFilter { + pub fn from_env() -> Self { + let mut f = Self::default(); + + f.enabled = std::env::var("AUDIT_ENABLED") + .map(|v| v.eq_ignore_ascii_case("true") || v == "1") + .unwrap_or(false); + + if let Ok(v) = std::env::var("AUDIT_KINDS") { + let set = v + .split(',') + .filter_map(|s| s.trim().parse::<u64>().ok()) + .collect::<HashSet<_>>(); + if !set.is_empty() { + f.kinds = Some(set); + } + } + + let parse_set = |key: &str| -> HashSet<String> { + std::env::var(key) + .ok() + .map(|s| { + s.split(',') + .map(|x| x.trim().to_lowercase()) + .filter(|x| !x.is_empty()) + .collect() + }) + .unwrap_or_default() + }; + + f.authors = parse_set("AUDIT_AUTHORS"); + f.npubs = parse_set("AUDIT_NPUBS"); + f.nip05_full = parse_set("AUDIT_NIP05"); + f.nip05_local = parse_set("AUDIT_NIP05_LOCAL"); + + if let Ok(rx) = std::env::var("AUDIT_CONTENT_RE") { + if !rx.trim().is_empty() { + if let Ok(re) = Regex::new(&format!("(?i){}", rx)) { + f.content_re = Some(re); + } + } + } + + f.created_at_min = std::env::var("AUDIT_CREATED_AT_MIN") + .ok() + .and_then(|s| s.parse().ok()); + f.created_at_max = std::env::var("AUDIT_CREATED_AT_MAX") + .ok() + .and_then(|s| s.parse().ok()); + + f + } +} + +#[derive(Clone)] +struct AuditState { + filter: AuditFilter, + resolver: Option<ProfileResolver>, +} + +static STATE: Lazy<RwLock<AuditState>> = Lazy::new(|| { + RwLock::new(AuditState { + filter: AuditFilter::from_env(), + resolver: None, + }) +}); + +pub fn reload_from_env() { + if let Ok(mut w) = STATE.write() { + w.filter = AuditFilter::from_env(); + } +} + +pub fn set_profile_resolver(resolver: ProfileResolver) { + if let Ok(mut w) = STATE.write() { + w.resolver = Some(resolver); + } +} + +fn nip05_parts_from_metadata(nip05: &str) -> (String, String) { + let lower = nip05.to_lowercase(); + if let Some((name, domain)) = lower.split_once('@') { + (format!("{name}@{domain}"), name.to_string()) + } else { + (lower.clone(), lower) + } +} + +fn should_log( + author_hex: &str, + kind_u64: u64, + created_at: u32, + content: &str, + npub_opt: Option<String>, + nip05_full_opt: Option<String>, + nip05_local_opt: Option<String>, +) -> bool { + let filter = STATE.read().ok().map(|s| s.filter.clone()); + let Some(f) = filter else { + return false; + }; + if !f.enabled { + return false; + } + + if let Some(kinds) = &f.kinds { + if !kinds.contains(&kind_u64) { + return false; + } + } + + if !f.authors.is_empty() && !f.authors.contains(&author_hex.to_lowercase()) { + return false; + } + + if !f.npubs.is_empty() { + let pass = npub_opt + .as_ref() + .map(|n| f.npubs.contains(&n.to_lowercase())) + .unwrap_or(false); + if !pass { + return false; + } + } + + if !f.nip05_full.is_empty() { + let pass = nip05_full_opt + .as_ref() + .map(|n| f.nip05_full.contains(&n.to_lowercase())) + .unwrap_or(false); + if !pass { + return false; + } + } + + if !f.nip05_local.is_empty() { + let pass = nip05_local_opt + .as_ref() + .map(|n| f.nip05_local.contains(&n.to_lowercase())) + .unwrap_or(false); + if !pass { + return false; + } + } + + if let Some(min) = f.created_at_min { + if created_at < min { + return false; + } + } + if let Some(max) = f.created_at_max { + if created_at > max { + return false; + } + } + + if let Some(re) = &f.content_re { + if !re.is_match(content) { + return false; + } + } + + true +} + +#[inline] +pub fn log_indexer_event(idx: &RelayIndexerEvent) { + let need_npub = STATE + .read() + .ok() + .map(|s| !s.filter.npubs.is_empty()) + .unwrap_or(false); + let npub_opt = if need_npub { + public_key_to_npub(&idx.author).ok() + } else { + None + }; + + let (need_full, need_local) = STATE + .read() + .ok() + .map(|s| { + ( + !s.filter.nip05_full.is_empty(), + !s.filter.nip05_local.is_empty(), + ) + }) + .unwrap_or((false, false)); + + let (nip05_full_opt, nip05_local_opt) = if need_full || need_local { + if let Ok(s) = STATE.read() { + if let Some(res) = s.resolver.as_ref() { + let local = res.nip05_for_author(&idx.author).map(|s| s.to_string()); + (None, local) + } else { + (None, None) + } + } else { + (None, None) + } + } else { + (None, None) + }; + + if !should_log( + &idx.author, + idx.kind.as_u64(), + idx.created_at, + &idx.content, + npub_opt, + nip05_full_opt, + nip05_local_opt, + ) { + return; + } + + let tags_json = + serde_json::to_string(&idx.tags).unwrap_or_else(|_| "Error serializing tags".into()); + info!( + target: "audit", + kind = idx.kind.as_u64(), + id = %idx.id, + author = %idx.author, + created_at = idx.created_at, + tags = %tags_json, + content = %idx.content, + "AUDIT: relay indexer event" + ); +} + +#[inline] +pub fn log_profile_event(evt: &RadrootsProfileEventIndex) { + let (nip05_full_opt, nip05_local_opt) = evt + .metadata + .profile + .nip05 + .as_ref() + .map(|n| { + let (full, local) = nip05_parts_from_metadata(n); + (Some(full), Some(local)) + }) + .unwrap_or((None, None)); + + let need_npub = STATE + .read() + .ok() + .map(|s| !s.filter.npubs.is_empty()) + .unwrap_or(false); + let npub_opt = if need_npub { + public_key_to_npub(&evt.event.author).ok() + } else { + None + }; + + if !should_log( + &evt.event.author, + evt.event.kind.try_into().unwrap(), + evt.event.created_at, + &evt.event.content, + npub_opt, + nip05_full_opt, + nip05_local_opt, + ) { + return; + } + + if let Ok(json) = serde_json::to_string(evt) { + info!( + target = "audit", + kind = evt.event.kind, + id = %evt.event.id, + author = %evt.event.author, + created_at = evt.event.created_at, + processed_json = %json, + "AUDIT: processed metadata" + ); + } +} + +#[inline] +pub fn log_listing_event(evt: &RadrootsListingEventIndex) { + let need_npub = STATE + .read() + .ok() + .map(|s| !s.filter.npubs.is_empty()) + .unwrap_or(false); + let npub_opt = if need_npub { + public_key_to_npub(&evt.event.author).ok() + } else { + None + }; + + let (need_full, need_local) = STATE + .read() + .ok() + .map(|s| { + ( + !s.filter.nip05_full.is_empty(), + !s.filter.nip05_local.is_empty(), + ) + }) + .unwrap_or((false, false)); + + let (nip05_full_opt, nip05_local_opt) = if need_full || need_local { + if let Ok(s) = STATE.read() { + if let Some(res) = s.resolver.as_ref() { + let local = res + .nip05_for_author(&evt.event.author) + .map(|s| s.to_string()); + (None, local) + } else { + (None, None) + } + } else { + (None, None) + } + } else { + (None, None) + }; + + if !should_log( + &evt.event.author, + evt.event.kind as u64, + evt.event.created_at, + &evt.event.content, + npub_opt, + nip05_full_opt, + nip05_local_opt, + ) { + return; + } + + if let Ok(json) = serde_json::to_string(evt) { + info!( + target = "audit", + kind = evt.event.kind, + id = %evt.event.id, + author = %evt.event.author, + created_at = evt.event.created_at, + processed_json = %json, + "AUDIT: processed listing" + ); + } +} diff --git a/crates/indexer/src/cli/args.rs b/indexer/src/cli/args.rs diff --git a/crates/indexer/src/cli/mod.rs b/indexer/src/cli/mod.rs diff --git a/indexer/src/config.rs b/indexer/src/config.rs @@ -0,0 +1,59 @@ +use anyhow::Result; +use config::{Config, ConfigError, File}; +use serde::{Deserialize, Serialize}; +use std::path::Path; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum SettingsError { + #[error("Configuration loading failed: {0}")] + Load(#[from] ConfigError), +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct Indexer { + pub data_dir: String, + pub logs_dir: String, + pub flush_interval: u64, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct Relay { + pub url: String, + pub database_path: String, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct Listings { + pub country_shard_size: usize, + pub profile_shard_size: usize, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Settings { + pub indexer: Indexer, + pub relay: Relay, + pub listings: Listings, +} + +impl Settings { + pub fn load(config_path: &Option<String>) -> Result<Self, SettingsError> { + let path = config_path.as_deref().unwrap_or("config.toml"); + let builder = Config::builder() + .add_source(File::from(Path::new(path)).required(true)); + + match builder.build() { + Ok(cfg) => match cfg.try_deserialize::<Settings>() { + Ok(settings) => Ok(settings), + Err(err) => { + eprintln!("Failed to deserialize configuration: {err}"); + Err(SettingsError::Load(err)) + } + }, + Err(err) => { + eprintln!("Failed to load configuration from '{}': {err}", path); + Err(SettingsError::Load(err)) + } + } + } +} diff --git a/crates/indexer/src/domain/events/listing.rs b/indexer/src/domain/events/listing.rs diff --git a/crates/indexer/src/domain/events/mod.rs b/indexer/src/domain/events/mod.rs diff --git a/crates/indexer/src/domain/events/profile.rs b/indexer/src/domain/events/profile.rs diff --git a/crates/indexer/src/domain/indexer/key.rs b/indexer/src/domain/indexer/key.rs diff --git a/indexer/src/domain/indexer/kind.rs b/indexer/src/domain/indexer/kind.rs @@ -0,0 +1,83 @@ +use serde::de::Error as DeError; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use std::fmt; +use std::path::PathBuf; + +use crate::domain::indexer::key::LISTING_INDEX_DIRECTORY; +use crate::domain::indexer::{IndexerKey, PROFILE_INDEX_DIRECTORY}; +use crate::utils::io::{paths_join, PathsError}; + +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub enum IndexerEventKind { + Profile, + Listing, +} + +impl IndexerEventKind { + pub const ALL: [IndexerEventKind; 2] = [IndexerEventKind::Profile, IndexerEventKind::Listing]; + + pub const fn as_u64(self) -> u64 { + match self { + IndexerEventKind::Profile => 0, + IndexerEventKind::Listing => 30402, + } + } + + pub const fn paths(self) -> &'static [IndexerKey] { + match self { + IndexerEventKind::Profile => &PROFILE_INDEX_DIRECTORY, + IndexerEventKind::Listing => &LISTING_INDEX_DIRECTORY, + } + } + + pub fn base_path<P: AsRef<std::path::Path>>(self, data_dir: P) -> Result<PathBuf, PathsError> { + paths_join(&[ + data_dir.as_ref().to_str().unwrap(), + "static", + "events", + &self.as_u64().to_string(), + ]) + } +} + +impl fmt::Display for IndexerEventKind { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.as_u64()) + } +} + +impl Serialize for IndexerEventKind { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + serializer.serialize_u64(self.as_u64()) + } +} + +impl<'de> Deserialize<'de> for IndexerEventKind { + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where + D: Deserializer<'de>, + { + let v = u64::deserialize(deserializer)?; + IndexerEventKind::try_from(v) + .map_err(|_| DeError::custom(format!("invalid event kind: {}", v))) + } +} + +#[derive(thiserror::Error, Debug)] +#[error("unknown event kind: {0}")] +pub struct IndexerEventKindParseError(pub u64); + +impl TryFrom<u64> for IndexerEventKind { + type Error = IndexerEventKindParseError; + + fn try_from(val: u64) -> Result<Self, Self::Error> { + match val { + 0 => Ok(IndexerEventKind::Profile), + 30402 => Ok(IndexerEventKind::Listing), + other => Err(IndexerEventKindParseError(other)), + } + } +} diff --git a/crates/indexer/src/domain/indexer/mod.rs b/indexer/src/domain/indexer/mod.rs diff --git a/indexer/src/domain/indexer/models/listing.rs b/indexer/src/domain/indexer/models/listing.rs @@ -0,0 +1,571 @@ +use crate::utils::crypto::compute_hash; +use crate::utils::io::fs_mkdir; +use crate::utils::io::{write_hash, write_json}; +use crate::utils::nostr::public_key_to_npub; +use crate::utils::strings::truncate_log; +use radroots_events::listing::models::{RadrootsListingEventIndex, RadrootsListingEventMetadata}; +use radroots_events_indexed::{RadrootsEventsIndexedManifest, RadrootsEventsIndexedShardMetadata}; +use std::{collections::BTreeMap, fs, path::PathBuf}; +use tracing::{instrument, warn}; + +use crate::{ + audit, + domain::{ + events::ToRadrootsListingEventIndex, + indexer::{ + key::LISTING_INDEX_DIRECTORY, + kind::IndexerEventKind, + models::{EventIndexes, NostrEventsStaticError, WriteEventIndexes}, + }, + resolvers::profile::ProfileResolver, + }, + relay::event::RelayIndexerEvent, + Settings, +}; + +macro_rules! write_if_stale { + ($path:expr, $data:expr, $updated:expr) => {{ + let hash = compute_hash(&$data)?; + let hash_path = $path.with_extension("sha256.txt"); + + let needs_write = if $path.exists() && hash_path.exists() { + let stored = fs::read_to_string(&hash_path)?; + stored.trim() != hash + } else { + true + }; + + if needs_write { + write_json(&$path, &$data)?; + write_hash(&$path, &hash)?; + $updated.push($path.clone()); + } + + hash + }}; +} + +#[derive(Debug)] +pub struct EventListingIndexes { + events: Vec<RadrootsListingEventIndex>, + events_id: BTreeMap<String, RadrootsListingEventIndex>, + country_ids: BTreeMap<String, Vec<String>>, + author_ids: BTreeMap<String, Vec<String>>, + npub_ids: BTreeMap<String, Vec<String>>, + nip05_ids: BTreeMap<String, Vec<String>>, +} + +impl EventListingIndexes { + pub fn build_with_profiles( + raw_events: &[RelayIndexerEvent], + profiles: &ProfileResolver, + ) -> Result<Self, NostrEventsStaticError> { + let mut events: Vec<RadrootsListingEventIndex> = Vec::with_capacity(raw_events.len()); + let mut events_id: BTreeMap<String, RadrootsListingEventIndex> = BTreeMap::new(); + let mut country_ids: BTreeMap<String, Vec<String>> = BTreeMap::new(); + let mut author_ids: BTreeMap<String, Vec<String>> = BTreeMap::new(); + let mut npub_ids: BTreeMap<String, Vec<String>> = BTreeMap::new(); + let mut nip05_ids: BTreeMap<String, Vec<String>> = BTreeMap::new(); + + for raw in raw_events { + match raw.clone().to_radroots_listing_event() { + Ok(evt) => { + audit::log_listing_event(&evt); + + let id = evt.metadata.id.clone(); + let author_hex = evt.metadata.author.to_lowercase(); + + let npub = public_key_to_npub(&author_hex) + .map(|s| s.to_lowercase()) + .ok(); + let author_nip05 = profiles.nip05_for_author(&author_hex).map(str::to_owned); + + let country_opt = evt + .metadata + .listing + .location + .as_ref() + .and_then(|loc| loc.country.as_ref()) + .map(|c| c.to_lowercase()); + + events_id.insert(id.clone(), evt.clone()); + events.push(evt.clone()); + + if let Some(country) = country_opt { + country_ids.entry(country).or_default().push(id.clone()); + } + + author_ids.entry(author_hex).or_default().push(id.clone()); + + if let Some(n) = npub { + npub_ids.entry(n).or_default().push(id.clone()); + } + if let Some(n05) = author_nip05 { + let n05 = n05.to_lowercase(); + nip05_ids.entry(n05).or_default().push(id.clone()); + } + } + Err(err) => { + warn!( + kind = raw.kind.as_u64(), + id = %raw.id, + author = %raw.author, + content = %truncate_log(&raw.content, 1000), + tags = ?raw.tags, + error = %err, + "Skipping malformed listing event" + ); + } + } + } + + let sort_ids = |ids: &mut Vec<String>, + map: &BTreeMap<String, RadrootsListingEventIndex>| { + ids.sort_unstable_by(|a, b| { + let pa = map + .get(a) + .map(|e| e.metadata.published_at) + .unwrap_or_default(); + let pb = map + .get(b) + .map(|e| e.metadata.published_at) + .unwrap_or_default(); + pb.cmp(&pa).then(a.cmp(b)) + }); + }; + + for ids in country_ids.values_mut() { + sort_ids(ids, &events_id); + } + for ids in author_ids.values_mut() { + sort_ids(ids, &events_id); + } + for ids in npub_ids.values_mut() { + sort_ids(ids, &events_id); + } + for ids in nip05_ids.values_mut() { + ids.sort_unstable_by(|a, b| { + let pa = events_id + .get(a) + .map(|e| e.metadata.published_at) + .unwrap_or_default(); + let pb = events_id + .get(b) + .map(|e| e.metadata.published_at) + .unwrap_or_default(); + pb.cmp(&pa).then(a.cmp(b)) + }); + } + + Ok(EventListingIndexes { + events, + events_id, + country_ids, + author_ids, + npub_ids, + nip05_ids, + }) + } +} + +impl EventIndexes for EventListingIndexes { + type Event = RelayIndexerEvent; + + fn subdirs() -> &'static [crate::domain::indexer::IndexerKey] { + &LISTING_INDEX_DIRECTORY + } + + #[instrument(skip(raw_events), fields(event_count = raw_events.len()))] + fn build(raw_events: &[Self::Event]) -> Result<Self, NostrEventsStaticError> { + let empty = ProfileResolver::default(); + Self::build_with_profiles(raw_events, &empty) + } +} + +impl EventListingIndexes { + fn format_shard_filename(ix: usize) -> String { + format!("shards/{:06}.json", ix) + } + + fn shard_vec<T: Clone>(items: &[T], size: usize) -> Vec<Vec<T>> { + if size == 0 { + return vec![items.to_vec()]; + } + let mut out = Vec::with_capacity((items.len() + size - 1) / size); + let mut i = 0; + while i < items.len() { + let end = (i + size).min(items.len()); + out.push(items[i..end].to_vec()); + i = end; + } + out + } +} + +impl WriteEventIndexes for EventListingIndexes { + fn write(&self, settings: &Settings, updated: &mut Vec<PathBuf>) -> anyhow::Result<()> { + let base: PathBuf = IndexerEventKind::Listing.base_path(&settings.indexer.data_dir)?; + fs_mkdir(&[&base])?; + + { + let idxs_root = base.join("events.json"); + let ids: Vec<&String> = self.events.iter().map(|e| &e.event.id).collect(); + write_if_stale!(idxs_root, ids, updated); + } + + { + let sub = base.join("id"); + fs_mkdir(&[&sub])?; + let keys: Vec<String> = self.events_id.keys().cloned().collect(); + write_if_stale!(sub.join("indexes.json"), keys, updated); + + for (id, evt) in &self.events_id { + let dir = sub.join(id.to_lowercase()); + fs_mkdir(&[&dir])?; + write_if_stale!(dir.join("event.json"), evt.event.clone(), updated); + write_if_stale!(dir.join("data.json"), evt.metadata.clone(), updated); + } + } + + { + let sub_country = base.join(crate::domain::indexer::IndexerKey::Country.as_str()); + fs_mkdir(&[&sub_country])?; + let country_codes: Vec<String> = self.country_ids.keys().cloned().collect(); + write_if_stale!(sub_country.join("indexes.json"), country_codes, updated); + + for (cc, ids) in &self.country_ids { + let cc_dir = sub_country.join(cc); + let shards_dir = cc_dir.join("shards"); + fs_mkdir(&[&cc_dir])?; + fs_mkdir(&[&shards_dir])?; + + let mut data_items: Vec<RadrootsListingEventMetadata> = + Vec::with_capacity(ids.len()); + for id in ids { + if let Some(evt) = self.events_id.get(id) { + data_items.push(evt.metadata.clone()); + } + } + + let shard_size = settings.listings.country_shard_size; + + let shards = Self::shard_vec(&data_items, shard_size); + + let (country_first_pub, country_last_pub) = + if let (Some(f), Some(l)) = (data_items.first(), data_items.last()) { + (f.published_at, l.published_at) + } else { + (0, 0) + }; + + let mut manifest = RadrootsEventsIndexedManifest { + country: cc.clone(), + total: u32::try_from(data_items.len()).expect("too many data items for u32"), + shard_size: u32::try_from(shard_size).expect("shard_size too large for u32"), + first_published_at: country_first_pub, + last_published_at: country_last_pub, + shards: Vec::with_capacity(shards.len()), + }; + + for (ix, chunk) in shards.into_iter().enumerate() { + let file_rel = Self::format_shard_filename(ix); + let file_abs = cc_dir.join(&file_rel); + if let Some(parent) = file_abs.parent() { + fs_mkdir(&[&parent])?; + } + + let sha = write_if_stale!(file_abs, chunk, updated); + + let (first_id, first_pub, last_id, last_pub) = if let (Some(f), Some(l)) = ( + data_items.get(ix * shard_size), + data_items.get(((ix + 1) * shard_size).saturating_sub(1)), + ) { + (f.id.clone(), f.published_at, l.id.clone(), l.published_at) + } else { + let fp = chunk + .first() + .map(|x| (x.id.clone(), x.published_at)) + .unwrap_or_default(); + let lp = chunk + .last() + .map(|x| (x.id.clone(), x.published_at)) + .unwrap_or_default(); + (fp.0, fp.1, lp.0, lp.1) + }; + + manifest.shards.push(RadrootsEventsIndexedShardMetadata { + file: file_rel, + count: u32::try_from(chunk.len()).expect("chunk length too large for u32"), + first_id, + last_id, + first_published_at: first_pub, + last_published_at: last_pub, + sha256: sha, + }); + } + + write_if_stale!(cc_dir.join("manifest.json"), manifest, updated); + } + } + + { + let sub_author = base.join(crate::domain::indexer::IndexerKey::Author.as_str()); + fs_mkdir(&[&sub_author])?; + let authors: Vec<String> = self.author_ids.keys().cloned().collect(); + write_if_stale!(sub_author.join("indexes.json"), authors, updated); + + for (author, ids) in &self.author_ids { + let dir = sub_author.join(author); + let shards_dir = dir.join("shards"); + fs_mkdir(&[&dir, &shards_dir])?; + + let mut data_items: Vec<RadrootsListingEventMetadata> = + Vec::with_capacity(ids.len()); + for id in ids { + if let Some(evt) = self.events_id.get(id) { + data_items.push(evt.metadata.clone()); + } + } + + let shard_size = settings.listings.profile_shard_size; + let shards = Self::shard_vec(&data_items, shard_size); + + let (first_pub, last_pub) = + if let (Some(f), Some(l)) = (data_items.first(), data_items.last()) { + (f.published_at, l.published_at) + } else { + (0, 0) + }; + + let mut manifest = RadrootsEventsIndexedManifest { + country: author.clone(), + total: u32::try_from(data_items.len()).expect("too many data items for u32"), + shard_size: u32::try_from(shard_size).expect("shard_size too large for u32"), + first_published_at: first_pub, + last_published_at: last_pub, + shards: Vec::with_capacity(shards.len()), + }; + + for (ix, chunk) in shards.into_iter().enumerate() { + let file_rel = Self::format_shard_filename(ix); + let file_abs = dir.join(&file_rel); + if let Some(parent) = file_abs.parent() { + fs_mkdir(&[&parent])?; + } + + let sha = write_if_stale!(file_abs, chunk, updated); + + let (first_id, first_published_at, last_id, last_published_at) = + if let (Some(f), Some(l)) = ( + data_items.get(ix * shard_size), + data_items.get(((ix + 1) * shard_size).saturating_sub(1)), + ) { + (f.id.clone(), f.published_at, l.id.clone(), l.published_at) + } else { + let fp = data_items + .get(ix * shard_size) + .map(|x| (x.id.clone(), x.published_at)) + .or_else(|| chunk.first().map(|x| (x.id.clone(), x.published_at))) + .unwrap_or_default(); + + let lp = data_items + .get(((ix + 1) * shard_size).saturating_sub(1)) + .map(|x| (x.id.clone(), x.published_at)) + .or_else(|| chunk.last().map(|x| (x.id.clone(), x.published_at))) + .unwrap_or_default(); + + (fp.0, fp.1, lp.0, lp.1) + }; + + manifest.shards.push(RadrootsEventsIndexedShardMetadata { + file: file_rel, + count: u32::try_from(std::cmp::min( + shard_size, + data_items.len().saturating_sub(ix * shard_size), + )) + .expect("chunk length too large for u32"), + first_id, + last_id, + first_published_at, + last_published_at, + sha256: sha, + }); + } + + write_if_stale!(dir.join("manifest.json"), manifest, updated); + } + } + + { + let sub_npub = base.join(crate::domain::indexer::IndexerKey::Npub.as_str()); + fs_mkdir(&[&sub_npub])?; + let npubs: Vec<String> = self.npub_ids.keys().cloned().collect(); + write_if_stale!(sub_npub.join("indexes.json"), npubs, updated); + + for (npub, ids) in &self.npub_ids { + let dir = sub_npub.join(npub); + let shards_dir = dir.join("shards"); + fs_mkdir(&[&dir, &shards_dir])?; + + let mut data_items: Vec<RadrootsListingEventMetadata> = + Vec::with_capacity(ids.len()); + for id in ids { + if let Some(evt) = self.events_id.get(id) { + data_items.push(evt.metadata.clone()); + } + } + + let shard_size = settings.listings.profile_shard_size; + let shards = Self::shard_vec(&data_items, shard_size); + + let (first_pub, last_pub) = + if let (Some(f), Some(l)) = (data_items.first(), data_items.last()) { + (f.published_at, l.published_at) + } else { + (0, 0) + }; + + let mut manifest = RadrootsEventsIndexedManifest { + country: npub.clone(), + total: u32::try_from(data_items.len()).expect("too many data items for u32"), + shard_size: u32::try_from(shard_size).expect("shard_size too large for u32"), + first_published_at: first_pub, + last_published_at: last_pub, + shards: Vec::with_capacity(shards.len()), + }; + + for (ix, chunk) in shards.into_iter().enumerate() { + let file_rel = Self::format_shard_filename(ix); + let file_abs = dir.join(&file_rel); + if let Some(parent) = file_abs.parent() { + fs_mkdir(&[&parent])?; + } + + let sha = write_if_stale!(file_abs, chunk, updated); + + let (first_id, first_published_at, last_id, last_published_at) = + if let (Some(f), Some(l)) = ( + data_items.get(ix * shard_size), + data_items.get(((ix + 1) * shard_size).saturating_sub(1)), + ) { + (f.id.clone(), f.published_at, l.id.clone(), l.published_at) + } else { + let fp = data_items + .get(ix * shard_size) + .map(|x| (x.id.clone(), x.published_at)) + .or_else(|| chunk.first().map(|x| (x.id.clone(), x.published_at))) + .unwrap_or_default(); + + let lp = data_items + .get(((ix + 1) * shard_size).saturating_sub(1)) + .map(|x| (x.id.clone(), x.published_at)) + .or_else(|| chunk.last().map(|x| (x.id.clone(), x.published_at))) + .unwrap_or_default(); + + (fp.0, fp.1, lp.0, lp.1) + }; + + manifest.shards.push(RadrootsEventsIndexedShardMetadata { + file: file_rel, + count: u32::try_from(std::cmp::min( + shard_size, + data_items.len().saturating_sub(ix * shard_size), + )) + .expect("chunk length too large for u32"), + first_id, + last_id, + first_published_at, + last_published_at, + sha256: sha, + }); + } + + write_if_stale!(dir.join("manifest.json"), manifest, updated); + } + + { + let sub_nip05 = base.join(crate::domain::indexer::IndexerKey::Nip05.as_str()); + fs_mkdir(&[&sub_nip05])?; + let names: Vec<String> = self.nip05_ids.keys().cloned().collect(); + write_if_stale!(sub_nip05.join("indexes.json"), names, updated); + + for (name, ids) in &self.nip05_ids { + let dir = sub_nip05.join(name); + let shards_dir = dir.join("shards"); + fs_mkdir(&[&dir, &shards_dir])?; + + let mut data_items = Vec::with_capacity(ids.len()); + for id in ids { + if let Some(evt) = self.events_id.get(id) { + data_items.push(evt.metadata.clone()); + } + } + + let shard_size = settings.listings.profile_shard_size; + let shards = Self::shard_vec(&data_items, shard_size); + + let (first_pub, last_pub) = + if let (Some(f), Some(l)) = (data_items.first(), data_items.last()) { + (f.published_at, l.published_at) + } else { + (0, 0) + }; + + let mut manifest = RadrootsEventsIndexedManifest { + country: name.clone(), + total: u32::try_from(data_items.len()).expect("u32 overflow"), + shard_size: u32::try_from(shard_size).expect("u32 overflow"), + first_published_at: first_pub, + last_published_at: last_pub, + shards: Vec::with_capacity(shards.len()), + }; + + for (ix, chunk) in shards.into_iter().enumerate() { + let file_rel = Self::format_shard_filename(ix); + let file_abs = dir.join(&file_rel); + if let Some(parent) = file_abs.parent() { + fs_mkdir(&[&parent])?; + } + + let sha = write_if_stale!(file_abs, chunk, updated); + + let (first_id, first_pub, last_id, last_pub) = if let (Some(f), Some(l)) = ( + data_items.get(ix * shard_size), + data_items.get(((ix + 1) * shard_size).saturating_sub(1)), + ) { + (f.id.clone(), f.published_at, l.id.clone(), l.published_at) + } else { + let fp = chunk + .first() + .map(|x| (x.id.clone(), x.published_at)) + .unwrap_or_default(); + let lp = chunk + .last() + .map(|x| (x.id.clone(), x.published_at)) + .unwrap_or_default(); + (fp.0, fp.1, lp.0, lp.1) + }; + + manifest.shards.push(RadrootsEventsIndexedShardMetadata { + file: file_rel, + count: u32::try_from(std::cmp::min( + shard_size, + data_items.len().saturating_sub(ix * shard_size), + )) + .expect("u32 overflow"), + first_id, + last_id, + first_published_at: first_pub, + last_published_at: last_pub, + sha256: sha, + }); + } + + write_if_stale!(dir.join("manifest.json"), manifest, updated); + } + } + } + + Ok(()) + } +} diff --git a/crates/indexer/src/domain/indexer/models/mod.rs b/indexer/src/domain/indexer/models/mod.rs diff --git a/indexer/src/domain/indexer/models/profile.rs b/indexer/src/domain/indexer/models/profile.rs @@ -0,0 +1,175 @@ +use crate::utils::crypto::compute_hash; +use crate::utils::io::fs_mkdir; +use crate::utils::io::{write_hash, write_json}; +use crate::utils::nostr::public_key_to_npub; +use crate::utils::strings::truncate_log; +use crate::{ + audit, + domain::{ + events::ToRadrootsProfileEventIndex, + indexer::{ + kind::IndexerEventKind, + models::{EventIndexes, NostrEventsStaticError, WriteEventIndexes}, + IndexerKey, PROFILE_INDEX_DIRECTORY, + }, + }, + relay::event::RelayIndexerEvent, + Settings, +}; +use radroots_events::profile::models::RadrootsProfileEventIndex; +use std::{collections::BTreeMap, fs, path::PathBuf}; +use tracing::{instrument, warn}; + +macro_rules! write_if_stale { + ($path:expr, $data:expr, $updated:expr) => {{ + let hash = compute_hash(&$data)?; + let hash_path = $path.with_extension("sha256.txt"); + + let needs_write = if $path.exists() && hash_path.exists() { + let stored = fs::read_to_string(&hash_path)?; + stored.trim() != hash + } else { + true + }; + + if needs_write { + write_json(&$path, &$data)?; + write_hash(&$path, &hash)?; + $updated.push($path.clone()); + } + }}; +} + +#[derive(Debug)] +pub struct EventProfileIndexes { + events: Vec<RadrootsProfileEventIndex>, + events_id: BTreeMap<String, RadrootsProfileEventIndex>, + events_author: BTreeMap<String, RadrootsProfileEventIndex>, + events_nip05: BTreeMap<String, RadrootsProfileEventIndex>, + events_npub: BTreeMap<String, RadrootsProfileEventIndex>, +} + +impl EventIndexes for EventProfileIndexes { + type Event = RelayIndexerEvent; + + fn subdirs() -> &'static [IndexerKey] { + &PROFILE_INDEX_DIRECTORY + } + + #[instrument(skip(raw_events), fields(event_count = raw_events.len()))] + fn build(raw_events: &[Self::Event]) -> Result<Self, NostrEventsStaticError> { + let mut events = Vec::with_capacity(raw_events.len()); + let mut events_id = BTreeMap::new(); + let mut events_author = BTreeMap::new(); + let mut events_nip05 = BTreeMap::new(); + let mut events_npub = BTreeMap::new(); + + for raw in raw_events { + match raw.clone().to_radroots_profile_event() { + Ok(evt) => { + audit::log_profile_event(&evt); + let id = evt.event.id.clone(); + let author = evt.event.author.clone(); + events.push(evt.clone()); + events_id.insert(id.clone(), evt.clone()); + events_author.insert(author.clone(), evt.clone()); + + if let Ok(npub) = public_key_to_npub(&author) { + events_npub.insert(npub.to_lowercase(), evt.clone()); + } + if let Some(nip05) = &evt.metadata.profile.nip05 { + let normalized = nip05.replace("@radroots.market", ""); + events_nip05.insert(normalized, evt.clone()); + } + } + Err(err) => { + warn!( + kind = raw.kind.as_u64(), + id = %raw.id, + author = %raw.author, + content = %truncate_log(&raw.content, 1000), + tags = ?raw.tags, + error = %err, + "Skipping malformed metadata event" + ); + } + } + } + + Ok(EventProfileIndexes { + events, + events_id, + events_author, + events_nip05, + events_npub, + }) + } +} + +impl WriteEventIndexes for EventProfileIndexes { + fn write(&self, settings: &Settings, updated: &mut Vec<PathBuf>) -> anyhow::Result<()> { + let base: PathBuf = IndexerEventKind::Profile.base_path(&settings.indexer.data_dir)?; + fs_mkdir(&[&base])?; + + let idxs_root = base.join("events.json"); + let ids: Vec<&String> = self.events.iter().map(|e| &e.event.id).collect(); + write_if_stale!(idxs_root, ids, updated); + + for &subdir in Self::subdirs().iter() { + let sub_base = base.join(subdir.as_str()); + fs_mkdir(&[sub_base.to_str().unwrap()])?; + + let keys_lower: Vec<String> = match subdir { + IndexerKey::Id => self.events_id.keys().map(|k| k.to_lowercase()).collect(), + IndexerKey::Author => self + .events_author + .keys() + .map(|k| k.to_lowercase()) + .collect(), + IndexerKey::Nip05 => self.events_nip05.keys().map(|k| k.to_lowercase()).collect(), + IndexerKey::Npub => self.events_npub.keys().map(|k| k.to_lowercase()).collect(), + _ => Vec::new(), + }; + let idxs_subdir = sub_base.join("indexes.json"); + write_if_stale!(idxs_subdir, keys_lower, updated); + + match subdir { + IndexerKey::Id => { + for (key, evt) in &self.events_id { + let dir = sub_base.join(key.to_lowercase()); + fs_mkdir(&[dir.to_str().unwrap()])?; + write_if_stale!(dir.join("event.json"), evt.event.clone(), updated); + write_if_stale!(dir.join("metadata.json"), evt.metadata.clone(), updated); + } + } + IndexerKey::Author => { + for (key, evt) in &self.events_author { + let dir = sub_base.join(key.to_lowercase()); + fs_mkdir(&[dir.to_str().unwrap()])?; + write_if_stale!(dir.join("event.json"), evt.event.clone(), updated); + write_if_stale!(dir.join("metadata.json"), evt.metadata.clone(), updated); + } + } + IndexerKey::Nip05 => { + for (key, evt) in &self.events_nip05 { + let dir = sub_base.join(key.to_lowercase()); + fs_mkdir(&[dir.to_str().unwrap()])?; + write_if_stale!(dir.join("event.json"), evt.event.clone(), updated); + write_if_stale!(dir.join("metadata.json"), evt.metadata.clone(), updated); + } + } + IndexerKey::Npub => { + for (key, evt) in &self.events_npub { + let dir = sub_base.join(key.to_lowercase()); + fs_mkdir(&[dir.to_str().unwrap()])?; + write_if_stale!(dir.join("event.json"), evt.event.clone(), updated); + write_if_stale!(dir.join("metadata.json"), evt.metadata.clone(), updated); + } + } + _ => {} + } + } + + Ok(()) + } +} diff --git a/crates/indexer/src/domain/resolvers/mod.rs b/indexer/src/domain/resolvers/mod.rs diff --git a/crates/indexer/src/domain/resolvers/profile.rs b/indexer/src/domain/resolvers/profile.rs diff --git a/indexer/src/lib.rs b/indexer/src/lib.rs @@ -0,0 +1,210 @@ +use anyhow::{Context, Result}; +use std::{ + collections::HashMap, + time::{Duration, Instant}, +}; +use tracing::info; + +pub mod cli; +pub mod config; +pub mod telemetry; +pub mod domain { + pub mod events; + pub mod indexer; + pub mod resolvers; +} +pub mod relay { + pub mod event; + pub mod record; +} +pub mod utils; + +#[cfg(feature = "audit")] +pub mod audit; + +#[cfg(not(feature = "audit"))] +pub mod audit { + use radroots_events::{ + listing::models::RadrootsListingEventIndex, profile::models::RadrootsProfileEventIndex, + }; + + pub fn log_indexer_event(_: &crate::relay::event::RelayIndexerEvent) {} + pub fn log_profile_event(_: &RadrootsProfileEventIndex) {} + pub fn log_listing_event(_: &RadrootsListingEventIndex) {} +} + +use crate::{ + domain::{ + indexer::{ + kind::IndexerEventKind, + models::{EventIndexes, EventListingIndexes, EventProfileIndexes, WriteEventIndexes}, + }, + resolvers::profile::ProfileResolver, + }, + relay::event::RelayIndexerEvent, utils::{db::IndexerDb, sqlite::{sqlite_conn, sqlite_stmt}}, +}; +pub use config::Settings; +pub use relay::record::RelayEventRecord; + +pub async fn run(settings: Settings) -> Result<()> { + let db_idx = IndexerDb::open(&format!("{}/indexer_db", settings.indexer.data_dir))?; + let tree_raw = "hashes"; + let tree_events_profile = "profile_events"; + let tree_events_listing = "listing_events"; + let tree_stats = "stats"; + + let last_created_at_db: u32 = db_idx + .get_raw(tree_stats, "last_created_at")? + .map(|ivec| { + let arr: [u8; 4] = ivec.as_ref().try_into().unwrap(); + u32::from_be_bytes(arr) + }) + .unwrap_or(0); + let mut last_created_at = last_created_at_db; + + let event_kinds = IndexerEventKind::ALL + .iter() + .map(|k| k.as_u64().to_string()) + .collect::<Vec<_>>() + .join(", "); + + let relay_query = format!( + "SELECT hex(event_hash), hex(author), created_at, kind, content \ + FROM event WHERE kind IN ({}) AND created_at > ?", + event_kinds + ); + + loop { + let iteration_start = Instant::now(); + let relay_db = sqlite_conn(&settings.relay.database_path).with_context(|| { + format!( + "Could not open relay DB at {}", + settings.relay.database_path + ) + })?; + let mut stmt = + sqlite_stmt(&relay_db, &relay_query).context("Could not prepare event query")?; + + let records: Vec<RelayEventRecord> = stmt + .query_map([last_created_at], RelayEventRecord::from_row)? + .collect::<Result<_, _>>() + .context("collecting RelayEventRecord rows")?; + info!(record_count = records.len(), "Loaded relay records"); + + let mut records_kind: HashMap<IndexerEventKind, Vec<RelayIndexerEvent>> = HashMap::new(); + for rec in records.into_iter() { + let iev = RelayIndexerEvent::try_from(rec)?; + audit::log_indexer_event(&iev); + records_kind.entry(iev.kind).or_default().push(iev); + } + + let mut need_rebuild_listing = false; + + if let Some(profile_events) = records_kind.remove(&IndexerEventKind::Profile) { + if !profile_events.is_empty() { + for ev in &profile_events { + last_created_at = last_created_at.max(ev.created_at); + let id = &ev.id; + let hash = &ev.hash; + let skip = if let Some(old) = db_idx.get_raw(tree_raw, id)? { + old.as_ref() == hash.as_bytes() + } else { + false + }; + if skip { + continue; + } + + db_idx.insert(tree_events_profile, id, ev)?; + db_idx.insert_raw(tree_raw, id, hash.as_bytes())?; + } + + db_idx.insert_raw( + tree_stats, + "last_created_at", + &last_created_at.to_be_bytes(), + )?; + db_idx.flush()?; + + let raw_profile_events: Vec<RelayIndexerEvent> = + db_idx.get_all(tree_events_profile)?; + let indexed_profile_events = EventProfileIndexes::build(&raw_profile_events)?; + let mut updated_indexes = Vec::new(); + indexed_profile_events.write(&settings, &mut updated_indexes)?; + info!( + written = updated_indexes.len(), + "Written {} index files", + updated_indexes.len() + ); + + need_rebuild_listing = true; + } + } + + let raw_profile_events: Vec<RelayIndexerEvent> = db_idx.get_all(tree_events_profile)?; + let profiles = ProfileResolver::from_metadata(&raw_profile_events); + + if let Some(listing_events) = records_kind.remove(&IndexerEventKind::Listing) { + if !listing_events.is_empty() { + for ev in &listing_events { + last_created_at = last_created_at.max(ev.created_at); + let id = &ev.id; + let hash = &ev.hash; + let skip = if let Some(old) = db_idx.get_raw(tree_raw, id)? { + old.as_ref() == hash.as_bytes() + } else { + false + }; + if skip { + continue; + } + db_idx.insert(tree_events_listing, id, ev)?; + db_idx.insert_raw(tree_raw, id, hash.as_bytes())?; + } + + db_idx.insert_raw( + tree_stats, + "last_created_at", + &last_created_at.to_be_bytes(), + )?; + db_idx.flush()?; + + let raw_listing_events: Vec<RelayIndexerEvent> = + db_idx.get_all(tree_events_listing)?; + let listing_indexes = EventListingIndexes::build(&raw_listing_events)?; + let mut updated_listing = Vec::new(); + listing_indexes.write(&settings, &mut updated_listing)?; + info!( + written = updated_listing.len(), + "Written {} listing index files", + updated_listing.len() + ); + + need_rebuild_listing = true; + } + } + + if need_rebuild_listing { + let raw_listing_events: Vec<RelayIndexerEvent> = db_idx.get_all(tree_events_listing)?; + let listing_indexes = + EventListingIndexes::build_with_profiles(&raw_listing_events, &profiles)?; + let mut updated_listing = Vec::new(); + listing_indexes.write(&settings, &mut updated_listing)?; + info!( + written = updated_listing.len(), + "Written {} listing index files", + updated_listing.len() + ); + } + + let elapsed = iteration_start.elapsed(); + let interval = Duration::from_secs(settings.indexer.flush_interval); + let delay = interval.saturating_sub(elapsed); + info!( + elapsed_ms = elapsed.as_millis(), + sleeping_ms = delay.as_millis(), + "Iteration complete" + ); + tokio::time::sleep(delay).await; + } +} diff --git a/indexer/src/main.rs b/indexer/src/main.rs @@ -0,0 +1,23 @@ +use anyhow::Result; +use clap::Parser; +use radroots_market_indexer::{cli, run, telemetry, Settings}; +use tracing::info; + +#[tokio::main] +async fn main() { + if let Err(err) = setup().await { + eprintln!("Fatal error: {err:#?}"); + std::process::exit(1); + } +} + +async fn setup() -> Result<()> { + let args = cli::Args::parse(); + + let settings = Settings::load(&args.config)?; + + telemetry::init(&settings.indexer.logs_dir); + info!("Service starting"); + + run(settings).await +} diff --git a/crates/indexer/src/relay/event.rs b/indexer/src/relay/event.rs diff --git a/indexer/src/relay/record.rs b/indexer/src/relay/record.rs @@ -0,0 +1,37 @@ +use crate::utils::sqlite::{RustqliteError, SqliteResult, SqliteRow, SqliteType}; +use serde::Serialize; + +use crate::domain::indexer::kind::{IndexerEventKind, IndexerEventKindParseError}; + +#[derive(Clone, Debug, Serialize)] +pub struct RelayEventRecord { + pub event_hash: String, + pub author: String, + pub created_at: u32, + pub kind: IndexerEventKind, + pub content: String, +} + +impl RelayEventRecord { + pub fn from_row(row: &SqliteRow) -> SqliteResult<Self> { + let event_hash: String = row.get(0)?; + let author: String = row.get(1)?; + let created_at: u32 = row.get(2)?; + let kind_num: u32 = row.get(3)?; + + let kind = IndexerEventKind::try_from(kind_num as u64).map_err( + |e: IndexerEventKindParseError| { + RustqliteError::FromSqlConversionFailure(3, SqliteType::Integer, Box::new(e)) + }, + )?; + + let content: String = row.get(4)?; + Ok(RelayEventRecord { + event_hash, + author, + created_at, + kind, + content, + }) + } +} diff --git a/indexer/src/telemetry.rs b/indexer/src/telemetry.rs @@ -0,0 +1,46 @@ +use std::path::Path; +use tracing_appender::rolling; +use tracing_subscriber::{fmt, prelude::*, EnvFilter, Registry}; + +#[cfg(feature = "audit")] +use tracing_subscriber::filter::Targets; + +pub fn init(logs_dir: impl AsRef<Path>) { + let logs_path = logs_dir.as_ref(); + if let Err(e) = std::fs::create_dir_all(logs_path) { + eprintln!("Failed to create logs directory {}: {}", logs_path.display(), e); + } + + let file_appender = rolling::daily(logs_path, concat!(env!("CARGO_PKG_NAME"), ".log")); + let (file_writer, guard) = tracing_appender::non_blocking(file_appender); + std::mem::forget(guard); + + let stdout_layer = fmt::layer().with_writer(std::io::stdout).with_target(false); + + let file_layer = fmt::layer() + .with_writer(file_writer) + .with_ansi(false) + .with_target(false); + + let subscriber = Registry::default() + .with(EnvFilter::from_default_env()) + .with(stdout_layer) + .with(file_layer); + + #[cfg(feature = "audit")] + let subscriber = { + let audit_app = rolling::daily(&logs_dir, "audit.log"); + let (audit_writer, audit_guard) = tracing_appender::non_blocking(audit_app); + std::mem::forget(audit_guard); + + let audit_layer = fmt::layer() + .with_writer(audit_writer) + .with_ansi(false) + .with_target(true) + .with_filter(Targets::new().with_target("audit", tracing::Level::INFO)); + + subscriber.with(audit_layer) + }; + + subscriber.init(); +} diff --git a/indexer/src/utils/crypto.rs b/indexer/src/utils/crypto.rs @@ -0,0 +1,42 @@ +use serde::Serialize; +use sha2::{Digest, Sha256}; +use std::io::{Read, Result as IoResult}; + +pub fn sha256_hex(data: &[u8]) -> String { + let mut hasher = Sha256::new(); + hasher.update(data); + format!("{:x}", hasher.finalize()) +} + +pub fn compute_hash_of_reader<R: Read>(mut reader: R) -> IoResult<String> { + let mut hasher = Sha256::new(); + let mut buf = [0u8; 8192]; + loop { + let n = reader.read(&mut buf)?; + if n == 0 { + break; + } + hasher.update(&buf[..n]); + } + Ok(format!("{:x}", hasher.finalize())) +} + +pub fn compute_hash<T: Serialize>(value: &T) -> anyhow::Result<String> { + struct HasherWriter<'a>(&'a mut Sha256); + impl<'a> std::io::Write for HasherWriter<'a> { + fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { + self.0.update(buf); + Ok(buf.len()) + } + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } + } + + let mut hasher = Sha256::new(); + { + let writer = HasherWriter(&mut hasher); + serde_json::to_writer_pretty(writer, value)?; + } + Ok(format!("{:x}", hasher.finalize())) +} diff --git a/indexer/src/utils/db.rs b/indexer/src/utils/db.rs @@ -0,0 +1,58 @@ +use anyhow::Result; +use serde::{de::DeserializeOwned, Serialize}; +use sled::{Config as SledConfig, Db, IVec}; + +use crate::utils::serde_utils::{deserialize, serialize}; + +pub struct IndexerDb { + pub db: Db, +} + +impl IndexerDb { + pub fn open(path: &str) -> Result<Self> { + let db = SledConfig::new().path(path).open()?; + Ok(Self { db }) + } + + pub fn insert<T: Serialize>(&self, tree: &str, key: &str, value: &T) -> Result<()> { + let t = self.db.open_tree(tree)?; + let blob: Vec<u8> = serialize(value)?; + t.insert(key, blob)?; + Ok(()) + } + + pub fn get<T: DeserializeOwned>(&self, tree: &str, key: &str) -> Result<Option<T>> { + let t = self.db.open_tree(tree)?; + if let Some(bytes) = t.get(key)? { + let v: T = deserialize(&bytes)?; + Ok(Some(v)) + } else { + Ok(None) + } + } + + pub fn get_all<T: DeserializeOwned>(&self, tree: &str) -> Result<Vec<T>> { + let t = self.db.open_tree(tree)?; + let mut out = Vec::new(); + for res in t.iter().values() { + let bytes = res?; + let v: T = deserialize(&bytes)?; + out.push(v); + } + Ok(out) + } + + pub fn insert_raw(&self, tree: &str, key: &str, bytes: &[u8]) -> Result<()> { + self.db.open_tree(tree)?.insert(key, bytes)?; + Ok(()) + } + + pub fn get_raw(&self, tree: &str, key: &str) -> Result<Option<IVec>> { + Ok(self.db.open_tree(tree)?.get(key)?) + } + + pub fn flush(&self) -> Result<()> { + self.db.flush()?; + Ok(()) + } +} diff --git a/indexer/src/utils/io.rs b/indexer/src/utils/io.rs @@ -0,0 +1,84 @@ +use anyhow::Result; +use std::fs::{self, File}; +use std::io::Write; +use std::path::{Path, PathBuf}; +use thiserror::Error; +use tracing::debug; + +#[derive(Debug, Error)] +pub enum PathsError { + #[error("Invalid path segment at index {index}: `{segment}`")] + InvalidSegment { index: usize, segment: String }, +} + +pub fn paths_join<I, S>(segments: I) -> Result<PathBuf, PathsError> +where + I: IntoIterator<Item = S>, + S: AsRef<Path>, +{ + let mut path = PathBuf::new(); + for (i, segment) in segments.into_iter().enumerate() { + let seg = segment.as_ref(); + if seg.as_os_str().is_empty() { + return Err(PathsError::InvalidSegment { + index: i, + segment: seg.display().to_string(), + }); + } + path.push(seg); + } + Ok(path) +} + +#[derive(thiserror::Error, Debug)] +pub enum FileError { + #[error("Failed to create directory `{path}`: {source}")] + CreateDirError { + path: String, + #[source] + source: std::io::Error, + }, + #[error("Path join error: {0}")] + PathJoinError(#[from] PathsError), + #[error("I/O error: {0}")] + Io(#[from] std::io::Error), +} + +pub fn fs_mkdir<S, I>(segments: I) -> Result<(), FileError> +where + I: IntoIterator<Item = S>, + S: AsRef<Path>, +{ + let dir_path = paths_join(segments)?; + if !dir_path.exists() { + fs::create_dir_all(&dir_path).map_err(|e| FileError::CreateDirError { + path: dir_path.display().to_string(), + source: e, + })?; + debug!("Created directory: {}", dir_path.display()); + } else { + debug!("Directory already exists: {}", dir_path.display()); + } + Ok(()) +} + +pub fn write_json<T: serde::Serialize>(path: &Path, data: &T) -> Result<()> { + let file = File::create(path)?; + let mut buf = std::io::BufWriter::new(file); + serde_json::to_writer_pretty(&mut buf, data)?; + buf.flush()?; + Ok(()) +} + +pub fn write_hash(path: &Path, hash: &str) -> Result<()> { + let hash_path = path.with_extension("sha256.txt"); + fs::write(&hash_path, format!("{hash}\n"))?; + debug!(hash_path = %hash_path.display(), "Wrote new hash file"); + Ok(()) +} + +pub fn fs_write_rss(path: &Path, content: &str) -> Result<()> { + let mut file = File::create(path)?; + file.write_all(content.as_bytes())?; + Ok(()) +} diff --git a/indexer/src/utils/mod.rs b/indexer/src/utils/mod.rs @@ -0,0 +1,7 @@ +pub mod crypto; +pub mod db; +pub mod io; +pub mod nostr; +pub mod serde_utils; +pub mod sqlite; +pub mod strings; diff --git a/indexer/src/utils/nostr.rs b/indexer/src/utils/nostr.rs @@ -0,0 +1,32 @@ +use std::collections::HashMap; + +use nostr::key::{Error as PublicKeyError, PublicKey}; +use nostr::prelude::ToBech32; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum NostrUtilsError { + #[error("Invalid hex for public key: {0}")] + InvalidPublicKey(#[from] PublicKeyError), + #[error("Tag parsing error: {0}")] + TagParseError(String), +} + +pub fn public_key_to_npub(public_key_hex: &str) -> Result<String, NostrUtilsError> { + let pubkey = PublicKey::from_hex(public_key_hex)?; + Ok(pubkey.to_bech32().expect("to_bech32 is infallible")) +} + +pub fn get_tag_value<'a>( + tags_map: &'a HashMap<String, Vec<String>>, + key: &str, + idx: usize, +) -> Result<Option<String>, NostrUtilsError> { + match tags_map.get(key) { + Some(values) => Ok(values.get(idx).cloned()), + None => Err(NostrUtilsError::TagParseError(format!( + "Tag '{}' not found", + key + ))), + } +} diff --git a/crates/indexer-utils/src/serialization.rs b/indexer/src/utils/serde_utils.rs diff --git a/indexer/src/utils/sqlite.rs b/indexer/src/utils/sqlite.rs @@ -0,0 +1,54 @@ +use rusqlite::{Connection, Row, Statement}; +use thiserror::Error; + +pub use rusqlite::{ + types::Type as SqliteType, Error as RustqliteError, Result as SqliteResult, Row as SqliteRow, +}; + +#[derive(Debug, Error)] +pub enum SqliteError { + #[error("Failed to open SQLite DB at `{path}`: {source}")] + ConnectionError { + path: String, + #[source] + source: rusqlite::Error, + }, + #[error("Failed to prepare SQL statement: {0}")] + PrepareError(#[source] rusqlite::Error), + #[error("Failed to run SQL query: {0}")] + QueryError(#[source] rusqlite::Error), + #[error("Failed to collect query results: {0}")] + CollectError(#[source] rusqlite::Error), +} + +pub fn sqlite_conn(db_path: &str) -> Result<Connection, SqliteError> { + Connection::open(db_path).map_err(|e| SqliteError::ConnectionError { + path: db_path.to_string(), + source: e, + }) +} + +pub fn sqlite_stmt<'a>(conn: &'a Connection, stmt: &'a str) -> Result<Statement<'a>, SqliteError> { + conn.prepare(stmt).map_err(SqliteError::PrepareError) +} + +pub fn sqlite_stmt_querymap<'a, T, F>( + stmt: &'a mut Statement<'a>, + map_fn: F, +) -> Result<Vec<T>, SqliteError> +where + F: Fn(&Row) -> rusqlite::Result<T>, + T: 'a, +{ + let mapped = stmt + .query_map([], map_fn) + .map_err(SqliteError::QueryError)?; + mapped + .collect::<Result<Vec<_>, _>>() + .map_err(SqliteError::CollectError) +} + +pub fn row_u64_at(row: &SqliteRow, idx: usize) -> SqliteResult<u64> { + let v: u32 = row.get(idx)?; + Ok(v as u64) +} diff --git a/crates/indexer-utils/src/logs.rs b/indexer/src/utils/strings.rs diff --git a/package.json b/package.json @@ -18,9 +18,8 @@ "workspaces": { "packages": [ "app", - "packages/*", - "../../crates/packages/bindings/*", - "../../../global/packages/*" + "../packages/*", + "../crates/*/bindings/ts" ], "nohoist": [] }, diff --git a/yarn.lock b/yarn.lock @@ -1395,7 +1395,7 @@ external-editor@^3.0.3: iconv-lite "^0.4.24" tmp "^0.0.33" -fast-glob@^3.2.11: +fast-glob@^3.2.11, fast-glob@^3.3.3: version "3.3.3" resolved "https://registry.yarnpkg.com/fast-glob/-/fast-glob-3.3.3.tgz#d06d585ce8dba90a16b0505c543c3ccfb3aeb818" integrity sha512-7MptL8U0cqcFdzIzwOTHoilX9x5BrNqye7Z/LuC7kCMRio1EMSyqRK3BEAUD7sXRq4iT4AzTVuZdhgQ2TCvYLg==