Skip to content

Commit

Permalink
Filter
Browse files Browse the repository at this point in the history
  • Loading branch information
EmmaZhu committed Nov 29, 2023
1 parent 105a9cd commit e906560
Show file tree
Hide file tree
Showing 31 changed files with 9,202 additions and 15 deletions.
47 changes: 46 additions & 1 deletion src/blob/handlers/ContainerHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,52 @@ export default class ContainerHandler extends BaseHandler

public async filterBlobs(options: Models.ContainerFilterBlobsOptionalParams, context: Context
): Promise<Models.ContainerFilterBlobsResponse> {
throw new NotImplementedError(context.contextId!);
const blobCtx = new BlobStorageContext(context);
const accountName = blobCtx.account!;
const containerName = blobCtx.container!;
await this.metadataStore.checkContainerExist(
context,
accountName,
containerName
);

const request = context.request!;
const marker = options.marker;
options.marker = options.marker || "";
if (
options.maxresults === undefined ||
options.maxresults > DEFAULT_LIST_BLOBS_MAX_RESULTS
) {
options.maxresults = DEFAULT_LIST_BLOBS_MAX_RESULTS;
}

const [blobs, nextMarker] = await this.metadataStore.filterBlobs(
context,
accountName,
containerName,
options.where,
options.maxresults,
marker,
);

const serviceEndpoint = `${request.getEndpoint()}/${accountName}`;
const response: Models.ContainerFilterBlobsResponse = {
statusCode: 200,
requestId: context.contextId,
version: BLOB_API_VERSION,
date: context.startTime,
serviceEndpoint,
where: options.where!,
blobs: blobs.map(item => {
return {
...item
};
}),
clientRequestId: options.requestId,
nextMarker: `${nextMarker || ""}`
};

return response;
}

/**
Expand Down
128 changes: 128 additions & 0 deletions src/blob/persistence/FilterBlobPage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@

/**
* This implements a page of blob results.
* When maxResults is smaller than the number of prefixed items in the metadata source, multiple reads from
* the source may be necessary.
*
* @export
* @class FilterBlobPage
*/
export default class FilterBlobPage<FilterBlobType> {
readonly maxResults: number;

filterBlobItems: FilterBlobType[] = [];
latestMarker: string = "";

// isFull indicates we could only (maybe) add a prefix
private isFull: boolean = false;

// isExhausted indicates nothing more should be added
private isExhausted: boolean = false;

constructor(maxResults: number) {
this.maxResults = maxResults;
}

/**
* Empty the page (useful in unit tests)
*
*/
public reset() {
this.filterBlobItems.splice(0);
this.isFull = false;
this.isExhausted = false;
this.latestMarker = "";
}

private updateFull() {
this.isFull = (this.filterBlobItems.length === this.maxResults);
}

/**
* addItem will add to the blob list if possible and update the full/exhausted state of the page
*/
private addItem(item: FilterBlobType): boolean {
if (this.isExhausted) {
return false;
}
let added: boolean = false;
if (! this.isFull) {
this.filterBlobItems.push(item);
added = true;
}
this.updateFull();

// if a blob causes fullness the next item read cannot be squashed only duplicate prefixes can
this.isExhausted = this.isFull;
return added;
}

/**
* Add a BlobType item to the appropriate collection, update the marker
*
* When the page becomes full, items may still be added iff the item is existing prefix
*
* Return the number of items added
*/
private add(name: string, item: FilterBlobType): boolean {
if (this.isExhausted) {
return false;
}
if (name < this.latestMarker) {
throw new Error("add received unsorted item. add must be called on sorted data");
}
const marker = (name > this.latestMarker) ? name : this.latestMarker;
let added: boolean = false;
added = this.addItem(item);
if (added) {
this.latestMarker = marker;
}
return added;
}

/**
* Iterate over an array blobs read from a source and add them until the page cannot accept new items
*/
private processList(docs: FilterBlobType[], nameFn: (item: FilterBlobType) => string): number {
let added: number = 0;
for (const item of docs) {
if (this.add(nameFn(item), item)) {
added++;
}
if (this.isExhausted) break;
}
return added;
}

/**
* Fill the page if possible by using the provided reader function.
*
* For any BlobType, the name is used with delimiter to treat the item as a blob or
* a BlobPrefix for the list blobs result.
*
* This function will use the reader for BlobType to keep reading from a metadata
* data source until the source has no more items or the page cannot add any more items.
*
* Return the contents of the page, blobs, prefixes, and a continuation token if applicable
*/
public async fill(
reader: (offset: number) => Promise<FilterBlobType[]>,
namer: (item: FilterBlobType) => string,
): Promise<[FilterBlobType[], string]> {
let offset: number = 0;
let docs = await reader(offset);
let added: number = 0;
while (docs.length) {
added = this.processList(docs, namer);
offset += added;
if (added < this.maxResults) {
break;
}
docs = await reader(offset);
}
return [
this.filterBlobItems,
added < docs.length ? this.latestMarker : ""
];
}
}
14 changes: 13 additions & 1 deletion src/blob/persistence/IBlobMetadataStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import IDataStore from "../../common/IDataStore";
import IGCExtentProvider from "../../common/IGCExtentProvider";
import * as Models from "../generated/artifacts/models";
import Context from "../generated/Context";
import { FilterBlobItem } from "../generated/artifacts/models";

/**
* This model describes a chunk inside a persistency extent for a given extent ID.
Expand Down Expand Up @@ -153,6 +154,8 @@ interface IGetBlobPropertiesRes {
}
export type GetBlobPropertiesRes = IGetBlobPropertiesRes;

export type FilterBlobModel = FilterBlobItem;

// The response model for each lease-related request.
interface IBlobLeaseResponse {
properties: Models.BlobPropertiesInternal;
Expand Down Expand Up @@ -500,7 +503,16 @@ export interface IBlobMetadataStore
marker?: string,
includeSnapshots?: boolean,
includeUncommittedBlobs?: boolean
): Promise<[BlobModel[], string | undefined]>;
): Promise<[BlobModel[], string | undefined]>;

filterBlobs(
context: Context,
account: string,
container: string,
where?: string,
maxResults?: number,
marker?: string,
): Promise<[FilterBlobModel[], string | undefined]>;

/**
* Create blob item in persistency layer. Will replace if blob exists.
Expand Down
Loading

0 comments on commit e906560

Please sign in to comment.