Skip to content

Commit

Permalink
Adding disk buffering, part 4 (#221)
Browse files Browse the repository at this point in the history
* Creating and initializing SignalDiskExporter

* Renaming test functions

* Verifying rum set up with disk buffering enabled

* Exporting all available signals from disk in DefaultExportScheduler

* Setting up disk buffering export scheduler

* Adding javadocs

* Reorganizing fields

* Update instrumentation/src/main/java/io/opentelemetry/android/features/diskbuffering/SignalDiskExporter.kt

Co-authored-by: Manoel Aranda Neto <[email protected]>

* Update instrumentation/src/main/java/io/opentelemetry/android/features/diskbuffering/scheduler/DefaultExportScheduler.kt

Co-authored-by: jason plumb <[email protected]>

* Inverting "if" as suggested in the reviews.

* Renaming function to scheduleDiskTelemetryReader

* Fixing compilation

* Adapting disk buffering to new APIs

---------

Co-authored-by: Manoel Aranda Neto <[email protected]>
Co-authored-by: jason plumb <[email protected]>
  • Loading branch information
3 people committed Mar 4, 2024
1 parent 16b450c commit 3632f6a
Show file tree
Hide file tree
Showing 10 changed files with 519 additions and 61 deletions.
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
opentelemetry = "1.35.0"
opentelemetry-alpha = "1.32.1-alpha"
opentelemetry-semconv = "1.21.0-alpha"
opentelemetry-contrib = "1.31.0-alpha"
opentelemetry-contrib = "1.33.0-alpha"
mockito = "5.11.0"
junit = "5.10.2"
byteBuddy = "1.14.12"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import android.util.Log;
import io.opentelemetry.android.config.OtelRumConfig;
import io.opentelemetry.android.features.diskbuffering.DiskBufferingConfiguration;
import io.opentelemetry.android.features.diskbuffering.SignalFromDiskExporter;
import io.opentelemetry.android.features.diskbuffering.scheduler.ExportScheduleHandler;
import io.opentelemetry.android.instrumentation.InstrumentedApplication;
import io.opentelemetry.android.instrumentation.activity.VisibleScreenTracker;
import io.opentelemetry.android.instrumentation.anr.AnrDetector;
Expand All @@ -31,8 +33,9 @@
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.contrib.disk.buffering.SpanDiskExporter;
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.SpanFromDiskExporter;
import io.opentelemetry.contrib.disk.buffering.SpanToDiskExporter;
import io.opentelemetry.contrib.disk.buffering.StorageConfiguration;
import io.opentelemetry.exporter.logging.LoggingSpanExporter;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.logs.SdkLoggerProvider;
Expand Down Expand Up @@ -289,24 +292,80 @@ public OpenTelemetryRum build() {

applyConfiguration();

DiskBufferingConfiguration diskBufferingConfiguration =
config.getDiskBufferingConfiguration();
SpanExporter spanExporter = buildSpanExporter();
SignalFromDiskExporter signalFromDiskExporter = null;
if (diskBufferingConfiguration.isEnabled()) {
try {
StorageConfiguration storageConfiguration = createStorageConfiguration();
final SpanExporter originalSpanExporter = spanExporter;
spanExporter =
SpanToDiskExporter.create(originalSpanExporter, storageConfiguration);

signalFromDiskExporter =
new SignalFromDiskExporter(
SpanFromDiskExporter.create(
originalSpanExporter, storageConfiguration),
null,
null);
} catch (IOException e) {
Log.e(RumConstants.OTEL_RUM_LOG_TAG, "Could not initialize disk exporters.", e);
}
}

OpenTelemetrySdk sdk =
OpenTelemetrySdk.builder()
.setTracerProvider(buildTracerProvider(sessionId, application))
.setTracerProvider(
buildTracerProvider(sessionId, application, spanExporter))
.setMeterProvider(buildMeterProvider(application))
.setLoggerProvider(buildLoggerProvider(application))
.setPropagators(buildFinalPropagators())
.build();

scheduleDiskTelemetryReader(signalFromDiskExporter, diskBufferingConfiguration);

SdkPreconfiguredRumBuilder delegate =
new SdkPreconfiguredRumBuilder(application, sdk, sessionId);
instrumentationInstallers.forEach(delegate::addInstrumentation);
return delegate.build();
}

private StorageConfiguration createStorageConfiguration() throws IOException {
DiskManager diskManager = DiskManager.create(config.getDiskBufferingConfiguration());
return StorageConfiguration.builder()
.setMaxFileSize(diskManager.getMaxCacheFileSize())
.setMaxFolderSize(diskManager.getMaxFolderSize())
.setRootDir(diskManager.getSignalsBufferDir())
.setTemporaryFileProvider(
new SimpleTemporaryFileProvider(diskManager.getTemporaryDir()))
.build();
}

private void scheduleDiskTelemetryReader(
@Nullable SignalFromDiskExporter signalExporter,
DiskBufferingConfiguration diskBufferingConfiguration) {
ExportScheduleHandler exportScheduleHandler =
diskBufferingConfiguration.getExportScheduleHandler();
if (signalExporter == null) {
// Disabling here allows to cancel previously scheduled exports using tools that
// can run even after the app has been terminated (such as WorkManager).
// But for in-memory only schedulers, nothing should need to be disabled.
exportScheduleHandler.disable();
} else {
// Not null means that disk buffering is enabled and disk exporters are successfully
// initialized.
SignalFromDiskExporter.set(signalExporter);
exportScheduleHandler.enable();
}
}

/** Leverage the configuration to wire up various instrumentation components. */
private void applyConfiguration() {
if (config.shouldGenerateSdkInitializationEvents()) {
initializationEvents = new SdkInitializationEvents();
if (initializationEvents == InitializationEvents.NO_OP) {
initializationEvents = new SdkInitializationEvents();
}
initializationEvents.recordConfiguration(config);
}
initializationEvents.sdkInitializationStarted();
Expand Down Expand Up @@ -406,13 +465,14 @@ private CurrentNetworkProvider getOrCreateCurrentNetworkProvider() {
return currentNetworkProvider;
}

private SdkTracerProvider buildTracerProvider(SessionId sessionId, Application application) {
private SdkTracerProvider buildTracerProvider(
SessionId sessionId, Application application, SpanExporter spanExporter) {
SdkTracerProviderBuilder tracerProviderBuilder =
SdkTracerProvider.builder()
.setResource(resource)
.addSpanProcessor(new SessionIdSpanAppender(sessionId));

SpanExporter spanExporter = buildSpanExporter();
initializationEvents.spanExporterInitialized(spanExporter);
BatchSpanProcessor batchSpanProcessor = BatchSpanProcessor.builder(spanExporter).build();
tracerProviderBuilder.addSpanProcessor(batchSpanProcessor);

Expand All @@ -426,32 +486,7 @@ private SdkTracerProvider buildTracerProvider(SessionId sessionId, Application a
private SpanExporter buildSpanExporter() {
// TODO: Default to otlp...but how can we make endpoint and auth mandatory?
SpanExporter defaultExporter = LoggingSpanExporter.create();
SpanExporter spanExporter = defaultExporter;
DiskBufferingConfiguration diskBufferingConfiguration =
config.getDiskBufferingConfiguration();
if (diskBufferingConfiguration.isEnabled()) {
try {
spanExporter = createDiskExporter(defaultExporter, diskBufferingConfiguration);
} catch (IOException e) {
Log.w(RumConstants.OTEL_RUM_LOG_TAG, "Could not create span disk exporter.", e);
}
}
return spanExporterCustomizer.apply(spanExporter);
}

private static SpanExporter createDiskExporter(
SpanExporter defaultExporter, DiskBufferingConfiguration diskBufferingConfiguration)
throws IOException {
DiskManager diskManager = DiskManager.create(diskBufferingConfiguration);
StorageConfiguration storageConfiguration =
StorageConfiguration.builder()
.setMaxFileSize(diskManager.getMaxCacheFileSize())
.setMaxFolderSize(diskManager.getMaxFolderSize())
.setTemporaryFileProvider(
new SimpleTemporaryFileProvider(diskManager.getTemporaryDir()))
.build();
return SpanDiskExporter.create(
defaultExporter, diskManager.getSignalsBufferDir(), storageConfiguration);
return spanExporterCustomizer.apply(defaultExporter);
}

private SdkMeterProvider buildMeterProvider(Application application) {
Expand All @@ -478,4 +513,9 @@ private ContextPropagators buildFinalPropagators() {
TextMapPropagator defaultPropagator = buildDefaultPropagator();
return ContextPropagators.create(propagatorCustomizer.apply(defaultPropagator));
}

OpenTelemetryRumBuilder setInitializationEvents(InitializationEvents initializationEvents) {
this.initializationEvents = initializationEvents;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,22 @@

package io.opentelemetry.android.features.diskbuffering;

import io.opentelemetry.android.features.diskbuffering.scheduler.DefaultExportScheduleHandler;
import io.opentelemetry.android.features.diskbuffering.scheduler.DefaultExportScheduler;
import io.opentelemetry.android.features.diskbuffering.scheduler.ExportScheduleHandler;

/** Configuration for disk buffering. */
public final class DiskBufferingConfiguration {
private final boolean enabled;
private final int maxCacheSize;
private final ExportScheduleHandler exportScheduleHandler;
private static final int DEFAULT_MAX_CACHE_SIZE = 60 * 1024 * 1024;
private static final int MAX_FILE_SIZE = 1024 * 1024;

private DiskBufferingConfiguration(Builder builder) {
this.enabled = builder.enabled;
this.maxCacheSize = builder.maxCacheSize;
this.exportScheduleHandler = builder.exportScheduleHandler;
}

public static Builder builder() {
Expand All @@ -33,9 +39,15 @@ public int getMaxCacheFileSize() {
return MAX_FILE_SIZE;
}

public ExportScheduleHandler getExportScheduleHandler() {
return exportScheduleHandler;
}

public static final class Builder {
private boolean enabled;
private int maxCacheSize;
private ExportScheduleHandler exportScheduleHandler =
new DefaultExportScheduleHandler(new DefaultExportScheduler());

private Builder(boolean enabled, int maxCacheSize) {
this.enabled = enabled;
Expand All @@ -58,6 +70,15 @@ public Builder setMaxCacheSize(int maxCacheSize) {
return this;
}

/**
* Sets a scheduler that will take care of periodically read data stored in disk and export
* it.
*/
public Builder setExportScheduleHandler(ExportScheduleHandler exportScheduleHandler) {
this.exportScheduleHandler = exportScheduleHandler;
return this;
}

public DiskBufferingConfiguration build() {
return new DiskBufferingConfiguration(this);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.android.features.diskbuffering

import androidx.annotation.WorkerThread
import io.opentelemetry.contrib.disk.buffering.LogRecordFromDiskExporter
import io.opentelemetry.contrib.disk.buffering.MetricFromDiskExporter
import io.opentelemetry.contrib.disk.buffering.SpanFromDiskExporter
import java.io.IOException
import java.util.concurrent.TimeUnit

/**
* Entrypoint to read and export previously cached signals.
*/
class SignalFromDiskExporter
@JvmOverloads
internal constructor(
private val spanFromDiskExporter: SpanFromDiskExporter?,
private val metricFromDiskExporter: MetricFromDiskExporter?,
private val logRecordFromDiskExporter: LogRecordFromDiskExporter?,
private val exportTimeoutInMillis: Long = TimeUnit.SECONDS.toMillis(5),
) {
/**
* A batch contains all the signals that arrived in one call to [SpanDiskExporter.export]. So if
* that function is called 5 times, then there will be 5 batches in disk. This function reads
* and exports ONE batch every time is called.
*
* @return TRUE if it found data in disk and the exporter succeeded. FALSE if any of those conditions were
* not met.
*/
@WorkerThread
@Throws(IOException::class)
fun exportBatchOfSpans(): Boolean {
return spanFromDiskExporter?.exportStoredBatch(
exportTimeoutInMillis,
TimeUnit.MILLISECONDS,
) ?: false
}

/**
* A batch contains all the signals that arrived in one call to [MetricDiskExporter.export]. So if
* that function is called 5 times, then there will be 5 batches in disk. This function reads
* and exports ONE batch every time is called.
*
* @return TRUE if it found data in disk and the exporter succeeded. FALSE if any of those conditions were
* not met.
*/
@WorkerThread
@Throws(IOException::class)
fun exportBatchOfMetrics(): Boolean {
return metricFromDiskExporter?.exportStoredBatch(
exportTimeoutInMillis,
TimeUnit.MILLISECONDS,
) ?: false
}

/**
* A batch contains all the signals that arrived in one call to [LogRecordDiskExporter.export]. So if
* that function is called 5 times, then there will be 5 batches in disk. This function reads
* and exports ONE batch every time is called.
*
* @return TRUE if it found data in disk and the exporter succeeded. FALSE if any of those conditions were
* not met.
*/
@WorkerThread
@Throws(IOException::class)
fun exportBatchOfLogs(): Boolean {
return logRecordFromDiskExporter?.exportStoredBatch(
exportTimeoutInMillis,
TimeUnit.MILLISECONDS,
) ?: false
}

/**
* Convenience method that attempts to export all kinds of signals from disk.
*
* @return TRUE if at least one of the signals were successfully exported, FALSE if no signal
* of any kind was exported.
*/
@WorkerThread
@Throws(IOException::class)
fun exportBatchOfEach(): Boolean {
var atLeastOneWorked = exportBatchOfSpans()
if (exportBatchOfMetrics()) {
atLeastOneWorked = true
}
if (exportBatchOfLogs()) {
atLeastOneWorked = true
}
return atLeastOneWorked
}

companion object {
private var instance: SignalFromDiskExporter? = null

@JvmStatic
fun get(): SignalFromDiskExporter? {
return instance
}

@JvmStatic
fun set(signalFromDiskExporter: SignalFromDiskExporter) {
check(instance == null) { "An instance is already set. You can only set it once." }
instance = signalFromDiskExporter
}

@JvmStatic
fun resetForTesting() {
instance = null
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@

package io.opentelemetry.android.features.diskbuffering.scheduler

import android.util.Log
import io.opentelemetry.android.RumConstants
import io.opentelemetry.android.features.diskbuffering.SignalFromDiskExporter
import io.opentelemetry.android.internal.services.periodicwork.PeriodicRunnable
import java.io.IOException
import java.util.concurrent.TimeUnit

class DefaultExportScheduler : PeriodicRunnable() {
Expand All @@ -14,11 +18,19 @@ class DefaultExportScheduler : PeriodicRunnable() {
}

override fun onRun() {
// TODO for next PR.
val exporter = SignalFromDiskExporter.get() ?: return

try {
do {
val didExport = exporter.exportBatchOfEach()
} while (didExport)
} catch (e: IOException) {
Log.e(RumConstants.OTEL_RUM_LOG_TAG, "Error while exporting signals from disk.", e)
}
}

override fun shouldStopRunning(): Boolean {
return false
return SignalFromDiskExporter.get() == null
}

override fun minimumDelayUntilNextRunInMillis(): Long {
Expand Down
Loading

0 comments on commit 3632f6a

Please sign in to comment.