Skip to content

Commit

Permalink
feat: support profile data compressed by agent
Browse files Browse the repository at this point in the history
  • Loading branch information
taloric committed Sep 11, 2024
1 parent 4b502df commit d173351
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 45 deletions.
20 changes: 11 additions & 9 deletions server/ingester/profile/dbwriter/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"time"

"github.com/google/gopacket/layers"
"github.com/pyroscope-io/pyroscope/pkg/storage"

basecommon "github.com/deepflowio/deepflow/server/ingester/common"
"github.com/deepflowio/deepflow/server/ingester/flow_tag"
Expand All @@ -40,6 +39,7 @@ const (
LabelAppService = "app_service"
LabelAppInstance = "app_instance"
LabelLanguageType = "profile_language_type"
LabelProfileID = "profile_id"
)

var InProcessCounter uint32
Expand Down Expand Up @@ -299,7 +299,9 @@ func (p *InProcessProfile) Clone() *InProcessProfile {
return c
}

func (p *InProcessProfile) FillProfile(input *storage.PutInput,
func (p *InProcessProfile) FillProfile(createTime time.Time,
profileUnit string,
profileLabels map[string]string,
platformData *grpc.PlatformInfoTable,
vtapID, orgId, teamId uint16,
podID uint32,
Expand All @@ -315,22 +317,22 @@ func (p *InProcessProfile) FillProfile(input *storage.PutInput,
tagNames []string,
tagValues []string) {

p.Time = uint32(input.StartTime.Unix())
p._id = genID(uint32(input.StartTime.UnixNano()/int64(time.Second)), &InProcessCounter, vtapID)
p.Time = uint32(createTime.Unix())
p._id = genID(uint32(createTime.UnixNano()/int64(time.Second)), &InProcessCounter, vtapID)
p.VtapID = vtapID
p.PodID = podID
p.AppService = profileName
p.ProfileLocationStr = location
p.CompressionAlgo = compressionAlgo
p.ProfileEventType = eventType
p.ProfileValue = self
p.ProfileValueUnit = input.Units.String()
p.ProfileCreateTimestamp = input.StartTime.UnixMicro()
p.ProfileValueUnit = profileUnit
p.ProfileCreateTimestamp = createTime.UnixMicro()
p.ProfileInTimestamp = inTimeStamp.UnixMicro()
p.ProfileLanguageType = languageType
p.ProfileID, _ = input.Key.ProfileID()
if input.Key.Labels() != nil {
p.SpanName = input.Key.Labels()[LabelSpanName]
if profileLabels != nil {
p.ProfileID = profileLabels[LabelProfileID]
p.SpanName = profileLabels[LabelSpanName]
}
// app_instance should upload by user with label, if empty use app_service
if p.AppInstance == "" {
Expand Down
75 changes: 61 additions & 14 deletions server/ingester/profile/decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
logging "github.com/op/go-logging"
"github.com/pyroscope-io/pyroscope/pkg/convert/jfr"
"github.com/pyroscope-io/pyroscope/pkg/convert/pprof"
pprofile "github.com/pyroscope-io/pyroscope/pkg/convert/profile"
"github.com/pyroscope-io/pyroscope/pkg/ingestion"
"github.com/pyroscope-io/pyroscope/pkg/storage/metadata"
"github.com/pyroscope-io/pyroscope/pkg/storage/segment"
Expand Down Expand Up @@ -91,6 +90,13 @@ var eBPFEventType = []string{
pb.ProfileEventType_EbpfMemInUse: "mem-inuse",
}

const (
// when agent(sender) compress data by `profile.DataCompress` flag, should parse _ZSTD_COMPRESS_FLAG to decompress
_ZSTD_COMPRESS_FLAG uint8 = 1 // 0x1
// when profiler(java-sdk) send data to agent with gzip compress, should parse _GZIP_COMPRESS_FLAG to decompress
_GZIP_COMPRESS_FLAG uint8 = 1 << 1 // 0x2
)

type Decoder struct {
index int
msgType datatype.MessageType
Expand All @@ -102,7 +108,8 @@ type Decoder struct {

offCpuSplittingGranularity int

orgId, teamId uint16
orgId, teamId uint16
decompressBuffer []byte

counter *Counter
utils.Closable
Expand Down Expand Up @@ -204,19 +211,20 @@ func (d *Decoder) handleProfileData(vtapID uint16, decoder *codec.SimpleDecoder)
}
copy(parser.IP, profile.Ip[:len(profile.Ip)])

// for jfr/pprof format, no matter compress or not, it requires decompress to parse profile data
switch profile.Format {
case "jfr":
atomic.AddInt64(&d.counter.JavaProfileCount, 1)
metadata := d.buildMetaData(profile)
parser.profileName = metadata.Key.AppName()
decompressJfr, err := profile_common.GzipDecompress(profile.Data)
if err != nil {
log.Errorf("decompress java profile data failed, offset=%d, len=%d, err=%s", decoder.Offset(), len(decoder.Bytes()), err)
return
compressFlag := _GZIP_COMPRESS_FLAG
if profile.DataCompressed {
compressFlag |= _ZSTD_COMPRESS_FLAG
}
err = d.sendProfileData(&jfr.RawProfile{
log.Debugf("decode java profile data, compression: %d, data: %v", compressFlag, profile.Data)
err := d.sendProfileData(&jfr.RawProfile{
FormDataContentType: string(profile.ContentType),
RawData: decompressJfr,
RawData: d.decompressData(profile.Data, compressFlag),
}, profile.Format, parser, metadata)

if err != nil {
Expand All @@ -227,9 +235,14 @@ func (d *Decoder) handleProfileData(vtapID uint16, decoder *codec.SimpleDecoder)
atomic.AddInt64(&d.counter.GolangProfileCount, 1)
metadata := d.buildMetaData(profile)
parser.profileName = metadata.Key.AppName()
var compressFlag uint8 = 0
if profile.DataCompressed {
compressFlag = _ZSTD_COMPRESS_FLAG
}
log.Debugf("decode golang profile data, compression: %d, data: %v", compressFlag, profile.Data)
err := d.sendProfileData(&pprof.RawProfile{
FormDataContentType: string(profile.ContentType),
RawData: profile.Data,
RawData: d.decompressData(profile.Data, compressFlag),
}, profile.Format, parser, metadata)
if err != nil {
log.Errorf("decode golang profile data failed, offset=%d, len=%d, err=%s", decoder.Offset(), len(decoder.Bytes()), err)
Expand All @@ -242,9 +255,14 @@ func (d *Decoder) handleProfileData(vtapID uint16, decoder *codec.SimpleDecoder)
atomic.AddInt64(&d.counter.GolangProfileCount, 1)
metadata := d.buildMetaData(profile)
parser.profileName = metadata.Key.AppName()
var compressFlag uint8 = 0
if profile.DataCompressed {
compressFlag = _ZSTD_COMPRESS_FLAG
}
log.Debugf("decode golang profile data, compression: %d, data: %v", compressFlag, profile.Data)
err := d.sendProfileData(&pprof.RawProfile{
FormDataContentType: string(profile.ContentType),
RawData: profile.Data,
RawData: d.decompressData(profile.Data, compressFlag),
StreamingParser: true,
PoolStreamingParser: true,
}, profile.Format, parser, metadata)
Expand All @@ -262,10 +280,17 @@ func (d *Decoder) handleProfileData(vtapID uint16, decoder *codec.SimpleDecoder)
// adapt agent version before v6.6
parser.processTracer.value = uint64(profile.Count)
}
err := d.sendProfileData(&pprofile.RawProfile{
Format: ingestion.FormatLines,
RawData: profile.Data,
}, profile.Format, parser, metadata)
// for ebpf profiling data, directly write, no need to parse
log.Debugf("decode ebpf profile data, compression: %d, data: %v", profile.DataCompressed, profile.Data)
err := parser.rawStackToInProcess(
profile.Data,
parser.value,
metadata.StartTime,
metadata.Units.String(),
metadata.SpyName,
metadata.Key.Labels(),
profile.DataCompressed,
)
if err != nil {
log.Errorf("decode ebpf profile data failed, offset=%d, len=%d, err=%s", decoder.Offset(), len(decoder.Bytes()), err)
return
Expand Down Expand Up @@ -329,6 +354,28 @@ func (d *Decoder) buildMetaData(profile *pb.Profile) ingestion.Metadata {
}
}

func (d *Decoder) decompressData(data []byte, compressFlag uint8) []byte {
// zstdCompress comes from agent-sender
if compressFlag&_ZSTD_COMPRESS_FLAG == _ZSTD_COMPRESS_FLAG {
var err error
data, err = profile_common.ZstdDecompress(d.decompressBuffer[:0], data)
if err != nil {
log.Errorf("decompress profile data failed, decompressType=%d, len=%d, err=%s", _ZSTD_COMPRESS_FLAG, len(data), err)
return data
}
}
// gzipCompress comes from application-profiler
if compressFlag&_GZIP_COMPRESS_FLAG == _GZIP_COMPRESS_FLAG {
var err error
data, err = profile_common.GzipDecompress(data)
if err != nil {
log.Errorf("decompress profile data failed, decompressType=%d, len=%d, err=%s", _GZIP_COMPRESS_FLAG, len(data), err)
return data
}
}
return data
}

func (d *Decoder) sendProfileData(profile ingestion.RawProfile, format string, parser *Parser, metadata ingestion.Metadata) error {
input := &ingestion.IngestInput{
Format: ingestion.Format(format),
Expand Down
71 changes: 49 additions & 22 deletions server/ingester/profile/decoder/decoder_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
const (
// the maximum duration of off-cpu profile is 1h + <1s
MAX_OFF_CPU_PROFILE_SPLIT_COUNT = 4000
DEFAULT_COMPRESSION_ALGO = "zstd"
)

type Parser struct {
Expand Down Expand Up @@ -68,11 +69,23 @@ type processTracer struct {
// implement storage.Putter
// triggered by input.Profile.Parse
func (p *Parser) Put(ctx context.Context, i *storage.PutInput) error {
// for application profiling, appName like : application.cpu, e.g.:<appName>.<eventType>
eventType := strings.TrimPrefix(i.Key.AppName(), fmt.Sprintf("%s.", p.profileName))
if p.processTracer != nil {
// for ebpf profiling event type
eventType = p.processTracer.eventType
}
log.Debugf("put profile data, from: %d, spy: %s, event type: %s", i.StartTime.Unix(), i.SpyName, eventType)
i.Val.IterateStacks(func(name string, self uint64, stack []string) {
for i, j := 0, len(stack)-1; i < j; i, j = i+1, j-1 {
stack[i], stack[j] = stack[j], stack[i]
}
inProcesses := p.stackToInProcess(i, stack, self)
onelineStack := strings.Join(stack, ";")
atomic.AddInt64(&p.Counter.UncompressSize, int64(len(stack)))
location := compress([]byte(onelineStack), p.compressionAlgo)
atomic.AddInt64(&p.Counter.CompressedSize, int64(len(location)))

inProcesses := p.stackToInProcess(location, self, i.StartTime, i.Units.String(), eventType, i.SpyName, i.Key.Labels())
p.profileWriterCallback(inProcesses)
// in the same batch, app_service is the same and only needs to be written once.
p.appServiceTagWriterCallback(inProcesses[0].(*dbwriter.InProcessProfile))
Expand All @@ -86,8 +99,28 @@ func (p *Parser) Evaluate(i *storage.PutInput) (storage.SampleObserver, bool) {
return p.observer, true
}

func (p *Parser) stackToInProcess(input *storage.PutInput, stack []string, value uint64) []interface{} {
labels := input.Key.Labels()
// eBPF profiling direct write into db
func (p *Parser) rawStackToInProcess(stack []byte, value uint64, startTime time.Time, units, spyName string, labels map[string]string, srcCompressed bool) error {
data := string(stack)
// sender require compress, but ingester require not compress, should decompress
if srcCompressed && p.compressionAlgo == "" {
data = deCompress(stack, DEFAULT_COMPRESSION_ALGO)
}
// sender require not compress, but ingester require compress, should compress
if !srcCompressed && p.compressionAlgo == DEFAULT_COMPRESSION_ALGO {
data = compress(stack, p.compressionAlgo)
}
if len(data) == 0 {
return fmt.Errorf("stack parsing failed, startTime: %d, spy: %s", startTime.Unix(), spyName)
}
// otherwise, do nothing, directly write into db
inProcess := p.stackToInProcess(data, value, startTime, units, p.processTracer.eventType, spyName, labels)
p.profileWriterCallback(inProcess)
p.appServiceTagWriterCallback(inProcess[0].(*dbwriter.InProcessProfile))
return nil
}

func (p *Parser) stackToInProcess(location string, value uint64, startTime time.Time, units, eventType, spyName string, labels map[string]string) []interface{} {
tagNames := make([]string, 0, len(labels))
tagValues := make([]string, 0, len(labels))
for k, v := range labels {
Expand All @@ -108,24 +141,18 @@ func (p *Parser) stackToInProcess(input *storage.PutInput, stack []string, value

var pid uint32
var stime int64
var eventType string

onelineStack := strings.Join(stack, ";")
atomic.AddInt64(&p.Counter.UncompressSize, int64(len(onelineStack)))

location := compress(onelineStack, p.compressionAlgo)
atomic.AddInt64(&p.Counter.CompressedSize, int64(len(location)))

profileValueUs := int64(value)
if p.processTracer != nil {
// only for eBPF profiling
profileValueUs = int64(p.value)
pid = p.processTracer.pid
stime = p.processTracer.stime
eventType = p.processTracer.eventType
} else {
eventType = strings.TrimPrefix(input.Key.AppName(), fmt.Sprintf("%s.", p.profileName))
}
ret.FillProfile(input,

ret.FillProfile(startTime,
units,
labels,
p.platformData,
p.vtapID,
p.orgId, p.teamId,
Expand All @@ -136,7 +163,7 @@ func (p *Parser) stackToInProcess(input *storage.PutInput, stack []string, value
p.compressionAlgo,
profileValueUs,
p.inTimestamp,
spyMap[input.SpyName],
spyMap[spyName],
pid,
stime,
tagNames,
Expand Down Expand Up @@ -190,29 +217,29 @@ func (s *observer) Observe(k []byte, v int) {
// e.g.: convert profile application & profile labels to prometheus series
}

func compress(src string, algo string) string {
func compress(src []byte, algo string) string {
switch algo {
case "":
return src
case "zstd":
return string(src)
case DEFAULT_COMPRESSION_ALGO:
dst := make([]byte, 0, len(src))
result, err := common.ZstdCompress(dst, []byte(src), zstd.SpeedDefault)
result, err := common.ZstdCompress(dst, src, zstd.SpeedDefault)
if err != nil {
log.Errorf("compress error: %v", err)
return src
return string(src)
}
// str after compressed and algo
return string(result)
default:
return src
return string(src)
}
}

func deCompress(str []byte, algo string) string {
switch algo {
case "":
return string(str)
case "zstd":
case DEFAULT_COMPRESSION_ALGO:
dst := make([]byte, 0, len(str))
result, err := common.ZstdDecompress(dst, str)
if err != nil {
Expand Down

0 comments on commit d173351

Please sign in to comment.