Skip to content

Commit

Permalink
[INLONG-11153][Manager] Fix the problem of HTTP sink does not automat…
Browse files Browse the repository at this point in the history
…ically allocate sort cluster
  • Loading branch information
fuweng11 committed Sep 20, 2024
1 parent f1ccde4 commit 9af47e1
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public class SinkType extends StreamType {
public static final Set<String> SORT_STANDALONE_SINK = new HashSet<>();

static {
SINK_TO_CLUSTER.put(HTTP, ClusterType.SORT_HTTP);
SINK_TO_CLUSTER.put(CLS, ClusterType.SORT_CLS);
SINK_TO_CLUSTER.put(ES, ClusterType.SORT_ES);
SINK_TO_CLUSTER.put(PULSAR, ClusterType.SORT_PULSAR);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.service.resource.sink.http;

import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.pojo.sink.SinkInfo;
import org.apache.inlong.manager.service.resource.sink.AbstractStandaloneSinkResourceOperator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

/**
* Http resource operate for creating http resource
*/
@Service
public class HttpResourceOperator extends AbstractStandaloneSinkResourceOperator {

private static final Logger LOG = LoggerFactory.getLogger(HttpResourceOperator.class);

@Override
public Boolean accept(String sinkType) {
return SinkType.HTTP.equals(sinkType);
}

@Override
public void createSinkResource(SinkInfo sinkInfo) {
LOG.info("begin to create sink resources sinkId={}", sinkInfo.getId());
if (SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) {
LOG.warn("sink resource [" + sinkInfo.getId() + "] already success, skip to create");
return;
} else if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
LOG.warn("create resource was disabled, skip to create for [" + sinkInfo.getId() + "]");
return;
}
this.checkTaskAndConsumerGroup(sinkInfo);
this.assignCluster(sinkInfo);
}

}

0 comments on commit 9af47e1

Please sign in to comment.