diff --git a/.gitignore b/.gitignore index 0b6277f..508cb11 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ ./replit-ebpf ./result .direnv/ +integration_tests/disk.img diff --git a/bdwatch_x86_bpfel.go b/bdwatch_x86_bpfel.go deleted file mode 100644 index 09c40ef..0000000 --- a/bdwatch_x86_bpfel.go +++ /dev/null @@ -1,135 +0,0 @@ -// Code generated by bpf2go; DO NOT EDIT. -//go:build 386 || amd64 - -package main - -import ( - "bytes" - _ "embed" - "fmt" - "io" - - "github.com/cilium/ebpf" -) - -type bdwatchEvent struct { - Fsid [16]uint8 - Label [256]int8 - DevId uint32 - Ret int32 -} - -// loadBdwatch returns the embedded CollectionSpec for bdwatch. -func loadBdwatch() (*ebpf.CollectionSpec, error) { - reader := bytes.NewReader(_BdwatchBytes) - spec, err := ebpf.LoadCollectionSpecFromReader(reader) - if err != nil { - return nil, fmt.Errorf("can't load bdwatch: %w", err) - } - - return spec, err -} - -// loadBdwatchObjects loads bdwatch and converts it into a struct. -// -// The following types are suitable as obj argument: -// -// *bdwatchObjects -// *bdwatchPrograms -// *bdwatchMaps -// -// See ebpf.CollectionSpec.LoadAndAssign documentation for details. -func loadBdwatchObjects(obj interface{}, opts *ebpf.CollectionOptions) error { - spec, err := loadBdwatch() - if err != nil { - return err - } - - return spec.LoadAndAssign(obj, opts) -} - -// bdwatchSpecs contains maps and programs before they are loaded into the kernel. -// -// It can be passed ebpf.CollectionSpec.Assign. -type bdwatchSpecs struct { - bdwatchProgramSpecs - bdwatchMapSpecs -} - -// bdwatchSpecs contains programs before they are loaded into the kernel. -// -// It can be passed ebpf.CollectionSpec.Assign. -type bdwatchProgramSpecs struct { - BtrfsRecoverLogTrees *ebpf.ProgramSpec `ebpf:"btrfs_recover_log_trees"` - BtrfsRecoverLogTreesExit *ebpf.ProgramSpec `ebpf:"btrfs_recover_log_trees_exit"` -} - -// bdwatchMapSpecs contains maps before they are loaded into the kernel. -// -// It can be passed ebpf.CollectionSpec.Assign. -type bdwatchMapSpecs struct { - BtrfsRecoverLogTreesErrors *ebpf.MapSpec `ebpf:"btrfs_recover_log_trees_errors"` - PendingCalls *ebpf.MapSpec `ebpf:"pending_calls"` - RegisteredDevices *ebpf.MapSpec `ebpf:"registered_devices"` -} - -// bdwatchObjects contains all objects after they have been loaded into the kernel. -// -// It can be passed to loadBdwatchObjects or ebpf.CollectionSpec.LoadAndAssign. -type bdwatchObjects struct { - bdwatchPrograms - bdwatchMaps -} - -func (o *bdwatchObjects) Close() error { - return _BdwatchClose( - &o.bdwatchPrograms, - &o.bdwatchMaps, - ) -} - -// bdwatchMaps contains all maps after they have been loaded into the kernel. -// -// It can be passed to loadBdwatchObjects or ebpf.CollectionSpec.LoadAndAssign. -type bdwatchMaps struct { - BtrfsRecoverLogTreesErrors *ebpf.Map `ebpf:"btrfs_recover_log_trees_errors"` - PendingCalls *ebpf.Map `ebpf:"pending_calls"` - RegisteredDevices *ebpf.Map `ebpf:"registered_devices"` -} - -func (m *bdwatchMaps) Close() error { - return _BdwatchClose( - m.BtrfsRecoverLogTreesErrors, - m.PendingCalls, - m.RegisteredDevices, - ) -} - -// bdwatchPrograms contains all programs after they have been loaded into the kernel. -// -// It can be passed to loadBdwatchObjects or ebpf.CollectionSpec.LoadAndAssign. -type bdwatchPrograms struct { - BtrfsRecoverLogTrees *ebpf.Program `ebpf:"btrfs_recover_log_trees"` - BtrfsRecoverLogTreesExit *ebpf.Program `ebpf:"btrfs_recover_log_trees_exit"` -} - -func (p *bdwatchPrograms) Close() error { - return _BdwatchClose( - p.BtrfsRecoverLogTrees, - p.BtrfsRecoverLogTreesExit, - ) -} - -func _BdwatchClose(closers ...io.Closer) error { - for _, closer := range closers { - if err := closer.Close(); err != nil { - return err - } - } - return nil -} - -// Do not access this directly. -// -//go:embed bdwatch_x86_bpfel.o -var _BdwatchBytes []byte diff --git a/btrfs.h b/btrfswatch/btrfs.h similarity index 100% rename from btrfs.h rename to btrfswatch/btrfs.h diff --git a/btrfswatch/btrfsmon.go b/btrfswatch/btrfsmon.go new file mode 100644 index 0000000..af804a0 --- /dev/null +++ b/btrfswatch/btrfsmon.go @@ -0,0 +1,203 @@ +// Package btrfswatch is a go wrapper for interfacing with the eBPF program in btrfswatch.c +package btrfswatch + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + "slices" + "sync" + + "github.com/cilium/ebpf/link" + "github.com/cilium/ebpf/ringbuf" + "github.com/cilium/ebpf/rlimit" + log "github.com/sirupsen/logrus" +) + +//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -target amd64 btrfswatch btrfswatch.c + +type Manager struct { + objs btrfswatchObjects + cleanups []func() error + demux *eventDemux +} + +type Event btrfswatchEvent + +type eventDemux struct { + rd *ringbuf.Reader + + devMapLock sync.RWMutex + devMap map[uint32]map[chan Event]struct{} +} + +type EventReader struct { + dev uint32 + eventChan chan Event + mgr *Manager +} + +func NewManager() (*Manager, error) { + mgr := &Manager{} + + // Remove resource limits for kernels <5.11. + if err := rlimit.RemoveMemlock(); err != nil { + return nil, fmt.Errorf("Removing memlock: %w", err) + } + + // Load the compiled eBPF ELF and load it into the kernel. + if err := loadBtrfswatchObjects(&mgr.objs, nil); err != nil { + return nil, fmt.Errorf("Loading eBPF objects: %w", err) + } + mgr.cleanups = append(mgr.cleanups, mgr.objs.Close) + + { + link, err := link.Kprobe("btrfs_recover_log_trees", mgr.objs.BtrfsRecoverLogTrees, &link.KprobeOptions{}) + if err != nil { + return nil, fmt.Errorf("Attaching kprobe: %w", err) + } + mgr.cleanups = append(mgr.cleanups, link.Close) + } + + { + link, err := link.Kretprobe("btrfs_recover_log_trees", mgr.objs.BtrfsRecoverLogTreesExit, &link.KprobeOptions{}) + if err != nil { + return nil, fmt.Errorf("Attaching kretprobe: %w", err) + } + mgr.cleanups = append(mgr.cleanups, link.Close) + } + + rd, err := ringbuf.NewReader(mgr.objs.BtrfsRecoverLogTreesErrors) + if err != nil { + return nil, fmt.Errorf("creating ringbuf reader: %w", err) + } + mgr.demux = newEventDemux(rd) + mgr.cleanups = append(mgr.cleanups, mgr.demux.close) + go mgr.demux.run() + + return mgr, nil +} + +func (mgr *Manager) RegisterDevice(dev uint32) (*EventReader, error) { + err := mgr.objs.btrfswatchMaps.RegisteredDevices.Put(dev, true) + if err != nil { + return nil, fmt.Errorf("RegisteredDevices.Put: %w", err) + } + + eventChan := make(chan Event, 1) + mgr.demux.addDevice(dev, eventChan) + + return &EventReader{ + dev: dev, + eventChan: eventChan, + mgr: mgr, + }, nil +} + +func (mgr *Manager) UnregisterDevice(dev uint32, eventChan chan Event) error { + err := mgr.objs.btrfswatchMaps.RegisteredDevices.Delete(dev) + if err != nil { + return fmt.Errorf("RegisteredDevices.Delete: %w", err) + } + + mgr.demux.removeDevice(dev, eventChan) + return nil +} + +func (mgr *Manager) Close() error { + slices.Reverse(mgr.cleanups) + + var errs []error + for _, f := range mgr.cleanups { + errs = append(errs, f()) + } + + return errors.Join(errs...) +} + +func newEventDemux(rd *ringbuf.Reader) *eventDemux { + return &eventDemux{ + rd: rd, + devMap: make(map[uint32]map[chan Event]struct{}), + } +} + +func (demux *eventDemux) addDevice(dev uint32, eventChan chan Event) { + demux.devMapLock.Lock() + defer demux.devMapLock.Unlock() + + if demux.devMap[dev] == nil { + demux.devMap[dev] = make(map[chan Event]struct{}) + } + + demux.devMap[dev][eventChan] = struct{}{} +} + +func (demux *eventDemux) removeDevice(dev uint32, eventChan chan Event) { + demux.devMapLock.Lock() + defer demux.devMapLock.Unlock() + + delete(demux.devMap[dev], eventChan) + + if demux.devMap[dev] == nil { + delete(demux.devMap, dev) + } +} + +func (demux *eventDemux) run() error { + for { + record, err := demux.rd.Read() + if err != nil { + if errors.Is(err, ringbuf.ErrClosed) { + return nil + } + return fmt.Errorf("reading ringbuf: %w", err) + } + + var entry Event + err = binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &entry) + if err != nil { + return fmt.Errorf("decoding btrfswatch event: %w", err) + } + + demux.devMapLock.RLock() + + chans, ok := demux.devMap[entry.DevId] + if !ok { + return fmt.Errorf("devid %d does not exist", entry.DevId) + } + + for c := range chans { + // don't block other streams while waiting for one + select { + case c <- entry: + default: + log. + WithField("device", entry.DevId). + Warn("dropping message") + } + } + + demux.devMapLock.RUnlock() + } +} + +func (demux *eventDemux) close() error { + return demux.rd.Close() +} + +func (evtrdr *EventReader) Read() (*Event, error) { + event, ok := <-evtrdr.eventChan + if !ok { + return nil, errors.New("reader is closed") + } + + return &event, nil +} + +func (evtrdr *EventReader) Close() error { + evtrdr.mgr.UnregisterDevice(evtrdr.dev, evtrdr.eventChan) + close(evtrdr.eventChan) + return nil +} diff --git a/bdwatch.c b/btrfswatch/btrfswatch.c similarity index 100% rename from bdwatch.c rename to btrfswatch/btrfswatch.c diff --git a/btrfswatch/btrfswatch_x86_bpfel.go b/btrfswatch/btrfswatch_x86_bpfel.go new file mode 100644 index 0000000..826f549 --- /dev/null +++ b/btrfswatch/btrfswatch_x86_bpfel.go @@ -0,0 +1,135 @@ +// Code generated by bpf2go; DO NOT EDIT. +//go:build 386 || amd64 + +package btrfswatch + +import ( + "bytes" + _ "embed" + "fmt" + "io" + + "github.com/cilium/ebpf" +) + +type btrfswatchEvent struct { + Fsid [16]uint8 + Label [256]int8 + DevId uint32 + Ret int32 +} + +// loadBtrfswatch returns the embedded CollectionSpec for btrfswatch. +func loadBtrfswatch() (*ebpf.CollectionSpec, error) { + reader := bytes.NewReader(_BtrfswatchBytes) + spec, err := ebpf.LoadCollectionSpecFromReader(reader) + if err != nil { + return nil, fmt.Errorf("can't load btrfswatch: %w", err) + } + + return spec, err +} + +// loadBtrfswatchObjects loads btrfswatch and converts it into a struct. +// +// The following types are suitable as obj argument: +// +// *btrfswatchObjects +// *btrfswatchPrograms +// *btrfswatchMaps +// +// See ebpf.CollectionSpec.LoadAndAssign documentation for details. +func loadBtrfswatchObjects(obj interface{}, opts *ebpf.CollectionOptions) error { + spec, err := loadBtrfswatch() + if err != nil { + return err + } + + return spec.LoadAndAssign(obj, opts) +} + +// btrfswatchSpecs contains maps and programs before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type btrfswatchSpecs struct { + btrfswatchProgramSpecs + btrfswatchMapSpecs +} + +// btrfswatchSpecs contains programs before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type btrfswatchProgramSpecs struct { + BtrfsRecoverLogTrees *ebpf.ProgramSpec `ebpf:"btrfs_recover_log_trees"` + BtrfsRecoverLogTreesExit *ebpf.ProgramSpec `ebpf:"btrfs_recover_log_trees_exit"` +} + +// btrfswatchMapSpecs contains maps before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type btrfswatchMapSpecs struct { + BtrfsRecoverLogTreesErrors *ebpf.MapSpec `ebpf:"btrfs_recover_log_trees_errors"` + PendingCalls *ebpf.MapSpec `ebpf:"pending_calls"` + RegisteredDevices *ebpf.MapSpec `ebpf:"registered_devices"` +} + +// btrfswatchObjects contains all objects after they have been loaded into the kernel. +// +// It can be passed to loadBtrfswatchObjects or ebpf.CollectionSpec.LoadAndAssign. +type btrfswatchObjects struct { + btrfswatchPrograms + btrfswatchMaps +} + +func (o *btrfswatchObjects) Close() error { + return _BtrfswatchClose( + &o.btrfswatchPrograms, + &o.btrfswatchMaps, + ) +} + +// btrfswatchMaps contains all maps after they have been loaded into the kernel. +// +// It can be passed to loadBtrfswatchObjects or ebpf.CollectionSpec.LoadAndAssign. +type btrfswatchMaps struct { + BtrfsRecoverLogTreesErrors *ebpf.Map `ebpf:"btrfs_recover_log_trees_errors"` + PendingCalls *ebpf.Map `ebpf:"pending_calls"` + RegisteredDevices *ebpf.Map `ebpf:"registered_devices"` +} + +func (m *btrfswatchMaps) Close() error { + return _BtrfswatchClose( + m.BtrfsRecoverLogTreesErrors, + m.PendingCalls, + m.RegisteredDevices, + ) +} + +// btrfswatchPrograms contains all programs after they have been loaded into the kernel. +// +// It can be passed to loadBtrfswatchObjects or ebpf.CollectionSpec.LoadAndAssign. +type btrfswatchPrograms struct { + BtrfsRecoverLogTrees *ebpf.Program `ebpf:"btrfs_recover_log_trees"` + BtrfsRecoverLogTreesExit *ebpf.Program `ebpf:"btrfs_recover_log_trees_exit"` +} + +func (p *btrfswatchPrograms) Close() error { + return _BtrfswatchClose( + p.BtrfsRecoverLogTrees, + p.BtrfsRecoverLogTreesExit, + ) +} + +func _BtrfswatchClose(closers ...io.Closer) error { + for _, closer := range closers { + if err := closer.Close(); err != nil { + return err + } + } + return nil +} + +// Do not access this directly. +// +//go:embed btrfswatch_x86_bpfel.o +var _BtrfswatchBytes []byte diff --git a/bdwatch_x86_bpfel.o b/btrfswatch/btrfswatch_x86_bpfel.o similarity index 56% rename from bdwatch_x86_bpfel.o rename to btrfswatch/btrfswatch_x86_bpfel.o index 9bfa762..10b200c 100644 Binary files a/bdwatch_x86_bpfel.o and b/btrfswatch/btrfswatch_x86_bpfel.o differ diff --git a/ebpf/ebpf.pb.go b/ebpf/ebpf.pb.go new file mode 100644 index 0000000..a036da9 --- /dev/null +++ b/ebpf/ebpf.pb.go @@ -0,0 +1,252 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.34.1 +// protoc v4.24.4 +// source: ebpf/ebpf.proto + +package ebpf + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type MonitorBtrfsRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Major uint32 `protobuf:"varint,1,opt,name=major,proto3" json:"major,omitempty"` + Minor uint32 `protobuf:"varint,2,opt,name=minor,proto3" json:"minor,omitempty"` +} + +func (x *MonitorBtrfsRequest) Reset() { + *x = MonitorBtrfsRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_ebpf_ebpf_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MonitorBtrfsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MonitorBtrfsRequest) ProtoMessage() {} + +func (x *MonitorBtrfsRequest) ProtoReflect() protoreflect.Message { + mi := &file_ebpf_ebpf_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MonitorBtrfsRequest.ProtoReflect.Descriptor instead. +func (*MonitorBtrfsRequest) Descriptor() ([]byte, []int) { + return file_ebpf_ebpf_proto_rawDescGZIP(), []int{0} +} + +func (x *MonitorBtrfsRequest) GetMajor() uint32 { + if x != nil { + return x.Major + } + return 0 +} + +func (x *MonitorBtrfsRequest) GetMinor() uint32 { + if x != nil { + return x.Minor + } + return 0 +} + +type MonitorBtrfsResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Major uint32 `protobuf:"varint,1,opt,name=major,proto3" json:"major,omitempty"` + Minor uint32 `protobuf:"varint,2,opt,name=minor,proto3" json:"minor,omitempty"` + Uuid string `protobuf:"bytes,3,opt,name=uuid,proto3" json:"uuid,omitempty"` + Ret int32 `protobuf:"varint,4,opt,name=ret,proto3" json:"ret,omitempty"` +} + +func (x *MonitorBtrfsResponse) Reset() { + *x = MonitorBtrfsResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_ebpf_ebpf_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MonitorBtrfsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MonitorBtrfsResponse) ProtoMessage() {} + +func (x *MonitorBtrfsResponse) ProtoReflect() protoreflect.Message { + mi := &file_ebpf_ebpf_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MonitorBtrfsResponse.ProtoReflect.Descriptor instead. +func (*MonitorBtrfsResponse) Descriptor() ([]byte, []int) { + return file_ebpf_ebpf_proto_rawDescGZIP(), []int{1} +} + +func (x *MonitorBtrfsResponse) GetMajor() uint32 { + if x != nil { + return x.Major + } + return 0 +} + +func (x *MonitorBtrfsResponse) GetMinor() uint32 { + if x != nil { + return x.Minor + } + return 0 +} + +func (x *MonitorBtrfsResponse) GetUuid() string { + if x != nil { + return x.Uuid + } + return "" +} + +func (x *MonitorBtrfsResponse) GetRet() int32 { + if x != nil { + return x.Ret + } + return 0 +} + +var File_ebpf_ebpf_proto protoreflect.FileDescriptor + +var file_ebpf_ebpf_proto_rawDesc = []byte{ + 0x0a, 0x0f, 0x65, 0x62, 0x70, 0x66, 0x2f, 0x65, 0x62, 0x70, 0x66, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x12, 0x0b, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x74, 0x2e, 0x65, 0x62, 0x70, 0x66, 0x22, 0x41, + 0x0a, 0x13, 0x4d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x42, 0x74, 0x72, 0x66, 0x73, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x6d, 0x61, 0x6a, 0x6f, 0x72, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x6d, 0x61, 0x6a, 0x6f, 0x72, 0x12, 0x14, 0x0a, 0x05, 0x6d, + 0x69, 0x6e, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x6d, 0x69, 0x6e, 0x6f, + 0x72, 0x22, 0x68, 0x0a, 0x14, 0x4d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x42, 0x74, 0x72, 0x66, + 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x6d, 0x61, 0x6a, + 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x6d, 0x61, 0x6a, 0x6f, 0x72, 0x12, + 0x14, 0x0a, 0x05, 0x6d, 0x69, 0x6e, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, + 0x6d, 0x69, 0x6e, 0x6f, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x75, 0x75, 0x69, 0x64, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x04, 0x75, 0x75, 0x69, 0x64, 0x12, 0x10, 0x0a, 0x03, 0x72, 0x65, 0x74, + 0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, 0x72, 0x65, 0x74, 0x32, 0x5d, 0x0a, 0x04, 0x45, + 0x62, 0x70, 0x66, 0x12, 0x55, 0x0a, 0x0c, 0x4d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x42, 0x74, + 0x72, 0x66, 0x73, 0x12, 0x20, 0x2e, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x74, 0x2e, 0x65, 0x62, 0x70, + 0x66, 0x2e, 0x4d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x42, 0x74, 0x72, 0x66, 0x73, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, 0x2e, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x74, 0x2e, 0x65, + 0x62, 0x70, 0x66, 0x2e, 0x4d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x42, 0x74, 0x72, 0x66, 0x73, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x42, 0x24, 0x5a, 0x22, 0x67, 0x69, + 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x74, 0x2f, + 0x72, 0x65, 0x70, 0x6c, 0x69, 0x74, 0x2d, 0x65, 0x62, 0x70, 0x66, 0x2f, 0x65, 0x62, 0x70, 0x66, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_ebpf_ebpf_proto_rawDescOnce sync.Once + file_ebpf_ebpf_proto_rawDescData = file_ebpf_ebpf_proto_rawDesc +) + +func file_ebpf_ebpf_proto_rawDescGZIP() []byte { + file_ebpf_ebpf_proto_rawDescOnce.Do(func() { + file_ebpf_ebpf_proto_rawDescData = protoimpl.X.CompressGZIP(file_ebpf_ebpf_proto_rawDescData) + }) + return file_ebpf_ebpf_proto_rawDescData +} + +var file_ebpf_ebpf_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_ebpf_ebpf_proto_goTypes = []interface{}{ + (*MonitorBtrfsRequest)(nil), // 0: replit.ebpf.MonitorBtrfsRequest + (*MonitorBtrfsResponse)(nil), // 1: replit.ebpf.MonitorBtrfsResponse +} +var file_ebpf_ebpf_proto_depIdxs = []int32{ + 0, // 0: replit.ebpf.Ebpf.MonitorBtrfs:input_type -> replit.ebpf.MonitorBtrfsRequest + 1, // 1: replit.ebpf.Ebpf.MonitorBtrfs:output_type -> replit.ebpf.MonitorBtrfsResponse + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_ebpf_ebpf_proto_init() } +func file_ebpf_ebpf_proto_init() { + if File_ebpf_ebpf_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_ebpf_ebpf_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*MonitorBtrfsRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_ebpf_ebpf_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*MonitorBtrfsResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_ebpf_ebpf_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_ebpf_ebpf_proto_goTypes, + DependencyIndexes: file_ebpf_ebpf_proto_depIdxs, + MessageInfos: file_ebpf_ebpf_proto_msgTypes, + }.Build() + File_ebpf_ebpf_proto = out.File + file_ebpf_ebpf_proto_rawDesc = nil + file_ebpf_ebpf_proto_goTypes = nil + file_ebpf_ebpf_proto_depIdxs = nil +} diff --git a/ebpf/ebpf.proto b/ebpf/ebpf.proto new file mode 100644 index 0000000..f5db31f --- /dev/null +++ b/ebpf/ebpf.proto @@ -0,0 +1,22 @@ +syntax = "proto3"; + +package replit.ebpf; +option go_package = "github.com/replit/replit-ebpf/ebpf"; + +message MonitorBtrfsRequest { + uint32 major = 1; + uint32 minor = 2; +} + +message MonitorBtrfsResponse { + uint32 major = 1; + uint32 minor = 2; + string uuid = 3; + int32 ret = 4; +} + +// The Ebpf service allows interacting with running eBPF programs. +service Ebpf { + // MonitorBtrfs watches the given device for failed mounts due to corruption in the btrfs log tree. + rpc MonitorBtrfs(MonitorBtrfsRequest) returns (stream MonitorBtrfsResponse); +} diff --git a/ebpf/ebpf_grpc.pb.go b/ebpf/ebpf_grpc.pb.go new file mode 100644 index 0000000..5fd753f --- /dev/null +++ b/ebpf/ebpf_grpc.pb.go @@ -0,0 +1,138 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.3.0 +// - protoc v4.24.4 +// source: ebpf/ebpf.proto + +package ebpf + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +const ( + Ebpf_MonitorBtrfs_FullMethodName = "/replit.ebpf.Ebpf/MonitorBtrfs" +) + +// EbpfClient is the client API for Ebpf service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type EbpfClient interface { + // MonitorBtrfs watches the given device for failed mounts due to corruption in the btrfs log tree. + MonitorBtrfs(ctx context.Context, in *MonitorBtrfsRequest, opts ...grpc.CallOption) (Ebpf_MonitorBtrfsClient, error) +} + +type ebpfClient struct { + cc grpc.ClientConnInterface +} + +func NewEbpfClient(cc grpc.ClientConnInterface) EbpfClient { + return &ebpfClient{cc} +} + +func (c *ebpfClient) MonitorBtrfs(ctx context.Context, in *MonitorBtrfsRequest, opts ...grpc.CallOption) (Ebpf_MonitorBtrfsClient, error) { + stream, err := c.cc.NewStream(ctx, &Ebpf_ServiceDesc.Streams[0], Ebpf_MonitorBtrfs_FullMethodName, opts...) + if err != nil { + return nil, err + } + x := &ebpfMonitorBtrfsClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Ebpf_MonitorBtrfsClient interface { + Recv() (*MonitorBtrfsResponse, error) + grpc.ClientStream +} + +type ebpfMonitorBtrfsClient struct { + grpc.ClientStream +} + +func (x *ebpfMonitorBtrfsClient) Recv() (*MonitorBtrfsResponse, error) { + m := new(MonitorBtrfsResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// EbpfServer is the server API for Ebpf service. +// All implementations must embed UnimplementedEbpfServer +// for forward compatibility +type EbpfServer interface { + // MonitorBtrfs watches the given device for failed mounts due to corruption in the btrfs log tree. + MonitorBtrfs(*MonitorBtrfsRequest, Ebpf_MonitorBtrfsServer) error + mustEmbedUnimplementedEbpfServer() +} + +// UnimplementedEbpfServer must be embedded to have forward compatible implementations. +type UnimplementedEbpfServer struct { +} + +func (UnimplementedEbpfServer) MonitorBtrfs(*MonitorBtrfsRequest, Ebpf_MonitorBtrfsServer) error { + return status.Errorf(codes.Unimplemented, "method MonitorBtrfs not implemented") +} +func (UnimplementedEbpfServer) mustEmbedUnimplementedEbpfServer() {} + +// UnsafeEbpfServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to EbpfServer will +// result in compilation errors. +type UnsafeEbpfServer interface { + mustEmbedUnimplementedEbpfServer() +} + +func RegisterEbpfServer(s grpc.ServiceRegistrar, srv EbpfServer) { + s.RegisterService(&Ebpf_ServiceDesc, srv) +} + +func _Ebpf_MonitorBtrfs_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(MonitorBtrfsRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(EbpfServer).MonitorBtrfs(m, &ebpfMonitorBtrfsServer{stream}) +} + +type Ebpf_MonitorBtrfsServer interface { + Send(*MonitorBtrfsResponse) error + grpc.ServerStream +} + +type ebpfMonitorBtrfsServer struct { + grpc.ServerStream +} + +func (x *ebpfMonitorBtrfsServer) Send(m *MonitorBtrfsResponse) error { + return x.ServerStream.SendMsg(m) +} + +// Ebpf_ServiceDesc is the grpc.ServiceDesc for Ebpf service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Ebpf_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "replit.ebpf.Ebpf", + HandlerType: (*EbpfServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "MonitorBtrfs", + Handler: _Ebpf_MonitorBtrfs_Handler, + ServerStreams: true, + }, + }, + Metadata: "ebpf/ebpf.proto", +} diff --git a/ebpf/service.go b/ebpf/service.go new file mode 100644 index 0000000..58d246b --- /dev/null +++ b/ebpf/service.go @@ -0,0 +1,82 @@ +// Package ebpf defines a gRPC service that exposes eBPF functionality defined in other packages. +package ebpf + +import ( + "errors" + "fmt" + + "github.com/google/uuid" + + "github.com/replit/replit-ebpf/btrfswatch" +) + +// Service is an implementation of ebpfpb.EbpfServer. +type Service struct { + UnimplementedEbpfServer + + btrfswatchMgr *btrfswatch.Manager +} + +// ServiceOpts are the options that can be provided to NewService. +type ServiceOpts struct { + BtrfswatchMgr *btrfswatch.Manager +} + +// NewService returns a new instance of an Ebpf service. +func NewService(opts ServiceOpts) (*Service, error) { + return &Service{ + btrfswatchMgr: opts.BtrfswatchMgr, + }, nil +} + +func (s *Service) MonitorBtrfs( + r *MonitorBtrfsRequest, + stream Ebpf_MonitorBtrfsServer, +) error { + dev := (r.Major << 20) | r.Minor + rd, err := s.btrfswatchMgr.RegisterDevice(dev) + if err != nil { + return fmt.Errorf("registering device: %w", err) + } + + go func() { + <-stream.Context().Done() + rd.Close() + }() + + for { + select { + case <-stream.Context().Done(): + return errors.New("stream context canceled") + default: + } + + entry, err := rd.Read() + if err != nil { + return fmt.Errorf("reading event: %w", err) + } + + id, err := uuid.FromBytes(entry.Fsid[:]) + if err != nil { + return fmt.Errorf("decoding fsid: %w", err) + } + + err = stream.Send(&MonitorBtrfsResponse{ + Major: major(entry.DevId), + Minor: minor(entry.DevId), + Uuid: id.String(), + Ret: entry.Ret, + }) + if err != nil { + return fmt.Errorf("stream send: %w", err) + } + } +} + +func major(dev uint32) uint32 { + return dev >> 20 +} + +func minor(dev uint32) uint32 { + return dev & ((1 << 20) - 1) +} diff --git a/flake.nix b/flake.nix index f522fb5..51713e8 100644 --- a/flake.nix +++ b/flake.nix @@ -15,12 +15,15 @@ pname = "replit-ebpf"; version = "0.0.01"; src = ./.; - vendorHash = "sha256-tKRFrGu0rI6UdMIZ1mBjpLP1mZtASleIcllKcvc/gPE="; + vendorHash = "sha256-bq2FEysBTvn+SHw+tUksUCXu+AMGJ93g+L8L7v6Kdjo="; buildInputs = [ pkgs.makeWrapper ]; + + # integration tests require a local corrupted disk + doCheck = false; }; devShell = pkgs.mkShell { - buildInputs = with pkgs; [ go gopls llvm libbpf ]; + buildInputs = with pkgs; [ go gopls llvm libbpf protobuf protoc-gen-go protoc-gen-go-grpc ]; }; }); } diff --git a/go.mod b/go.mod index 1a3b0de..cf71ced 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,19 @@ go 1.22.6 require ( github.com/cilium/ebpf v0.16.0 github.com/google/uuid v1.6.0 + github.com/sirupsen/logrus v1.9.3 + github.com/stretchr/testify v1.7.0 golang.org/x/sys v0.25.0 + google.golang.org/grpc v1.66.1 + google.golang.org/protobuf v1.34.1 ) -require golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2 // indirect +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2 // indirect + golang.org/x/net v0.26.0 // indirect + golang.org/x/text v0.16.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117 // indirect + gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect +) diff --git a/go.sum b/go.sum index 6f99afc..9a1acd8 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,53 @@ github.com/cilium/ebpf v0.16.0 h1:+BiEnHL6Z7lXnlGUsXQPPAE7+kenAd4ES8MQ5min0Ok= github.com/cilium/ebpf v0.16.0/go.mod h1:L7u2Blt2jMM/vLAVgjxluxtBKlz3/GWjB0dMOEngfwE= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-quicktest/qt v1.101.0 h1:O1K29Txy5P2OK0dGo59b7b0LR6wKfIhttaAhHUyn7eI= +github.com/go-quicktest/qt v1.101.0/go.mod h1:14Bz/f7NwaXPtdYEgzsx46kqSxVwTbzVZsDC26tQJow= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/josharian/native v1.1.0 h1:uuaP0hAbW7Y4l0ZRQ6C9zfb7Mg1mbFKry/xzDAfmtLA= +github.com/josharian/native v1.1.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w= +github.com/jsimonetti/rtnetlink/v2 v2.0.1 h1:xda7qaHDSVOsADNouv7ukSuicKZO7GgVUCXxpaIEIlM= +github.com/jsimonetti/rtnetlink/v2 v2.0.1/go.mod h1:7MoNYNbb3UaDHtF8udiJo/RH6VsTKP1pqKLUTVCvToE= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mdlayher/netlink v1.7.2 h1:/UtM3ofJap7Vl4QWCPDGXY8d3GIY2UGSDbK+QWmY8/g= +github.com/mdlayher/netlink v1.7.2/go.mod h1:xraEF7uJbxLhc5fpHL4cPe221LI2bdttWlU+ZGLfQSw= +github.com/mdlayher/socket v0.4.1 h1:eM9y2/jlbs1M615oshPQOHZzj6R6wMT7bX5NPiQvn2U= +github.com/mdlayher/socket v0.4.1/go.mod h1:cAqeGjoufqdxWkD7DkpyS+wcefOtmu5OQ8KuoJGIReA= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= +github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2 h1:Jvc7gsqn21cJHCmAWx0LiimpP18LZmUxkT5Mp7EZ1mI= golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= +golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= +golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117 h1:1GBuWVLM/KMVUv1t1En5Gs+gFZCNd360GGb4sSxtrhU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0= +google.golang.org/grpc v1.66.1 h1:hO5qAXR19+/Z44hmvIM4dQFMSYX9XcWsByfoxutBpAM= +google.golang.org/grpc v1.66.1/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y= +google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= +google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/integration_tests/ebpf_test.go b/integration_tests/ebpf_test.go new file mode 100644 index 0000000..d064030 --- /dev/null +++ b/integration_tests/ebpf_test.go @@ -0,0 +1,376 @@ +package main + +import ( + "context" + "fmt" + "log" + "net" + "os" + "os/exec" + "path" + "strings" + "sync" + "syscall" + "testing" + + "github.com/replit/replit-ebpf/btrfswatch" + ebpfpb "github.com/replit/replit-ebpf/ebpf" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/sys/unix" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +const ( + socketName = "ebpf_test.sock" + diskPath = "disk.img" +) + +var ( + client ebpfpb.EbpfClient + + tmpDir string + + devPath1 string + devPath2 string + dev1 uint32 + dev2 uint32 +) + +func TestMain(m *testing.M) { + // os.Exit does not run defers, so we put TestMain's implementation in + // a different function so defers are ran before we exit. + os.Exit(testMain(m)) +} + +func testMain(m *testing.M) int { + var err error + tmpDir, err = os.MkdirTemp("", "ebpf-test") + if err != nil { + log.Fatal("Making temp dir:", err) + } + defer os.RemoveAll(tmpDir) + + mgr, err := btrfswatch.NewManager() + if err != nil { + log.Fatal("Initializing btrfswatch:", err) + } + + ebpfService, err := ebpfpb.NewService(ebpfpb.ServiceOpts{ + BtrfswatchMgr: mgr, + }) + if err != nil { + log.Fatal("Starting eBPF gRPC service:", err) + } + + socketPath := path.Join(tmpDir, socketName) + listener, err := net.Listen("unix", socketPath) + if err != nil { + log.Fatal("Listen on unix socket:", err) + } + defer listener.Close() + + grpcS := grpc.NewServer() + ebpfpb.RegisterEbpfServer(grpcS, ebpfService) + + go grpcS.Serve(listener) + + conn, err := grpc.Dial( + socketPath, + grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) { + var dialer net.Dialer + return dialer.DialContext(ctx, "unix", addr) + }), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + log.Fatal("Creating gRPC client:", err) + } + defer conn.Close() + + client = ebpfpb.NewEbpfClient(conn) + + { + cleanup, err := setupDevice(&devPath1, &dev1) + if err != nil { + log.Fatal("setupDevice:", err) + } + defer cleanup() + } + { + cleanup, err := setupDevice(&devPath2, &dev2) + if err != nil { + log.Fatal("setupDevice:", err) + } + defer cleanup() + } + + return m.Run() +} + +func setupDevice(path *string, dev *uint32) (func() error, error) { + out, err := exec. + Command("sudo", "losetup", "-f", "--show", diskPath). + CombinedOutput() + if err != nil { + return nil, fmt.Errorf("losetup %s: %w", string(out), err) + } + + *path = strings.TrimSpace(string(out)) + + stat, err := os.Stat(*path) + if err != nil { + return nil, fmt.Errorf("stat: %w", err) + } + rdev := stat.Sys().(*syscall.Stat_t).Rdev + *dev = convertDevice(rdev) + + return func() error { + return exec.Command("sudo", "losetup", "-d", *path).Run() + }, nil +} + +func TestBasic(t *testing.T) { + ctx := context.Background() + + devMajor := major(dev1) + devMinor := minor(dev1) + + stream, err := client.MonitorBtrfs(ctx, &ebpfpb.MonitorBtrfsRequest{ + Major: devMajor, + Minor: devMinor, + }) + require.NoError(t, err) + + dir, err := os.MkdirTemp("", t.Name()) + require.NoError(t, err) + defer os.RemoveAll(dir) + + // this should error since the disk is corrupted + err = exec.Command("mount", devPath1, dir).Run() + require.Error(t, err) + + response, err := stream.Recv() + require.NoError(t, err) + + assert.Equal(t, response.Major, devMajor) + assert.Equal(t, response.Minor, devMinor) +} + +func TestDemux(t *testing.T) { + ctx := context.Background() + + devMajor1 := major(dev1) + devMinor1 := minor(dev1) + + devMajor2 := major(dev2) + devMinor2 := minor(dev2) + + stream1, err := client.MonitorBtrfs(ctx, &ebpfpb.MonitorBtrfsRequest{ + Major: devMajor1, + Minor: devMinor1, + }) + require.NoError(t, err) + + stream2, err := client.MonitorBtrfs(ctx, &ebpfpb.MonitorBtrfsRequest{ + Major: devMajor2, + Minor: devMinor2, + }) + + dir, err := os.MkdirTemp("", t.Name()) + require.NoError(t, err) + defer os.RemoveAll(dir) + + // this should error since the disk is corrupted + err = exec.Command("mount", devPath2, dir).Run() + require.Error(t, err) + + // this should error since the disk is corrupted + err = exec.Command("mount", devPath1, dir).Run() + require.Error(t, err) + + { + response, err := stream1.Recv() + require.NoError(t, err) + + assert.Equal(t, response.Major, devMajor1) + assert.Equal(t, response.Minor, devMinor1) + } + + { + response, err := stream2.Recv() + require.NoError(t, err) + + assert.Equal(t, response.Major, devMajor2) + assert.Equal(t, response.Minor, devMinor2) + } +} + +func TestMultipleSubsPerDevice(t *testing.T) { + ctx := context.Background() + + devMajor := major(dev1) + devMinor := minor(dev1) + + stream1, err := client.MonitorBtrfs(ctx, &ebpfpb.MonitorBtrfsRequest{ + Major: devMajor, + Minor: devMinor, + }) + require.NoError(t, err) + + stream2, err := client.MonitorBtrfs(ctx, &ebpfpb.MonitorBtrfsRequest{ + Major: devMajor, + Minor: devMinor, + }) + + dir, err := os.MkdirTemp("", t.Name()) + require.NoError(t, err) + defer os.RemoveAll(dir) + + // this should error since the disk is corrupted + err = exec.Command("mount", devPath1, dir).Run() + require.Error(t, err) + + // both streams should receive despite only generating one message + { + response, err := stream1.Recv() + require.NoError(t, err) + + assert.Equal(t, response.Major, devMajor) + assert.Equal(t, response.Minor, devMinor) + } + + { + response, err := stream2.Recv() + require.NoError(t, err) + + assert.Equal(t, response.Major, devMajor) + assert.Equal(t, response.Minor, devMinor) + } +} + +func TestMultipleMessages(t *testing.T) { + ctx := context.Background() + + devMajor := major(dev1) + devMinor := minor(dev1) + + stream, err := client.MonitorBtrfs(ctx, &ebpfpb.MonitorBtrfsRequest{ + Major: devMajor, + Minor: devMinor, + }) + require.NoError(t, err) + + dir, err := os.MkdirTemp("", t.Name()) + require.NoError(t, err) + defer os.RemoveAll(dir) + + iters := 10 + doneChan := make(chan struct{}) + go func() { + defer close(doneChan) + + for range iters { + // this should error since the disk is corrupted + err := exec.Command("mount", devPath1, dir).Run() + require.Error(t, err) + } + }() + + for range iters { + response, err := stream.Recv() + require.NoError(t, err) + + assert.Equal(t, response.Major, devMajor) + assert.Equal(t, response.Minor, devMinor) + } + + <-doneChan +} + +func TestMultipleConcurrentStreams(t *testing.T) { + ctx := context.Background() + + devMajor1 := major(dev1) + devMinor1 := minor(dev1) + + devMajor2 := major(dev2) + devMinor2 := minor(dev2) + + stream1, err := client.MonitorBtrfs(ctx, &ebpfpb.MonitorBtrfsRequest{ + Major: devMajor1, + Minor: devMinor1, + }) + require.NoError(t, err) + + stream2, err := client.MonitorBtrfs(ctx, &ebpfpb.MonitorBtrfsRequest{ + Major: devMajor2, + Minor: devMinor2, + }) + require.NoError(t, err) + + var wg sync.WaitGroup + + dir, err := os.MkdirTemp("", t.Name()) + require.NoError(t, err) + defer os.RemoveAll(dir) + + iters := 10 + wg.Add(1) + go func() { + defer wg.Done() + + for range iters { + // this should error since the disk is corrupted + err := exec.Command("mount", devPath1, dir).Run() + require.Error(t, err) + + // this should error since the disk is corrupted + err = exec.Command("mount", devPath2, dir).Run() + require.Error(t, err) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + + for range iters { + response, err := stream1.Recv() + require.NoError(t, err) + + assert.Equal(t, response.Major, devMajor1) + assert.Equal(t, response.Minor, devMinor1) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + + for range iters { + response, err := stream2.Recv() + require.NoError(t, err) + + assert.Equal(t, response.Major, devMajor2) + assert.Equal(t, response.Minor, devMinor2) + } + }() + + wg.Wait() +} + +func major(dev uint32) uint32 { + return dev >> 20 +} + +func minor(dev uint32) uint32 { + return dev & ((1 << 20) - 1) +} + +func convertDevice(dev uint64) uint32 { + minor := uint32(unix.Minor(dev)) + major := uint32(unix.Major(dev)) + return (major << 20) | minor +} diff --git a/main.go b/main.go index 331eacd..0117f9b 100644 --- a/main.go +++ b/main.go @@ -1,127 +1,61 @@ package main -//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -target amd64 bdwatch bdwatch.c - import ( - "bytes" - "encoding/binary" - "errors" "flag" - "fmt" - "log" + "net" "os" "os/signal" - "syscall" - "github.com/cilium/ebpf/link" - "github.com/cilium/ebpf/ringbuf" - "github.com/cilium/ebpf/rlimit" - "github.com/google/uuid" - "golang.org/x/sys/unix" + log "github.com/sirupsen/logrus" + "google.golang.org/grpc" + + "github.com/replit/replit-ebpf/btrfswatch" + ebpfpb "github.com/replit/replit-ebpf/ebpf" ) func main() { - device := flag.String("device", "", "device to watch") + socketName := flag.String("socket-name", "/run/conman/conkid/ebpf.sock", "unix socket to listen on") + logJSON := flag.Bool("log-json", false, "format log messages as JSON") flag.Parse() - // Remove resource limits for kernels <5.11. - if err := rlimit.RemoveMemlock(); err != nil { - log.Fatal("Removing memlock:", err) - } - - // Load the compiled eBPF ELF and load it into the kernel. - var objs bdwatchObjects - - if err := loadBdwatchObjects(&objs, nil); err != nil { - log.Fatal("Loading eBPF objects:", err) - } - defer objs.Close() - - // Attach count_packets to the network interface. - { - link, err := link.Kprobe("btrfs_recover_log_trees", objs.BtrfsRecoverLogTrees, &link.KprobeOptions{}) - if err != nil { - log.Fatal("Attaching kprobe:", err) - } - defer link.Close() + if *logJSON { + log.SetFormatter(&log.JSONFormatter{}) + } else { + log.SetFormatter(&log.TextFormatter{ForceColors: true}) } - { - link, err := link.Kretprobe("btrfs_recover_log_trees", objs.BtrfsRecoverLogTreesExit, &link.KprobeOptions{}) - if err != nil { - log.Fatal("Attaching kretprobe:", err) - } - defer link.Close() + mgr, err := btrfswatch.NewManager() + if err != nil { + log.WithError(err).Fatal("Initializing btrfswatch") } - log.Println("Waiting...") + log.Infof("Listening at %s...", *socketName) // exit the program when interrupted. stop := make(chan os.Signal, 5) signal.Notify(stop, os.Interrupt) - stat, err := os.Stat(*device) + ebpfService, err := ebpfpb.NewService(ebpfpb.ServiceOpts{ + BtrfswatchMgr: mgr, + }) if err != nil { - panic(err) - } - dev := stat.Sys().(*syscall.Stat_t).Rdev - fmt.Printf("dev %d %d:%d\n", dev, unix.Major(dev), unix.Minor(dev)) - err = objs.bdwatchMaps.RegisteredDevices.Put(convertDevice(stat.Sys().(*syscall.Stat_t).Rdev), true) - if err != nil { - panic(err) + log.WithError(err).Fatal("Starting eBPF gRPC service") } - rd, err := ringbuf.NewReader(objs.BtrfsRecoverLogTreesErrors) + listener, err := net.Listen("unix", *socketName) if err != nil { - panic(err) + log.WithError(err).Fatal("Listen on unix socket") } + defer listener.Close() - go func() { - <-stop - rd.Close() - }() + grpcS := grpc.NewServer() + ebpfpb.RegisterEbpfServer(grpcS, ebpfService) - var entry bdwatchEvent - for { - select { - case <-stop: - return - default: - } - record, err := rd.Read() - if err != nil { - if errors.Is(err, ringbuf.ErrClosed) { - return - } - panic(err) - } + go grpcS.Serve(listener) - err = binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &entry) - if err != nil { - panic(err) - } + <-stop - id, err := uuid.FromBytes(entry.Fsid[:]) - if err != nil { - panic(err) - } - log.Printf("Major:%d Minor:%d UUID:%s Ret: %d", - major(entry.DevId), - minor(entry.DevId), - id, - entry.Ret, - ) - } -} - -func major(dev uint32) uint32 { - return dev >> 20 -} -func minor(dev uint32) uint32 { - return dev & ((1 << 20) - 1) -} -func convertDevice(dev uint64) uint32 { - minor := uint32(unix.Minor(dev)) - major := uint32(unix.Major(dev)) - return (major << 20) | minor + log.Infoln("Shutting down...") + grpcS.GracefulStop() + mgr.Close() }