Skip to content

Commit

Permalink
Add bi-stream call support for triple
Browse files Browse the repository at this point in the history
  • Loading branch information
namelessssssssssss authored and liujianjun.ljj committed Mar 27, 2024
1 parent e67ea54 commit df27427
Show file tree
Hide file tree
Showing 32 changed files with 1,522 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ protected void decorateRequest(SofaRequest request) {

if (!consumerConfig.isGeneric()) {
// 找到调用类型, generic的时候类型在filter里进行判断
request.setInvokeType(consumerConfig.getMethodInvokeType(request.getMethodName()));
request.setInvokeType(consumerConfig.getMethodInvokeType(request));
}

RpcInvokeContext invokeCtx = RpcInvokeContext.peekContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,10 @@ else if (RpcConstants.INVOKER_TYPE_FUTURE.equals(invokeType)) {
// 放入线程上下文
RpcInternalContext.getContext().setFuture(future);
response = buildEmptyResponse(request);
} else if (RpcConstants.INVOKER_TYPE_CLIENT_STREAMING.equals(invokeType)
|| RpcConstants.INVOKER_TYPE_BI_STREAMING.equals(invokeType)
|| RpcConstants.INVOKER_TYPE_SERVER_STREAMING.equals(invokeType)) {
response = transport.syncSend(request, Integer.MAX_VALUE);
} else {
throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR, "Unknown invoke type:" + invokeType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,22 @@ public class RpcConstants {
* 调用方式:future
*/
public static final String INVOKER_TYPE_FUTURE = "future";
/**
* 调用方式:一元调用
*/
public static final String INVOKER_TYPE_UNARY = "unary";
/**
* 调用方式:客户端流
*/
public static final String INVOKER_TYPE_CLIENT_STREAMING = "clientStream";
/**
* 调用方式:服务端流
*/
public static final String INVOKER_TYPE_SERVER_STREAMING = "serverStream";
/**
* 调用方式:双向流
*/
public static final String INVOKER_TYPE_BI_STREAMING = "bidirectionalStream";

/**
* Hessian序列化 [不推荐]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,11 @@ public abstract class AbstractInterfaceConfig<T, S extends AbstractInterfaceConf
*/
protected transient volatile Map<String, Object> configValueCache = null;

/**
* 方法调用类型,(方法全名 - 调用类型)
*/
protected transient volatile Map<String, String> methodCallType = null;

/**
* 代理接口类,和T对应,主要针对泛化调用
*/
Expand Down Expand Up @@ -247,6 +252,21 @@ public S setProxyClass(Class proxyClass) {
return castThis();
}

/**
* Cache the call type of interface methods
*/
protected void loadMethodCallType(Class<?> interfaceClass){
Method[] methods = interfaceClass.getDeclaredMethods();
this.methodCallType = new ConcurrentHashMap<>();
for(Method method :methods) {
methodCallType.put(method.getName(),MethodConfig.mapStreamType(method,RpcConstants.INVOKER_TYPE_SYNC));
}
}

public String getMethodCallType(String methodName) {
return methodCallType.get(methodName);
}

/**
* Gets application.
*
Expand Down Expand Up @@ -1015,7 +1035,7 @@ public Object getMethodConfigValue(String methodName, String configKey) {
* @param key the key
* @return the string
*/
private String buildmkey(String methodName, String key) {
protected String buildmkey(String methodName, String key) {
return RpcConstants.HIDE_KEY_PREFIX + methodName + RpcConstants.HIDE_KEY_PREFIX + key;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.alipay.sofa.rpc.common.utils.ExceptionUtils;
import com.alipay.sofa.rpc.common.utils.StringUtils;
import com.alipay.sofa.rpc.core.invoke.SofaResponseCallback;
import com.alipay.sofa.rpc.core.request.SofaRequest;
import com.alipay.sofa.rpc.listener.ChannelListener;
import com.alipay.sofa.rpc.listener.ConsumerStateListener;
import com.alipay.sofa.rpc.listener.ProviderInfoListener;
Expand Down Expand Up @@ -935,12 +936,62 @@ public SofaResponseCallback getMethodOnreturn(String methodName) {
/**
* Gets the call type corresponding to the method name
*
* @param methodName the method name
* @param sofaRequest the request
* @return the call type
*/
public String getMethodInvokeType(String methodName) {
return (String) getMethodConfigValue(methodName, RpcConstants.CONFIG_KEY_INVOKE_TYPE,
getInvokeType());
public String getMethodInvokeType(SofaRequest sofaRequest) {
String methodName = sofaRequest.getMethodName();

String invokeType = (String) getMethodConfigValue(methodName, RpcConstants.CONFIG_KEY_INVOKE_TYPE, null);

if (invokeType == null) {
invokeType = getAndCacheCallType(sofaRequest);
}

return invokeType;
}

/**
* Get and cache the call type of certain method
* @param request RPC request
* @return request call type
*/
public String getAndCacheCallType(SofaRequest request) {
Method method = request.getMethod();
String callType = MethodConfig
.mapStreamType(
method,
(String) getMethodConfigValue(request.getMethodName(), RpcConstants.CONFIG_KEY_INVOKE_TYPE,
getInvokeType())
);
//Method level config
updateAttribute(buildMethodConfigKey(request, RpcConstants.CONFIG_KEY_INVOKE_TYPE), callType, true);
return callType;
}

/**
* 通过请求的目标方法构建方法配置key。该key使用内部配置格式。(以'.' 开头)
* @param request RPC请求
* @return 方法配置名称,带方法参数列表
*/
public String buildMethodConfigKey(SofaRequest request, String propertyKey) {
return "." + getMethodSignature(request.getMethod()) + "." + propertyKey;
}

public static String getMethodSignature(Method method) {
Class<?>[] parameterTypes = method.getParameterTypes();
StringBuilder methodSignature = new StringBuilder();
methodSignature.append(method.getName()).append("(");

for (int i = 0; i < parameterTypes.length; i++) {
methodSignature.append(parameterTypes[i].getSimpleName());
if (i < parameterTypes.length - 1) {
methodSignature.append(", ");
}
}

methodSignature.append(")");
return methodSignature.toString();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,15 @@
*/
package com.alipay.sofa.rpc.config;

import com.alipay.sofa.rpc.common.RpcConstants;
import com.alipay.sofa.rpc.core.exception.RpcErrorType;
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.core.invoke.SofaResponseCallback;
import com.alipay.sofa.rpc.transport.StreamHandler;

import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

Expand All @@ -43,7 +49,8 @@ public class MethodConfig implements Serializable {
protected Map<String, String> parameters;

/**
* The Timeout. 远程调用超时时间(毫秒)
* The Timeout. 远程调用超时时间(毫秒)。
* 对于Stream调用,这个时间为整个调用的超时时长,而非stream内单个调用的时长。未在这个时长内完成(调用{@link StreamHandler#onFinish()})的Stream调用会认为超时并抛出异常。
*/
protected Integer timeout;

Expand Down Expand Up @@ -326,4 +333,38 @@ public MethodConfig setParameter(String key, String value) {
public String getParameter(String key) {
return parameters == null ? null : parameters.get(key);
}

/**
* Gets the stream call type of certain method
* @param method the method
* @return call type,server/client/bidirectional stream or default value. If not mapped to any stream call type, use the default value
*/
public static String mapStreamType(Method method, String defaultValue){
Class<?>[] paramClasses = method.getParameterTypes();
Class<?> returnClass = method.getReturnType();

int paramLen = paramClasses.length;
String callType;

//BidirectionalStream & ClientStream
if(paramLen > 0 && StreamHandler.class.isAssignableFrom(paramClasses[0]) && StreamHandler.class.isAssignableFrom(returnClass)){
if(paramLen > 1){
throw new SofaRpcException(RpcErrorType.CLIENT_CALL_TYPE,"Bidirectional/Client stream method parameters can be only one StreamHandler.");
}
callType = RpcConstants.INVOKER_TYPE_BI_STREAMING;
}
//ServerStream
else if (paramLen > 1 && StreamHandler.class.isAssignableFrom(paramClasses[0]) && void.class == returnClass){
callType = RpcConstants.INVOKER_TYPE_SERVER_STREAMING;
}
else if (StreamHandler.class.isAssignableFrom(returnClass) || Arrays.stream(paramClasses).anyMatch(StreamHandler.class::isAssignableFrom)) {
throw new SofaRpcException(RpcErrorType.CLIENT_CALL_TYPE, "StreamHandler can only at the specified location of parameter. Please check related docs.");
}
//Other call types
else {
callType = defaultValue;
}

return callType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ public T getRef() {
*/
public ProviderConfig<T> setRef(T ref) {
this.ref = ref;
loadMethodCallType(ref.getClass());
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,21 @@ public static SofaRequest buildSofaRequest(Class<?> clazz, Method method, Class[
return request;
}

/**
* 根据一个请求的属性复制一个不包含具体方法实参的请求。
* 复制以下属性:请求接口名、请求方法名、请求方法、方法参数类型
*
* @param sofaRequest 被复制的请求实例
*/
public static SofaRequest copyEmptyRequest(SofaRequest sofaRequest) {
SofaRequest request = new SofaRequest();
request.setInterfaceName(sofaRequest.getInterfaceName());
request.setMethodName(sofaRequest.getMethodName());
request.setMethod(sofaRequest.getMethod());
request.setMethodArgSigs(sofaRequest.getMethodArgSigs());
return request;
}

/**
* 构建rpc错误结果
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.rpc.transport;

/**
* StreamHandler, works just like gRPC StreamObserver.
*/
public interface StreamHandler<T> {

/**
* Sends a message, or defines the behavior when a message is received.
* <p>This method should never be called after {@link StreamHandler#onFinish()} has been invoked.
*/
void onMessage(T message);

/**
* Note: This method MUST be invoked after the transport is complete.
* Failure to do so may result in unexpected errors.
* <p>
* Signals that all messages have been sent/received normally, and closes this stream.
*/
void onFinish();

/**
* Signals an exception to terminate this stream, or defines the behavior when an error occurs.
* <p></p>
* Once this method is invoked by one side, it can't send more messages, and the corresponding method on the other side will be triggered.
* Depending on the protocol implementation, it's possible that the other side can still call {@link StreamHandler#onMessage(Object)} after this method has been invoked, although this is not recommended.
* <p></p>
* As a best practice, it is advised not to send any more information once this method is called.
*
*/
void onException(Throwable throwable);
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ public class RpcErrorType {
*/
public static final int CLIENT_NETWORK = 250;

/**
* 不支持的RPC调用方式异常
*/
public static final int CLIENT_CALL_TYPE = 260;

/**
* 客户端过滤器异常
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws So

// 修正类型
ConsumerConfig consumerConfig = (ConsumerConfig) invoker.getConfig();
String invokeType = consumerConfig.getMethodInvokeType(methodName);
String invokeType = consumerConfig.getMethodInvokeType(request);
request.setInvokeType(invokeType);
request.addRequestProp(RemotingConstants.HEAD_INVOKE_TYPE, invokeType);
request.addRequestProp(REVISE_KEY, REVISE_VALUE);
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit df27427

Please sign in to comment.