Skip to content

Commit

Permalink
A naive implementation of the blockBlobClient stageBlockFromURL opera…
Browse files Browse the repository at this point in the history
…tion
  • Loading branch information
radekg committed Jul 30, 2024
1 parent 2945f6b commit a5d0f7a
Show file tree
Hide file tree
Showing 2 changed files with 384 additions and 3 deletions.
310 changes: 307 additions & 3 deletions src/blob/handlers/BlockBlobHandler.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import { URLBuilder } from "@azure/ms-rest-js";
import axios, { AxiosResponse } from "axios";

import { convertRawHeadersToMetadata } from "../../common/utils/utils";
import {
getMD5FromStream,
Expand All @@ -11,10 +14,15 @@ import * as Models from "../generated/artifacts/models";
import Context from "../generated/Context";
import IBlockBlobHandler from "../generated/handlers/IBlockBlobHandler";
import { parseXML } from "../generated/utils/xml";
import { extractStoragePartsFromPath } from "../middlewares/blobStorageContext.middleware";
import { BlobModel, BlockModel } from "../persistence/IBlobMetadataStore";
import { BLOB_API_VERSION } from "../utils/constants";
import { BLOB_API_VERSION, HeaderConstants } from "../utils/constants";
import BaseHandler from "./BaseHandler";
import { getTagsFromString } from "../utils/utils";
import {
getTagsFromString,
deserializeRangeHeader,
getBlobTagsCount,
} from "../utils/utils";

/**
* BlobHandler handles Azure Storage BlockBlob related requests.
Expand Down Expand Up @@ -270,7 +278,75 @@ export default class BlockBlobHandler
options: Models.BlockBlobStageBlockFromURLOptionalParams,
context: Context
): Promise<Models.BlockBlobStageBlockFromURLResponse> {
throw new NotImplementedError(context.contextId);

const blobCtx = new BlobStorageContext(context);

// TODO: Check dest Lease status, and set to available if it's expired, see sample in BlobHandler.setMetadata()
const url = this.NewUriFromCopySource(sourceUrl, context);
const [
sourceAccount,
sourceContainer,
sourceBlob
] = extractStoragePartsFromPath(url.hostname, url.pathname, blobCtx.disableProductStyleUrl);
const snapshot = url.searchParams.get("snapshot") || "";

if (
sourceAccount === undefined ||
sourceContainer === undefined ||
sourceBlob === undefined
) {
throw StorageErrorFactory.getBlobNotFound(context.contextId!);
}

const sig = url.searchParams.get("sig");
if ((sourceAccount !== blobCtx.account) || (sig !== null)) {
await this.validateCopySource(sourceUrl, sourceAccount, context);
}

const downloadBlobRes = await this.metadataStore.downloadBlob(
context,
sourceAccount,
sourceContainer,
sourceBlob,
snapshot,
options.leaseAccessConditions,
);

if (downloadBlobRes.properties.contentLength === undefined) {
throw StorageErrorFactory.getConditionNotMet(context.contextId!);
}

const downloadBlockBlobRes = await this.downloadBlockBlobOrAppendBlob(
{ snapshot: snapshot, leaseAccessConditions: options.leaseAccessConditions },
context,
downloadBlobRes,
);

if (downloadBlockBlobRes.body === undefined) {
throw StorageErrorFactory.getConditionNotMet(context.contextId!);
}

const stageBlockRes = await this.stageBlock(blockId,
downloadBlobRes.properties.contentLength,
downloadBlockBlobRes.body,
{ leaseAccessConditions: options.leaseAccessConditions },
context
);

const response: Models.BlockBlobStageBlockFromURLResponse = {
statusCode: stageBlockRes.statusCode,
contentMD5: stageBlockRes.contentMD5,
date: stageBlockRes.date,
encryptionKeySha256: stageBlockRes.encryptionKeySha256,
encryptionScope: stageBlockRes.encryptionScope,
errorCode: stageBlockRes.errorCode,
isServerEncrypted: stageBlockRes.isServerEncrypted,
requestId: stageBlockRes.requestId,
version: stageBlockRes.version,
xMsContentCrc64: stageBlockRes.xMsContentCrc64,
};

return response
}

public async commitBlockList(
Expand Down Expand Up @@ -501,4 +577,232 @@ export default class BlockBlobHandler
);
}
}

// from BlobHandler, surely there must be a better way

private async validateCopySource(copySource: string, sourceAccount: string, context: Context): Promise<void> {
// Currently the only cross-account copy support is from/to the same Azurite instance. In either case access
// is determined by performing a request to the copy source to see if the authentication is valid.
const blobCtx = new BlobStorageContext(context);

const currentServer = blobCtx.request!.getHeader("Host") || "";
const url = this.NewUriFromCopySource(copySource, context);
if (currentServer !== url.host) {
this.logger.error(
`BlobHandler:startCopyFromURL() Source account ${url} is not on the same Azurite instance as target account ${blobCtx.account}`,
context.contextId
);

throw StorageErrorFactory.getCannotVerifyCopySource(
context.contextId!,
404,
"The specified resource does not exist"
);
}

this.logger.debug(
`BlobHandler:startCopyFromURL() Validating access to the source account ${sourceAccount}`,
context.contextId
);

// In order to retrieve proper error details we make a metadata request to the copy source. If we instead issue
// a HEAD request then the error details are not returned and reporting authentication failures to the caller
// becomes a black box.
const metadataUrl = URLBuilder.parse(copySource);
metadataUrl.setQueryParameter("comp", "metadata");
const validationResponse: AxiosResponse = await axios.get(
metadataUrl.toString(),
{
// Instructs axios to not throw an error for non-2xx responses
validateStatus: () => true
}
);
if (validationResponse.status === 200) {
this.logger.debug(
`BlobHandler:startCopyFromURL() Successfully validated access to source account ${sourceAccount}`,
context.contextId
);
} else {
this.logger.debug(
`BlobHandler:startCopyFromURL() Access denied to source account ${sourceAccount} StatusCode=${validationResponse.status}, AuthenticationErrorDetail=${validationResponse.data}`,
context.contextId
);

if (validationResponse.status === 404) {
throw StorageErrorFactory.getCannotVerifyCopySource(
context.contextId!,
validationResponse.status,
"The specified resource does not exist"
);
} else {
// For non-successful responses attempt to unwrap the error message from the metadata call.
let message: string =
"Could not verify the copy source within the specified time.";
if (
validationResponse.headers[HeaderConstants.CONTENT_TYPE] ===
"application/xml"
) {
const authenticationError = await parseXML(validationResponse.data);
if (authenticationError.Message !== undefined) {
message = authenticationError.Message.replace(/\n+/gm, "");
}
}

throw StorageErrorFactory.getCannotVerifyCopySource(
context.contextId!,
validationResponse.status,
message
);
}
}
}

private NewUriFromCopySource(copySource: string, context: Context): URL {
try {
return new URL(copySource)
}
catch
{
throw StorageErrorFactory.getInvalidHeaderValue(
context.contextId,
{
HeaderName: "x-ms-copy-source",
HeaderValue: copySource
})
}
}

/**
* Download block blob or append blob.
*
* @private
* @param {Models.BlobDownloadOptionalParams} options
* @param {Context} context
* @param {BlobModel} blob
* @returns {Promise<Models.BlobDownloadResponse>}
* @memberof BlobHandler
*/
private async downloadBlockBlobOrAppendBlob(
options: Models.BlobDownloadOptionalParams,
context: Context,
blob: BlobModel
): Promise<Models.BlobDownloadResponse> {
if (blob.isCommitted === false) {
throw StorageErrorFactory.getBlobNotFound(context.contextId!);
}

// Deserializer doesn't handle range header currently, manually parse range headers here
const rangesParts = deserializeRangeHeader(
context.request!.getHeader("range"),
context.request!.getHeader("x-ms-range")
);
const rangeStart = rangesParts[0];
let rangeEnd = rangesParts[1];

// Start Range is bigger than blob length
if (rangeStart > blob.properties.contentLength!) {
throw StorageErrorFactory.getInvalidPageRange(context.contextId!);
}

// Will automatically shift request with longer data end than blob size to blob size
if (rangeEnd + 1 >= blob.properties.contentLength!) {
// report error is blob size is 0, and rangeEnd is specified but not 0
if (blob.properties.contentLength == 0 && rangeEnd !== 0 && rangeEnd !== Infinity) {
throw StorageErrorFactory.getInvalidPageRange2(context.contextId!);
}
else {
rangeEnd = blob.properties.contentLength! - 1;
}
}

const contentLength = rangeEnd - rangeStart + 1;
const partialRead = contentLength !== blob.properties.contentLength!;

this.logger.info(
// tslint:disable-next-line:max-line-length
`BlobHandler:downloadBlockBlobOrAppendBlob() NormalizedDownloadRange=bytes=${rangeStart}-${rangeEnd} RequiredContentLength=${contentLength}`,
context.contextId
);

let bodyGetter: () => Promise<NodeJS.ReadableStream | undefined>;
const blocks = blob.committedBlocksInOrder;
if (blocks === undefined || blocks.length === 0) {
bodyGetter = async () => {
if (blob.persistency === undefined) {
return this.extentStore.readExtent(undefined, context.contextId);
}
return this.extentStore.readExtent(
{
id: blob.persistency.id,
offset: blob.persistency.offset + rangeStart,
count: Math.min(blob.persistency.count, contentLength)
},
context.contextId
);
};
} else {
bodyGetter = async () => {
return this.extentStore.readExtents(
blocks.map((block) => block.persistency),
rangeStart,
rangeEnd + 1 - rangeStart,
context.contextId
);
};
}

let contentRange: string | undefined;
if (
context.request!.getHeader("range") ||
context.request!.getHeader("x-ms-range")
) {
contentRange = `bytes ${rangeStart}-${rangeEnd}/${blob.properties
.contentLength!}`;
}

let body: NodeJS.ReadableStream | undefined = await bodyGetter();
let contentMD5: Uint8Array | undefined;
if (!partialRead) {
contentMD5 = blob.properties.contentMD5;
}
if (
contentLength <= 4 * 1024 * 1024 &&
contentMD5 === undefined &&
body !== undefined
) {
contentMD5 = await getMD5FromStream(body);
body = await bodyGetter();
}

const response: Models.BlobDownloadResponse = {
statusCode: contentRange ? 206 : 200,
body,
metadata: blob.metadata,
eTag: blob.properties.etag,
requestId: context.contextId,
date: context.startTime!,
version: BLOB_API_VERSION,
...blob.properties,
cacheControl: context.request!.getQuery("rscc") ?? blob.properties.cacheControl,
contentDisposition: context.request!.getQuery("rscd") ?? blob.properties.contentDisposition,
contentEncoding: context.request!.getQuery("rsce") ?? blob.properties.contentEncoding,
contentLanguage: context.request!.getQuery("rscl") ?? blob.properties.contentLanguage,
contentType: context.request!.getQuery("rsct") ?? blob.properties.contentType,
blobContentMD5: blob.properties.contentMD5,
acceptRanges: "bytes",
contentLength,
contentRange,
contentMD5: contentRange ? (context.request!.getHeader("x-ms-range-get-content-md5") ? contentMD5: undefined) : contentMD5,
tagCount: getBlobTagsCount(blob.blobTags),
isServerEncrypted: true,
clientRequestId: options.requestId,
creationTime: blob.properties.creationTime,
blobCommittedBlockCount:
blob.properties.blobType === Models.BlobType.AppendBlob
? (blob.committedBlocksInOrder || []).length
: undefined,
};

return response;
}
}
Loading

0 comments on commit a5d0f7a

Please sign in to comment.